Intercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.
Project description
cjm-tqdm-capture
Install
pip install cjm-tqdm-capture
Project Structure
nbs/
├── job_runner.ipynb # Executes functions in background threads with automatic tqdm progress capture.
├── patch_tqdm.ipynb # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
├── progress_info.ipynb # Defines the data structure used to represent progress state
├── progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
└── streaming.ipynb # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.
Total: 5 notebooks
Module Dependencies
graph LR
job_runner[job_runner<br/>job runner]
patch_tqdm[patch_tqdm<br/>patch tqdm]
progress_info[progress_info<br/>progress info]
progress_monitor[progress_monitor<br/>progress monitor]
streaming[streaming<br/>streaming]
job_runner --> patch_tqdm
job_runner --> progress_monitor
job_runner --> progress_info
patch_tqdm --> progress_info
progress_monitor --> patch_tqdm
progress_monitor --> progress_info
streaming --> progress_monitor
streaming --> job_runner
8 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
job runner (job_runner.ipynb)
Executes functions in background threads with automatic tqdm progress capture.
Import
from cjm_tqdm_capture.job_runner import (
JobRunner
)
Classes
class JobRunner:
def __init__(
self,
monitor: ProgressMonitor # Progress monitor instance to receive updates
)
"""
Runs a callable in a background thread, patches tqdm inside the job,
and forwards ProgressInfo updates to a ProgressMonitor under job_id.
"""
def __init__(
self,
monitor: ProgressMonitor # Progress monitor instance to receive updates
)
"Initialize a job runner with a progress monitor"
def start(
self,
job_id: str, # Unique identifier for this job
fn: Callable[..., Any],
*args,
patch_kwargs: Optional[Dict[str, Any]] = None,
**kwargs
) -> threading.Thread: # The thread running the job
"Start a job in a background thread with automatic tqdm patching"
def is_alive(
self,
job_id: str # Unique identifier of the job to check
) -> bool: # True if the job thread is still running
"Check if a job's thread is still running"
def join(
self,
job_id: str, # Unique identifier of the job to wait for
timeout: Optional[float] = None # Maximum seconds to wait (None for indefinite)
) -> None: # Returns when thread completes or timeout expires
"Wait for a job's thread to complete"
patch tqdm (patch_tqdm.ipynb)
Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
Import
from cjm_tqdm_capture.patch_tqdm import (
patch_tqdm
)
Functions
def _make_callback_class(
BaseTqdm: type, # Base tqdm class to extend with callback functionality
default_cb: Optional[Callable[[ProgressInfo], None]],
min_update_interval: float = 0.1, # Minimum time between callback invocations (seconds)
min_delta_pct: float = 1.0, # emit only if pct moves by >= this
emit_initial: bool = False # whether to emit at 0%
)
"Create a tqdm subclass that emits progress callbacks during iteration"
@contextmanager
def patch_tqdm(
progress_callback: Optional[Callable[[ProgressInfo], None]], # Function to call with progress updates
min_update_interval: float = 0.1, # Minimum time between callback invocations (seconds)
min_delta_pct: float = 10.0, # e.g., only every ~10%
emit_initial: bool = False # Whether to emit callback at 0% progress
)
"Context manager that patches tqdm to emit progress callbacks"
Variables
_BAR_COUNTER
progress info (progress_info.ipynb)
Defines the data structure used to represent progress state
Import
from cjm_tqdm_capture.progress_info import (
ProgressInfo,
serialize_job_snapshot,
serialize_all_jobs
)
Functions
def serialize_job_snapshot(
snapshot: Optional[Dict[str, Any]] # Job snapshot dictionary from ProgressMonitor
) -> Optional[Dict[str, Any]]: # JSON-serializable dictionary or None if input is None
"Convert a job snapshot with ProgressInfo objects to a JSON-serializable format."
def serialize_all_jobs(
jobs: Dict[str, Dict[str, Any]] # Dictionary mapping job IDs to job snapshots
) -> Dict[str, Optional[Dict[str, Any]]]: # Dictionary mapping job IDs to serialized snapshots
"Convert all jobs from monitor.all() to JSON-serializable format."
Classes
@dataclass
class ProgressInfo:
"Structured progress information"
progress: float # Percentage completion (0-100)
current: Optional[int] # Current iteration count
total: Optional[int] # Total iterations expected
rate: Optional[str] # Processing rate (e.g., "50.5 it/s")
elapsed: Optional[str] # Time elapsed since start
remaining: Optional[str] # Estimated time remaining
description: Optional[str] # Progress bar description/label
raw_output: str = '' # Raw output string (if any)
timestamp: float # Unix timestamp when created
bar_id: Optional[str] # Unique identifier for this progress bar
position: Optional[int] # Display position for multi-bar scenarios
def to_dict(self):
"""Convert to dictionary for JSON serialization"""
return {
'progress': self.progress,
"Convert to dictionary for JSON serialization"
progress monitor (progress_monitor.ipynb)
Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
Import
from cjm_tqdm_capture.progress_monitor import (
ProgressMonitor
)
Classes
class ProgressMonitor:
def __init__(
self,
keep_history: bool = False, # Whether to maintain a history of progress updates
history_limit: int = 500 # Maximum number of historical updates to keep per job
)
"Thread-safe monitor for tracking progress of multiple concurrent jobs"
def __init__(
self,
keep_history: bool = False, # Whether to maintain a history of progress updates
history_limit: int = 500 # Maximum number of historical updates to keep per job
)
"Initialize a new progress monitor with optional history tracking"
def update(
self,
job_id: str, # Unique identifier for the job being tracked
info: ProgressInfo # Progress information update for the job
)
"TODO: Add function description"
def snapshot(
self,
job_id: str # Unique identifier of the job to snapshot
) -> Optional[Dict[str, Any]]: # Job state dictionary or None if job not found
"Get a point-in-time snapshot of a specific job's progress state"
def all(
self
) -> Dict[str, Dict[str, Any]]: # Dictionary mapping job IDs to their state snapshots
"Get snapshots of all tracked jobs"
def clear_completed(
self,
older_than_seconds: float = 3600 # Age threshold in seconds for removing completed jobs
)
"Remove completed jobs that finished more than the specified seconds ago"
streaming (streaming.ipynb)
Server-Sent Events (SSE) generator for real-time progress streaming to web clients.
Import
from cjm_tqdm_capture.streaming import (
sse_stream
)
Functions
def sse_stream(
monitor: ProgressMonitor, # Progress monitor instance to read job updates from
job_id: str, # Unique identifier of the job to stream
interval: float = 0.25, # Polling interval in seconds for checking progress updates
heartbeat: float = 15.0, # Seconds between keep-alive messages when no updates
wait_for_start: bool = True, # Whether to wait for job to start before ending stream
start_timeout: float = 5.0, # Maximum seconds to wait for job to start if wait_for_start is True
) -> Iterator[str]: # SSE-formatted strings ready to send to client
"""
Framework-agnostic SSE generator.
- Yields 'data: {json}\n\n' when progress changes.
- Sends ': keep-alive' comments every `heartbeat` seconds when idle.
- If `wait_for_start` is True, it will wait up to `start_timeout` for
the first snapshot before ending (avoids race at job startup).
"""
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_tqdm_capture-0.0.4.tar.gz.
File metadata
- Download URL: cjm_tqdm_capture-0.0.4.tar.gz
- Upload date:
- Size: 16.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
861263e9a2dae858bda06c8475cd986942769d3d507ad02807bf3a9ff2f05ae4
|
|
| MD5 |
2532d65f0168a32b12ed6ca5c71d9059
|
|
| BLAKE2b-256 |
0d21eb34e3712bd188b5ee72d4aee1dcc47ca224a56402ca50fd9b7a1ed72c84
|
File details
Details for the file cjm_tqdm_capture-0.0.4-py3-none-any.whl.
File metadata
- Download URL: cjm_tqdm_capture-0.0.4-py3-none-any.whl
- Upload date:
- Size: 17.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0d994697203174e952b6fe4a3827976d7e7a87176c9730b02b30d5825caed663
|
|
| MD5 |
c83ed7636acc785c9c355428e32ac2eb
|
|
| BLAKE2b-256 |
c3c6b048efe10a671e03a92cf436547b2aad0764c6bb528f114c7012c809b412
|