Skip to main content

JAF (Just Another Flow) - A streaming data processing system for JSON with lazy evaluation, composable operations, and a fluent API

Project description

JAF - Just Another Flow

PyPI version License: MIT Test Coverage Tests

JAF (Just Another Flow) is a powerful streaming data processing system for JSON/JSONL data with a focus on lazy evaluation, composability, and a fluent API.

Features

  • 🚀 Streaming Architecture - Process large datasets without loading everything into memory
  • 🔗 Lazy Evaluation - Build complex pipelines that only execute when needed
  • 🎯 Fluent API - Intuitive method chaining for readable code
  • 🧩 Composable - Combine operations freely, integrate with other tools
  • 📦 Multiple Sources - Files, directories, stdin, memory, compressed files, infinite streams
  • 🛠️ Unix Philosophy - Works great with pipes and other command-line tools

Installation

pip install jaf

Quick Start

Command Line

# Filter JSON data using S-expressions (lazy by default)
jaf filter users.jsonl '(gt? @age 25)'

# Or use JSON array syntax
jaf filter users.jsonl '["gt?", "@age", 25]'

# Or use infix DSL (note: paths need @ prefix)
jaf filter users.jsonl '@age > 25'

# Evaluate immediately with --eval
jaf filter users.jsonl '(gt? @age 25)' --eval

# Chain operations
jaf filter users.jsonl '(eq? @status "active")' | \
jaf map - "@email" | \
jaf eval -

# Complex queries with nested logic
jaf filter logs.jsonl '(and (eq? @level "ERROR") (gt? @timestamp "2024-01-01"))' --eval

# Combine with other tools
jaf filter logs.jsonl '(eq? @level "ERROR")' --eval | \
ja groupby service

Python API

from jaf import stream

# Build a pipeline
pipeline = stream("users.jsonl") \
    .filter(["gt?", "@age", 25]) \
    .map(["dict", "name", "@name", "email", "@email"]) \
    .take(10)

# Execute when ready
for user in pipeline.evaluate():
    print(user)

Core Concepts

Lazy Evaluation

Operations don't execute until you call .evaluate() or use --eval:

# This doesn't read any data yet
pipeline = stream("huge_file.jsonl") \
    .filter(["contains?", "@tags", "important"]) \
    .map("@message")

# Now it processes data
for message in pipeline.evaluate():
    process(message)

Query Language

JAF supports multiple query syntaxes for flexibility:

1. S-Expression Syntax (Lisp-like)

# Simple comparisons
(eq? @status "active")              # status == "active"
(gt? @age 25)                       # age > 25
(contains? @tags "python")          # "python" in tags

# Boolean logic
(and 
    (gte? @age 18)
    (eq? @verified true))

# Nested expressions
(or (eq? @role "admin") 
    (and (eq? @role "user") 
         (gt? @score 100)))

2. JSON Array Syntax

# Same queries in JSON array format
["eq?", "@status", "active"]
["gt?", "@age", 25]
["contains?", "@tags", "python"]

["and", 
    ["gte?", "@age", 18],
    ["eq?", "@verified", true]
]

3. Infix DSL Syntax

# Natural infix notation (paths need @ prefix)
@status == "active"
@age > 25 and @verified == true
@role == "admin" or (@role == "user" and @score > 100)

All three syntaxes compile to the same internal representation. Use whichever feels most natural for your use case!

Streaming Operations

  • filter - Keep items matching a predicate
  • map - Transform each item
  • take/skip - Limit or paginate results
  • batch - Group items into chunks
  • Boolean ops - AND, OR, NOT on filtered streams

Documentation

Examples

Log Analysis

# Find errors in specific services
errors = stream("app.log.jsonl") \
    .filter(["and",
        ["eq?", "@level", "ERROR"],
        ["in?", "@service", ["api", "auth"]]
    ]) \
    .map(["dict", 
        "time", "@timestamp",
        "service", "@service",
        "message", "@message"
    ]) \
    .evaluate()

Data Validation

# Find invalid records
invalid = stream("users.jsonl") \
    .filter(["or",
        ["not", ["exists?", "@email"]],
        ["not", ["regex-match?", "@email", "^[^@]+@[^@]+\\.[^@]+$"]]
    ]) \
    .evaluate()

