Skip to main content

Async Processes and Pipelines

Project description

Async Processes and Pipelines

docs PyPI CI codecov Downloads

shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.

import asyncio
import shellous

sh = shellous.context()

async def main():
    result = await (sh("ls") | sh("grep", "README"))
    print(result)

asyncio.run(main())

Benefits

  • Run programs asychronously in a single line.
  • Easily capture output or redirect stdin, stdout and stderr to files, memory buffers or loggers.
  • Easily construct pipelines and use process substitution.
  • Run a program with a pseudo-terminal (pty).
  • Runs on Linux, MacOS, FreeBSD and Windows.
  • Monitor processes being started and stopped with audit_callback API.

Requirements

  • Requires Python 3.9 or later.
  • Requires an asyncio event loop.
  • Process substitution requires a Unix system with /dev/fd support.
  • Pseudo-terminals require a Unix system. Pty's do not work on uvloop.
  • FastChildWatcher is not supported.

Basic Usage

Start the asyncio REPL by typing python3 -m asyncio, and import the shellous module:

>>> import shellous

Before we can do anything else, we need to create a context. Store the context in a short variable name like sh because we'll be typing it a lot.

>>> sh = shellous.context()

Now, we're ready to run our first command. Here's one that runs echo "hello, world".

>>> await sh("echo", "hello, world")
'hello, world\n'

The first argument is the program name. It is followed by zero or more separate arguments.

A command does not run until you await it. Here, we create our own echo command with "-n" to omit the newline. Note, echo("abc") is the same as echo -n "abc".

>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'

Results and Exit Codes

When you await a command, it captures the standard output and returns it. You can optionally have the command return a Result object. The Result object will contain more information about the command execution including the exit_code. To return a result object, set return_result option to True.

>>> await echo("abc").set(return_result=True)
Result(output_bytes=b'abc', exit_code=0, cancelled=False, encoding='utf-8', extra=None)

The above command had an exit_code of 0.

If a command exits with a non-zero exit code, it raises a ResultError exception that contains the Result object.

>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)

Redirecting Standard Input

You can change the standard input of a command by using the | operator.

>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
'       3\n'

To redirect stdin using a file's contents, use a Path object from pathlib.

>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
'     201\n'

Redirecting Standard Output

To redirect standard output, use the | operator.

>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'

To redirect standard output with append, use the >> operator.

>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'

Redirecting Standard Error

By default, standard error is not captured. To redirect standard error, use the stderr method.

>>> cmd = sh("cat", "does_not_exist").stderr(shellous.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'

You can redirect standard error to a file or path.

To redirect standard error to the hosting program's sys.stderr, use the INHERIT redirect option.

>>> cmd = sh("cat", "does_not_exist").stderr(shellous.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)

Pipelines

You can create a pipeline by combining commands using the | operator.

>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'

Process Substitution (Unix Only)

You can pass a shell command as an argument to another.

>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'

Use ~ to write to a command instead.

>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | shellous.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')

Async With & For

You can loop over a command's output by using the context manager as an iterator.

>>> async with pipe as run:
...   async for line in run:
...     print(line.rstrip())
... 
README.md

⚠️ You can also acquire an async iterator directly from the command or pipeline object. This is discouraged because you will have less control over the final clean up of the command invocation than with a context manager.

>>> async for line in pipe:   # Use caution!
...   print(line.rstrip())
... 
README.md

You can use async with to interact with the process streams directly. You have to be careful; you are responsible for correctly reading and writing multiple streams at the same time.

>>> async with pipe as run:
...   data = await run.stdout.readline()
...   print(data)
... 
b'README.md\n'

Cancellation

When a command is cancelled, shellous terminates the process and raises a CancelledError.

You can retrieve the partial result by setting incomplete_result to True. Shellous will return a ResultError when the specified command is cancelled.

>>> sleep = sh("sleep", 60).set(incomplete_result=True)
>>> t = asyncio.create_task(sleep.coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=-15, cancelled=True, encoding='utf-8', extra=None)

When you use incomplete_result, your code should respect the cancelled attribute in the Result object. Otherwise, your code may swallow the CancelledError.

Pseudo-Terminal Support (Unix Only)

To run a command through a pseudo-terminal, set the pty option to True. Alternatively, you can pass a function to configure the tty mode and size.

>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'

Context Objects

You can specify shared command settings in a context object. Context objects are immutable, so you must store the result of your changes in a new variable.

>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh1 = sh.set(audit_callback=auditor)

Now all commands run with sh1 will log their progress using the audit callback.

>>> await sh1("echo", "goodbye")
start echo
stop echo
'goodbye\n'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shellous-0.9.3.tar.gz (37.9 kB view hashes)

Uploaded Source

Built Distribution

shellous-0.9.3-py3-none-any.whl (39.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page