Async Processes and Pipelines
Project description
Async Processes and Pipelines
shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.
import asyncio
from shellous import sh
async def main():
result = await sh("echo", "hello")
print(result)
asyncio.run(main())
Benefits
- Run programs asynchronously in a single line.
- Redirect stdin, stdout and stderr to files, memory buffers, async streams or loggers.
- Iterate asynchronously over subprocess output.
- Set timeouts and reliably cancel running processes.
- Run a program with a pseudo-terminal (pty).
- Use send() and expect() to manually control a subprocess.
- Construct pipelines and use process substitution directly from Python (no shell required).
- Runs on Linux, MacOS, FreeBSD and Windows.
- Monitor processes being started and stopped with
audit_callback
API.
Requirements
- Requires Python 3.9 or later.
- Requires an asyncio event loop.
- Pseudo-terminals require a Unix system.
- Process substitution requires a Unix system with /dev/fd support.
Running a Command
The tutorial in this README uses the asyncio REPL built into Python. In these examples, >>>
is the REPL prompt.
Start the asyncio REPL by typing python3 -m asyncio
, and import sh from the shellous module:
>>> from shellous import sh
Here's a command that runs echo "hello, world"
.
>>> await sh("echo", "hello, world")
'hello, world\n'
The first argument to sh
is the program name. It is followed by zero or more arguments. Each argument will be
converted to a string. If an argument is a list or tuple, it is flattened recursively.
>>> await sh("echo", 1, 2, [3, 4, (5, 6)])
'1 2 3 4 5 6\n'
A command does not run until you await
it. When you run a command using await
, it returns the value of the standard output interpreted as a UTF-8 string.
It is safe to await
the same command object more than once.
Here, we create our own echo command with "-n" to omit the newline. Note, echo("abc")
will run the same command as echo -n "abc"
.
>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'
Commands are immutable objects that represent a program invocation: program name, arguments, environment
variables, redirection operators and other settings. When you use a method to modify a Command
, you are
returning a new Command
object. The original object is unchanged.
You can wrap your commands in a function to improve type safety:
>>> from shellous import Command
>>> def exclaim(word: str) -> Command[str]:
... return sh("echo", "-n", f"{word}!!")
...
>>> await exclaim("Oh")
'Oh!!'
The type hint Command[str]
indicates that the command returns a str
.
Arguments
Commands use positional arguments only; keyword arguments are not supported.
In most cases, shellous automatically converts Python objects passed as command arguments to str
or bytes
. As described above, the list
and tuple
types are an exception; they are recursively flattened before their elements are converted to strings.
Dicts, sets, and generator types are not supported as arguments. Their string format doesn't make sense as a command line argument.
Results
When a command completes successfully, it returns the standard output (or "" if stdout is redirected). For a more detailed response, you can specify that the command should return a Result
object by using the .result
modifier:
>>> await echo.result("abc")
Result(exit_code=0, output_bytes=b'abc', error_bytes=b'', cancelled=False, encoding='utf-8')
A Result
object contains the command's exit_code
in addition to its output. A Result
is True if
the command's exit_code is zero. You can access the string value of the output using the .output
property:
if result := await sh.result("cat", "some-file"):
output = result.output
else:
print(f"Command failed with exit_code={result.exit_code})
You can retrieve the string value of the standard error using the .error
property. (By default, only the
first 1024 bytes of standard error is stored.)
If a command was terminated by a signal, the exit_code
will be the negative signal number.
The return value of sh.result("cmd", ...)
uses the type hint Command[Result]
.
ResultError
If you are not using the .result
modifier and a command fails, it raises a ResultError
exception:
>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'cat: does_not_exist: No such file or directory\n', cancelled=False, encoding='utf-8')
The ResultError
exception contains a Result
object with the exit_code and the first 1024 bytes of standard error.
In some cases, you want to ignore certain exit code values. That is, you want to treat them as if they are
normal. To do this, you can set the exit_codes
option:
>>> await sh("cat", "does_not_exist").set(exit_codes={0,1})
''
If there is a problem launching a process, shellous can also raise a separate FileNotFoundError
or PermissionError
exception.
Async For
Using await
to run a command collects the entire output of the command in memory before returning it. You
can also iterate over the output lines as they arrive using async for
.
>>> [line async for line in echo("hi\n", "there")]
['hi\n', ' there']
Use an async for
loop when you want to examine the stream of output from a command, line by line. For example, suppose you want to run tail on a log file.
async for line in sh("tail", "-f", "/var/log/syslog"):
if "ERROR" in line:
print(line.rstrip())
Async With
You can use a command as an asynchronous context manager. There are two ways to run a program using a context manager: a low-level API and a high-level API.
Byte-by-Byte (Low Level)
Use async with
directly when you need byte-by-byte
control over the individual streams: stdin, stdout and stderr. To control a standard stream, you
must tell shellous to "capture" it (For more on this, see Redirection.)
cmd = sh("cat").stdin(sh.CAPTURE).stdout(sh.CAPTURE)
async with cmd as run:
run.stdin.write(b"abc")
run.stdin.close()
print(await run.stdout.readline())
result = run.result()
The streams run.stdout
and run.stderr
are asyncio.StreamReader
objects. The stream run.stdin
is an asyncio.StreamWriter
object. If we didn't specify that stdin/stdout are sh.CAPTURE
, the
streams run.stdin
and run.stdout
would be
None
.
The return value of run.result()
is a Result
object. Depending on the command settings, this
function may raise a ResultError
on a non-zero exit code.
:warning: When reading or writing individual streams, you are responsible for managing reads and writes so they don't deadlock. You may use
run.create_task
to schedule a concurrent task.
You can also use async with
to run a server. When you do so, you must tell the server
to stop using run.cancel()
. Otherwise, the context manager will wait forever for the process to exit.
async with sh("some-server") as run:
# Send commands to the server here...
# Manually signal the server to stop.
run.cancel()
Prompt with Send/Expect (High Level API)
Use the prompt()
method to control a process using send
and expect
. The prompt()
method returns an asynchronous context manager (the Prompt
class) that facilitates reading and
writing strings and matching regular expressions.
cmd = sh("cat").set(pty=True)
async with cmd.prompt() as client:
await client.send("abc")
output, _ = await client.expect("\r\n")
print(output)
The Prompt
API automatically captures stdin
and stdout
.
Here is another example of controlling a bash co-process running in a docker container.
async def list_packages():
"Run bash in an ubuntu docker container and list packages."
bash_prompt = re.compile("root@[0-9a-f]+:/[^#]*# ")
cmd = sh("docker", "run", "-it", "--rm", "-e", "TERM=dumb", "ubuntu")
async with cmd.set(pty=True).prompt(bash_prompt, timeout=3) as cli:
# Read up to first prompt.
await cli.expect()
# Disable echo. The `command()` method combines send *and* expect methods.
await cli.command("stty -echo")
# Return list of packages.
result = await cli.command("apt-cache pkgnames")
return result.strip().split("\r\n")
# You can check the result object's exit code. You can only
# access `cli.result` outside the `async with` block.
assert cli.result.exit_code == 0
The prompt()
API does not raise a ResultError
when a command exits with an error status.
Typically, you'll see an EOFError when you were expecting to read a response. You can check the
exit status by retrieving the Prompt's result
property outside of the async with
block.
Redirection
shellous supports the redirection operators |
and >>
. They work similar to how they work in
the unix shell. Shellous does not support use of <
or >
for redirection. Instead, replace these
with |
.
To redirect to or from a file, use a pathlib.Path
object. Alternatively, you can redirect input/output
to a StringIO object, an open file, a Logger, or use a special redirection constant like sh.DEVNULL
.
:warning: When combining the redirect operators with
await
, you must use parentheses;await
has higher precedence than|
and>>
.
Redirecting Standard Input
To redirect standard input, use the pipe operator |
with the argument on the left-side.
Here is an example that passes the string "abc" as standard input.
>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
' 3\n'
To read input from a file, use a Path
object from pathlib
.
>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
' 201\n'
Shellous supports different STDIN behavior when using different Python types.
Python Type | Behavior as STDIN |
---|---|
str | Read input from string object. |
bytes, bytearray | Read input from bytes object. |
Path | Read input from file specified by Path . |
File, StringIO, ByteIO | Read input from open file object. |
int | Read input from existing file descriptor. |
asyncio.StreamReader | Read input from StreamReader . |
sh.DEVNULL | Read input from /dev/null . |
sh.INHERIT | Read input from existing sys.stdin . |
sh.CAPTURE | You will write to stdin interactively. |
Redirecting Standard Output
To redirect standard output, use the pipe operator |
with the argument on the right-side. Here is an
example that writes to a temporary file.
>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'
To redirect standard output with append, use the >>
operator.
>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'
Shellous supports different STDOUT behavior when using different Python types.
Python Type | Behavior as STDOUT/STDERR |
---|---|
Path | Write output to the file path specified by Path . |
bytearray | Write output to a mutable byte array. |
File, StringIO, ByteIO | Write output to an open file object. |
int | Write output to existing file descriptor at its current position. ◆ |
logging.Logger | Log each line of output. ◆ |
asyncio.StreamWriter | Write output to StreamWriter . ◆ |
sh.CAPTURE | Capture output for async with . ◆ |
sh.DEVNULL | Write output to /dev/null . ◆ |
sh.INHERIT | Write output to existing sys.stdout or sys.stderr . ◆ |
◆ For these types, there is no difference between using |
and >>
.
Shellous does not support redirecting standard output/error to a plain str
or bytes
object.
If you intend to redirect output to a file, you must use a pathlib.Path
object.
Redirecting Standard Error
By default, the first 1024 bytes read from standard error are stored in the Result object.
Any further bytes are discarded. You can change the 1024 byte limit using the error_limit
option.
To redirect standard error, use the stderr
method. Standard error supports the
same Python types as standard output. To append, set append=True
in the stderr
method.
To redirect stderr to the same place as stdout, use the sh.STDOUT
constant. If you also
redirect stdout to sh.DEVNULL
, you will only receive the standard error.
>>> cmd = sh("cat", "does_not_exist").stderr(sh.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'
To redirect standard error to the hosting program's sys.stderr
, use the sh.INHERIT
redirect
option.
>>> cmd = sh("cat", "does_not_exist").stderr(sh.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
...
shellous.result.ResultError: Result(exit_code=1, output_bytes=b'', error_bytes=b'', cancelled=False, encoding='utf-8')
If you redirect stderr, it will no longer be stored in the Result object, and the error_limit
option
will not apply.
Default Redirections
For regular commands, the default redirections are:
- Standard input is read from the empty string ("").
- Standard out is buffered and stored in the Result object (BUFFER).
- First 1024 bytes of standard error is buffered and stored in the Result object (BUFFER).
However, the default redirections are adjusted when using a pseudo-terminal (pty):
- Standard input is captured and ignored (CAPTURE).
- Standard out is buffered and stored in the Result object (BUFFER).
- Standard error is redirected to standard output (STDOUT).
When you use the Prompt
API, the standard input and standard output are automatically redirected to CAPTURE.
Pipelines
You can create a pipeline by combining commands using the |
operator. A pipeline feeds the standard out of one process into the next process as standard input. Here is the shellous
equivalent to the bash command: ls | grep README
>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'
A pipeline returns a Result
if the last command in the pipeline has the .result
modifier. To set other
options like encoding
for a Pipeline, set them on the last command.
>>> pipe = sh("ls") | sh("grep", "README").result
>>> await pipe
Result(exit_code=0, output_bytes=b'README.md\n', error_bytes=b'', cancelled=False, encoding='utf-8')
Error reporting for a pipeline is implemented similar to using the -o pipefail
shell option.
Pipelines support the same await/async for/async with
operations that work on a single command, including
the Prompt
API.
>>> [line.strip() async for line in pipe]
['README.md']
Process Substitution (Unix Only)
You can pass a shell command as an argument to another. Here is the shellous equivalent to the bash
command: grep README <(ls)
.
>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'
Use .writable
to write to a command instead.
>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | sh.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')
The above example is equivalent to ls | tee >(grep README > buf) > /dev/null
.
Timeouts
You can specify a timeout using the timeout
option. If the timeout expires, shellous will raise
a TimeoutError
.
>>> await sh("sleep", 60).set(timeout=0.1)
Traceback (most recent call last):
...
TimeoutError
Timeouts are just a special case of cancellation. When a command is cancelled, shellous terminates
the running process and raises a CancelledError
.
>>> t = asyncio.create_task(sh("sleep", 60).coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
...
CancelledError
By default, shellous will send a SIGTERM signal to the process to tell it to exit. If the process does not
exit within 3 seconds, shellous will send a SIGKILL signal. You can change these defaults with the
cancel_signal
and cancel_timeout
settings. A command is not considered fully cancelled until the
process exits.
Pseudo-Terminal Support (Unix Only)
To run a command through a pseudo-terminal, set the pty
option to True.
>>> await sh("echo", "in a pty").set(pty=True)
'in a pty\r\n'
Alternatively, you can pass a pty
function to configure the tty mode and size.
>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'
Shellous provides three built-in helper functions: shellous.cooked()
, shellous.raw()
and shellous.cbreak()
.
Context Objects
You can store shared command settings in an immutable context object (CmdContext). To create a new context object, specify your changes to the default context sh:
>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh_audit = sh.set(audit_callback=auditor)
Now all commands created with sh_audit
will log their progress using the audit callback.
>>> await sh_audit("echo", "goodbye")
start echo
stop echo
'goodbye\n'
You can also create a context object that specifies all return values are Result
objects.
>>> rsh = sh.result
>>> await rsh("echo", "whatever")
Result(exit_code=0, output_bytes=b'whatever\n', error_bytes=b'', cancelled=False, encoding='utf-8')
Options
Both Command
and CmdContext
support options to control their runtime behavior. Some of these options (timeout,
pty, audit_callback, and exit_codes) have been described above. See the shellous.Options
class for
more information.
You can retrieve an option from cmd
with cmd.options.<option>
. For example, use cmd.options.encoding
to obtain the encoding:
>>> cmd = sh("echo").set(encoding="latin1")
>>> cmd.options.encoding
'latin1'
Command
and CmdContext
use the .set()
method to specify most options:
Option | Description |
---|---|
path | Search path to use instead of the PATH environment variable. (Default=None) |
env | Additional environment variables to pass to the command. (Default={}) |
inherit_env | True if command should inherit the environment variables from the current process. (Default=True) |
encoding | Text encoding of input/output streams. You can specify an error handling scheme by including it after a space, e.g. "ascii backslashreplace". (Default="utf-8 strict") |
exit_codes | Set of exit codes that do not raise a ResultError . (Default={0}) |
timeout | Timeout in seconds to wait before cancelling the process. (Default=None) |
cancel_timeout | Timeout in seconds to wait for a cancelled process to exit before forcefully terminating it. (Default=3s) |
cancel_signal | The signal sent to a process when it is cancelled. (Default=SIGTERM) |
alt_name | Alternate name for the process used for debug logging. (Default=None) |
pass_fds | Additional file descriptors to pass to the process. (Default={}) |
pass_fds_close | True if descriptors in pass_fds should be closed after the child process is launched. (Default=False) |
pty | Used to allocate a pseudo-terminal (PTY). (Default=False) |
close_fds | True if process should close all file descriptors when it starts. This setting defaults to False to align with posix_spawn requirements. (Default=False) |
audit_callback | Provide function to audit stages of process execution. (Default=None) |
coerce_arg | Provide function to coerce Command arguments to strings when str() is not sufficient. For example, you can provide your own function that converts a dictionary argument to a sequence of strings. (Default=None) |
error_limit | Maximum number of initial bytes of STDERR to store in Result object. (Default=1024) |
env
Use the env()
method to add to the list of environment variables. The env()
method supports keyword parameters.
You can call env()
more than once and the effect is additive.
>>> cmd = sh("echo").env(ENV1="a", ENV2="b").env(ENV2=3)
>>> cmd.options.env
{'ENV1': 'a', 'ENV2': '3'}
Use the env
option with set()
when you want to replace all the environment variables.
input, output, error
When you apply a redirection operator to a Command
or CmdContext
, the redirection targets
are also stored in the Options
object. To change these, use the .stdin()
, .stdout()
, or .stderr()
methods or the redirection operator |
.
Option | Description |
---|---|
input | The redirection target for standard input. |
input_close | True if standard input should be closed after the process is launched. |
output | The redirection target for standard output. |
output_append | True if standard output should be open for append. |
output_close | True if standard output should be closed after the process is launched. |
error | The redirection target for standard error. |
error_append | True if standard error should be open for append. |
error_close | True if standard error should be closed after the process is launched. |
Type Checking
Shellous fully supports PEP 484 type hints.
Commands
Commands are generic on the return type, either str
or Result
. You will specify the
type of a command object as Command[str]
or Command[Result]
.
Use the result
modifier to obtain a Command[Result]
from a Command[str]
.
from shellous import sh, Command, Result
cmd1: Command[str] = sh("echo", "abc")
# When you `await cmd1`, the result is a `str` object.
cmd2: Command[Result] = sh.result("echo", "abc")
# When you `await cmd2`, the result is a `Result` object.
CmdContext
The CmdContext
class is also generic on either str
or Result
.
from shellous import sh, CmdContext, Result
sh1: CmdContext[str] = sh.set(path="/bin:/usr/bin")
# When you use `sh1` to create commands, it produces `Command[str]` object with the given path.
sh2: CmdContext[Result] = sh.result.set(path="/bin:/usr/bin")
# When you use `sh2` to create commands, it produces `Command[Result]` objects with the given path.
Logging
For verbose logging, shellous supports a SHELLOUS_TRACE
environment variable. Set the
value of SHELLOUS_TRACE
to a comma-delimited list of options:
-
detail: Enables detailed logging used to trace the steps of running a command.
-
prompt: Enables logging in the
Prompt
class when controlling a program using send/expect. -
all: Enables all logging options.
Shellous uses the built-in Python logging
module. After enabling these options,
the shellous
logger will display log messages at the INFO
level.
Without these options enabled, Shellous generates almost no log messages.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file shellous-0.37.0.tar.gz
.
File metadata
- Download URL: shellous-0.37.0.tar.gz
- Upload date:
- Size: 60.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 520a3adec56a57da00fbadbc30013f38319be14bb9b5e3dc2f4cab4dc81ce04f |
|
MD5 | 9509490164e3fc46d1d030db9f2bbbed |
|
BLAKE2b-256 | 1ec3d1b644b0fc1ebaec1023a1f37d6c3546182cc7ce2cdb1c7641c6d13e875a |
File details
Details for the file shellous-0.37.0-py3-none-any.whl
.
File metadata
- Download URL: shellous-0.37.0-py3-none-any.whl
- Upload date:
- Size: 59.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a23a9391abdd4a282a11f3920202a6c6125b4c498a84e63b3d863f4d31a8729a |
|
MD5 | 598d638fdcc0a30e3b03f14764e073d2 |
|
BLAKE2b-256 | 61309dd7ae262f82d811c77997ea6f67b46f457ddab481b683c893ad54797dbe |