JAF (Just Another Flow) - A streaming data processing system for JSON with lazy evaluation, composable operations, and a fluent API
Project description
JAF - Just Another Flow
JAF (Just Another Flow) is a powerful streaming data processing system for JSON/JSONL data with a focus on lazy evaluation, composability, and a fluent API.
Features
- 🚀 Streaming Architecture - Process large datasets without loading everything into memory
- 🔗 Lazy Evaluation - Build complex pipelines that only execute when needed
- 🎯 Fluent API - Intuitive method chaining for readable code
- 🧩 Composable - Combine operations freely, integrate with other tools
- 📦 Multiple Sources - Files, directories, stdin, memory, compressed files, infinite streams
- 🛠️ Unix Philosophy - Works great with pipes and other command-line tools
Installation
pip install jaf
Quick Start
Command Line
# Filter JSON data (lazy by default)
jaf filter users.jsonl '["gt?", "@age", 25]'
# Evaluate immediately
jaf filter users.jsonl '["gt?", "@age", 25]' --eval
# Chain operations
jaf filter users.jsonl '["eq?", "@status", "active"]' | \
jaf map - "@email" | \
jaf eval -
# Combine with other tools
jaf filter logs.jsonl '["eq?", "@level", "ERROR"]' --eval | \
ja groupby service
Python API
from jaf import stream
# Build a pipeline
pipeline = stream("users.jsonl") \
.filter(["gt?", "@age", 25]) \
.map(["dict", "name", "@name", "email", "@email"]) \
.take(10)
# Execute when ready
for user in pipeline.evaluate():
print(user)
Core Concepts
Lazy Evaluation
Operations don't execute until you call .evaluate() or use --eval:
# This doesn't read any data yet
pipeline = stream("huge_file.jsonl") \
.filter(["contains?", "@tags", "important"]) \
.map("@message")
# Now it processes data
for message in pipeline.evaluate():
process(message)
Query Language
JAF uses S-expression syntax for queries:
# Simple comparisons
["eq?", "@status", "active"] # status == "active"
["gt?", "@age", 25] # age > 25
["contains?", "@tags", "python"] # "python" in tags
# Boolean logic
["and",
["gte?", "@age", 18],
["eq?", "@verified", true]
]
# Path navigation with @
["eq?", "@user.profile.name", "Alice"] # Nested access
["any", "@items.*.inStock"] # Wildcard
["exists?", "@**.error"] # Recursive search
Streaming Operations
- filter - Keep items matching a predicate
- map - Transform each item
- take/skip - Limit or paginate results
- batch - Group items into chunks
- Boolean ops - AND, OR, NOT on filtered streams
Documentation
- Getting Started - Installation and first steps
- API Guide - Complete Python API reference
- Query Language - Query syntax and operators
- CLI Reference - Command-line usage
- Cookbook - Practical examples
Examples
Log Analysis
# Find errors in specific services
errors = stream("app.log.jsonl") \
.filter(["and",
["eq?", "@level", "ERROR"],
["in?", "@service", ["api", "auth"]]
]) \
.map(["dict",
"time", "@timestamp",
"service", "@service",
"message", "@message"
]) \
.evaluate()
Data Validation
# Find invalid records
invalid = stream("users.jsonl") \
.filter(["or",
["not", ["exists?", "@email"]],
["not", ["regex-match?", "@email", "^[^@]+@[^@]+\\.[^@]+$"]]
]) \
.evaluate()
ETL Pipeline
# Transform and filter data
pipeline = stream("raw_sales.jsonl") \
.filter(["eq?", "@status", "completed"]) \
.map(["dict",
"date", ["date", "@timestamp"],
"amount", "@amount",
"category", ["if", ["gt?", "@amount", 1000], "high", "low"]
]) \
.batch(1000)
# Process in chunks
for batch in pipeline.evaluate():
bulk_insert(batch)
Integration
JAF works seamlessly with other tools:
# With jsonl-algebra
jaf filter orders.jsonl '["gt?", "@amount", 100]' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'
# With jq
jaf filter data.jsonl '["exists?", "@metadata"]' --eval | \
jq '.metadata'
# With standard Unix tools
jaf map users.jsonl "@email" --eval | sort | uniq -c
Performance
JAF is designed for streaming large datasets:
- Processes one item at a time
- Minimal memory footprint
- Early termination (e.g., with
take) - Efficient pipeline composition
Contributing
Contributions are welcome! Please read our Contributing Guide for details.
License
JAF is licensed under the MIT License. See LICENSE for details.
Related Projects
- jsonl-algebra - Relational operations on JSONL
- jq - Command-line JSON processor
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jaf-0.6.1.tar.gz.
File metadata
- Download URL: jaf-0.6.1.tar.gz
- Upload date:
- Size: 96.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4141ce5e7f18b705943727212549f0d51f143d2d4be3385d20443e68600662ff
|
|
| MD5 |
3464911bd01b5c4f83a1f58c5252bfba
|
|
| BLAKE2b-256 |
c5fa4da92108a656def5653904006294a3361cfd45f99bd15ee68ee762f391ab
|
File details
Details for the file jaf-0.6.1-py3-none-any.whl.
File metadata
- Download URL: jaf-0.6.1-py3-none-any.whl
- Upload date:
- Size: 65.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f47ea68f466eeddb6fb4435124514c3cc1d2d2c18b457fabcf852ae5108b437b
|
|
| MD5 |
1377841d3a15767cf4389eb8e63c9991
|
|
| BLAKE2b-256 |
99ce05483f56ded2b6bd5365f649edb414dd537cbba91d4a130a03997d26f34f
|