Real-time cryptocurrency and financial data ingestion system
Project description
StreamForge
Real-time cryptocurrency and financial data ingestion made simple.
StreamForge is a unified, async-first framework for ingesting real-time market data from cryptocurrency exchanges. Built with Python's asyncio, it offers high-performance data streaming, normalization, and multiple output formats.
Features
- Real-time WebSocket Streaming - Live market data from multiple exchanges
- Multi-Exchange Support - Binance, Kraken, OKX with unified API
- Multiple Output Formats - CSV, PostgreSQL, Kafka, or custom emitters
- Timeframe Aggregation - Automatic aggregation to higher timeframes
- Historical Backfilling - Load months of historical data effortlessly
- Data Transformation - Built-in transformers for custom data processing
- Stream Merging - Combine multiple exchanges into unified streams
- Type-Safe - Full type hints and Pydantic validation
Installation
pip install streamforge
Requirements: Python 3.8+
Quick Start
Stream Bitcoin price data in 3 lines:
import asyncio
import streamforge as sf
async def main():
# Configure what to stream
stream = sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
# Create runner and add logger
runner = sf.BinanceRunner(stream_input=stream)
runner.register_emitter(sf.Logger(prefix="Binance"))
# Start streaming!
await runner.run()
asyncio.run(main())
Output:
[Binance] BTCUSDT 1m | Open: 43,250.00 | High: 43,275.00 | Low: 43,240.00 | Close: 43,260.00
📖 Read the full documentation →
Supported Exchanges
| Exchange | Symbol Format | Type Name | Backfilling |
|---|---|---|---|
| Binance | BTCUSDT |
kline |
✓ |
| Kraken | BTC/USD |
ohlc |
Limited |
| OKX | BTC-USDT |
candle |
✓ |
Usage Examples
Save to CSV
import asyncio
import streamforge as sf
async def main():
runner = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
)
csv_emitter = sf.CSVEmitter(
source="Binance",
symbol="BTCUSDT",
timeframe="1m",
file_path="btc_data.csv"
)
runner.register_emitter(csv_emitter)
await runner.run()
asyncio.run(main())
Save to PostgreSQL
import asyncio
import streamforge as sf
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Float, BigInteger
Base = declarative_base()
class KlineTable(Base):
__tablename__ = 'klines'
source = Column(String, primary_key=True)
symbol = Column(String, primary_key=True)
timeframe = Column(String, primary_key=True)
open_ts = Column(BigInteger, primary_key=True)
end_ts = Column(BigInteger)
open = Column(Float)
high = Column(Float)
low = Column(Float)
close = Column(Float)
volume = Column(Float)
async def main():
postgres = (sf.PostgresEmitter(
host="localhost",
dbname="crypto",
user="postgres",
password="password"
)
.set_model(KlineTable)
.on_conflict(["source", "symbol", "timeframe", "open_ts"])
)
runner = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT", "ETHUSDT"],
timeframe="1m"
)
)
runner.register_emitter(postgres)
await runner.run()
asyncio.run(main())
Multi-Timeframe Aggregation
Stream 1-minute data and automatically create 5m, 15m, and 1h candles:
import asyncio
import streamforge as sf
async def main():
runner = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m",
aggregate_list=["5m", "15m", "1h"] # Auto-aggregate!
),
active_warmup=True # Required for aggregation
)
runner.register_emitter(sf.Logger(prefix="Multi-TF"))
await runner.run()
asyncio.run(main())
Historical Backfilling
Load historical data:
import streamforge as sf
backfiller = sf.BinanceBackfilling(
symbol="BTCUSDT",
timeframe="1h",
from_date="2024-01-01",
to_date="2024-12-31"
)
backfiller.register_emitter(postgres_emitter)
backfiller.run() # Downloads and saves year of data
Multi-Exchange Streaming
Merge data from multiple exchanges:
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams
async def main():
binance = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
)
okx = sf.OKXRunner(
stream_input=sf.DataInput(
type="candle",
symbols=["BTC-USDT"],
timeframe="1m"
)
)
async for data in merge_streams(binance, okx):
print(f"{data.source} | {data.symbol} | ${data.close:,.2f}")
asyncio.run(main())
Documentation
Full documentation: https://paulobueno90.github.io/streamforge/
Key Concepts
Runners
Connect to exchanges and manage data flow:
runner = sf.BinanceRunner(stream_input=stream) # Binance
runner = sf.KrakenRunner(stream_input=stream) # Kraken
runner = sf.OKXRunner(stream_input=stream) # OKX
Emitters
Define where data goes:
sf.Logger() # Print to console
sf.CSVEmitter() # Save to CSV
sf.PostgresEmitter() # Save to PostgreSQL
sf.KafkaEmitter() # Stream to Kafka
DataInput
Configure what to stream:
stream = sf.DataInput(
type="kline", # Data type
symbols=["BTCUSDT", "ETHUSDT"], # Trading pairs
timeframe="1m", # Candle interval
aggregate_list=["5m", "15m", "1h"] # Optional aggregation
)
Development
Install from Source
git clone https://github.com/paulobueno90/streamforge.git
cd streamforge
pip install -e ".[dev]"
Run Tests
pytest
Code Formatting
black streamforge/
isort streamforge/
flake8 streamforge/
Requirements
Core dependencies (installed automatically):
aiohttp- Async HTTP clientwebsockets- WebSocket clientsqlalchemy- SQL ORMpandas- Data manipulationpydantic- Data validationaiokafka- Kafka clientasyncpg- PostgreSQL driveraiolimiter- Rate limiting
Examples
Stream Multiple Symbols
import asyncio
import streamforge as sf
async def main():
runner = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT", "ETHUSDT", "SOLUSDT"],
timeframe="1m"
)
)
runner.register_emitter(sf.Logger(prefix="Crypto"))
await runner.run()
asyncio.run(main())
Multiple Output Destinations
import asyncio
import streamforge as sf
async def main():
runner = sf.BinanceRunner(
stream_input=sf.DataInput(
type="kline",
symbols=["BTCUSDT"],
timeframe="1m"
)
)
# Register multiple emitters - data goes to ALL
runner.register_emitter(sf.Logger(prefix="Monitor"))
runner.register_emitter(csv_emitter)
runner.register_emitter(postgres_emitter)
runner.register_emitter(kafka_emitter)
await runner.run()
asyncio.run(main())
Architecture
Exchange WebSocket → Runner → Normalizer → Processor → Aggregator → Transformer → Emitter(s)
- Runner - Manages WebSocket connections
- Normalizer - Standardizes data across exchanges
- Processor - Buffers and processes data
- Aggregator - Creates higher timeframe candles (optional)
- Transformer - Applies custom transformations (optional)
- Emitter - Outputs to your destination(s)
Learn more about architecture →
Use Cases
- Trading Bots - Real-time market data for algorithmic trading
- Data Analysis - Collect data for backtesting and research
- Price Monitoring - Track cryptocurrency prices across exchanges
- Arbitrage Detection - Find price differences between exchanges
- Market Research - Analyze market trends and patterns
- Portfolio Tracking - Monitor your cryptocurrency holdings
Contributing
Contributions are welcome! Please see our Contributing Guide.
Development Setup
- Fork the repository
- Clone your fork:
git clone https://github.com/YOUR_USERNAME/streamforge.git - Install dev dependencies:
pip install -e ".[dev]" - Create a branch:
git checkout -b feature/my-feature - Make changes and add tests
- Run tests:
pytest - Submit a pull request
Links
- Documentation: https://paulobueno90.github.io/streamforge/
- PyPI: https://pypi.org/project/streamforge/
- GitHub: https://github.com/paulobueno90/streamforge
- Issues: https://github.com/paulobueno90/streamforge/issues
- Changelog: CHANGELOG.md
License
MIT License - see LICENSE file for details.
Author
Paulo Bueno
Email: paulohmbueno@gmail.com
GitHub: @paulobueno90
Acknowledgments
Built with:
- aiohttp - Async HTTP
- websockets - WebSocket support
- Pydantic - Data validation
- SQLAlchemy - Database ORM
- Pandas - Data manipulation
Happy Streaming! 🚀
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamforge-0.1.1.tar.gz.
File metadata
- Download URL: streamforge-0.1.1.tar.gz
- Upload date:
- Size: 62.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2e77f092a6b66f614c8f40b6c13d7e61e0ae8878c31bb35cee85165c2b303dc0
|
|
| MD5 |
17eff6971002925db159f20207dda19d
|
|
| BLAKE2b-256 |
dde18e9e982611d25a2ad4d36bebb2b964e39a52364804d68c6639344ef63237
|
File details
Details for the file streamforge-0.1.1-py3-none-any.whl.
File metadata
- Download URL: streamforge-0.1.1-py3-none-any.whl
- Upload date:
- Size: 87.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd4c15c9fded826071fc1725a83ab47d8352710bf78e311244f13c9a3fffe630
|
|
| MD5 |
04b0eb448052314fbff72d84d822a979
|
|
| BLAKE2b-256 |
9b17cd5c9dcd7c5e84c45e6f98e806b133a181363870fde247fd25e85e9c494e
|