Skip to main content

Intercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.

Project description

cjm-tqdm-capture

Install

pip install cjm-tqdm-capture

Project Structure

nbs/
├── job_runner.ipynb       # Executes functions in background threads with automatic tqdm progress capture.
├── patch_tqdm.ipynb       # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
├── progress_info.ipynb    # Defines the data structure used to represent progress state
├── progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
└── streaming.ipynb        # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Total: 5 notebooks

Module Dependencies

graph LR
    job_runner[job_runner<br/>job runner]
    patch_tqdm[patch_tqdm<br/>patch tqdm]
    progress_info[progress_info<br/>progress info]
    progress_monitor[progress_monitor<br/>progress monitor]
    streaming[streaming<br/>streaming]

    job_runner --> progress_info
    job_runner --> progress_monitor
    job_runner --> patch_tqdm
    patch_tqdm --> progress_info
    progress_monitor --> progress_info
    progress_monitor --> patch_tqdm
    streaming --> progress_monitor
    streaming --> job_runner

8 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

job runner (job_runner.ipynb)

Executes functions in background threads with automatic tqdm progress capture.

Import

from cjm_tqdm_capture.job_runner import (
    JobRunner
)

Classes

class JobRunner:
    def __init__(
        self,
        monitor: ProgressMonitor  # Progress monitor instance to receive updates
    )
    "Runs functions in background threads with automatic tqdm progress capture"
    
    def __init__(
            self,
            monitor: ProgressMonitor  # Progress monitor instance to receive updates
        )
        "Initialize a job runner with a progress monitor"
    
    def start(
            self,
            job_id: str,  # Unique identifier for this job
            fn: Callable[..., Any],
            *args,
            patch_kwargs: Optional[Dict[str, Any]] = None,
            **kwargs
        ) -> threading.Thread:  # The thread running the job
        "Start a job in a background thread with automatic tqdm patching"
    
    def is_alive(
            self,
            job_id: str  # Unique identifier of the job to check
        ) -> bool:  # True if the job thread is still running
        "Check if a job's thread is still running"
    
    def join(
            self,
            job_id: str,  # Unique identifier of the job to wait for
            timeout: Optional[float] = None  # Maximum seconds to wait (None for indefinite)
        ) -> None:  # Returns when thread completes or timeout expires
        "Wait for a job's thread to complete"

patch tqdm (patch_tqdm.ipynb)

Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information

Import

from cjm_tqdm_capture.patch_tqdm import (
    patch_tqdm
)

Functions

def _make_callback_class(
    BaseTqdm: type,  # Base tqdm class to extend with callback functionality
    default_cb: Optional[Callable[[ProgressInfo], None]],
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 1.0,      # emit only if pct moves by >= this
    emit_initial: bool = False       # whether to emit at 0%
) -> type: # Extended tqdm class with callback support
    "Create a tqdm subclass that emits progress callbacks during iteration"
@contextmanager
def patch_tqdm(
    progress_callback: Optional[Callable[[ProgressInfo], None]],  # Function to call with progress updates
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 10.0,   # e.g., only every ~10%
    emit_initial: bool = False  # Whether to emit callback at 0% progress
): # Context manager that temporarily patches tqdm modules
    "Context manager that patches tqdm to emit progress callbacks"

Variables

_BAR_COUNTER

progress info (progress_info.ipynb)

Defines the data structure used to represent progress state

Import

from cjm_tqdm_capture.progress_info import (
    ProgressInfo,
    serialize_job_snapshot,
    serialize_all_jobs
)

Functions

def serialize_job_snapshot(
    snapshot: Optional[Dict[str, Any]]  # Job snapshot dictionary from ProgressMonitor
) -> Optional[Dict[str, Any]]:  # JSON-serializable dictionary or None if input is None
    "Convert a job snapshot to JSON-serializable format"
def serialize_all_jobs(
    jobs: Dict[str, Dict[str, Any]]  # Dictionary mapping job IDs to job snapshots
) -> Dict[str, Optional[Dict[str, Any]]]:  # Dictionary mapping job IDs to serialized snapshots
    "Convert all jobs from monitor.all() to JSON-serializable format"

