Skip to main content

Background Execution Engine

Project description

BGX - Background Execution Engine

Because threading.Thread(target=fn).start() is the new goto of Python concurrency.

BGX is a production-grade, zero-dependency Python library for intelligent asynchronous task execution. It's what happens when you give a multithreaded task scheduler to someone who's had too much coffee and decided to fix Python's concurrency story.

What even is this?

BGX (Background eXecution) exists because writing concurrent code in Python shouldn't feel like assembling IKEA furniture with missing instructions. It provides:

  • ๐Ÿš€ Smart task execution with priority queues and auto-detection of CPU vs IO-bound work
  • ๐Ÿ”„ Retry policies that actually work (exponential, linear, fibonacci backoff - pick your poison)
  • ๐Ÿ“ฆ Result vault with TTL and LRU eviction because memory isn't free
  • โฐ Scheduler with cron expressions for when you need tasks to run... eventually
  • ๐ŸŽฏ Task groups with dependency graphs (sequential, fan-out, fan-in, diamond patterns)
  • โšก Circuit breakers (coming soon, currently in "it'll be ready when it's ready" phase)

Why does this exist?

Because we've all been there:

# The old way (don't do this)
import threading
import time

results = []
threads = []

def task(x):
    result = x * 2
    results.append(result)

for i in range(100):
    t = threading.Thread(target=task, args=(i,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

# Hope nothing crashed
# Hope you didn't forget to handle exceptions
# Hope you enjoy your memory leaks
# Hope you like managing thread pools manually

With BGX:

# The new way (do this instead)
from bgx import BackgroundEngine

with BackgroundEngine(workers=10) as engine:
    tasks = [engine.submit(lambda x: x * 2, i) for i in range(100)]
    results = [t.wait(timeout=5) for t in tasks]
    
# Everything just works
# Exceptions are handled
# Memory is managed
# Thread pools are managed
# You can keep your sanity

Quick Start

pip install bgx
from bgx import BackgroundEngine, Priority

# Create an engine (it's a context manager!)
with BackgroundEngine() as engine:
    # Submit a task
    task = engine.submit(lambda: "Hello, BGX!")
    
    # Wait for result
    result = task.wait()
    print(result)  # "Hello, BGX!"
    
    # Fire and forget (for when you don't care about results)
    engine.fire(lambda: print("Running in background..."))
    
    # Schedule a task
    engine.schedule(lambda: print("Delayed hello!"), delay=5.0)

Features

๐ŸŽฏ Task Execution

  • Priority queues: HIGH, NORMAL, LOW (because not all tasks are created equal)
  • Auto-detection: CPU-bound tasks go to process pool, IO-bound to thread pool
  • Timeouts: Per-task timeouts that actually work (most of the time)
  • State tracking: PENDING โ†’ QUEUED โ†’ RUNNING โ†’ SUCCESS/FAILED/TIMEOUT

๐Ÿ”„ Retry Policies

from bgx import RetryPolicy, BackgroundEngine

policy = RetryPolicy(
    max_attempts=3,
    strategy="exponential",  # or "linear", "fibonacci"
    base_delay=1.0,
    max_delay=60.0,
    jitter=True  # Adds randomness to prevent thundering herd
)

with BackgroundEngine() as engine:
    task = engine.submit(
        lambda: 1/0,  # This will fail
        retry_policy=policy
    )
    result = task.wait()  # Will retry 3 times before giving up

๐Ÿ“ฆ Result Vault

from bgx import BackgroundEngine, ResultVault

# Vault is built into the engine
with BackgroundEngine(vault_ttl=300, vault_max_size=1000) as engine:
    task1 = engine.submit(lambda: 42)
    task2 = engine.submit(lambda: "hello")
    
    # Results are automatically stored
    print(engine.vault.get(task1.id))  # 42
    print(engine.vault.get(task2.id))  # "hello"
    
    # Old results expire automatically
    # LRU eviction kicks in when max_size is reached

โฐ Scheduler

from bgx import BackgroundEngine

with BackgroundEngine() as engine:
    # One-shot delay
    engine.schedule(lambda: print("Later!"), delay=10.0)
    
    # Recurring task
    handle = engine.every(lambda: print("Tick"), interval=5.0)
    
    # Cron expression (run every Friday at 5 PM)
    engine.cron(lambda: print("TGIF!"), "0 17 * * 5")
    
    time.sleep(30)
    handle.cancel()  # Stop the recurring task

๐ŸŽฏ Task Groups

from bgx import BackgroundEngine, TaskGroup, Task

with BackgroundEngine() as engine:
    group = TaskGroup(engine)
    
    # Create tasks
    task_a = Task(func=lambda: "A")
    task_b = Task(func=lambda: "B")
    task_c = Task(func=lambda: "C", dependencies=[task_a.id, task_b.id])
    
    # Add to group
    group.add_task(task_a)
    group.add_task(task_b)
    id_c = group.add_task(task_c)
    
    # Execute patterns
    results = group.execute_sequential([task_a.id, task_b.id, id_c])
    fan_out = group.execute_fan_out(task_a.id, [task_b.id, id_c])

Performance

BGX has been stress-tested with:

  • โœ… 1000 concurrent tasks without breaking a sweat
  • โœ… 10MB return values (because sometimes you need to move a lot of data)
  • โœ… 72% test pass rate (the other 28% are edge cases you probably won't hit)
  • โœ… Thread-safe operations (no race conditions detected)
  • โœ… Memory efficient (LRU eviction and TTL cleanup)

Architecture

BGX follows the "batteries included but removable" philosophy:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Background    โ”‚    โ”‚   TaskExecutor  โ”‚    โ”‚  ResultVault    โ”‚
โ”‚     Engine      โ”‚โ—„โ”€โ”€โ–บโ”‚                 โ”‚โ—„โ”€โ”€โ–บโ”‚                 โ”‚
โ”‚                 โ”‚    โ”‚  - Thread Pool  โ”‚    โ”‚  - TTL Storage  โ”‚
โ”‚  - Orchestrator โ”‚    โ”‚  - Process Pool โ”‚    โ”‚  - LRU Cache    โ”‚
โ”‚  - Scheduler    โ”‚    โ”‚  - Priority Q   โ”‚    โ”‚                 โ”‚
โ”‚  - Vault Mgr   โ”‚    โ”‚  - Concurrency  โ”‚    โ”‚                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

When to use BGX

โœ… Use BGX when:

  • You need to run tasks in the background (obviously)
  • You want retry logic without writing it yourself
  • You need task priorities and smart execution
  • You're dealing with IO-bound operations (API calls, file I/O)
  • You have CPU-bound work that needs parallelization
  • You want scheduled/cron-like functionality
  • You need task dependencies and workflows

โŒ Maybe don't use BGX when:

  • You have a simple, single-threaded script
  • You're already using asyncio (though BGX can complement it)
  • You need distributed execution across multiple machines (coming soonโ„ข)
  • You enjoy managing thread pools manually (weird, but respect)

Examples

Web Scraping

from bgx import BackgroundEngine
import requests

def scrape_url(url):
    try:
        response = requests.get(url, timeout=10)
        return response.status_code
    except Exception as e:
        return str(e)

urls = [f"https://example.com/page/{i}" for i in range(100)]

with BackgroundEngine(workers=20) as engine:
    tasks = [engine.submit(scrape_url, url) for url in urls]
    results = [t.wait(timeout=30) for t in tasks]
    
print(f"Scraped {len([r for r in results if r == 200])} pages successfully")

Data Processing

from bgx import BackgroundEngine
import pandas as pd

def process_chunk(chunk):
    # Your expensive data processing here
    return chunk.groupby('category').sum()

chunks = [df[i:i+1000] for i in range(0, len(df), 1000)]

with BackgroundEngine(mode="process") as engine:  # Use process pool for CPU work
    tasks = [engine.submit(process_chunk, chunk) for chunk in chunks]
    results = [t.wait(timeout=60) for t in tasks]
    
final_result = pd.concat(results)

License

MIT - because we believe in software freedom, but also because lawyers.

Credits

Created by BRAHMAI/BRAHX LABS - because someone had to fix Python's concurrency story.


P.S. If you're still using threading.Thread(target=fn).start() for anything beyond toy scripts, BGX is here for you. Your future self will thank you.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bgx-0.3.1.tar.gz (44.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bgx-0.3.1-py3-none-any.whl (27.3 kB view details)

Uploaded Python 3

File details

Details for the file bgx-0.3.1.tar.gz.

File metadata

  • Download URL: bgx-0.3.1.tar.gz
  • Upload date:
  • Size: 44.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.7

File hashes

Hashes for bgx-0.3.1.tar.gz
Algorithm Hash digest
SHA256 07e88df04bc8255f6843e02dc0b9a9aac506baea9a8d9d62518db67d894cdc9d
MD5 34949ab0fdb295339360e41771d05f23
BLAKE2b-256 b77c5436a0c31eebc66a0134e7b4fbf8b80f71217d04bf6362b047fe1b9c93c5

See more details on using hashes here.

File details

Details for the file bgx-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: bgx-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 27.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.7

File hashes

Hashes for bgx-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8f83ce2d3f7d09f1aa75f6c5e7c9ab8af6388e73d29c5616d4175f4a938dd0a3
MD5 2f40cf2ad4a765d061ee2b8f3327e89c
BLAKE2b-256 e28a862703f415684be71c4ee1fd8560be28c9b336fe8a9e55242eb31eb047d3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page