Skip to main content

Provides tools for multiprocessing.

Project description

procnexus

Provides tools for multiprocessing.

procnexus offers a tiny, explicit interface for collecting function calls and executing them concurrently with Python's multiprocessing.Pool.

🛠️ Installation

$ pip install procnexus

✨ Features

  • Simple task submission (submit) API.
  • Batch execution with process pools.
  • Asynchronous execution with start(), join(), and get()
  • Ordered results (same order as submitted tasks).
  • Lightweight wrapper around the standard library.

🚀 Quick Start

from procnexus import nexus


def add(a: int, b: int) -> int:
    return a + b


job = nexus(add, processes=4)
job.submit(1, 2)
job.submit(10, 5)
job.submit(-1, 8)

results = job.run()
print(results)  # [3, 15, 7]

# Or start the work asynchronously and collect it later.
job = nexus(add, processes=4)
job.submit(1, 2)
job.submit(10, 5)
job.start()
# Do other work here, and optionally submit more tasks before joining.
job.submit(-1, 8)
job.join()
results = job.get()
print(results)  # [3, 15, 7]

🧩 API

nexus(func, processes=-1) -> ProcNexus

Create a ProcNexus runner from a callable.

  • func: target function for each task.
  • processes: worker-process setting.
    • < 0: use os.cpu_count().
    • = 0: do not create a process pool; run with normal in-process mapping.
    • > 0: pass directly to multiprocessing.Pool.

ProcNexus.submit(*args, **kwargs) -> None

Queue one invocation of func. Before start(), the invocation is stored for later execution. After start() and before join(), the invocation is scheduled immediately and is included in the ordered get() result.

ProcNexus.start() -> None

Start executing all queued tasks. With processes=0, this computes immediately in the current process; otherwise it starts a process pool asynchronously.

ProcNexus.join(timeout=None) -> None

Wait for a previously started run to finish. Results are stored on the runner instead of being returned directly. For process-pool runs, timeout is passed to each task result wait; if it expires, unfinished workers are terminated and multiprocessing.TimeoutError is raised.

ProcNexus.get() -> list

Return results in submission order, including tasks submitted after start(). If the runner is still active, get() raises RuntimeError; call join() before retrieving results.

ProcNexus.run() -> list

Execute all currently queued tasks in parallel and return results in submission order. This one-shot convenience method leaves the runner in the pending state and keeps submitted tasks queued, so it can be called repeatedly before start().

📝 Notes

  • The submitted callable should be picklable by multiprocessing.
  • Arguments must also be serializable for inter-process communication.
  • Exceptions from worker processes propagate when calling join() or run().

🔗 See Also

Github repository

PyPI project

⚖️ License

This project falls under the BSD 3-Clause License.

🕒 History

v0.0.3

  • Changed get() to reject calls while a nexus is still running, making join() the explicit synchronization point before result retrieval.
  • Added join(timeout=None) support for process-pool runs, terminating unfinished workers and propagating multiprocessing.TimeoutError when a task wait expires.

v0.0.2

  • Made run() a non-mutating convenience API to better align with Python conventions: it returns results without implicitly advancing the asynchronous start()/join() lifecycle or consuming queued tasks.
  • Updated process-pool run() execution to use multiprocessing.Pool.starmap, preserving ordered results and keyword-argument handling while keeping queued tasks available for a later async run.
  • Added unit coverage for repeated run() calls, process-pool execution, keyword arguments, and rejecting run() after start().

v0.0.1

  • Added asynchronous execution with start(), join(), and get(), while keeping run() as the one-shot convenience API.
  • Allowed submit() calls after start() and before join(), preserving submission-order results across queued and late-submitted tasks.
  • Expanded README/API documentation and added unit coverage for async lifecycle, ordered results, and invalid state transitions.

v0.0.0

  • Initial release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

procnexus-0.0.3.tar.gz (9.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

procnexus-0.0.3-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file procnexus-0.0.3.tar.gz.

File metadata

  • Download URL: procnexus-0.0.3.tar.gz
  • Upload date:
  • Size: 9.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for procnexus-0.0.3.tar.gz
Algorithm Hash digest
SHA256 ed2cb017d390f7b2acf03182ec5fcf040798940e4fe7d5051cba13e276674eb2
MD5 fb87146077c3a4acd370489fa7008c59
BLAKE2b-256 4f413180a744ebc74886285213dfd35c0df8f8fb886c448337f955da522c7c9f

See more details on using hashes here.

File details

Details for the file procnexus-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: procnexus-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 7.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for procnexus-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 67fe94793c2f507f1f023547776cf56a60099d2d25d2dfa0ddbdc2ea230639ea
MD5 5954ee169634692e2cae59fd4388be91
BLAKE2b-256 68c989c8de9c7e80c898aff7d4a950db796863c2aa3f4dd48e0e17239d8a7db1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page