High-throughput I/O coordinator for feeding persistent subprocesses.
Project description
subfeed
High-throughput I/O coordinator for feeding persistent subprocesses.
subfeed is a Python library designed to feed stream data into swarms of persistent subprocesses. Unlike standard process pools that focus on executing discrete functions, subfeed focuses on streaming data into long-running external processes via multiple pipes (stdin + side channels) without deadlocks.
It handles the complex mechanics of non-blocking I/O, file descriptor management, and graceful shutdowns, allowing you to focus on your data flow.
Features
- Deadlock-Free I/O: Safely writes to multiple input pipes (e.g., stdin and 3) simultaneously using threaded workers.
- Persistent Processes: "Stoke" long-running processes rather than spawning a new process for every item.
- Multi-Channel Support: Feed distinct data streams to different file descriptors on the same subprocess.
- Lazy Startup: Workers initialize in parallel and come online as ready.
- Graceful Shutdown: Handles the complex sequence of draining buffers, closing pipes, and reaping processes.
Installation
pip install subfeed
Usage Example
In this example, we feed a subprocess that expects data on stdin AND a side channel (File Descriptor 3).
1. The Child Process (child.py)
A script that reads fibonacci numbers from stdin and line numbers from a side channel.
import sys
import os
# Read from stdin (fibonacci sequence)
fibonaccis = sys.stdin
# Read from Side Channel (File Descriptor passed via env var)
fd = int(os.environ["line_numbers"])
line_numbers = open(fd, "r")
for line_number, fib in zip(line_numbers, fibonaccis):
print(f"{line_number.strip()}\t{fib.strip()}")
2. The Coordinator (main.py)
Using subfeed to manage the swarm
from subfeed import *
# Define how to format data for the pipes
class LineNumbersWriter(Writer):
def filter(self, batch):
line_number = str(batch[0]) + "\n"
return line_number.encode()
class TextWriter(Writer):
def filter(self, batch):
fib_line = str(batch[1]) + "\n"
return fib_line.encode()
# 1. Define the Task Template
# We want to run "python child.py"
# We need stdin (implicit) and a side channel named "line_numbers"
template = TaskTemplate(
args="python tests/print_fibonaccis.py",
stdout=FileChannel("{id}.out"), # Write output to 0.out, 1.out...
sidein={"line_numbers": AnonChannel()}
)
# 2. Map data types to channels
specs = {
"stdin": WriterSpec(TextWriter),
"line_numbers": WriterSpec(LineNumbersWriter)
}
# 3. Start the Swarm (2 concurrent processes)
with Coordinator(template, count=2, writer_specs=specs) as swarm:
# Generate first 100 fibonacci sequence and indices
fibs = [0, 1]
for i in range(98):
fibs.append(fibs[-2] + fibs[-1])
expected = {}
for line_number, fib in enumerate(fibs, start = 1):
# The 'feed' method accepts a data object that your Writers understand
expected[line_number] = fib
batch = [line_number, fib]
swarm.feed(batch)
output = open("0.out").readlines() + open("1.out").readlines()
recovered = [[int(i) for i in line.split()] for line in output]
for line_number, fib in recovered:
assert expected[line_number] == fib
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file subfeed-0.1.1.tar.gz.
File metadata
- Download URL: subfeed-0.1.1.tar.gz
- Upload date:
- Size: 8.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 colorama/0.4.4 importlib-metadata/7.0.1 keyring/24.3.0 pkginfo/1.9.6 readme-renderer/34.0 requests-toolbelt/1.0.0 requests/2.31.0 rfc3986/1.5.0 tqdm/4.66.1 urllib3/1.26.5 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
00d9da120cd02c733de41195d79362eef9d007d5784d60875d6b83a0aa0dec5f
|
|
| MD5 |
36d8f66ac3d243d71f4bfeaa08abd0e4
|
|
| BLAKE2b-256 |
cc441608042da5ff0562260fe32dd5270a1db10bbe3964be4c78adb321c28bcc
|
File details
Details for the file subfeed-0.1.1-py3-none-any.whl.
File metadata
- Download URL: subfeed-0.1.1-py3-none-any.whl
- Upload date:
- Size: 9.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 colorama/0.4.4 importlib-metadata/7.0.1 keyring/24.3.0 pkginfo/1.9.6 readme-renderer/34.0 requests-toolbelt/1.0.0 requests/2.31.0 rfc3986/1.5.0 tqdm/4.66.1 urllib3/1.26.5 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86849b38d773c85a7d016cf292e88f49cfe413698a82ef0042554dfb7743160d
|
|
| MD5 |
485cd88b13779eef36c04eea234a4da1
|
|
| BLAKE2b-256 |
8996792ff46dbc5acf6cb315c32a4a6313251188952f211f4121795f3fb62e5f
|