Skip to main content

Intercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.

Project description

cjm-tqdm-capture

Install

pip install cjm-tqdm-capture

Project Structure

nbs/
├── job_runner.ipynb       # Executes functions in background threads with automatic tqdm progress capture.
├── patch_tqdm.ipynb       # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
├── progress_info.ipynb    # Defines the data structure used to represent progress state
├── progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
└── streaming.ipynb        # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Total: 5 notebooks

Module Dependencies

graph LR
    job_runner[job_runner<br/>job runner]
    patch_tqdm[patch_tqdm<br/>patch tqdm]
    progress_info[progress_info<br/>progress info]
    progress_monitor[progress_monitor<br/>progress monitor]
    streaming[streaming<br/>streaming]

    job_runner --> patch_tqdm
    job_runner --> progress_monitor
    job_runner --> progress_info
    patch_tqdm --> progress_info
    progress_monitor --> patch_tqdm
    progress_monitor --> progress_info
    streaming --> progress_monitor
    streaming --> job_runner

8 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

job runner (job_runner.ipynb)

Executes functions in background threads with automatic tqdm progress capture.

Import

from cjm_tqdm_capture.job_runner import (
    JobRunner
)

Classes

class JobRunner:
    def __init__(
        self,
        monitor: ProgressMonitor  # Progress monitor instance to receive updates
    )
    """
    Runs a callable in a background thread, patches tqdm inside the job,
    and forwards ProgressInfo updates to a ProgressMonitor under job_id.
    """
    
    def __init__(
            self,
            monitor: ProgressMonitor  # Progress monitor instance to receive updates
        )
        "Initialize a job runner with a progress monitor"
    
    def start(
            self,
            job_id: str,  # Unique identifier for this job
            fn: Callable[..., Any],
            *args,
            patch_kwargs: Optional[Dict[str, Any]] = None,
            **kwargs
        ) -> threading.Thread:  # The thread running the job
        "Start a job in a background thread with automatic tqdm patching"
    
    def is_alive(
            self,
            job_id: str  # Unique identifier of the job to check
        ) -> bool:  # True if the job thread is still running
        "Check if a job's thread is still running"
    
    def join(
            self,
            job_id: str,  # Unique identifier of the job to wait for
            timeout: Optional[float] = None  # Maximum seconds to wait (None for indefinite)
        ) -> None:  # Returns when thread completes or timeout expires
        "Wait for a job's thread to complete"

patch tqdm (patch_tqdm.ipynb)

Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information

Import

from cjm_tqdm_capture.patch_tqdm import (
    patch_tqdm
)

Functions

def _make_callback_class(
    BaseTqdm: type,  # Base tqdm class to extend with callback functionality
    default_cb: Optional[Callable[[ProgressInfo], None]],
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 1.0,      # emit only if pct moves by >= this
    emit_initial: bool = False       # whether to emit at 0%
)
    "Create a tqdm subclass that emits progress callbacks during iteration"
@contextmanager
def patch_tqdm(
    progress_callback: Optional[Callable[[ProgressInfo], None]],  # Function to call with progress updates
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 10.0,   # e.g., only every ~10%
    emit_initial: bool = False  # Whether to emit callback at 0% progress
)
    "Context manager that patches tqdm to emit progress callbacks"

Variables

_BAR_COUNTER

progress info (progress_info.ipynb)

Defines the data structure used to represent progress state

Import

from cjm_tqdm_capture.progress_info import (
    ProgressInfo,
    serialize_job_snapshot,
    serialize_all_jobs
)

Functions

def serialize_job_snapshot(
    snapshot: Optional[Dict[str, Any]]  # Job snapshot dictionary from ProgressMonitor
) -> Optional[Dict[str, Any]]:  # JSON-serializable dictionary or None if input is None
    "Convert a job snapshot with ProgressInfo objects to a JSON-serializable format."
def serialize_all_jobs(
    jobs: Dict[str, Dict[str, Any]]  # Dictionary mapping job IDs to job snapshots
) -> Dict[str, Optional[Dict[str, Any]]]:  # Dictionary mapping job IDs to serialized snapshots
    "Convert all jobs from monitor.all() to JSON-serializable format."

