Skip to main content

Ultra-simple parallel processing

Project description

English description follows Japanese.


概要

forkx は、Python で「超」簡単に並行処理を書くための小さなユーティリティです。

複数の「引数なし関数(呼び出し可能オブジェクト)」を forkx に渡すと、それらが同一プロセス内で並行実行され、すべての処理が終わったあとで、結果がリストとして返されます。結果の順番は、渡した関数の順番と一致します。

ポイント:

  • 複雑なワーカー管理なしで複数処理を同時に走らせたい
  • オブジェクトをコピーせずに共有したい
  • 書き込み競合のときだけロックで守りたい

といった用途に向きます。

注意: これはプロセス間の「並列処理」ではなく、同一プロセス内での「並行処理」です。

特徴

  • とにかく簡単な API
    • forkx(fn1, fn2, ...) と書くだけ
    • 返り値は [fn1の結果, fn2の結果, ...]
  • 共通オブジェクトをそのまま共有
    • 同じメモリ空間上で、オブジェクトを複製せずにアクセスできます
  • ロック機能付き
    • forkx.Lock() で、書き込み競合を避けるためのロックを簡単に利用できます
  • 例外の扱い
    • 各タスク内で発生した例外は呼び出し元に伝播します
    • ただし「すべてのタスクが終了したあと」でスローされます(途中で即中断しない)

基本的な使い方

import time
import forkx

def proc_a():
	time.sleep(1)
	return 2**2

def proc_b():
	time.sleep(1)
	return 3**3

# proc_a, proc_b が並行に実行される
results = forkx(proc_a, proc_b)
print(results)  # -> [4, 27]

動作:

  • proc_aproc_b がほぼ同時に開始されます。
  • forkx は両方の関数が終了するまで待ちます。
  • 返ってくるリストの順番は、渡した順 (proc_a, proc_b) と同じです。

分岐機能

  • 並列の枝を1本追加する事ができます
import time
import forkx

def proc_c():
	for _ in range(3):
		print("hoge")
		time.sleep(1)

# 関数の実行を開始するが、次行からの実行も継続する [forkx]
forkx.branch(proc_c)

print("fuga")	# これをやっている間もproc_cは動き続ける

動作:

  • proc_c の実行が開始され、それの終了を待たずにbranch()の次行からも実行され続けます
  • 注意点としては、forkx()とは異なり、branch内での例外は捕捉できません

バッチ実行

引数付きの関数を、最大並行数を制限しながらまとめて実行できます。 forkx.batch に「1引数の関数」「引数のイテラブル」「バッチサイズ」を渡すと、バッチサイズで指定された数ずつ並行実行し、すべての結果をリストで返します。

import forkx

def func(arg):
	return arg ** 2

res_ls = forkx.batch(func, range(10), batch_size=3)
print(res_ls)  # -> [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

動作:

  • range(10) の各要素に対して func が呼ばれます。
  • 一度に最大 batch_size(この例では3)個ずつ並行実行されます。
  • すべてのバッチが順に処理され、結果は引数の順番どおりにリストで返されます。
  • batch_size=1 のときは逐次実行と同じ動作になります。

共通オブジェクトとロック

全タスクは同一プロセス・同一メモリ空間を共有するので、Python オブジェクトをそのまま共通で扱えます。 複数タスクから同じオブジェクトへ書き込みが発生する場合は、forkx.Lock で保護してください。

import time
import forkx

common = {"cnt": 0}
lock = forkx.Lock()  # ロックオブジェクト

def cnt_up():
	time.sleep(1)
	# ロックで書き込み競合を防ぐ
	with lock:
		common["cnt"] += 1

# 同一関数も並行実行可能
forkx(*[cnt_up for _ in range(10)])

print(common)  # 例: {'cnt': 10}

動作:

  • すべての cnt_up が同じ common を共有します。
  • with lock: により、common["cnt"] の更新は1回ずつ順番に行われます。
  • 全タスク完了後、common["cnt"] には矛盾のない最終値が入ります。

引数付きの処理を並行実行する

forkx に渡すのは「引数なし」の呼び出し可能オブジェクトです。 引数が必要な場合は、外側で引数をキャプチャするラッパ関数や lambda を使います。

import forkx

def gen_func(i):
	return lambda:i**2

funcs = [gen_func(i) for i in range(5)]
print(forkx(*funcs))  # -> [0, 1, 4, 9, 16]

例外処理

タスク内で例外が発生した場合:

  • その例外は forkx 呼び出し元に伝播します。
  • ただし、他のタスクも含めて「すべての処理が終わってから」例外がスローされます。

つまり:

  • どれか1つが失敗しても、他のタスクは最後まで実行されます。
  • 呼び出し側から見ると、「全タスクが終わった段階」で成功か失敗かを判断できます。

(どの例外がどのような形で伝播するかは実装に依存します。)


備考

  • forkx は、重厚な並列処理フレームワークではなく、「ちょっと並行実行したい」場面向けの軽量ツールです。
  • 特に以下のようなケースに向きます:
    • time.sleep や I/O 待ちなど、待ち時間の長い処理をまとめて走らせたい
    • 共有オブジェクトをシンプルに扱いたい
    • 成功・失敗を「すべての処理が終わった時点」でまとめて確認したい

Overview

forkx is a small utility that lets you write extremely simple concurrent code in Python.

When you pass multiple “argument-less functions (callables)” to forkx, they run concurrently within the same process, and once all tasks finish, their return values are collected into a list. The order of the results matches the order in which the functions were provided.

Key points:

  • You want to run multiple tasks simultaneously without complex worker management
  • You want to share objects without copying
  • You want to protect only the write-conflicting parts with locks

This tool is designed for such cases.

Note: This is concurrency within a single process, not multi-process parallelism.


Features

  • Extremely simple API

    • Just write forkx(fn1, fn2, ...)
    • The return value is [result_of_fn1, result_of_fn2, ...]
  • Shared objects without duplication

    • All tasks run in the same memory space and can access shared objects directly
  • Lock support

    • Use forkx.Lock() to easily guard write operations and avoid race conditions
  • Exception handling

    • Exceptions raised inside tasks are propagated to the caller
    • However, they are thrown after all tasks have finished (execution does not stop in the middle)

Basic Usage

import time
import forkx

def proc_a():
	time.sleep(1)
	return 2**2

def proc_b():
	time.sleep(1)
	return 3**3

# proc_a and proc_b run concurrently
results = forkx(proc_a, proc_b)
print(results)  # -> [4, 27]

Behavior:

  • proc_a and proc_b start almost simultaneously.
  • forkx waits until both functions complete.
  • The returned list preserves the call order (proc_a, proc_b).

Branching

  • You can add one extra concurrent “branch”.
import time
import forkx

def proc_c():
	for _ in range(3):
		print("hoge")
		time.sleep(1)

# Starts a function concurrently while continuing the next lines immediately
forkx.branch(proc_c)

print("fuga")  # proc_c keeps running while this executes

Behavior:

  • proc_c begins execution, and the caller continues immediately without waiting for its completion.
  • Unlike forkx(), exceptions inside a branch are not captured.

Batch Execution

You can run a function over many arguments with a limit on how many tasks run concurrently. Pass a single-argument function, an iterable of arguments, and a batch_size to forkx.batch. It splits the work into batches of the specified size, runs each batch concurrently, and returns all results as a list.

import forkx

def func(arg):
	return arg ** 2

res_ls = forkx.batch(func, range(10), batch_size=3)
print(res_ls)  # -> [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Behavior:

  • func is called for each element in range(10).
  • At most batch_size (3 in this example) tasks run concurrently at a time.
  • Batches are processed in order, and the results are returned in the same order as the input arguments.
  • When batch_size=1, it behaves the same as sequential execution.

Shared Objects and Locks

Because all tasks share the same process and memory space, Python objects can be shared directly. If multiple tasks write to the same object, protect the operation using forkx.Lock.

import time
import forkx

common = {"cnt": 0}
lock = forkx.Lock()  # lock object

def cnt_up():
	time.sleep(1)
	# prevent write conflicts
	with lock:
		common["cnt"] += 1

# the same function can be run concurrently
forkx(*[cnt_up for _ in range(10)])

print(common)  # e.g. {'cnt': 10}

Behavior:

  • All cnt_up functions share the same common object.
  • with lock: ensures that updates to common["cnt"] occur one at a time.
  • After all tasks finish, the final value is consistent.

Running Functions With Arguments

forkx accepts callables without arguments. If you need to pass arguments, wrap them with a closure or lambda.

import forkx

def gen_func(i):
	return lambda: i**2

funcs = [gen_func(i) for i in range(5)]
print(forkx(*funcs))  # -> [0, 1, 4, 9, 16]

Exception Handling

If a task raises an exception:

  • The exception is propagated back to the caller.
  • However, it is thrown only after all tasks have completed.

This means:

  • Even if one task fails, the others continue until the end.
  • From the caller’s perspective, you determine success or failure after all tasks finish.

(How exactly exceptions are returned depends on the implementation.)


Notes

  • forkx is not a heavy-duty parallel processing framework; it is a lightweight tool for “a bit of concurrency.”

  • Particularly suitable for:

    • Running many tasks that spend time in time.sleep, I/O waits, or other blocking operations
    • Handling shared objects simply
    • Checking overall success/failure once all tasks have completed

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

forkx-0.3.0.tar.gz (7.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

forkx-0.3.0-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file forkx-0.3.0.tar.gz.

File metadata

  • Download URL: forkx-0.3.0.tar.gz
  • Upload date:
  • Size: 7.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.3

File hashes

Hashes for forkx-0.3.0.tar.gz
Algorithm Hash digest
SHA256 d5ce50069b352097e07b8eaa11df38a7252c464bbacc7b9162b1fd2be6204208
MD5 989d8e059dd42ec3ad51d191251510c3
BLAKE2b-256 b0f5039f12fca7748f3efc9dbdf970b864e88cdb0f9c42fc05d87995c1349251

See more details on using hashes here.

File details

Details for the file forkx-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: forkx-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 7.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.3

File hashes

Hashes for forkx-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 07b7f0deb6ae37071a6f1d1d6f6f539db09df7360b97cf2185d91e80dabbee70
MD5 0d3a1c69f9ce274b4be7fab57d528c95
BLAKE2b-256 92969f194d45c9695b6a3b49e1c12a749a5ea0b1a8840a4d8280e249af976be7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page