Skip to main content

A lightweight, strict flow controller for enforcing thread-safe Read (P2) and Write (P1) priority in concurrent Python applications.

Project description

StrictFlow 🔒


A strict, thread-safe flow controller for Python concurrency, enforcing Read (P2) and Write (P1) priority.

💡 The Problem

In high-concurrency, multi-threaded applications (e.g., financial systems, industrial IoT, or server configurations), simultaneous reads and writes to a single piece of global state can lead to data corruption, inconsistent reads, or fatal race conditions. This is especially true for data that cannot fail or be read mid-update (e.g., blockchain transaction hashes, live configuration objects).

Standard Python locking mechanisms often ensure safety but lack priority and ordered execution.

✨ The StrictFlow Solution

StrictFlow implements a single-thread, priority-based task queue that provides exclusive access for critical operations, ensuring data integrity is never compromised.

It enforces two critical priority levels:

P1 Write (Critical): Tasks decorated with @write are the highest priority. When a P1 task is executed, all P2 Read tasks are paused and blocked until the write is complete and stable. This guarantees the writer has exclusive access.

P2 Read (High-Concurrency): Tasks decorated with @read can run highly concurrent logic internally (using asyncio), but they must wait synchronously for any active P1 Write task to finish.

This pattern ensures readers never see unstable or incomplete data.

📦 Installation

pip install strictflow


(Note: Once uploaded to PyPI, this command will work.)

🚀 Quick Start Example

This example demonstrates how a P1 Write task (updating config) blocks P2 Read tasks to prevent them from reading the old version (V0) or an unstable version (V1 mid-write).

import threading
import time
import asyncio
import logging
from strictflow import StrictFlow, read, write, setup_logging
from strictflow.core import logger # Use the internal logger for application logs

# 1. The user MUST activate logging to see output
setup_logging(logging.DEBUG)

GLOBAL_CONFIG = {"version": 0, "status": "stable"}

def main():
flow = StrictFlow()
loop_thread = threading.Thread(target=flow.run_loop, daemon=True, name="FlowLoop")
loop_thread.start()

@write(flow)
def update_config(new_version: int, new_status: str):
"""P1 Write: Simulates a critical, blocking update."""
global GLOBAL_CONFIG
print(f" [P1] Working: Simulating 1.5s write for V{new_version}...")
time.sleep(1.5)

GLOBAL_CONFIG['version'] = new_version
GLOBAL_CONFIG['status'] = new_status
print(f" [P1] Complete: Config updated to V{new_version}.")


@read(flow)
async def fetch_data_async(reader_id: str):
"""P2 Read: Can run concurrently but MUST wait for P1."""
for i in range(3):
await asyncio.sleep(0.3)
current_config = GLOBAL_CONFIG.copy()
print(f" [P2] Reader {reader_id} read | V{current_config['version']} | Iteration {i+1}")

# --- SIMULATION ---
fetch_data_async(reader_id="R-A") # P2 task submitted

time.sleep(0.5)

update_config(new_version=1, new_status="deploying") # P1 task submitted (Blocks R-A)

fetch_data_async(reader_id="R-B") # P2 task submitted (Will also wait for P1)

time.sleep(7.5) # Wait for all tasks to finish
flow.stop_loop()
loop_thread.join()

if __name__ == "__main__":
main()


🛠️ Key Concepts

1. Thread-Safe Execution

All tasks (P1 and P2) are submitted to and executed sequentially by a single, dedicated worker thread (SF-LOOP). This single-threaded execution context eliminates concurrency issues at the data access level.

2. Priority Control (P1 vs P2)

While all tasks execute in the dedicated thread, the P1/P2 mechanism controls when P2 tasks are allowed to begin. P2 tasks actively check a status flag set by the P1 writer. If the flag is clear, they block the SF-LOOP thread until the P1 task is complete.

3. Asynchronous Reads

The @read decorator allows the wrapped function to be async. When the P2 task is executed by the SF-LOOP thread, it runs the async function using asyncio.run(), allowing the P2 task to achieve high internal concurrency (multiple fast reads) without requiring locks on the shared data outside the loop.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

strictflow-0.1.0.tar.gz (10.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

strictflow-0.1.0-py3-none-any.whl (9.7 kB view details)

Uploaded Python 3

File details

Details for the file strictflow-0.1.0.tar.gz.

File metadata

  • Download URL: strictflow-0.1.0.tar.gz
  • Upload date:
  • Size: 10.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for strictflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 6771e84060a0db49ed372fa21ce4256b6cb1c81bed0b8d3d204bc277f0c6f46e
MD5 405f6310a71aee35b2fc71f7a766f77b
BLAKE2b-256 3a609f837682aedba499c8841f3dbceaadec8631eb1b9f1e54bde3ce31404239

See more details on using hashes here.

File details

Details for the file strictflow-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: strictflow-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 9.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for strictflow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2285c33abe5e74f21386daa1be31d3928aa582fbbf765b0d38fb13bd513da61f
MD5 bb08f4962c9a28aab6ac1fd8e510d47a
BLAKE2b-256 6bb8b9c825d3e9d4ad630af1b03244501c90a3483f9da91628e0137964f7246c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page