Classes

@dataclass
class ProgressInfo:
    "Structured progress information"
    
    progress: float  # Percentage completion (0-100)
    current: Optional[int]  # Current iteration count
    total: Optional[int]  # Total iterations expected
    rate: Optional[str]  # Processing rate (e.g., "50.5 it/s")
    elapsed: Optional[str]  # Time elapsed since start
    remaining: Optional[str]  # Estimated time remaining
    description: Optional[str]  # Progress bar description/label
    raw_output: str = ''  # Raw output string (if any)
    timestamp: float  # Unix timestamp when created
    bar_id: Optional[str]  # Unique identifier for this progress bar
    position: Optional[int]  # Display position for multi-bar scenarios
    
    def to_dict(self):
            """Convert to dictionary for JSON serialization"""
            return {
                'progress': self.progress,
        "Convert to dictionary for JSON serialization"

progress monitor (progress_monitor.ipynb)

Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.

Import

from cjm_tqdm_capture.progress_monitor import (
    ProgressMonitor
)

Classes

class ProgressMonitor:
    def __init__(
        self,
        keep_history: bool = False,  # Whether to maintain a history of progress updates
        history_limit: int = 500  # Maximum number of historical updates to keep per job
    )
    "Thread-safe monitor for tracking progress of multiple concurrent jobs"
    
    def __init__(
            self,
            keep_history: bool = False,  # Whether to maintain a history of progress updates
            history_limit: int = 500  # Maximum number of historical updates to keep per job
        )
        "Initialize a new progress monitor with optional history tracking"
    
    def update(
            self,
            job_id: str,  # Unique identifier for the job being tracked
            info: ProgressInfo  # Progress information update for the job
        )
        "TODO: Add function description"
    
    def snapshot(
            self,
            job_id: str  # Unique identifier of the job to snapshot
        ) -> Optional[Dict[str, Any]]:  # Job state dictionary or None if job not found
        "Get a point-in-time snapshot of a specific job's progress state"
    
    def all(
            self
        ) -> Dict[str, Dict[str, Any]]:  # Dictionary mapping job IDs to their state snapshots
        "Get snapshots of all tracked jobs"
    
    def clear_completed(
            self,
            older_than_seconds: float = 3600  # Age threshold in seconds for removing completed jobs
        )
        "Remove completed jobs that finished more than the specified seconds ago"

streaming (streaming.ipynb)

Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Import

from cjm_tqdm_capture.streaming import (
    sse_stream
)

Functions

def sse_stream(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> Iterator[str]:  # SSE-formatted strings ready to send to client
    """
    Framework-agnostic SSE generator.
    - Yields 'data: {json}\n\n' when progress changes.
    - Sends ': keep-alive' comments every `heartbeat` seconds when idle.
    - If `wait_for_start` is True, it will wait up to `start_timeout` for
      the first snapshot before ending (avoids race at job startup).
    """

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_tqdm_capture-0.0.4.tar.gz (16.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_tqdm_capture-0.0.4-py3-none-any.whl (17.3 kB view details)

Uploaded Python 3

File details

Details for the file cjm_tqdm_capture-0.0.4.tar.gz.

File metadata

  • Download URL: cjm_tqdm_capture-0.0.4.tar.gz
  • Upload date:
  • Size: 16.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for cjm_tqdm_capture-0.0.4.tar.gz
Algorithm Hash digest
SHA256 861263e9a2dae858bda06c8475cd986942769d3d507ad02807bf3a9ff2f05ae4
MD5 2532d65f0168a32b12ed6ca5c71d9059
BLAKE2b-256 0d21eb34e3712bd188b5ee72d4aee1dcc47ca224a56402ca50fd9b7a1ed72c84

See more details on using hashes here.

File details

Details for the file cjm_tqdm_capture-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_tqdm_capture-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 0d994697203174e952b6fe4a3827976d7e7a87176c9730b02b30d5825caed663
MD5 c83ed7636acc785c9c355428e32ac2eb
BLAKE2b-256 c3c6b048efe10a671e03a92cf436547b2aad0764c6bb528f114c7012c809b412

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page