Skip to main content

Iterator and async iterator utilities for Python

Project description

streamish

PyPI version Python CI License: MIT

Iterator and async iterator utilities for Python 3.12+.

Features

  • Hybrid API: Fluent chains or standalone functions
  • Unified sync/async: Same functions work with both iterators and async iterators
  • Type safe: Full pyright strict mode support
  • Zero dependencies: stdlib only

Installation

pip install streamish

Quick Start

import streamish as st

# Fluent API
result = list(
    st.stream([1, 2, 3, 4, 5])
    .map(lambda x: x * 2)
    .filter(lambda x: x > 4)
    .take(2)
)
# [6, 8]

# Standalone functions
result = list(st.take(2, st.filter(lambda x: x > 4, st.map(lambda x: x * 2, [1, 2, 3, 4, 5]))))
# [6, 8]

Async Support

Functions automatically detect async iterables and async functions:

import streamish as st

async def fetch(url: str) -> Response:
    ...

# Async source
async def urls():
    yield "https://example.com/1"
    yield "https://example.com/2"

# Automatically async
async for response in st.stream(urls()).map(fetch):
    print(response)

# Concurrent execution with map_async
async for response in st.map_async(fetch, urls(), concurrency=10):
    print(response)

Operations

Transform

Operation Description
map(fn, it) Apply function to each element
filter(pred, it) Keep elements satisfying predicate
flatten(it) Flatten one level of nesting
flat_map(fn, it) Map then flatten
enumerate(it, start=0) Add index to elements
scan(fn, it, initial=x) Cumulative reduce, yielding intermediate values
map_async(fn, it, concurrency=1) Concurrent async map, preserving order

Filter

Operation Description
take(n, it) Take first n elements
skip(n, it) Skip first n elements
take_while(pred, it) Take while predicate is true
skip_while(pred, it) Skip while predicate is true
distinct(it) Remove duplicates
distinct_by(key_fn, it) Remove duplicates by key

Group

Operation Description
batch(size, it, timeout=None) Group into batches by size or timeout
window(size, it, step=1) Sliding window
partition(pred, it) Split into (matches, non_matches)

Combine

Operation Description
zip(*iterables) Zip iterables together
chain(*iterables) Chain iterables sequentially
interleave(*iterables) Alternate elements round-robin
merge(*async_iterables) Merge async iterables, emit as they arrive

Examples

Processing a file line by line

import streamish as st

with open("data.txt") as f:
    result = list(
        st.stream(f)
        .map(str.strip)
        .filter(bool)  # skip empty lines
        .distinct()
        .take(100)
    )

Batching API requests

import streamish as st

async def send_batch(items: list[Item]) -> None:
    ...

async def process(items: AsyncIterable[Item]) -> None:
    async for batch in st.stream(items).batch(100, timeout=5.0):
        await send_batch(batch)

Concurrent HTTP requests

import httpx
import streamish as st

async def fetch(client: httpx.AsyncClient, url: str) -> Response:
    return await client.get(url)

async def main():
    urls = ["https://example.com/1", "https://example.com/2", ...]

    async with httpx.AsyncClient() as client:
        async for response in st.map_async(
            lambda url: fetch(client, url),
            urls,
            concurrency=10,
        ):
            print(response.status_code)

Windowed statistics

import streamish as st

# Moving average over last 5 values
values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

averages = list(
    st.stream(values)
    .window(5)
    .map(lambda w: sum(w) / len(w))
)
# [3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

Merging async streams

import streamish as st

async def stream_a():
    for i in range(3):
        await asyncio.sleep(0.1)
        yield f"a{i}"

async def stream_b():
    for i in range(3):
        await asyncio.sleep(0.15)
        yield f"b{i}"

# Items emitted as they arrive
async for item in st.merge(stream_a(), stream_b()):
    print(item)
# a0, b0, a1, a2, b1, b2 (order depends on timing)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamish-0.1.0.tar.gz (29.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamish-0.1.0-py3-none-any.whl (12.5 kB view details)

Uploaded Python 3

File details

Details for the file streamish-0.1.0.tar.gz.

File metadata

  • Download URL: streamish-0.1.0.tar.gz
  • Upload date:
  • Size: 29.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamish-0.1.0.tar.gz
Algorithm Hash digest
SHA256 1f0ae11669f865b25e7d1fa315cd493c8b912bf0484799522d4db10f43be27c0
MD5 f7c3eda9845c2ab83b3a01975bd81c68
BLAKE2b-256 ae99c72d45c047bf24183945622d9ddde8b095edffae45dd5621b41f0bb7bcb9

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamish-0.1.0.tar.gz:

Publisher: publish.yml on gabfssilva/streamish

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamish-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: streamish-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 12.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for streamish-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a101a6e92038ffe18272740a2d40958a7de50a422b69e101f051fce7004573a0
MD5 3f0095d524d0c1fcc854e4d9012dc211
BLAKE2b-256 5d07d56f04f3483c3e3c8ab292b8d859b64c11c4a1f8b25cd21167a6c3b39f7c

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamish-0.1.0-py3-none-any.whl:

Publisher: publish.yml on gabfssilva/streamish

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page