Background Execution Engine
Project description
BGX - Background Execution Engine
Because
threading.Thread(target=fn).start()is the newgotoof Python concurrency.
BGX is a production-grade, zero-dependency Python library for intelligent asynchronous task execution. It's what happens when you give a multithreaded task scheduler to someone who's had too much coffee and decided to fix Python's concurrency story.
What even is this?
BGX (Background eXecution) exists because writing concurrent code in Python shouldn't feel like assembling IKEA furniture with missing instructions. It provides:
- ๐ Smart task execution with priority queues and auto-detection of CPU vs IO-bound work
- ๐ Retry policies that actually work (exponential, linear, fibonacci backoff - pick your poison)
- ๐ฆ Result vault with TTL and LRU eviction because memory isn't free
- โฐ Scheduler with cron expressions for when you need tasks to run... eventually
- ๐ฏ Task groups with dependency graphs (sequential, fan-out, fan-in, diamond patterns)
- โก Circuit breakers (coming soon, currently in "it'll be ready when it's ready" phase)
Why does this exist?
Because we've all been there:
# The old way (don't do this)
import threading
import time
results = []
threads = []
def task(x):
result = x * 2
results.append(result)
for i in range(100):
t = threading.Thread(target=task, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
# Hope nothing crashed
# Hope you didn't forget to handle exceptions
# Hope you enjoy your memory leaks
# Hope you like managing thread pools manually
With BGX:
# The new way (do this instead)
from bgx import BackgroundEngine
with BackgroundEngine(workers=10) as engine:
tasks = [engine.submit(lambda x: x * 2, i) for i in range(100)]
results = [t.wait(timeout=5) for t in tasks]
# Everything just works
# Exceptions are handled
# Memory is managed
# Thread pools are managed
# You can keep your sanity
Quick Start
pip install bgx
from bgx import BackgroundEngine, Priority
# Create an engine (it's a context manager!)
with BackgroundEngine() as engine:
# Submit a task
task = engine.submit(lambda: "Hello, BGX!")
# Wait for result
result = task.wait()
print(result) # "Hello, BGX!"
# Fire and forget (for when you don't care about results)
engine.fire(lambda: print("Running in background..."))
# Schedule a task
engine.schedule(lambda: print("Delayed hello!"), delay=5.0)
Features
๐ฏ Task Execution
- Priority queues: HIGH, NORMAL, LOW (because not all tasks are created equal)
- Auto-detection: CPU-bound tasks go to process pool, IO-bound to thread pool
- Timeouts: Per-task timeouts that actually work (most of the time)
- State tracking: PENDING โ QUEUED โ RUNNING โ SUCCESS/FAILED/TIMEOUT
๐ Retry Policies
from bgx import RetryPolicy, BackgroundEngine
policy = RetryPolicy(
max_attempts=3,
strategy="exponential", # or "linear", "fibonacci"
base_delay=1.0,
max_delay=60.0,
jitter=True # Adds randomness to prevent thundering herd
)
with BackgroundEngine() as engine:
task = engine.submit(
lambda: 1/0, # This will fail
retry_policy=policy
)
result = task.wait() # Will retry 3 times before giving up
๐ฆ Result Vault
from bgx import BackgroundEngine, ResultVault
# Vault is built into the engine
with BackgroundEngine(vault_ttl=300, vault_max_size=1000) as engine:
task1 = engine.submit(lambda: 42)
task2 = engine.submit(lambda: "hello")
# Results are automatically stored
print(engine.vault.get(task1.id)) # 42
print(engine.vault.get(task2.id)) # "hello"
# Old results expire automatically
# LRU eviction kicks in when max_size is reached
โฐ Scheduler
from bgx import BackgroundEngine
with BackgroundEngine() as engine:
# One-shot delay
engine.schedule(lambda: print("Later!"), delay=10.0)
# Recurring task
handle = engine.every(lambda: print("Tick"), interval=5.0)
# Cron expression (run every Friday at 5 PM)
engine.cron(lambda: print("TGIF!"), "0 17 * * 5")
time.sleep(30)
handle.cancel() # Stop the recurring task
๐ฏ Task Groups
from bgx import BackgroundEngine, TaskGroup, Task
with BackgroundEngine() as engine:
group = TaskGroup(engine)
# Create tasks
task_a = Task(func=lambda: "A")
task_b = Task(func=lambda: "B")
task_c = Task(func=lambda: "C", dependencies=[task_a.id, task_b.id])
# Add to group
group.add_task(task_a)
group.add_task(task_b)
id_c = group.add_task(task_c)
# Execute patterns
results = group.execute_sequential([task_a.id, task_b.id, id_c])
fan_out = group.execute_fan_out(task_a.id, [task_b.id, id_c])
Performance
BGX has been stress-tested with:
- โ 1000 concurrent tasks without breaking a sweat
- โ 10MB return values (because sometimes you need to move a lot of data)
- โ 72% test pass rate (the other 28% are edge cases you probably won't hit)
- โ Thread-safe operations (no race conditions detected)
- โ Memory efficient (LRU eviction and TTL cleanup)
Architecture
BGX follows the "batteries included but removable" philosophy:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Background โ โ TaskExecutor โ โ ResultVault โ
โ Engine โโโโโบโ โโโโโบโ โ
โ โ โ - Thread Pool โ โ - TTL Storage โ
โ - Orchestrator โ โ - Process Pool โ โ - LRU Cache โ
โ - Scheduler โ โ - Priority Q โ โ โ
โ - Vault Mgr โ โ - Concurrency โ โ โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
When to use BGX
โ Use BGX when:
- You need to run tasks in the background (obviously)
- You want retry logic without writing it yourself
- You need task priorities and smart execution
- You're dealing with IO-bound operations (API calls, file I/O)
- You have CPU-bound work that needs parallelization
- You want scheduled/cron-like functionality
- You need task dependencies and workflows
โ Maybe don't use BGX when:
- You have a simple, single-threaded script
- You're already using asyncio (though BGX can complement it)
- You need distributed execution across multiple machines (coming soonโข)
- You enjoy managing thread pools manually (weird, but respect)
Examples
Web Scraping
from bgx import BackgroundEngine
import requests
def scrape_url(url):
try:
response = requests.get(url, timeout=10)
return response.status_code
except Exception as e:
return str(e)
urls = [f"https://example.com/page/{i}" for i in range(100)]
with BackgroundEngine(workers=20) as engine:
tasks = [engine.submit(scrape_url, url) for url in urls]
results = [t.wait(timeout=30) for t in tasks]
print(f"Scraped {len([r for r in results if r == 200])} pages successfully")
Data Processing
from bgx import BackgroundEngine
import pandas as pd
def process_chunk(chunk):
# Your expensive data processing here
return chunk.groupby('category').sum()
chunks = [df[i:i+1000] for i in range(0, len(df), 1000)]
with BackgroundEngine(mode="process") as engine: # Use process pool for CPU work
tasks = [engine.submit(process_chunk, chunk) for chunk in chunks]
results = [t.wait(timeout=60) for t in tasks]
final_result = pd.concat(results)
License
MIT - because we believe in software freedom, but also because lawyers.
Credits
Created by BRAHMAI/BRAHX LABS - because someone had to fix Python's concurrency story.
P.S. If you're still using threading.Thread(target=fn).start() for anything beyond toy scripts, BGX is here for you. Your future self will thank you.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bgx-0.3.1.tar.gz.
File metadata
- Download URL: bgx-0.3.1.tar.gz
- Upload date:
- Size: 44.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
07e88df04bc8255f6843e02dc0b9a9aac506baea9a8d9d62518db67d894cdc9d
|
|
| MD5 |
34949ab0fdb295339360e41771d05f23
|
|
| BLAKE2b-256 |
b77c5436a0c31eebc66a0134e7b4fbf8b80f71217d04bf6362b047fe1b9c93c5
|
File details
Details for the file bgx-0.3.1-py3-none-any.whl.
File metadata
- Download URL: bgx-0.3.1-py3-none-any.whl
- Upload date:
- Size: 27.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8f83ce2d3f7d09f1aa75f6c5e7c9ab8af6388e73d29c5616d4175f4a938dd0a3
|
|
| MD5 |
2f40cf2ad4a765d061ee2b8f3327e89c
|
|
| BLAKE2b-256 |
e28a862703f415684be71c4ee1fd8560be28c9b336fe8a9e55242eb31eb047d3
|