Async Processes and Pipelines
Project description
Async Processes and Pipelines
shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.
import asyncio
import shellous
sh = shellous.context()
async def main():
result = await (sh("ls") | sh("grep", "README"))
print(result)
asyncio.run(main())
Benefits
- Run programs asychronously in a single line.
- Easily capture output or redirect stdin, stdout and stderr to files, memory buffers or loggers.
- Easily construct pipelines and use process substitution.
- Easily set timeouts and reliably cancel running processes.
- Run a program with a pseudo-terminal (pty).
- Runs on Linux, MacOS, FreeBSD and Windows.
- Monitor processes being started and stopped with
audit_callback
API.
Requirements
- Requires Python 3.9 or later.
- Requires an asyncio event loop.
- Process substitution requires a Unix system with /dev/fd support.
- Pseudo-terminals require a Unix system.
Basic Usage
Start the asyncio REPL by typing python3 -m asyncio
, and import the shellous module:
>>> import shellous
Before we can do anything else, we need to create a context. Store the context in a
short variable name like sh
because we'll be typing it a lot.
>>> sh = shellous.context()
Now, we're ready to run our first command. Here's one that runs echo "hello, world"
.
>>> await sh("echo", "hello, world")
'hello, world\n'
The first argument is the program name. It is followed by zero or more separate arguments.
A command does not run until you await
it. Here, we create our own echo command with "-n"
to omit the newline. Note, echo("abc")
is the same as echo -n "abc"
.
>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'
Results and Exit Codes
If a command exits with a non-zero exit code, it raises a ResultError
exception that contains
the Result
object. The Result
object contains the exit code for the command among other details.
>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)
To always return a Result
object (and not raise an error for a non-zero exit status), add the .result
modifier.
>>> await echo("abc").result
Result(output_bytes=b'abc', exit_code=0, cancelled=False, encoding='utf-8', extra=None)
Redirecting Standard Input
You can change the standard input of a command by using the |
operator.
>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
' 3\n'
To redirect stdin using a file's contents, use a Path
object from pathlib
.
>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
' 201\n'
Redirecting Standard Output
To redirect standard output, use the |
operator.
>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'
To redirect standard output with append, use the >>
operator.
>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'
Redirecting Standard Error
By default, standard error is not captured. To redirect standard error, use the stderr
method.
>>> cmd = sh("cat", "does_not_exist").stderr(shellous.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'
You can redirect standard error to a file or path.
To redirect standard error to the hosting program's sys.stderr
, use the INHERIT redirect
option.
>>> cmd = sh("cat", "does_not_exist").stderr(shellous.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)
Pipelines
You can create a pipeline by combining commands using the |
operator.
>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'
Process Substitution (Unix Only)
You can pass a shell command as an argument to another.
>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'
Use .writable
to write to a command instead.
>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | shellous.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')
Async With & For
You can loop over a command's output by using the context manager as an iterator.
>>> async with pipe as run:
... async for line in run:
... print(line.rstrip())
...
README.md
⚠️ You can also acquire an async iterator directly from the command or pipeline object. This is discouraged because you will have less control over the final clean up of the command invocation than with a context manager.
>>> async for line in pipe: # Use caution!
... print(line.rstrip())
...
README.md
You can use async with
to interact with the process streams directly. You have to be careful; you
are responsible for correctly reading and writing multiple streams at the same time.
>>> async with pipe as run:
... data = await run.stdout.readline()
... print(data)
...
b'README.md\n'
Timeouts
You can specify a timeout using the timeout
option. If the timeout expires, shellous will raise
an asyncio.TimeoutError
.
>>> await sh("sleep", 60).set(timeout=0.1)
Traceback (most recent call last):
...
asyncio.exceptions.TimeoutError
Timeouts are just a special case of cancellation.
Cancellation
When a command is cancelled, shellous terminates the process and raises a CancelledError
.
You can retrieve the partial result by setting incomplete_result
to True. Shellous will return a
ResultError
when the specified command is cancelled (or timed out).
>>> sleep = sh("sleep", 60).set(incomplete_result=True)
>>> t = asyncio.create_task(sleep.coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=-15, cancelled=True, encoding='utf-8', extra=None)
When you use incomplete_result
, your code should respect the cancelled
attribute in the Result object.
Otherwise, your code may swallow the CancelledError.
Pseudo-Terminal Support (Unix Only)
To run a command through a pseudo-terminal, set the pty
option to True. Alternatively, you can pass
a function to configure the tty mode and size.
>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'
Context Objects
All commands are created using a Context object. To create a new context object use the shellous.context
function.
>>> sh = shellous.context()
You can specify shared command settings in a context object. Context objects are immutable, so you must store the result of your changes in a new variable.
>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh_audit = sh.set(audit_callback=auditor)
Now all commands created with sh_audit
will log their progress using the audit callback.
>>> await sh_audit("echo", "goodbye")
start echo
stop echo
'goodbye\n'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for shellous-0.14.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | fbef2a7a812746ca20b7c588bb268c4f4c01ce0d5bfa9d34642dfb477121aa19 |
|
MD5 | 6926b5fa483f75ac5ebee71c38edfa34 |
|
BLAKE2b-256 | db18bade487966f10fbbf5b379d7a3545aaeea207a7297ec3d83df6f135a6fab |