Skip to main content

Async Processes and Pipelines

Project description

Async Processes and Pipelines

docs PyPI CI codecov Downloads

shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.

import asyncio
from shellous import sh

async def main():
    result = await sh("echo", "hello")
    print(result)

asyncio.run(main())

Benefits

  • Run programs asychronously in a single line.
  • Redirect stdin, stdout and stderr to files, memory buffers or loggers.
  • Construct pipelines and use process substitution.
  • Set timeouts and reliably cancel running processes.
  • Run a program with a pseudo-terminal (pty).
  • Runs on Linux, MacOS, FreeBSD and Windows.
  • Monitor processes being started and stopped with audit_callback API.

Requirements

  • Requires Python 3.9 or later.
  • Requires an asyncio event loop.
  • Process substitution requires a Unix system with /dev/fd support.
  • Pseudo-terminals require a Unix system.

Running a Command

The tutorial in this README uses the asyncio REPL built into Python. In these examples, >>> is the REPL prompt.

Start the asyncio REPL by typing python3 -m asyncio, and import sh from the shellous module:

>>> from shellous import sh

Here's a command that runs echo "hello, world".

>>> await sh("echo", "hello, world")
'hello, world\n'

The first argument to sh is the program name. It is followed by zero or more arguments. Each argument will be converted to a string. If an argument is a list or tuple, it is flattened recursively.

>>> await sh("echo", 1, 2, [3, 4, (5, 6)])
'1 2 3 4 5 6\n'

A command does not run until you await it. When you run a command using await, it returns the value of the standard output interpreted as a UTF-8 string.

Here, we create our own echo command with "-n" to omit the newline. Note, echo("abc") will run the same command as echo -n "abc".

>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'

Commands are immutable objects that represent a program invocation: program name, arguments, environment variables, redirection operators and other settings. When you use a method to modify a Command, you are returning a new Command object. The original object is unchanged.

In this example, we use the set() modifier to change the output encoding. The new command echob will return standard output as bytes (encoding=None).

>>> echob = echo.set(encoding=None)
>>> await echob("def")
b'def'

Async For

Using await to run a command collects the entire output of the command before returning it. You can also iterate over the output lines as they arrive using async for.

>>> [line async for line in echo("hi\n", "there")]
['hi\n', ' there']

Use an async for loop when you want to examine the stream of output from a command, line by line. For example, suppose you want to run tail on a log file.

async for line in sh("tail", "-f", "/var/log/syslog"):
    if "ERROR" in line:
        print(line.rstrip())

Async With

You can use a command as an asynchronous context manager. Use async with when you need byte-by-byte control over the individual process streams: stdin, stdout and stderr. To control standard input, we need to explicitly tell shellous to "capture" standard input (For more on this, see Redirection.)

async with sh("cat").stdin(sh.CAPTURE) as run:
    run.stdin.write(b"abc")
    run.stdin.close()
    print(await run.stdout.readline())

result = run.result()

When reading or writing individual streams, you are responsible for managing reads and writes so they don't deadlock. The streams on the run object are asyncio.StreamReader and asyncio.StreamWriter objects.

ResultError

When a command fails, it raises a ResultError exception:

>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)

The ResultError exception contains a Result object with the exit_code.

In some cases, you want to ignore certain exit code values. That is, you want to treat them as if they are normal. To do this, you can set the exit_codes option:

>>> await sh("cat", "does_not_exist").set(exit_codes={0,1})
''

If there is a problem launching a process, shellous can also raise a separate FileNotFoundError or PermissionError.

Results

When a command completes successfully, it returns the standard output (unless it is redirected). For a more detailed response, you can specify that the command should return a Result object by using the .result modifier:

>>> await echo("abc").result
Result(output_bytes=b'abc', exit_code=0, cancelled=False, encoding='utf-8', extra=None)

A Result object contains the command's exit_code in addition to its output. A Result is True if the command's exit_code is zero. You can access the string value of the output using the .output property:

if result := sh("cat", "some-file").result:
    output = result.output
