Skip to main content

Iterator and async iterator utilities for Python

Project description

streamish

PyPI version Python CI License: MIT

Iterator and async iterator utilities for Python 3.12+.

Features

  • Hybrid API: Fluent chains or standalone functions
  • Unified sync/async: Same functions work with both iterators and async iterators
  • Type safe: Full pyright strict mode support
  • Zero dependencies: stdlib only

Installation

pip install streamish

Quick Start

import streamish as st

# Fluent API
result = list(
    st.stream([1, 2, 3, 4, 5])
    .map(lambda x: x * 2)
    .filter(lambda x: x > 4)
    .take(2)
)
# [6, 8]

# Standalone functions
result = list(st.take(2, st.filter(lambda x: x > 4, st.map(lambda x: x * 2, [1, 2, 3, 4, 5]))))
# [6, 8]

Async Support

Functions automatically detect async iterables and async functions:

import streamish as st

async def fetch(url: str) -> Response:
    ...

# Async source
async def urls():
    yield "https://example.com/1"
    yield "https://example.com/2"

# Automatically async
async for response in st.stream(urls()).map(fetch):
    print(response)

# Concurrent execution with map_async
async for response in st.map_async(fetch, urls(), concurrency=10):
    print(response)

Operations

Transform

Operation Description
map(fn, it) Apply function to each element
filter(pred, it) Keep elements satisfying predicate
flatten(it) Flatten one level of nesting
flat_map(fn, it) Map then flatten
enumerate(it, start=0) Add index to elements
scan(fn, it, initial=x) Cumulative reduce, yielding intermediate values
map_async(fn, it, concurrency=1) Concurrent async map, preserving order

Filter

Operation Description
take(n, it) Take first n elements
skip(n, it) Skip first n elements
take_while(pred, it) Take while predicate is true
skip_while(pred, it) Skip while predicate is true
distinct(it, window=N, timeout=T) Remove duplicates (with optional LRU window or expiry)
distinct_by(key_fn, it) Remove duplicates by key

Group

Operation Description
batch(size, it, timeout=None) Group into batches by size or timeout
window(size, it, step=1) Sliding window
partition(pred, it) Split into (matches, non_matches)

Combine

Operation Description
zip(*iterables) Zip iterables together
chain(*iterables) Chain iterables sequentially
interleave(*iterables) Alternate elements round-robin
merge(*async_iterables) Merge async iterables, emit as they arrive

Examples

Processing a file line by line

import streamish as st

with open("data.txt") as f:
    result = list(
        st.stream(f)
        .map(str.strip)
        .filter(bool)  # skip empty lines
        .distinct()
        .take(100)
    )

Batching API requests

import streamish as st

async def send_batch(items: list[Item]) -> None:
    ...

async def process(items: AsyncIterable[Item]) -> None:
    async for batch in st.stream(items).batch(100, timeout=5.0):
        await send_batch(batch)

Concurrent HTTP requests

import httpx
import streamish as st

async def fetch(client: httpx.AsyncClient, url: str) -> Response:
    return await client.get(url)

async def main():
    urls = ["https://example.com/1", "https://example.com/2", ...]

    async with httpx.AsyncClient() as client:
        async for response in st.map_async(
            lambda url: fetch(client, url),
            urls,
            concurrency=10,
        ):
            print(response.status_code)

Windowed statistics

import streamish as st

# Moving average over last 5 values
values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

averages = list(
    st.stream(values)
    .window(5)
    .map(lambda w: sum(w) / len(w))
)
# [3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

Merging async streams

import streamish as st

async def stream_a():
    for i in range(3):
        await asyncio.sleep(0.1)
        yield f"a{i}"

async def stream_b():
    for i in range(3):
        await asyncio.sleep(0.15)
        yield f"b{i}"

# Items emitted as they arrive
async for item in st.merge(stream_a(), stream_b()):
    print(item)
# a0, b0, a1, a2, b1, b2 (order depends on timing)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamish-0.1.1.tar.gz (29.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamish-0.1.1-py3-none-any.whl (13.3 kB view details)

Uploaded Python 3

File details

Details for the file streamish-0.1.1.tar.gz.

File metadata

  • Download URL: streamish-0.1.1.tar.gz
  • Upload date:
  • Size: 29.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamish-0.1.1.tar.gz
Algorithm Hash digest
SHA256 fe3e5b426764ba3d035e981f28836b80cb029168fb57487a2401cea6b0396ce4
MD5 9a5d245880f92d4558f843692ebc074b
BLAKE2b-256 90722eb1d27a190bfccf1410a2bf04ff4e89e89b04a687c9a41f70071d4dcea7

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamish-0.1.1.tar.gz:

Publisher: publish.yml on gabfssilva/streamish

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamish-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: streamish-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 13.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamish-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 fe14a7f338edff0eb1c40bd62f984bf6c6c846c917bb681a196087a971ee10c0
MD5 7ac4d5f5b4d137de1796326d2d873ac4
BLAKE2b-256 cbacbc0957e692c69fbe3927ee02b1f76989c79e802bd9f12b19406b4c64cc0e

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamish-0.1.1-py3-none-any.whl:

Publisher: publish.yml on gabfssilva/streamish

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page