ETL Pipeline

# Transform and filter data
pipeline = stream("raw_sales.jsonl") \
    .filter(["eq?", "@status", "completed"]) \
    .map(["dict",
        "date", ["date", "@timestamp"],
        "amount", "@amount",
        "category", ["if", ["gt?", "@amount", 1000], "high", "low"]
    ]) \
    .batch(1000)

# Process in chunks
for batch in pipeline.evaluate():
    bulk_insert(batch)

Integration

JAF works seamlessly with other tools:

# With jsonl-algebra
jaf filter orders.jsonl '["gt?", "@amount", 100]' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'

# With jq
jaf filter data.jsonl '["exists?", "@metadata"]' --eval | \
jq '.metadata'

# With standard Unix tools
jaf map users.jsonl "@email" --eval | sort | uniq -c

Performance

JAF is designed for streaming large datasets:

  • Processes one item at a time
  • Minimal memory footprint
  • Early termination (e.g., with take)
  • Efficient pipeline composition

Windowed Operations

JAF supports windowed operations for memory-efficient processing of large datasets:

  • distinct, groupby, join, intersect, except all support window_size parameter
  • Use window_size=float('inf') for exact results (default)
  • Finite windows trade accuracy for memory efficiency
  • Warning: For intersect/except, window size must be large enough to capture overlapping items
# Exact distinct (uses more memory)
stream("data.jsonl").distinct(window_size=float('inf'))

# Windowed distinct (bounded memory)
stream("data.jsonl").distinct(window_size=1000)

# Tumbling window groupby
stream("logs.jsonl").groupby(key="@level", window_size=100)

Future Work

Probabilistic Data Structures

  • Bloom Filters for memory-efficient approximate set operations (intersect, except, distinct)
  • Count-Min Sketch for frequency estimation and heavy hitters detection
  • HyperLogLog for cardinality estimation
  • These would provide controllable accuracy/memory tradeoffs with theoretical guarantees

Additional Features

  • Top-K operations - Find most frequent items in streams
  • Sampling strategies - Reservoir sampling, stratified sampling
  • Time-based windows - Process data in time intervals
  • Exactly-once semantics - Checkpointing and recovery
  • Parallel processing - Multi-threaded stream processing

Integrations

  • FastAPI - REST API for stream processing
  • Model Context Protocol (MCP) - LLM integration
  • Apache Kafka - Stream from/to Kafka topics
  • Cloud Storage - S3, GCS, Azure Blob support

Contributing

Contributions are welcome! Please read our Contributing Guide for details.

License

JAF is licensed under the MIT License. See LICENSE for details.

Related Projects

  • jsonl-algebra - Relational operations on JSONL
  • jq - Command-line JSON processor
  • dotsuite - Pedagogical ecosystem demonstrating the concepts behind JAF through simple, composable tools. Great for understanding the theory and building blocks that JAF productionizes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jaf-0.7.0.tar.gz (133.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

jaf-0.7.0-py3-none-any.whl (69.7 kB view details)

Uploaded Python 3

File details

Details for the file jaf-0.7.0.tar.gz.

File metadata

  • Download URL: jaf-0.7.0.tar.gz
  • Upload date:
  • Size: 133.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for jaf-0.7.0.tar.gz
Algorithm Hash digest
SHA256 fc5ad46a092850c12316c378947998ad5d6af17c6d1e18e4c30283744ae8c2b9
MD5 8dd60363e81649af9bc80347ce58f3c3
BLAKE2b-256 77a95a1f5dd5d7e6011d08f8dd436636a4fbef0ac6b3f9bbe4c955a088bd0e5b

See more details on using hashes here.

File details

Details for the file jaf-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: jaf-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 69.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for jaf-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3110f28bde4f2a3ff0ea351586a8035be5449b31c55eb820247525515cb4660c
MD5 59a0bb1a3be9065374b55da6cdea8159
BLAKE2b-256 c75af67b4eac2177409bcca199d98cd91639c7f9f0133502c1c8a158ae6a7249

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page