Iterator and async iterator utilities for Python
Project description
streamish
Iterator and async iterator utilities for Python 3.12+.
Features
- Hybrid API: Fluent chains or standalone functions
- Unified sync/async: Same functions work with both iterators and async iterators
- Type safe: Full pyright strict mode support
- Zero dependencies: stdlib only
Installation
pip install streamish
Quick Start
import streamish as st
# Fluent API
result = list(
st.stream([1, 2, 3, 4, 5])
.map(lambda x: x * 2)
.filter(lambda x: x > 4)
.take(2)
)
# [6, 8]
# Standalone functions
result = list(st.take(2, st.filter(lambda x: x > 4, st.map(lambda x: x * 2, [1, 2, 3, 4, 5]))))
# [6, 8]
Async Support
Functions automatically detect async iterables and async functions:
import streamish as st
async def fetch(url: str) -> Response:
...
# Async source
async def urls():
yield "https://example.com/1"
yield "https://example.com/2"
# Automatically async
async for response in st.stream(urls()).map(fetch):
print(response)
# Concurrent execution with map_async
async for response in st.map_async(fetch, urls(), concurrency=10):
print(response)
Operations
Transform
| Operation | Description |
|---|---|
map(fn, it) |
Apply function to each element |
filter(pred, it) |
Keep elements satisfying predicate |
flatten(it) |
Flatten one level of nesting |
flat_map(fn, it) |
Map then flatten |
enumerate(it, start=0) |
Add index to elements |
scan(fn, it, initial=x) |
Cumulative reduce, yielding intermediate values |
map_async(fn, it, concurrency=1) |
Concurrent async map, preserving order |
Filter
| Operation | Description |
|---|---|
take(n, it) |
Take first n elements |
skip(n, it) |
Skip first n elements |
take_while(pred, it) |
Take while predicate is true |
skip_while(pred, it) |
Skip while predicate is true |
distinct(it, window=N, timeout=T) |
Remove duplicates (with optional LRU window or expiry) |
distinct_by(key_fn, it, window=N, timeout=T) |
Remove duplicates by key (with optional LRU window or expiry) |
Group
| Operation | Description |
|---|---|
batch(size, it, timeout=None) |
Group into batches by size or timeout |
window(size, it, step=1) |
Sliding window |
partition(pred, it) |
Split into (matches, non_matches) |
Combine
| Operation | Description |
|---|---|
zip(*iterables) |
Zip iterables together |
chain(*iterables) |
Chain iterables sequentially |
interleave(*iterables) |
Alternate elements round-robin |
merge(*async_iterables) |
Merge async iterables, emit as they arrive |
Examples
Processing a file line by line
import streamish as st
with open("data.txt") as f:
result = list(
st.stream(f)
.map(str.strip)
.filter(bool) # skip empty lines
.distinct()
.take(100)
)
Batching API requests
import streamish as st
async def send_batch(items: list[Item]) -> None:
...
async def process(items: AsyncIterable[Item]) -> None:
async for batch in st.stream(items).batch(100, timeout=5.0):
await send_batch(batch)
Concurrent HTTP requests
import httpx
import streamish as st
async def fetch(client: httpx.AsyncClient, url: str) -> Response:
return await client.get(url)
async def main():
urls = ["https://example.com/1", "https://example.com/2", ...]
async with httpx.AsyncClient() as client:
async for response in st.map_async(
lambda url: fetch(client, url),
urls,
concurrency=10,
):
print(response.status_code)
Windowed statistics
import streamish as st
# Moving average over last 5 values
values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
averages = list(
st.stream(values)
.window(5)
.map(lambda w: sum(w) / len(w))
)
# [3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
Merging async streams
import streamish as st
async def stream_a():
for i in range(3):
await asyncio.sleep(0.1)
yield f"a{i}"
async def stream_b():
for i in range(3):
await asyncio.sleep(0.15)
yield f"b{i}"
# Items emitted as they arrive
async for item in st.merge(stream_a(), stream_b()):
print(item)
# a0, b0, a1, a2, b1, b2 (order depends on timing)
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamish-0.1.3.tar.gz.
File metadata
- Download URL: streamish-0.1.3.tar.gz
- Upload date:
- Size: 30.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5aeac48ab11555ceb676f784f300592290e9837ebda4a7016b27734652f6b67c
|
|
| MD5 |
5de4b8ca41c6ab7e84d5f14b411fae9a
|
|
| BLAKE2b-256 |
b5ceb28e192cef93ac7f2b629649da7e083778757a45a5d18c5eda2d41e862b3
|
Provenance
The following attestation bundles were made for streamish-0.1.3.tar.gz:
Publisher:
publish.yml on gabfssilva/streamish
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamish-0.1.3.tar.gz -
Subject digest:
5aeac48ab11555ceb676f784f300592290e9837ebda4a7016b27734652f6b67c - Sigstore transparency entry: 886121911
- Sigstore integration time:
-
Permalink:
gabfssilva/streamish@aea09be2dbb588e020da462e6ff453d1c99172d1 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/gabfssilva
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@aea09be2dbb588e020da462e6ff453d1c99172d1 -
Trigger Event:
release
-
Statement type:
File details
Details for the file streamish-0.1.3-py3-none-any.whl.
File metadata
- Download URL: streamish-0.1.3-py3-none-any.whl
- Upload date:
- Size: 13.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d6a5dc5243637d4b64d881e92ce6380d5e66061c080e816eed1659032ddd8a32
|
|
| MD5 |
0f751498745ae5caecf8090f848c2146
|
|
| BLAKE2b-256 |
b4edce730a45d487d9d4c448f7b9c3631e74e05f7856b4bd2154cdb79fe394ac
|
Provenance
The following attestation bundles were made for streamish-0.1.3-py3-none-any.whl:
Publisher:
publish.yml on gabfssilva/streamish
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamish-0.1.3-py3-none-any.whl -
Subject digest:
d6a5dc5243637d4b64d881e92ce6380d5e66061c080e816eed1659032ddd8a32 - Sigstore transparency entry: 886121950
- Sigstore integration time:
-
Permalink:
gabfssilva/streamish@aea09be2dbb588e020da462e6ff453d1c99172d1 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/gabfssilva
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@aea09be2dbb588e020da462e6ff453d1c99172d1 -
Trigger Event:
release
-
Statement type: