Skip to main content

Iterator and async iterator utilities for Python

Project description

streamish

PyPI version Python CI License: MIT

Iterator and async iterator utilities for Python 3.12+.

Features

  • Hybrid API: Fluent chains or standalone functions
  • Unified sync/async: Same functions work with both iterators and async iterators
  • Type safe: Full pyright strict mode support
  • Zero dependencies: stdlib only

Installation

pip install streamish

Quick Start

import streamish as st

# Fluent API
result = list(
    st.stream([1, 2, 3, 4, 5])
    .map(lambda x: x * 2)
    .filter(lambda x: x > 4)
    .take(2)
)
# [6, 8]

# Standalone functions
result = list(st.take(2, st.filter(lambda x: x > 4, st.map(lambda x: x * 2, [1, 2, 3, 4, 5]))))
# [6, 8]

Async Support

Functions automatically detect async iterables and async functions:

import streamish as st

async def fetch(url: str) -> Response:
    ...

# Async source
async def urls():
    yield "https://example.com/1"
    yield "https://example.com/2"

# Automatically async
async for response in st.stream(urls()).map(fetch):
    print(response)

# Concurrent execution with map_async
async for response in st.map_async(fetch, urls(), concurrency=10):
    print(response)

Operations

Transform

Operation Description
map(fn, it) Apply function to each element
filter(pred, it) Keep elements satisfying predicate
flatten(it) Flatten one level of nesting
flat_map(fn, it) Map then flatten
enumerate(it, start=0) Add index to elements
scan(fn, it, initial=x) Cumulative reduce, yielding intermediate values
map_async(fn, it, concurrency=1) Concurrent async map, preserving order

Filter

Operation Description
take(n, it) Take first n elements
skip(n, it) Skip first n elements
take_while(pred, it) Take while predicate is true
skip_while(pred, it) Skip while predicate is true
distinct(it, window=N, timeout=T) Remove duplicates (with optional LRU window or expiry)
distinct_by(key_fn, it) Remove duplicates by key

Group

Operation Description
batch(size, it, timeout=None) Group into batches by size or timeout
window(size, it, step=1) Sliding window
partition(pred, it) Split into (matches, non_matches)

Combine

Operation Description
zip(*iterables) Zip iterables together
chain(*iterables) Chain iterables sequentially
interleave(*iterables) Alternate elements round-robin
merge(*async_iterables) Merge async iterables, emit as they arrive

Examples

Processing a file line by line

import streamish as st

with open("data.txt") as f:
    result = list(
        st.stream(f)
        .map(str.strip)
        .filter(bool)  # skip empty lines
        .distinct()
        .take(100)
    )

Batching API requests

import streamish as st

async def send_batch(items: list[Item]) -> None:
    ...

async def process(items: AsyncIterable[Item]) -> None:
    async for batch in st.stream(items).batch(100, timeout=5.0):
        await send_batch(batch)

Concurrent HTTP requests

import httpx
import streamish as st

async def fetch(client: httpx.AsyncClient, url: str) -> Response:
    return await client.get(url)

async def main():
    urls = ["https://example.com/1", "https://example.com/2", ...]

    async with httpx.AsyncClient() as client:
        async for response in st.map_async(
            lambda url: fetch(client, url),
            urls,
            concurrency=10,
        ):
            print(response.status_code)

Windowed statistics

import streamish as st

# Moving average over last 5 values
values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

averages = list(
    st.stream(values)
    .window(5)
    .map(lambda w: sum(w) / len(w))
)
# [3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

Merging async streams

import streamish as st

async def stream_a():
    for i in range(3):
        await asyncio.sleep(0.1)
        yield f"a{i}"

async def stream_b():
    for i in range(3):
        await asyncio.sleep(0.15)
        yield f"b{i}"

# Items emitted as they arrive
async for item in st.merge(stream_a(), stream_b()):
    print(item)
# a0, b0, a1, a2, b1, b2 (order depends on timing)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamish-0.1.2.tar.gz (30.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamish-0.1.2-py3-none-any.whl (13.4 kB view details)

Uploaded Python 3

File details

Details for the file streamish-0.1.2.tar.gz.

File metadata

  • Download URL: streamish-0.1.2.tar.gz
  • Upload date:
  • Size: 30.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamish-0.1.2.tar.gz
Algorithm Hash digest
SHA256 064de518296c6f1403a7bfaf6cbb47d39949334f07cea5e9f951600965586b46
MD5 2b161d6146a9822d41ba4c872bc74f8b
BLAKE2b-256 223801f6b42358f40d67df66ba3c8d6325bd806110b6b2c2879195a6bd1981f1

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamish-0.1.2.tar.gz:

Publisher: publish.yml on gabfssilva/streamish

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamish-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: streamish-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 13.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamish-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f2c9757f867a9049e58d33d5e934321f5de9974b91b7738f14d95d186280e3a8
MD5 ce744c1aa2a15238cdd6c5cf7e47b256
BLAKE2b-256 c479b9c80efebfd15d2131bb9eb082dff97ae9f6637de2c294615d504e2f2dae

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamish-0.1.2-py3-none-any.whl:

Publisher: publish.yml on gabfssilva/streamish

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page