Stream subprocess output with timeouts, tree-kill, and sane defaults.
Project description
procstream
Stream subprocess output in Python — with timeouts, tree-kill, stdin, check, merge_stderr, and sane defaults. Sync and async. Stdlib only.
from procstream import run
for line in run(["pytest", "-q"]).stream():
print("!!" if line.is_stderr else " ", line.text)
Why
subprocess.run() buffers everything until the child exits. subprocess.Popen
with pipes works, but reading stdout and stderr concurrently without deadlock
is one of those things you have to get right every single time, across every
project. Timeouts that kill the whole process tree (not just the direct
child) are another recurring footgun. Async? Different API again.
procstream wraps those patterns into one small, typed, stdlib-only library
with matching sync and async surfaces.
Install
pip install procstream
Python 3.9+. No runtime dependencies.
What it gives you
- Streaming output — iterate lines as they arrive, tagged by stream (stdout / stderr), in arrival order.
- Matching sync and async APIs —
run()/arun()with nearly identical signatures. - Timeouts that actually clean up — the whole process group gets SIGTERM, then SIGKILL if ignored. No orphaned grandchildren.
- Tree-kill —
terminate_tree()/kill_tree()for manual cancel, POSIX and Windows. stdin— feedstr,bytes, or an IO object; default is DEVNULL.check=True— raiseCalledProcessErroron non-zero exit.merge_stderr=True— interleave stderr into stdout with one combined stream.env_add=— overlay env vars on top ofos.environwithout replacing it.- Per-line callbacks —
on_stdout=printif you don't want to write the iterator loop yourself. - Context manager —
with run(...) as p:(orasync with) guarantees the tree dies if your code raises. - Result buffering —
wait()always returns captured stdout / stderr, even if you streamed them. - Stdlib only — no
psutil, no surprise deps. - Fully typed — PEP 561
py.typed.
Usage — sync
Simple: capture output
from procstream import run
r = run(["node", "--version"]).wait()
print(r.returncode, r.stdout.strip())
Streaming: react to lines as they arrive
for line in run(["npm", "install"]).stream():
if line.is_stderr:
log.warning(line.text)
else:
log.info(line.text)
Callbacks
run(
["make", "build"],
on_stdout=print,
on_stderr=lambda t: print("!!", t),
).wait()
Timeout — kills the whole tree
from procstream import run, TimeoutExpired
try:
run(["flaky-script"], timeout=30).wait()
except TimeoutExpired as e:
print("killed after", e.timeout, "seconds")
print(e.result.stdout) # partial output still available
check=True — raise on non-zero exit
from procstream import run, CalledProcessError
try:
run(["rspec"], check=True).wait()
except CalledProcessError as e:
print("tests failed:", e.returncode)
print(e.result.combined)
stdin
# str
run(["grep", "foo"], stdin="foo\nbar\nfoobar\n").wait()
# bytes
run(["gzip", "-d"], stdin=open("data.gz", "rb").read()).wait()
# a file object
with open("input.txt") as f:
run(["sort"], stdin=f).wait()
Merge stderr into stdout
for line in run(["cmake", "--build", "."], merge_stderr=True).stream():
# Every line.stream is "stdout", but the child's stderr is included too.
print(line.text)
Overlay env vars
r = run(
["node", "-e", "console.log(process.env.TOKEN)"],
env_add={"TOKEN": "shhh"},
).wait()
Manual cancel
p = run(["long-running-server", "--port", "8080"])
# ... elsewhere
p.terminate_tree(grace=5.0) # SIGTERM, then SIGKILL if still alive
# or
p.kill_tree() # immediate SIGKILL
Context manager
with run(["watcher"]) as p:
for line in p.stream():
if "ERROR" in line.text:
raise RuntimeError("bailing")
# Tree is terminated on the way out.
Usage — async
import asyncio
from procstream import arun, TimeoutExpired
async def main():
# streaming
proc = await arun(["pytest", "-q"])
async for line in proc.stream():
print(line.text)
# capture
r = await (await arun(["node", "--version"])).wait()
print(r.stdout)
# timeout
try:
await (await arun(["sleep", "30"], timeout=1.0)).wait()
except TimeoutExpired as e:
print("killed:", e.pid)
# context manager
async with await arun(["watcher"]) as p:
async for line in p.stream():
if "done" in line.text:
break
asyncio.run(main())
Async flags mirror sync: stdin, check, merge_stderr, env, env_add, cwd, timeout.
API reference
run(cmd, *, ...) → Process, arun(cmd, *, ...) → AsyncProcess
| parameter | type | description |
|---|---|---|
cmd |
list[str] or str |
command (with shell=True for string form on sync) |
cwd |
str | Path | None |
working directory |
env |
Mapping[str, str] | None |
full environment (replaces inheritance) |
env_add |
Mapping[str, str] | None |
overlay on os.environ |
timeout |
float | None |
seconds before tree kill |
stdin |
str | bytes | IO | None |
input to feed the child |
merge_stderr |
bool |
redirect stderr into stdout |
check |
bool |
raise CalledProcessError on non-zero exit |
encoding, errors |
str |
applied to stdout/stderr (sync only) |
on_stdout, on_stderr |
Callable[[str], None] |
per-line callbacks (sync only) |
Process / AsyncProcess
| member | description |
|---|---|
pid |
child's pid |
returncode |
None while running, int once finished |
running |
True while the child is alive |
stream() |
(async) iterator of Line in arrival order, consume once |
wait() |
block/await completion → Result; raises on timeout or check |
terminate_tree(grace=5.0) |
SIGTERM group, escalate to SIGKILL |
kill_tree() |
SIGKILL group immediately |
| context manager | with (sync) / async with (async) — terminates on exit |
Line
text: str, stream: "stdout" \| "stderr", is_stderr: bool.
Result
returncode: int, stdout: str, stderr: str, combined: str, ok: bool, raise_for_returncode(cmd=...).
Exceptions
procstream.ProcessError— base class.procstream.TimeoutExpired(pid, timeout, result)— raised on timeout.procstream.CalledProcessError(returncode, cmd, result)— raised whencheck=Trueand exit is non-zero.
Platform notes
- POSIX (macOS, Linux): child starts in a new session via
start_new_session=True. Signals go to the whole process group withos.killpg. - Windows: child starts with
CREATE_NEW_PROCESS_GROUP.terminate_tree()sendsCTRL_BREAK_EVENT; force-kill falls back totaskkill /F /T /PID <pid>.
Development
git clone https://github.com/f4rkh4d/procstream
cd procstream
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
pytest
ruff check src tests
mypy src/procstream
License
MIT.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file procstream-0.2.0.tar.gz.
File metadata
- Download URL: procstream-0.2.0.tar.gz
- Upload date:
- Size: 15.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cd6692a5b04d4c339f99b0880e3e5315257c8f65aa57319eaff0b6bfc5ddaa2c
|
|
| MD5 |
61db973201362e3d88906eb9cac2da58
|
|
| BLAKE2b-256 |
d20d9c35a57b1b5b081a3706817182383ef6e47545f2fe68f258ae1047117c3f
|
File details
Details for the file procstream-0.2.0-py3-none-any.whl.
File metadata
- Download URL: procstream-0.2.0-py3-none-any.whl
- Upload date:
- Size: 15.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cf06bc5cb300fd9a3a947aa3e1408c10f811618f5f7bd0220f20b3674992151e
|
|
| MD5 |
1453d72ae36bb99a508057e0d428fe9d
|
|
| BLAKE2b-256 |
a647fc3961fbaf1123ba070587001e84a10118edc973130cdc2c8b8e6fc40083
|