Classes

@dataclass
class ProgressInfo:
    "Structured progress information"
    
    progress: float  # Percentage completion (0-100)
    current: Optional[int]  # Current iteration count
    total: Optional[int]  # Total iterations expected
    rate: Optional[str]  # Processing rate (e.g., "50.5 it/s")
    elapsed: Optional[str]  # Time elapsed since start
    remaining: Optional[str]  # Estimated time remaining
    description: Optional[str]  # Progress bar description/label
    raw_output: str = ''  # Raw output string (if any)
    timestamp: float  # Unix timestamp when created
    bar_id: Optional[str]  # Unique identifier for this progress bar
    position: Optional[int]  # Display position for multi-bar scenarios
    
    def to_dict(
            self
        ) -> dict: # Dictionary with all progress fields for JSON serialization
        "Convert to dictionary for JSON serialization"

progress monitor (progress_monitor.ipynb)

Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.

Import

from cjm_tqdm_capture.progress_monitor import (
    ProgressMonitor
)

Classes

class ProgressMonitor:
    def __init__(
        self,
        keep_history: bool = False,  # Whether to maintain a history of progress updates
        history_limit: int = 500  # Maximum number of historical updates to keep per job
    )
    "Thread-safe monitor for tracking progress of multiple concurrent jobs"
    
    def __init__(
            self,
            keep_history: bool = False,  # Whether to maintain a history of progress updates
            history_limit: int = 500  # Maximum number of historical updates to keep per job
        )
        "Initialize a new progress monitor with optional history tracking"
    
    def update(
            self,
            job_id: str,  # Unique identifier for the job being tracked
            info: ProgressInfo  # Progress information update for the job
        )
        "Record a progress update for a job and recompute its completion status"
    
    def snapshot(
            self,
            job_id: str  # Unique identifier of the job to snapshot
        ) -> Optional[Dict[str, Any]]:  # Job state dictionary or None if job not found
        "Get a point-in-time snapshot of a specific job's progress state"
    
    def all(
            self
        ) -> Dict[str, Dict[str, Any]]:  # Dictionary mapping job IDs to their state snapshots
        "Get snapshots of all tracked jobs"
    
    def clear_completed(
            self,
            older_than_seconds: float = 3600  # Age threshold in seconds for removing completed jobs
        )
        "Remove completed jobs that finished more than the specified seconds ago"

streaming (streaming.ipynb)

Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Import

from cjm_tqdm_capture.streaming import (
    sse_stream,
    sse_stream_async
)

Functions

def sse_stream(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> Iterator[str]:  # SSE-formatted strings ready to send to client
    "Framework-agnostic SSE generator for streaming job progress"
async def sse_stream_async(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> AsyncIterator[str]:  # SSE-formatted strings ready to send to client
    "Async version of SSE generator for streaming job progress"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_tqdm_capture-0.0.6.tar.gz (16.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_tqdm_capture-0.0.6-py3-none-any.whl (17.3 kB view details)

Uploaded Python 3

File details

Details for the file cjm_tqdm_capture-0.0.6.tar.gz.

File metadata

  • Download URL: cjm_tqdm_capture-0.0.6.tar.gz
  • Upload date:
  • Size: 16.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for cjm_tqdm_capture-0.0.6.tar.gz
Algorithm Hash digest
SHA256 f7df467f6b79d012f69502e66c37e6e05c9758727143005438c9d967dfa06af1
MD5 3ff8a98e2495a2ab86c5e471c310f81b
BLAKE2b-256 d862a18ceb171d0127b73c3916fa91b22bc8044ae9856192c3a86524b670ef2f

See more details on using hashes here.

File details

Details for the file cjm_tqdm_capture-0.0.6-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_tqdm_capture-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 e5507c0f321de8d6163390ac90892224bc59636dcff43369465196b2603033cc
MD5 d96b917cd1f663fa153a80dc1933135e
BLAKE2b-256 1bdfdc7a58a8c6ade736f2603e7beefb738662b19ed664920ba40affef9e8c4d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page