Skip to main content

High-throughput I/O coordinator for feeding persistent subprocesses.

Project description

subfeed

High-throughput I/O coordinator for feeding persistent subprocesses.

subfeed is a Python library designed to feed stream data into swarms of persistent subprocesses. Unlike standard process pools that focus on executing discrete functions, subfeed focuses on streaming data into long-running external processes via multiple pipes (stdin + side channels) without deadlocks.

It handles the complex mechanics of non-blocking I/O, file descriptor management, and graceful shutdowns, allowing you to focus on your data flow.

Features

  • Deadlock-Free I/O: Safely writes to multiple input pipes (e.g., stdin and 3) simultaneously using threaded workers.
  • Persistent Processes: "Stoke" long-running processes rather than spawning a new process for every item.
  • Multi-Channel Support: Feed distinct data streams to different file descriptors on the same subprocess.
  • Lazy Startup: Workers initialize in parallel and come online as ready.
  • Graceful Shutdown: Handles the complex sequence of draining buffers, closing pipes, and reaping processes.

Installation

pip install subfeed

Usage Example

In this example, we feed a subprocess that expects data on stdin AND a side channel (File Descriptor 3).

1. The Child Process (child.py) A script that reads fibonacci numbers from stdin and line numbers from a side channel.

import sys
import os

# Read from stdin (fibonacci sequence)
fibonaccis = sys.stdin

# Read from Side Channel (File Descriptor passed via env var)
fd = int(os.environ["line_numbers"])
line_numbers = open(fd, "r")

for line_number, fib in zip(line_numbers, fibonaccis):
    print(f"{line_number.strip()}\t{fib.strip()}")

2. The Coordinator (main.py) Using subfeed to manage the swarm

from subfeed import *

# Define how to format data for the pipes
class LineNumbersWriter(Writer):
    def filter(self, batch):
        line_number = str(batch[0]) + "\n"
        return line_number.encode()

class TextWriter(Writer):
    def filter(self, batch):
        fib_line = str(batch[1]) + "\n"
        return fib_line.encode()

# 1. Define the Task Template
# We want to run "python child.py"
# We need stdin (implicit) and a side channel named "line_numbers"
template = TaskTemplate(
    args="python tests/print_fibonaccis.py",
    stdout=FileChannel("{id}.out"),  # Write output to 0.out, 1.out...
    sidein={"line_numbers": AnonChannel()}
)

# 2. Map data types to channels
specs = {
    "stdin": WriterSpec(TextWriter),
    "line_numbers": WriterSpec(LineNumbersWriter)
}

# 3. Start the Swarm (2 concurrent processes)
with Coordinator(template, count=2, writer_specs=specs) as swarm:
    # Generate first 100 fibonacci sequence and indices
    fibs = [0, 1]
    for i in range(98):
        fibs.append(fibs[-2] + fibs[-1])

    expected = {}
    for line_number, fib in enumerate(fibs, start = 1):
        # The 'feed' method accepts a data object that your Writers understand
        expected[line_number] = fib
        batch = [line_number, fib]
        swarm.feed(batch)

output = open("0.out").readlines() + open("1.out").readlines()
recovered = [[int(i) for i in line.split()] for line in output]
for line_number, fib in recovered:
    assert expected[line_number] == fib

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

subfeed-0.1.1.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

subfeed-0.1.1-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file subfeed-0.1.1.tar.gz.

File metadata

  • Download URL: subfeed-0.1.1.tar.gz
  • Upload date:
  • Size: 8.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 colorama/0.4.4 importlib-metadata/7.0.1 keyring/24.3.0 pkginfo/1.9.6 readme-renderer/34.0 requests-toolbelt/1.0.0 requests/2.31.0 rfc3986/1.5.0 tqdm/4.66.1 urllib3/1.26.5 CPython/3.10.12

File hashes

Hashes for subfeed-0.1.1.tar.gz
Algorithm Hash digest
SHA256 00d9da120cd02c733de41195d79362eef9d007d5784d60875d6b83a0aa0dec5f
MD5 36d8f66ac3d243d71f4bfeaa08abd0e4
BLAKE2b-256 cc441608042da5ff0562260fe32dd5270a1db10bbe3964be4c78adb321c28bcc

See more details on using hashes here.

File details

Details for the file subfeed-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: subfeed-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 9.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 colorama/0.4.4 importlib-metadata/7.0.1 keyring/24.3.0 pkginfo/1.9.6 readme-renderer/34.0 requests-toolbelt/1.0.0 requests/2.31.0 rfc3986/1.5.0 tqdm/4.66.1 urllib3/1.26.5 CPython/3.10.12

File hashes

Hashes for subfeed-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 86849b38d773c85a7d016cf292e88f49cfe413698a82ef0042554dfb7743160d
MD5 485cd88b13779eef36c04eea234a4da1
BLAKE2b-256 8996792ff46dbc5acf6cb315c32a4a6313251188952f211f4121795f3fb62e5f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page