Skip to main content

Real-time cryptocurrency and financial data ingestion system

Project description

StreamForge

PyPI version Python Support License: MIT

Real-time cryptocurrency and financial data ingestion made simple.

StreamForge is a unified, async-first framework for ingesting real-time market data from cryptocurrency exchanges. Built with Python's asyncio, it offers high-performance data streaming, normalization, and multiple output formats.


Features

  • Real-time WebSocket Streaming - Live market data from multiple exchanges
  • Multi-Exchange Support - Binance, Kraken, OKX with unified API
  • Multiple Output Formats - CSV, PostgreSQL, Kafka, or custom emitters
  • Timeframe Aggregation - Automatic aggregation to higher timeframes
  • Historical Backfilling - Load months of historical data effortlessly
  • Data Transformation - Built-in transformers for custom data processing
  • Stream Merging - Combine multiple exchanges into unified streams
  • Type-Safe - Full type hints and Pydantic validation

Installation

pip install streamforge

Requirements: Python 3.8+


Quick Start

Stream Bitcoin price data in 3 lines:

import asyncio
import streamforge as sf

async def main():
    # Configure what to stream
    stream = sf.DataInput(
        type="kline",
        symbols=["BTCUSDT"],
        timeframe="1m"
    )
    
    # Create runner and add logger
    runner = sf.BinanceRunner(stream_input=stream)
    runner.register_emitter(sf.Logger(prefix="Binance"))
    
    # Start streaming!
    await runner.run()

asyncio.run(main())

Output:

[Binance] BTCUSDT 1m | Open: 43,250.00 | High: 43,275.00 | Low: 43,240.00 | Close: 43,260.00

📖 Read the full documentation →


Supported Exchanges

Exchange Symbol Format Type Name Backfilling
Binance BTCUSDT kline
Kraken BTC/USD ohlc Limited
OKX BTC-USDT candle

Usage Examples

Save to CSV

import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    csv_emitter = sf.CSVEmitter(
        source="Binance",
        symbol="BTCUSDT",
        timeframe="1m",
        file_path="btc_data.csv"
    )
    
    runner.register_emitter(csv_emitter)
    await runner.run()

asyncio.run(main())

Save to PostgreSQL

import asyncio
import streamforge as sf
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Float, BigInteger

Base = declarative_base()

class KlineTable(Base):
    __tablename__ = 'klines'
    source = Column(String, primary_key=True)
    symbol = Column(String, primary_key=True)
    timeframe = Column(String, primary_key=True)
    open_ts = Column(BigInteger, primary_key=True)
    end_ts = Column(BigInteger)
    open = Column(Float)
    high = Column(Float)
    low = Column(Float)
    close = Column(Float)
    volume = Column(Float)

async def main():
    postgres = (sf.PostgresEmitter(
            host="localhost",
            dbname="crypto",
            user="postgres",
            password="password"
        )
        .set_model(KlineTable)
        .on_conflict(["source", "symbol", "timeframe", "open_ts"])
    )
    
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT", "ETHUSDT"],
            timeframe="1m"
        )
    )
    
    runner.register_emitter(postgres)
    await runner.run()

asyncio.run(main())

Multi-Timeframe Aggregation

Stream 1-minute data and automatically create 5m, 15m, and 1h candles:

import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m",
            aggregate_list=["5m", "15m", "1h"]  # Auto-aggregate!
        ),
        active_warmup=True  # Required for aggregation
    )
    
    runner.register_emitter(sf.Logger(prefix="Multi-TF"))
    await runner.run()

asyncio.run(main())

Historical Backfilling

Load historical data:

import streamforge as sf

backfiller = sf.BinanceBackfilling(
    symbol="BTCUSDT",
    timeframe="1h",
    from_date="2024-01-01",
    to_date="2024-12-31"
)

backfiller.register_emitter(postgres_emitter)
backfiller.run()  # Downloads and saves year of data

Multi-Exchange Streaming

Merge data from multiple exchanges:

import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams

async def main():
    binance = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    okx = sf.OKXRunner(
        stream_input=sf.DataInput(
            type="candle",
            symbols=["BTC-USDT"],
            timeframe="1m"
        )
    )
    
    async for data in merge_streams(binance, okx):
        print(f"{data.source} | {data.symbol} | ${data.close:,.2f}")

asyncio.run(main())

Documentation

Full documentation: https://paulobueno90.github.io/streamforge/


Key Concepts

Runners

Connect to exchanges and manage data flow:

