流淌 - A pure-Python streaming data framework with native concurrency, watermark, windowing, and stateful processing
Project description
LiuTang (流淌)
A pure-Python streaming data framework with native concurrency, watermark, windowing, and stateful processing. Zero external dependencies.
流淌 (liútǎng) means "to flow" — data flows like water.
Design Principles
- Zero dependencies —
pip install liutangneeds only Python stdlib; no version hell - Native concurrency — local engine uses
threading/multiprocessing/concurrent.futures - Full streaming — Watermark, event-time windows, keyed state, timers, checkpointing — all pure Python
- Unified API — Same
Flow/StreamAPI for batch and streaming modes
Quick Start
import liutang
# Batch mode — zero dependencies
flow = liutang.Flow(name="word-count", mode=liutang.RuntimeMode.BATCH)
flow.set_parallelism(4)
stream = flow.from_collection(["hello world", "hello liutang", "data flows"])
result = (
stream
.flat_map(lambda line: line.split()) # split words
.map(lambda w: (w.lower(), 1)) # word -> (word, 1)
.key_by(lambda pair: pair[0]) # group by word
)
sink = result.collect() # collect results
flow.execute()
for item in sink.results:
print(item)
Core Features
Windowing
# Tumbling window
stream.window(liutang.WindowType.tumbling(size=10.0, time_field="ts"))
# Sliding window
stream.window(liutang.WindowType.sliding(size=10.0, slide=5.0, time_field="ts"))
# Session window
stream.window(liutang.WindowType.session(gap=30.0, time_field="ts"))
# Window aggregations
windowed = stream.window(liutang.WindowType.tumbling(size=60.0, time_field="ts"))
windowed.sum(field="amount")
windowed.count()
windowed.min(field="price")
windowed.max(field="price")
windowed.aggregate(my_func)
windowed.apply(lambda window_data: process(window_data))
Watermarks
# Monotonous watermarks
stream.assign_timestamps(
extractor=lambda row: row["ts"],
watermark_strategy=liutang.WatermarkStrategy.monotonous(time_field="ts")
)
# Bounded out-of-orderness watermarks
stream.assign_timestamps(
extractor=lambda row: row["ts"],
watermark_strategy=liutang.WatermarkStrategy.bounded_out_of_orderness(2.0, time_field="ts")
)
Stateful Processing
# ValueState with TTL
state = ctx.get_state("my_state")
state.value = "hello"
# KeyedProcessFunction
class CountFunc(liutang.KeyedProcessFunction):
def process_element(self, value, ctx):
count = ctx.get_state("count")
count.value = (count.value or 0) + 1
return (ctx.current_key(), count.value)
# ListState / MapState / ReducingState / AggregatingState
ctx.get_list_state("events").add("event1")
ctx.get_map_state("counts").put("word", 5)
Timers
class TimerFunc(liutang.KeyedProcessFunction):
def process_element(self, value, ctx):
ctx.timer_service.register_event_time_timer(value["ts"] + 10.0)
return value
def on_timer(self, timestamp, ctx):
return {"alert": f"Timer fired at {timestamp}"}
Checkpointing
backend = liutang.MemoryStateBackend()
backend.set_value("key", "value")
snapshot = backend.checkpoint() # save state
backend2 = liutang.MemoryStateBackend()
backend2.restore(snapshot) # restore state
Sources
flow.from_collection([1, 2, 3, 4, 5]) # In-memory
flow.from_generator(gen, max_items=100) # Generator
flow.from_file("/data/input.csv", fmt="csv") # File (txt/csv/json)
flow.from_kafka(topic="events", bootstrap_servers="...") # Kafka (optional dep)
flow.from_source(liutang.DatagenSource(rows_per_second=100)) # Synthetic
flow.from_source(liutang.SocketSource(port=9999)) # Socket
Sinks
stream.print() # Print
sink = stream.collect() # Collect to list
stream.sink_to(liutang.CallbackSink(func=handle)) # Callback
stream.sink_to(liutang.FileSink(path="out.jsonl")) # File
stream.sink_to(liutang.KafkaSink(topic="results")) # Kafka (optional dep)
Streaming Mode
flow = liutang.Flow(name="realtime", mode=liutang.RuntimeMode.STREAMING)
stream = flow.from_kafka(topic="events")
result = stream.filter(lambda x: x["amount"] > 5000)
result.sink_to(liutang.CallbackSink(func=handle_alert))
handles = flow.execute() # returns immediately, runs in background
# ... streaming ...
handles["stop_events"]["source_0"].set() # graceful stop
Explain
flow = liutang.Flow(mode=liutang.RuntimeMode.BATCH)
stream = flow.from_collection([1, 2, 3]).map(lambda x: x * 2)
result = stream.filter(lambda x: x > 2)
result.print()
print(flow.explain())
# Output:
# Engine: liutang (pure Python)
# Mode: batch
# ...
Installation
# Base install — zero dependencies
pip install liutang
# With Kafka connector (optional)
pip install liutang[kafka]
# Development
pip install liutang[dev]
Project Structure
liutang/
├── src/liutang/
│ ├── __init__.py # Public API + version
│ ├── core/
│ │ ├── flow.py # Flow — pipeline definition & execution
│ │ ├── stream.py # Stream / KeyedStream / WindowedStream / TableStream
│ │ ├── schema.py # Schema / Field / FieldType
│ │ ├── window.py # WindowType (tumbling/sliding/session/over/global)
│ │ ├── connector.py # Source/Sink connectors (pure Python)
│ │ ├── state.py # Full state management (Value/List/Map/Reducing/Aggregating)
│ │ ├── errors.py # Exception hierarchy
│ │ └── (no external dependencies!)
│ └── engine/
│ ├── executor.py # Main executor (batch + streaming)
│ ├── runner.py # StreamRunner — parallel pipeline execution
│ └── watermark.py # WatermarkTracker
├── examples/ # Example pipelines
├── tests/ # 60 tests
├── upload_pypi.sh # Unix publish script
├── upload_pypi.bat # Windows publish script
└── pyproject.toml # Zero required dependencies
Version
Version is controlled exclusively in src/liutang/__init__.py. The publish scripts auto-bump the patch version.
License
GPL-3.0-or-later
Full Comparison
See COMPARISON.md — comprehensive comparison with PyFlink / PySpark / Beam / Bytewax / Faust / Streamz.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file liutang-0.1.0.tar.gz.
File metadata
- Download URL: liutang-0.1.0.tar.gz
- Upload date:
- Size: 43.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8f08a4b109b26fce18519935edc4502d60abd4dc7a37e5f1e51d4aaf2b29270
|
|
| MD5 |
b162435fbfc279d19d23543833fb4532
|
|
| BLAKE2b-256 |
0f3651d97131bc899038c348b34cdf26b2da4fba24a53479cf548a17c65a840c
|
File details
Details for the file liutang-0.1.0-py3-none-any.whl.
File metadata
- Download URL: liutang-0.1.0-py3-none-any.whl
- Upload date:
- Size: 34.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aa4e268bb08b0a2539b27837ab9bf40362ddec6908d0ade43a2b4593720b20c0
|
|
| MD5 |
cf715817d521bb965ad51a9fa103e4e9
|
|
| BLAKE2b-256 |
b396ecea03a90905718cb04353374f18067b0e18ec55d3416e521693c275a3bf
|