Skip to main content

Lightweight progress utilities for sync/async workloads

Project description

processit

A lightweight progress utility for Python --- built for both synchronous and asynchronous iteration.

processit provides a simple, dependency-free progress bar for loops that may be either regular iterables or async iterables.


Installation

pip install processit

Quick Example

progress (sequential iteration)

import asyncio
import time

from processit import progress

def numbers():
    for i in range(10):
        time.sleep(0.3)
        yield i

async def main():
    async for _ in progress(numbers(), total=10, desc="Numbers"):
        await asyncio.sleep(0)

asyncio.run(main())

⚠️ progress does not create concurrency.
It simply instruments iteration and renders a progress bar.


Using progress with context manager

import asyncio
import time

from processit import progress

def numbers():
    for i in range(10):
        time.sleep(0.3)
        yield i

async def main():
    async with progress(numbers(), total=10, desc='Numbers') as p:
        log = p.log_stream()
        async for n in p:
            print(f'value: {n}', file=log)
            await asyncio.sleep(0.5)

asyncio.run(main())

Concurrency & Parallelism

progress(...) does not parallelize work.

If you want true concurrency (e.g., HTTP calls, async DB operations, async file IO), you must create tasks yourself and use track_as_completed(...).


track_as_completed (parallel tasks)

import asyncio
import random

from processit import track_as_completed

async def work(n: int) -> int:
    await asyncio.sleep(1.5 + random.random())
    return n * 2

async def main():
    tasks = [asyncio.create_task(work(i)) for i in range(10)]

    async for task in track_as_completed(tasks, total=len(tasks), desc="Parallel work"):
        result = await task
        print(result)

asyncio.run(main())

Limiting concurrency with a semaphore

import asyncio
from processit import track_as_completed

async def fetch(i: int) -> int:
    await asyncio.sleep(1)
    return i * 2

async def main():
    sem = asyncio.Semaphore(5)

    async def bounded(i: int):
        async with sem:
            return await fetch(i)

    tasks = [asyncio.create_task(bounded(i)) for i in range(20)]

    async for task in track_as_completed(tasks, total=len(tasks), desc="HTTP"):
        result = await task
        # process result

asyncio.run(main())

Important Notes

1. Blocking code

If your iterable performs blocking operations (e.g. time.sleep, synchronous file writes, synchronous DB access), the event loop will still be blocked.

To avoid blocking:

  • Use async libraries (aiohttp, async DB drivers, aiofiles, etc.)
  • Or move blocking work to a thread:
await asyncio.to_thread(blocking_function, arg1, arg2)

2. When to use each utility

Use case Recommended utility


Sequential iteration progress(...) Parallel async tasks track_as_completed(...) Need concurrency limit Semaphore + track_as_completed(...)


3. TTY vs non-TTY output

When the output stream is a real TTY, processit redraws the same line.

When the output stream is not a TTY (for example CI logs, redirected output, or StringIO in tests), processit emits line-based snapshots instead:

  • It respects refresh_interval without periodic duplicate frames
  • It prints a final 100% snapshot before the summary when total is known
  • Messages written with p.write(...) or file=p.log_stream() stay above the live bar; the bar is re-rendered as the last line in TTY mode

Timing starts when iteration actually begins, not when the Progress instance is created.


More Examples

Asynchronous iteration over a data source

import asyncio
from processit import progress

async def fetch_items():
    for i in range(20):
        await asyncio.sleep(0.05)
        yield f"item-{i}"

async def main():
    async for item in progress(fetch_items(), total=20, desc="Fetching"):
        await asyncio.sleep(0.02)

asyncio.run(main())

Processing without a defined total

import asyncio
from processit import progress

items = [x ** 2 for x in range(100)]

async def main():
    async for value in progress(items, desc="Squaring"):
        await asyncio.sleep(0.01)

asyncio.run(main())

Features

  • Works with both async and sync iterables
  • Displays elapsed time, rate, and ETA (when total is known)
  • Automatically cleans up and prints a final summary
  • No dependencies --- pure Python, fully type-hinted
  • Easy to use drop-in function: progress(iterable, ...)

API

progress(iterable, total=None, *, desc='Processing', width=30, refresh_interval=0.1, show_summary=True)

Creates and returns a Progress instance.

Name Type Description


iterable Iterable[T] \| AsyncIterable[T] Iterable to track total int \| None Total number of iterations desc str Text prefix shown before the bar width int Width of the progress bar refresh_interval float Time between updates show_summary bool Whether to print final summary


Progress helpers inside async with progress(...) as p

  • p.write("message"): prints a message above the live bar and re-renders it
  • p.log_stream(): returns a file-like stream for print(..., file=...) or logging.StreamHandler

If a command mixes progress output with logs, prefer one of those two interfaces instead of calling print(...) directly on stdout/stderr.


track_as_completed(tasks, total=None, *, desc='Processing', width=30, refresh_interval=0.1, show_summary=True)

Tracks a collection of awaitables or tasks as they complete.

Name Type Description


tasks Iterable[Awaitable[T]] Tasks or coroutines to monitor total int \| None Total number of tasks desc str Text prefix shown before the bar width int Width of the progress bar refresh_interval float Time between updates show_summary bool Whether to print final summary


Design Philosophy

processit is intentionally minimal:

  • No external dependencies\
  • No hidden concurrency\
  • Clear separation between instrumentation (progress) and concurrency (track_as_completed)

It focuses purely on progress rendering while leaving execution strategy under your control.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

processit-0.2.2.tar.gz (15.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

processit-0.2.2-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file processit-0.2.2.tar.gz.

File metadata

  • Download URL: processit-0.2.2.tar.gz
  • Upload date:
  • Size: 15.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for processit-0.2.2.tar.gz
Algorithm Hash digest
SHA256 9aac189a54ba2947fa915941cc15a795f2a224ebaa8125abe1613264a52bb9ce
MD5 76f317d238ee0ed35dd4e9b75ff932e5
BLAKE2b-256 850d2d47bd1695c7f87038b002ca0d0c54deda2eeda286f581d5bf31c65ddde3

See more details on using hashes here.

File details

Details for the file processit-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: processit-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 9.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for processit-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f4bd7bf4918605631f43ea7a17e0b7bbdf61527fbab9174a1fa09ceb29a50b82
MD5 72b69e5b26b4feadd40ffa36b70ccba5
BLAKE2b-256 ccf5575b2d9c7c9c2f7a06890678598c96b1550add667d9f4db1f297f5e1f62d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page