Skip to main content

Ultra-simple parallel processing

Project description

English description follows Japanese.


概要

forkx は、Python で「超」簡単に並行処理を書くための小さなユーティリティです。

複数の「引数なし関数(呼び出し可能オブジェクト)」を forkx に渡すと、それらが同一プロセス内で並行実行され、すべての処理が終わったあとで、結果がリストとして返されます。結果の順番は、渡した関数の順番と一致します。

ポイント:

  • 複雑なワーカー管理なしで複数処理を同時に走らせたい
  • オブジェクトをコピーせずに共有したい
  • 書き込み競合のときだけロックで守りたい

といった用途に向きます。

注意: これはプロセス間の「並列処理」ではなく、同一プロセス内での「並行処理」です。

特徴

  • とにかく簡単な API
    • forkx(fn1, fn2, ...) と書くだけ
    • 返り値は [fn1の結果, fn2の結果, ...]
  • 共通オブジェクトをそのまま共有
    • 同じメモリ空間上で、オブジェクトを複製せずにアクセスできます
  • ロック機能付き
    • forkx.Lock() で、書き込み競合を避けるためのロックを簡単に利用できます
  • 例外の扱い
    • 各タスク内で発生した例外は呼び出し元に伝播します
    • ただし「すべてのタスクが終了したあと」でスローされます(途中で即中断しない)

基本的な使い方

import time
import forkx

def proc_a():
	time.sleep(1)
	return 2**2

def proc_b():
	time.sleep(1)
	return 3**3

# proc_a, proc_b が並行に実行される
results = forkx(proc_a, proc_b)
print(results)  # -> [4, 27]

動作:

  • proc_aproc_b がほぼ同時に開始されます。
  • forkx は両方の関数が終了するまで待ちます。
  • 返ってくるリストの順番は、渡した順 (proc_a, proc_b) と同じです。

共通オブジェクトとロック

全タスクは同一プロセス・同一メモリ空間を共有するので、Python オブジェクトをそのまま共通で扱えます。 複数タスクから同じオブジェクトへ書き込みが発生する場合は、forkx.Lock で保護してください。

import time
import forkx

common = {"cnt": 0}
lock = forkx.Lock()  # ロックオブジェクト

def cnt_up():
	time.sleep(1)
	# ロックで書き込み競合を防ぐ
	with lock:
		common["cnt"] += 1

# 同一関数も並行実行可能
forkx(*[cnt_up for _ in range(10)])

print(common)  # 例: {'cnt': 10}

動作:

  • すべての cnt_up が同じ common を共有します。
  • with lock: により、common["cnt"] の更新は1回ずつ順番に行われます。
  • 全タスク完了後、common["cnt"] には矛盾のない最終値が入ります。

引数付きの処理を並行実行する

forkx に渡すのは「引数なし」の呼び出し可能オブジェクトです。 引数が必要な場合は、外側で引数をキャプチャするラッパ関数や lambda を使います。

import forkx

def gen_func(i):
	return lambda:i**2

funcs = [gen_func(i) for i in range(5)]
print(forkx(*funcs))  # -> [0, 1, 4, 9, 16]

例外処理

タスク内で例外が発生した場合:

  • その例外は forkx 呼び出し元に伝播します。
  • ただし、他のタスクも含めて「すべての処理が終わってから」例外がスローされます。

つまり:

  • どれか1つが失敗しても、他のタスクは最後まで実行されます。
  • 呼び出し側から見ると、「全タスクが終わった段階」で成功か失敗かを判断できます。

(どの例外がどのような形で伝播するかは実装に依存します。)


備考

  • forkx は、重厚な並列処理フレームワークではなく、「ちょっと並行実行したい」場面向けの軽量ツールです。
  • 特に以下のようなケースに向きます:
    • time.sleep や I/O 待ちなど、待ち時間の長い処理をまとめて走らせたい
    • 共有オブジェクトをシンプルに扱いたい
    • 成功・失敗を「すべての処理が終わった時点」でまとめて確認したい

forkx Documentation


English

Overview

forkx is a tiny utility for ultra-simple concurrent execution in Python.

You pass multiple *callable- objects to forkx, and it runs them concurrently in a single process while sharing the same memory space. After all tasks complete, forkx returns a list of results in the same order as the callables you passed in.

