Async Processes and Pipelines
Project description
Async Processes and Pipelines
shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.
import asyncio
import shellous
sh = shellous.context()
async def main():
result = await (sh("ls") | sh("grep", "README"))
print(result)
asyncio.run(main())
Benefits
- Run programs asychronously in a single line.
- Easily capture output or redirect stdin, stdout and stderr to files, memory buffers or loggers.
- Easily construct pipelines and use process substitution.
- Run a program with a pseudo-terminal (pty).
- Runs on Linux, MacOS, FreeBSD and Windows.
- Monitor processes being started and stopped with
audit_callback
API.
Requirements
- Requires Python 3.9 or later.
- Requires an asyncio event loop.
- Process substitution requires a Unix system with /dev/fd support.
- Pseudo-terminals require a Unix system. Pty's do not work on uvloop.
- FastChildWatcher is not supported.
Basic Usage
Start the asyncio REPL by typing python3 -m asyncio
, and import the shellous module:
>>> import shellous
Before we can do anything else, we need to create a context. Store the context in a
short variable name like sh
because we'll be typing it a lot.
>>> sh = shellous.context()
Now, we're ready to run our first command. Here's one that runs echo "hello, world"
.
>>> await sh("echo", "hello, world")
'hello, world\n'
The first argument is the program name. It is followed by zero or more separate arguments.
A command does not run until you await
it. Here, we create our own echo command with "-n"
to omit the newline. Note, echo("abc")
is the same as echo -n "abc"
.
>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'
Results and Exit Codes
When you await
a command, it captures the standard output and returns it. You can optionally have the
command return a Result
object. The Result
object will contain more information about the command
execution including the exit_code
. To return a result object, set return_result
option to True
.
>>> await echo("abc").set(return_result=True)
Result(output_bytes=b'abc', exit_code=0, cancelled=False, encoding='utf-8', extra=None)
The above command had an exit_code of 0.
If a command exits with a non-zero exit code, it raises a ResultError
exception that contains
the Result
object.
>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)
Redirecting Standard Input
You can change the standard input of a command by using the |
operator.
>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
' 3\n'
To redirect stdin using a file's contents, use a Path
object from pathlib
.
>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
' 201\n'
Redirecting Standard Output
To redirect standard output, use the |
operator.
>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'
To redirect standard output with append, use the >>
operator.
>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'
Redirecting Standard Error
By default, standard error is not captured. To redirect standard error, use the stderr
method.
>>> cmd = sh("cat", "does_not_exist").stderr(shellous.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'
You can redirect standard error to a file or path.
To redirect standard error to the hosting program's sys.stderr
, use the INHERIT redirect
option.
>>> cmd = sh("cat", "does_not_exist").stderr(shellous.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)
Pipelines
You can create a pipeline by combining commands using the |
operator.
>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'
Process Substitution (Unix Only)
You can pass a shell command as an argument to another.
>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'
Use ~ to write to a command instead.
>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | shellous.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')
Async With & For
You can loop over a command's output by using the context manager as an iterator.
>>> async with pipe as run:
... async for line in run:
... print(line.rstrip())
...
README.md
⚠️ You can also acquire an async iterator directly from the command or pipeline object. This is discouraged because you will have less control over the final clean up of the command invocation than with a context manager.
>>> async for line in pipe: # Use caution!
... print(line.rstrip())
...
README.md
You can use async with
to interact with the process streams directly. You have to be careful; you
are responsible for correctly reading and writing multiple streams at the same time.
>>> async with pipe as run:
... data = await run.stdout.readline()
... print(data)
...
b'README.md\n'
Cancellation
When a command is cancelled, shellous terminates the process and raises a CancelledError
.
You can retrieve the partial result by setting incomplete_result
to True. Shellous will return a
ResultError
when the specified command is cancelled.
>>> sleep = sh("sleep", 60).set(incomplete_result=True)
>>> t = asyncio.create_task(sleep.coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=-15, cancelled=True, encoding='utf-8', extra=None)
When you use incomplete_result
, your code should respect the cancelled
attribute in the Result object.
Otherwise, your code may swallow the CancelledError.
Pseudo-Terminal Support (Unix Only)
To run a command through a pseudo-terminal, set the pty
option to True. Alternatively, you can pass
a function to configure the tty mode and size.
>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'
Context Objects
You can specify shared command settings in a context object. Context objects are immutable, so you must store the result of your changes in a new variable.
>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh1 = sh.set(audit_callback=auditor)
Now all commands run with sh1
will log their progress using the audit callback.
>>> await sh1("echo", "goodbye")
start echo
stop echo
'goodbye\n'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.