Skip to main content

流淌 - A pure-Python streaming data framework with native concurrency, watermark, windowing, and stateful processing

Project description

LiuTang (流淌)

A pure-Python streaming data framework with native concurrency, watermark, windowing, and stateful processing. Zero external dependencies.

流淌 (liútǎng) means "to flow" — data flows like water.

Design Principles

  • Zero dependenciespip install liutang needs only Python stdlib; no version hell
  • Native concurrency — local engine uses threading / multiprocessing / concurrent.futures
  • Full streaming — Watermark, event-time windows, keyed state, timers, checkpointing — all pure Python
  • Unified API — Same Flow / Stream API for batch and streaming modes

Quick Start

import liutang

# Batch mode — zero dependencies
flow = liutang.Flow(name="word-count", mode=liutang.RuntimeMode.BATCH)
flow.set_parallelism(4)

stream = flow.from_collection(["hello world", "hello liutang", "data flows"])

result = (
    stream
    .flat_map(lambda line: line.split())    # split words
    .map(lambda w: (w.lower(), 1))          # word -> (word, 1)
    .key_by(lambda pair: pair[0])           # group by word
)

sink = result.collect()  # collect results
flow.execute()

for item in sink.results:
    print(item)

Core Features

Windowing

# Tumbling window
stream.window(liutang.WindowType.tumbling(size=10.0, time_field="ts"))

# Sliding window
stream.window(liutang.WindowType.sliding(size=10.0, slide=5.0, time_field="ts"))

# Session window
stream.window(liutang.WindowType.session(gap=30.0, time_field="ts"))

# Window aggregations
windowed = stream.window(liutang.WindowType.tumbling(size=60.0, time_field="ts"))
windowed.sum(field="amount")
windowed.count()
windowed.min(field="price")
windowed.max(field="price")
windowed.aggregate(my_func)
windowed.apply(lambda window_data: process(window_data))

Watermarks

# Monotonous watermarks
stream.assign_timestamps(
    extractor=lambda row: row["ts"],
    watermark_strategy=liutang.WatermarkStrategy.monotonous(time_field="ts")
)

# Bounded out-of-orderness watermarks
stream.assign_timestamps(
    extractor=lambda row: row["ts"],
    watermark_strategy=liutang.WatermarkStrategy.bounded_out_of_orderness(2.0, time_field="ts")
)

Stateful Processing

# ValueState with TTL
state = ctx.get_state("my_state")
state.value = "hello"

# KeyedProcessFunction
class CountFunc(liutang.KeyedProcessFunction):
    def process_element(self, value, ctx):
        count = ctx.get_state("count")
        count.value = (count.value or 0) + 1
        return (ctx.current_key(), count.value)

# ListState / MapState / ReducingState / AggregatingState
ctx.get_list_state("events").add("event1")
ctx.get_map_state("counts").put("word", 5)

Timers

class TimerFunc(liutang.KeyedProcessFunction):
    def process_element(self, value, ctx):
        ctx.timer_service.register_event_time_timer(value["ts"] + 10.0)
        return value

    def on_timer(self, timestamp, ctx):
        return {"alert": f"Timer fired at {timestamp}"}

Checkpointing

backend = liutang.MemoryStateBackend()
backend.set_value("key", "value")
snapshot = backend.checkpoint()  # save state
backend2 = liutang.MemoryStateBackend()
backend2.restore(snapshot)       # restore state

Sources

flow.from_collection([1, 2, 3, 4, 5])                # In-memory
flow.from_generator(gen, max_items=100)                # Generator
flow.from_file("/data/input.csv", fmt="csv")            # File (txt/csv/json)
flow.from_kafka(topic="events", bootstrap_servers="...")  # Kafka (optional dep)
flow.from_source(liutang.DatagenSource(rows_per_second=100))  # Synthetic
flow.from_source(liutang.SocketSource(port=9999))       # Socket

Sinks

stream.print()                                           # Print
sink = stream.collect()                                  # Collect to list
stream.sink_to(liutang.CallbackSink(func=handle))        # Callback
stream.sink_to(liutang.FileSink(path="out.jsonl"))       # File
stream.sink_to(liutang.KafkaSink(topic="results"))       # Kafka (optional dep)

Streaming Mode

flow = liutang.Flow(name="realtime", mode=liutang.RuntimeMode.STREAMING)
stream = flow.from_kafka(topic="events")
result = stream.filter(lambda x: x["amount"] > 5000)
result.sink_to(liutang.CallbackSink(func=handle_alert))

handles = flow.execute()  # returns immediately, runs in background
# ... streaming ...
handles["stop_events"]["source_0"].set()  # graceful stop

Explain

flow = liutang.Flow(mode=liutang.RuntimeMode.BATCH)
stream = flow.from_collection([1, 2, 3]).map(lambda x: x * 2)
result = stream.filter(lambda x: x > 2)
result.print()
print(flow.explain())
# Output:
# Engine: liutang (pure Python)
# Mode: batch
# ...

Installation

# Base install — zero dependencies
pip install liutang

# With Kafka connector (optional)
pip install liutang[kafka]

# Development
pip install liutang[dev]

Project Structure

liutang/
├── src/liutang/
│   ├── __init__.py              # Public API + version
│   ├── core/
│   │   ├── flow.py              # Flow — pipeline definition & execution
│   │   ├── stream.py            # Stream / KeyedStream / WindowedStream / TableStream
│   │   ├── schema.py            # Schema / Field / FieldType
│   │   ├── window.py            # WindowType (tumbling/sliding/session/over/global)
│   │   ├── connector.py         # Source/Sink connectors (pure Python)
│   │   ├── state.py             # Full state management (Value/List/Map/Reducing/Aggregating)
│   │   ├── errors.py            # Exception hierarchy
│   │   └── (no external dependencies!)
│   └── engine/
│       ├── executor.py           # Main executor (batch + streaming)
│       ├── runner.py            # StreamRunner — parallel pipeline execution
│       └── watermark.py         # WatermarkTracker
├── examples/                    # Example pipelines
├── tests/                       # 60 tests
├── upload_pypi.sh               # Unix publish script
├── upload_pypi.bat              # Windows publish script
└── pyproject.toml               # Zero required dependencies

Version

Version is controlled exclusively in src/liutang/__init__.py. The publish scripts auto-bump the patch version.

License

GPL-3.0-or-later

Full Comparison

See COMPARISON.md — comprehensive comparison with PyFlink / PySpark / Beam / Bytewax / Faust / Streamz.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

liutang-0.1.0.tar.gz (43.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

liutang-0.1.0-py3-none-any.whl (34.5 kB view details)

Uploaded Python 3

File details

Details for the file liutang-0.1.0.tar.gz.

File metadata

  • Download URL: liutang-0.1.0.tar.gz
  • Upload date:
  • Size: 43.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for liutang-0.1.0.tar.gz
Algorithm Hash digest
SHA256 a8f08a4b109b26fce18519935edc4502d60abd4dc7a37e5f1e51d4aaf2b29270
MD5 b162435fbfc279d19d23543833fb4532
BLAKE2b-256 0f3651d97131bc899038c348b34cdf26b2da4fba24a53479cf548a17c65a840c

See more details on using hashes here.

File details

Details for the file liutang-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: liutang-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 34.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for liutang-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 aa4e268bb08b0a2539b27837ab9bf40362ddec6908d0ade43a2b4593720b20c0
MD5 cf715817d521bb965ad51a9fa103e4e9
BLAKE2b-256 b396ecea03a90905718cb04353374f18067b0e18ec55d3416e521693c275a3bf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page