Skip to main content

Intercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.

Project description

cjm-tqdm-capture

Install

pip install cjm-tqdm-capture

Project Structure

nbs/
├── job_runner.ipynb       # Executes functions in background threads with automatic tqdm progress capture.
├── patch_tqdm.ipynb       # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
├── progress_info.ipynb    # Defines the data structure used to represent progress state
├── progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
└── streaming.ipynb        # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Total: 5 notebooks

Module Dependencies

graph LR
    job_runner[job_runner<br/>job runner]
    patch_tqdm[patch_tqdm<br/>patch tqdm]
    progress_info[progress_info<br/>progress info]
    progress_monitor[progress_monitor<br/>progress monitor]
    streaming[streaming<br/>streaming]

    job_runner --> patch_tqdm
    job_runner --> progress_monitor
    job_runner --> progress_info
    patch_tqdm --> progress_info
    progress_monitor --> patch_tqdm
    progress_monitor --> progress_info
    streaming --> progress_monitor
    streaming --> job_runner

8 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

job runner (job_runner.ipynb)

Executes functions in background threads with automatic tqdm progress capture.

Import

from cjm_tqdm_capture.job_runner import (
    JobRunner
)

Classes

class JobRunner:
    def __init__(
        self,
        monitor: ProgressMonitor  # Progress monitor instance to receive updates
    )
    """
    Runs a callable in a background thread, patches tqdm inside the job,
    and forwards ProgressInfo updates to a ProgressMonitor under job_id.
    """
    
    def __init__(
            self,
            monitor: ProgressMonitor  # Progress monitor instance to receive updates
        )
        "Initialize a job runner with a progress monitor"
    
    def start(
            self,
            job_id: str,  # Unique identifier for this job
            fn: Callable[..., Any],
            *args,
            patch_kwargs: Optional[Dict[str, Any]] = None,
            **kwargs
        ) -> threading.Thread:  # The thread running the job
        "Start a job in a background thread with automatic tqdm patching"
    
    def is_alive(
            self,
            job_id: str  # Unique identifier of the job to check
        ) -> bool:  # True if the job thread is still running
        "Check if a job's thread is still running"
    
    def join(
            self,
            job_id: str,  # Unique identifier of the job to wait for
            timeout: Optional[float] = None  # Maximum seconds to wait (None for indefinite)
        ) -> None:  # Returns when thread completes or timeout expires
        "Wait for a job's thread to complete"

patch tqdm (patch_tqdm.ipynb)

Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information

Import

from cjm_tqdm_capture.patch_tqdm import (
    patch_tqdm
)

Functions

def _make_callback_class(
    BaseTqdm,    # Base tqdm class to extend with callback functionality
    default_cb: Optional[Callable[[ProgressInfo], None]],
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 1.0,      # emit only if pct moves by >= this
    emit_initial: bool = False       # whether to emit at 0%
)
    "Create a tqdm subclass that emits progress callbacks during iteration"
@contextmanager
def patch_tqdm(
    progress_callback: Optional[Callable[[ProgressInfo], None]],  # Function to call with progress updates
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 10.0,   # e.g., only every ~10%
    emit_initial: bool = False  # Whether to emit callback at 0% progress
)
    "Context manager that patches tqdm to emit progress callbacks"

Variables

_BAR_COUNTER

progress info (progress_info.ipynb)

Defines the data structure used to represent progress state

Import

from cjm_tqdm_capture.progress_info import (
    ProgressInfo
)

Classes

@dataclass
class ProgressInfo:
    "Structured progress information"
    
    progress: float  # Percentage completion (0-100)
    current: Optional[int]  # Current iteration count
    total: Optional[int]  # Total iterations expected
    rate: Optional[str]  # Processing rate (e.g., "50.5 it/s")
    elapsed: Optional[str]  # Time elapsed since start
    remaining: Optional[str]  # Estimated time remaining
    description: Optional[str]  # Progress bar description/label
    raw_output: str = ''  # Raw output string (if any)
    timestamp: float  # Unix timestamp when created
    bar_id: Optional[str]  # Unique identifier for this progress bar
    position: Optional[int]  # Display position for multi-bar scenarios
    

progress monitor (progress_monitor.ipynb)

Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.

Import

from cjm_tqdm_capture.progress_monitor import (
    ProgressMonitor
)

Classes

class ProgressMonitor:
    def __init__(
        self,
        keep_history: bool = False,  # Whether to maintain a history of progress updates
        history_limit: int = 500  # Maximum number of historical updates to keep per job
    )
    "Thread-safe monitor for tracking progress of multiple concurrent jobs"
    
    def __init__(
            self,
            keep_history: bool = False,  # Whether to maintain a history of progress updates
            history_limit: int = 500  # Maximum number of historical updates to keep per job
        )
        "Initialize a new progress monitor with optional history tracking"
    
    def update(
            self,
            job_id: str,  # Unique identifier for the job being tracked
            info: ProgressInfo  # Progress information update for the job
        )
    
    def snapshot(
            self,
            job_id: str  # Unique identifier of the job to snapshot
        ) -> Optional[Dict[str, Any]]:  # Job state dictionary or None if job not found
        "Get a point-in-time snapshot of a specific job's progress state"
    
    def all(
            self
        ) -> Dict[str, Dict[str, Any]]:  # Dictionary mapping job IDs to their state snapshots
        "Get snapshots of all tracked jobs"
    
    def clear_completed(
            self,
            older_than_seconds: float = 3600  # Age threshold in seconds for removing completed jobs
        )
        "Remove completed jobs that finished more than the specified seconds ago"

streaming (streaming.ipynb)

Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Import

from cjm_tqdm_capture.streaming import (
    sse_stream
)

Functions

def sse_stream(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> Iterator[str]:  # SSE-formatted strings ready to send to client
    """
    Framework-agnostic SSE generator.
    - Yields 'data: {json}\n\n' when progress changes.
    - Sends ': keep-alive' comments every `heartbeat` seconds when idle.
    - If `wait_for_start` is True, it will wait up to `start_timeout` for
      the first snapshot before ending (avoids race at job startup).
    """

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_tqdm_capture-0.0.3.tar.gz (15.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_tqdm_capture-0.0.3-py3-none-any.whl (16.4 kB view details)

Uploaded Python 3

File details

Details for the file cjm_tqdm_capture-0.0.3.tar.gz.

File metadata

  • Download URL: cjm_tqdm_capture-0.0.3.tar.gz
  • Upload date:
  • Size: 15.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for cjm_tqdm_capture-0.0.3.tar.gz
Algorithm Hash digest
SHA256 8a6d4e944ed227b3064c3e9a22b50c08d3d481bc9dc72c7d58019c7aa50b394e
MD5 50a3d108019cf2fb91e7396b70c3d6cb
BLAKE2b-256 2116da0a85b91c5c0c3d2b8678bd70ba21d7d357ea62ccc32fb7c134a36a97c1

See more details on using hashes here.

File details

Details for the file cjm_tqdm_capture-0.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_tqdm_capture-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 d2c2ea84d48e97494a33599375cb6c41018370abc8a45cbb6bb47e7500ab5456
MD5 240d89c3dc884ed60c65cfab16307f05
BLAKE2b-256 95fcb3d5578609f9a1e13aaa45a58976e55febf4bcd29ffe3ab6d2f3edf31ee6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page