Skip to main content

A library for deterministic concurrency testing that helps you reliably reproduce and test race conditions

Project description

Frontrun

A library for deterministic concurrency testing that helps you reliably reproduce and test race conditions.

pip install frontrun

Overview

Frontrun provides tools for controlling thread interleaving at a fine-grained level, allowing you to:

  • Deterministically reproduce race conditions - Force specific execution ordering to make race conditions happen reliably in tests
  • Test concurrent code exhaustively - Explore different execution orders to find bugs
  • Verify synchronization correctness - Ensure that proper locking prevents race conditions

Instead of relying on timing-based race detection (which is unreliable), Frontrun lets you control exactly when threads execute, making concurrency testing deterministic and reproducible.

Quick Start: Bank Account Race Condition

Here's a pytest test that uses Frontrun to trigger a race condition:

from frontrun.trace_markers import Schedule, Step, TraceExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance

    def transfer(self, amount):
        current = self.balance  # frontrun: read_balance
        new_balance = current + amount
        self.balance = new_balance  # frontrun: write_balance

def test_transfer_lost_update():
    account = BankAccount(balance=100)

    # Both threads read before either writes
    schedule = Schedule([
        Step("thread1", "read_balance"),    # T1 reads 100
        Step("thread2", "read_balance"),    # T2 reads 100 (both see same value!)
        Step("thread1", "write_balance"),   # T1 writes 150
        Step("thread2", "write_balance"),   # T2 writes 150 (overwrites T1's update!)
    ])

    executor = TraceExecutor(schedule)
    executor.run("thread1", lambda: account.transfer(50))
    executor.run("thread2", lambda: account.transfer(50))
    executor.wait(timeout=5.0)

    # One update was lost: balance is 150, not 200
    assert account.balance == 150

Case Studies

See detailed case studies of searching for concurrency bugs in ten libraries: TPool, threadpoolctl, cachetools, PyDispatcher, pydis, pybreaker, urllib3, SQLAlchemy, amqtt, and pykka. Run the test suites with: PYTHONPATH=frontrun python frontrun/docs/tests/run_external_tests.py

Usage Approaches

Frontrun provides two different ways to control thread interleaving:

1. Trace Markers

Trace markers are special comments (# frontrun: <marker-name>) which mark particular synchronization points in multithreaded or async code. These are intended to make it easier to reproduce race conditions in test cases and inspect whether some race conditions are possible.

The execution ordering is controlled with a "schedule" object that says what order the threads / markers should run in.

Each thread runs with a sys.settrace callback that pauses at markers and waits for a schedule to grant the next execution turn. This gives deterministic control over execution order without modifying code semantics — markers are just comments. A marker gates the code that follows it: the thread pauses at the marker and only executes the gated code after the scheduler grants it a turn. Name markers after the operation they gate (e.g. read_value, write_balance) rather than with temporal prefixes like before_ or after_.

Markers can be placed inline or on a separate line before the operation:

from frontrun.trace_markers import Schedule, Step, TraceExecutor

class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        temp = self.value  # frontrun: read_value
        temp += 1
        self.value = temp  # frontrun: write_value

def test_counter_lost_update():
    counter = Counter()

    schedule = Schedule([
        Step("thread1", "read_value"),
        Step("thread2", "read_value"),
        Step("thread1", "write_value"),
        Step("thread2", "write_value"),
    ])

    executor = TraceExecutor(schedule)
    executor.run("thread1", counter.increment)
    executor.run("thread2", counter.increment)
    executor.wait(timeout=5.0)

    assert counter.value == 1  # One increment lost

2. Bytecode Manipulation (Experimental)

⚠️ Experimental: Bytecode instrumentation is experimental and may change. It requires monkey-patching concurrency primitives and relies on f_trace_opcodes (Python 3.7+). Use with caution.

Automatically instrument functions using bytecode rewriting — no markers needed. Each thread fires a sys.settrace callback at every bytecode instruction, pausing at each one to wait for its scheduler turn. This gives fine-grained control but requires monkey-patching standard threading primitives (Lock, Semaphore, Event, Queue, etc.) to prevent deadlocks.

explore_interleavings() does property-based exploration in the style of Hypothesis: it generates random opcode-level schedules and checks that an invariant holds under each one, returning any counterexample schedule.

from frontrun.bytecode import explore_interleavings

class Counter:
    def __init__(self, value=0):
        self.value = value

    def increment(self):
        temp = self.value
        self.value = temp + 1

def test_counter_no_race():
    result = explore_interleavings(
        setup=lambda: Counter(value=0),
        threads=[
            lambda c: c.increment(),
            lambda c: c.increment(),
        ],
        invariant=lambda c: c.value == 2,
        max_attempts=200,
        max_ops=200,
        seed=42,
    )

    assert not result.property_holds, "Expected a race condition"
    assert result.counterexample.value == 1

Async Support

Both approaches have async variants.

Async Trace Markers

from frontrun.async_trace_markers import AsyncTraceExecutor
from frontrun.common import Schedule, Step

class AsyncCounter:
    def __init__(self):
        self.value = 0

    async def get_value(self):
        return self.value

    async def set_value(self, new_value):
        self.value = new_value

    async def increment(self):
        # frontrun: read_value
        temp = await self.get_value()
        # frontrun: write_value
        await self.set_value(temp + 1)

def test_async_counter_lost_update():
    counter = AsyncCounter()

    schedule = Schedule([
        Step("task1", "read_value"),
        Step("task2", "read_value"),
        Step("task1", "write_value"),
        Step("task2", "write_value"),
    ])

    executor = AsyncTraceExecutor(schedule)
    executor.run({
        "task1": counter.increment,
        "task2": counter.increment,
    })

    assert counter.value == 1  # One increment lost

Development

Running Tests

make test

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

frontrun-0.0.2.tar.gz (30.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

frontrun-0.0.2-py3-none-any.whl (31.4 kB view details)

Uploaded Python 3

File details

Details for the file frontrun-0.0.2.tar.gz.

File metadata

  • Download URL: frontrun-0.0.2.tar.gz
  • Upload date:
  • Size: 30.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for frontrun-0.0.2.tar.gz
Algorithm Hash digest
SHA256 1b205f4eaaabf7d1d8c9609696157a45a4dcb9c95ebf842d05fe91f4c9efe850
MD5 9f4cbe255eb1806cc2f1181a1d69a382
BLAKE2b-256 662fdb8946cddfcfd251da4ea385e2f62992703ebbffb03583cfb9f6a90898d2

See more details on using hashes here.

File details

Details for the file frontrun-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: frontrun-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 31.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for frontrun-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a45a7d98e814ccd0a8818f7bb5c1a3854e38a9431da35a9b6c950b42a2625240
MD5 00cd5b7b390f7bfacff53eab39630888
BLAKE2b-256 b6ae0e690d9870962dfd682734d1cdf9c4c404e584fd5ac165ac0cc8d9a00d05

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page