runner = sf.BinanceRunner(stream_input=stream)  # Binance
runner = sf.KrakenRunner(stream_input=stream)   # Kraken  
runner = sf.OKXRunner(stream_input=stream)      # OKX

Emitters

Define where data goes:

sf.Logger()              # Print to console
sf.CSVEmitter()          # Save to CSV
sf.PostgresEmitter()     # Save to PostgreSQL
sf.KafkaEmitter()        # Stream to Kafka

DataInput

Configure what to stream:

stream = sf.DataInput(
    type="kline",                           # Data type
    symbols=["BTCUSDT", "ETHUSDT"],        # Trading pairs
    timeframe="1m",                         # Candle interval
    aggregate_list=["5m", "15m", "1h"]     # Optional aggregation
)

Development

Install from Source

git clone https://github.com/paulobueno90/streamforge.git
cd streamforge
pip install -e ".[dev]"

Run Tests

pytest

Code Formatting

black streamforge/
isort streamforge/
flake8 streamforge/

Requirements

Core dependencies (installed automatically):

  • aiohttp - Async HTTP client
  • websockets - WebSocket client
  • sqlalchemy - SQL ORM
  • pandas - Data manipulation
  • pydantic - Data validation
  • aiokafka - Kafka client
  • asyncpg - PostgreSQL driver
  • aiolimiter - Rate limiting

Examples

Stream Multiple Symbols

import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT", "ETHUSDT", "SOLUSDT"],
            timeframe="1m"
        )
    )
    
    runner.register_emitter(sf.Logger(prefix="Crypto"))
    await runner.run()

asyncio.run(main())

Multiple Output Destinations

import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    # Register multiple emitters - data goes to ALL
    runner.register_emitter(sf.Logger(prefix="Monitor"))
    runner.register_emitter(csv_emitter)
    runner.register_emitter(postgres_emitter)
    runner.register_emitter(kafka_emitter)
    
    await runner.run()

asyncio.run(main())

See more examples →


Architecture

Exchange WebSocket → Runner → Normalizer → Processor → Aggregator → Transformer → Emitter(s)
  1. Runner - Manages WebSocket connections
  2. Normalizer - Standardizes data across exchanges
  3. Processor - Buffers and processes data
  4. Aggregator - Creates higher timeframe candles (optional)
  5. Transformer - Applies custom transformations (optional)
  6. Emitter - Outputs to your destination(s)

Learn more about architecture →


Use Cases

  • Trading Bots - Real-time market data for algorithmic trading
  • Data Analysis - Collect data for backtesting and research
  • Price Monitoring - Track cryptocurrency prices across exchanges
  • Arbitrage Detection - Find price differences between exchanges
  • Market Research - Analyze market trends and patterns
  • Portfolio Tracking - Monitor your cryptocurrency holdings

Contributing

Contributions are welcome! Please see our Contributing Guide.

Development Setup

  1. Fork the repository
  2. Clone your fork: git clone https://github.com/YOUR_USERNAME/streamforge.git
  3. Install dev dependencies: pip install -e ".[dev]"
  4. Create a branch: git checkout -b feature/my-feature
  5. Make changes and add tests
  6. Run tests: pytest
  7. Submit a pull request

Links


License

MIT License - see LICENSE file for details.


Author

Paulo Bueno
Email: paulohmbueno@gmail.com
GitHub: @paulobueno90


Acknowledgments

Built with:


Happy Streaming! 🚀

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamforge-0.1.1.tar.gz (62.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamforge-0.1.1-py3-none-any.whl (87.1 kB view details)

Uploaded Python 3

File details

Details for the file streamforge-0.1.1.tar.gz.

File metadata

  • Download URL: streamforge-0.1.1.tar.gz
  • Upload date:
  • Size: 62.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for streamforge-0.1.1.tar.gz
Algorithm Hash digest
SHA256 2e77f092a6b66f614c8f40b6c13d7e61e0ae8878c31bb35cee85165c2b303dc0
MD5 17eff6971002925db159f20207dda19d
BLAKE2b-256 dde18e9e982611d25a2ad4d36bebb2b964e39a52364804d68c6639344ef63237

See more details on using hashes here.

File details

Details for the file streamforge-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: streamforge-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 87.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for streamforge-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 bd4c15c9fded826071fc1725a83ab47d8352710bf78e311244f13c9a3fffe630
MD5 04b0eb448052314fbff72d84d822a979
BLAKE2b-256 9b17cd5c9dcd7c5e84c45e6f98e806b133a181363870fde247fd25e85e9c494e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page