This makes it easy to:

  • Run independent tasks concurrently
  • Share mutable objects between tasks without manual worker setup
  • Control write conflicts with a simple lock

Note: This is *concurrent- execution within one process, not multi-process parallelism with separate memory.


Key Features

  • Ultra-simple API

    • Just call forkx(fn1, fn2, ...)
    • Returns [fn1_result, fn2_result, ...]
  • Shared memory

    • All tasks can access and modify common objects directly (no copying)
  • Lock support

    • forkx.Lock() provides a simple context manager to avoid write conflicts
  • Exception propagation

    • Exceptions inside tasks are propagated to the caller
    • Propagation happens *after- all tasks have finished (no early abort)

Basic Usage

import time
import forkx

def proc_a():
	time.sleep(1)
	return 2**2

def proc_b():
	time.sleep(1)
	return 3**3

# proc_a and proc_b run concurrently
results = forkx(proc_a, proc_b)
print(results)  # -> [4, 27]

Behavior:

  • Both proc_a and proc_b start roughly at the same time.
  • forkx waits until *all- functions finish.
  • The return value is a list of results in the order of the arguments (proc_a, proc_b).

Shared State and Lock

Because all tasks run in the same process, you can safely share Python objects without serialization. When multiple tasks write to shared data, you should use forkx.Lock to avoid race conditions.

import time
import forkx

common = {"cnt": 0}
lock = forkx.Lock()

def cnt_up():
	time.sleep(1)
	with lock:  # prevent concurrent writes
		common["cnt"] += 1

# The same function can be run concurrently
forkx(*[cnt_up for _ in range(10)])

print(common)  # e.g. {'cnt': 10}

Behavior:

  • All cnt_up executions share the same common dict.
  • with lock: ensures that increments are applied one by one without conflicts.
  • After all tasks complete, common["cnt"] has the consistent final value.

Using Arguments via Closures

forkx expects zero-argument callables. To pass arguments, create small wrapper functions or use lambdas that capture variables from the outer scope.

import forkx

def gen_func(i):
	return lambda:i**2

funcs = [gen_func(i) for i in range(5)]
print(forkx(*funcs))  # -> [0, 1, 4, 9, 16]

Exception Handling

If a task raises an exception:

  • The exception will propagate out of forkx.
  • forkx waits until all tasks (including non-failing ones) are done, then raises.

This means:

  • No task is silently abandoned because another failed.
  • You can rely on "all tasks finished or failed" before handling the exception.

(The exact form of the propagated exception is implementation-dependent.)


Notes

  • forkx is intended for simple, lightweight concurrency scenarios.

  • It is especially useful when:

    • Functions are I/O bound or have waits (sleep, network, disk).
    • You want a minimal API for "run these functions concurrently and give me all results, or raise if something went wrong."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

forkx-0.1.1.tar.gz (6.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

forkx-0.1.1-py3-none-any.whl (5.8 kB view details)

Uploaded Python 3

File details

Details for the file forkx-0.1.1.tar.gz.

File metadata

  • Download URL: forkx-0.1.1.tar.gz
  • Upload date:
  • Size: 6.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/3.10.0 pkginfo/1.7.0 requests/2.32.3 requests-toolbelt/0.9.1 tqdm/4.64.1 CPython/3.8.8

File hashes

Hashes for forkx-0.1.1.tar.gz
Algorithm Hash digest
SHA256 9673a0f2e9a415dd9677616e804074d1d9ca475c4a5f954131107003ad483456
MD5 014f8c1fc30a50f8af7527c711c5b7ec
BLAKE2b-256 23a0036f844c862826af659545efdcb4cf59f39f0d80a2b75b868f67a8b52ae8

See more details on using hashes here.

File details

Details for the file forkx-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: forkx-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 5.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/3.10.0 pkginfo/1.7.0 requests/2.32.3 requests-toolbelt/0.9.1 tqdm/4.64.1 CPython/3.8.8

File hashes

Hashes for forkx-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 18f4685364a41a8afc7566fdd6838c2e83afe1d0daefbf6fdd448165aebc5381
MD5 7f22f47ac01f77379ad34a22f1d960f5
BLAKE2b-256 1f14fa2619d1354829f8c644ce5d74fe21fb9e466562201cde58580c716da974

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page