else:
    print(f"Command failed with exit_code={result.exit_code})

Redirection

shellous supports the redirection operators | and >>. They work similar to how they work in the unix shell.

To redirect to or from a file, use a pathlib.Path object. Alternatively, you can redirect input/output to a StringIO object, an open file, a Logger, or use a special redirection constant like sh.DEVNULL.

Redirecting Standard Input

To redirect standard input, use the pipe operator | with the argument on the left-side. Here is an example that passes the string "abc" as standard input.

>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
'       3\n'

To read input from a file, use a Path object from pathlib.

>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
'     201\n'

Shellous supports different STDIN behavior when using different Python types.

Python Type Behavior as STDIN
str Read input from string object.
bytes, bytearray Read input from bytes object.
Path Read input from file specified by Path.
File, StringIO, ByteIO Read input from open file object.
int Read input from existing file descriptor.
asyncio.StreamReader Read input from StreamReader.
sh.DEVNULL Read input from /dev/null.
sh.INHERIT Read input from existing sys.stdin.
sh.CAPTURE You will write to stdin interactively.

Redirecting Standard Output

To redirect standard output, use the pipe operator | with the argument on the right-side. Here is an example that writes to a temporary file.

>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'

To redirect standard output with append, use the >> operator.

>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'

Shellous supports different STDOUT behavior when using different Python types.

Python Type Behavior as STDOUT/STDERR append=True
Path Write output to file path specified by Path. Open file for append
bytearray Write output to mutable byte array. TypeError
File, StringIO, ByteIO Write output to open file object. TypeError
int Write output to existing file descriptor. TypeError
logging.Logger Log each line of output. TypeError
asyncio.StreamWriter Write output to StreamWriter. TypeError
sh.CAPTURE Return standard output or error. See Multiple Capture. TypeError
sh.DEVNULL Write output to /dev/null. TypeError
sh.INHERIT Write output to existing sys.stdout or sys.stderr. TypeError
sh.STDOUT Redirect stderr to same place as stdout. TypeError

Redirecting Standard Error

To redirect standard error, use the stderr method. Standard error supports the same Python types as standard output. To redirect stderr to the same place as stdout, use the sh.STDOUT constant.

>>> cmd = sh("cat", "does_not_exist").stderr(sh.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'

To redirect standard error to the hosting program's sys.stderr, use the sh.INHERIT redirect option.

>>> cmd = sh("cat", "does_not_exist").stderr(sh.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)

Default Redirections

For regular commands, the default redirections are:

  • Standard input is read from the empty string ("").
  • Standard out is captured by the program and returned (CAPTURE).
  • Standard error is discarded (DEVNULL).

However, the default redirections are adjusted when using a pseudo-terminal (pty):

  • Standard input is captured and ignored (CAPTURE).
  • Standard out is captured by the program and returned (CAPTURE).
  • Standard error is redirected to standard output (STDOUT).

Pipelines

You can create a pipeline by combining commands using the | operator.

>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'

Process Substitution (Unix Only)

You can pass a shell command as an argument to another.

>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'

Use .writable to write to a command instead.

>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | sh.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')

Timeouts

You can specify a timeout using the timeout option. If the timeout expires, shellous will raise a TimeoutError.

>>> await sh("sleep", 60).set(timeout=0.1)
Traceback (most recent call last):
  ...
TimeoutError

Timeouts are just a special case of cancellation. When a command is cancelled, shellous terminates the running process and raises a CancelledError.

>>> t = asyncio.create_task(sh("sleep", 60).coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
  ...
CancelledError

By default, shellous will send a SIGTERM signal to the process to tell it to exit. If the process does not exit within 3 seconds, shellous will send a SIGKILL signal. You can change these defaults with the cancel_signal and cancel_timeout settings. A command is not considered fully cancelled until the process exits.

Pseudo-Terminal Support (Unix Only)

To run a command through a pseudo-terminal, set the pty option to True. Alternatively, you can pass a function to configure the tty mode and size.

>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'

Context Objects

You can store shared command settings in an immutable context object. To create a new context object, specify your changes to the default context sh:

>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh_audit = sh.set(audit_callback=auditor)

Now all commands created with sh_audit will log their progress using the audit callback.

>>> await sh_audit("echo", "goodbye")
start echo
stop echo
'goodbye\n'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shellous-0.17.0.tar.gz (42.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

shellous-0.17.0-py3-none-any.whl (42.5 kB view details)

Uploaded Python 3

File details

Details for the file shellous-0.17.0.tar.gz.

File metadata

  • Download URL: shellous-0.17.0.tar.gz
  • Upload date:
  • Size: 42.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.9.12

File hashes

Hashes for shellous-0.17.0.tar.gz
Algorithm Hash digest
SHA256 efc1aa3d0735fa985c1099f9ee89d13cc59a91906d327d912b8ce6b3676259db
MD5 b38f0086ae69c7f6596c39fc15110b68
BLAKE2b-256 bda4c3bae24c05e00766e0117161568be27a63b635b25913d0d7dfa9732d9af5

See more details on using hashes here.

File details

Details for the file shellous-0.17.0-py3-none-any.whl.

File metadata

  • Download URL: shellous-0.17.0-py3-none-any.whl
  • Upload date:
  • Size: 42.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.9.12

File hashes

Hashes for shellous-0.17.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f142e07e6eaf93a6638f45aecc914b3cb5025cb50a7bfae57010ac9017719a9d
MD5 674af214c3c5f39850a9e9665fd29ad1
BLAKE2b-256 65ead74e1bc3a22fc91374fece6b4291ca7d42b4a31f58d388f030b024bfc55b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page