Skip to main content

Real-time cryptocurrency and financial data ingestion system

Project description

StreamForge

PyPI version Python Support License: MIT

Real-time cryptocurrency and financial data ingestion made simple.

StreamForge is a unified, async-first framework for ingesting real-time market data from cryptocurrency exchanges. Built with Python's asyncio, it offers high-performance data streaming, normalization, and multiple output formats.


Features

  • Real-time WebSocket Streaming - Live market data from multiple exchanges
  • Multi-Exchange Support - Binance, Kraken, OKX, Bybit with unified API
  • Multiple Output Formats - CSV, PostgreSQL, Kafka, or custom emitters
  • Timeframe Aggregation - Automatic aggregation to higher timeframes
  • Historical Backfilling - Load months of historical data effortlessly
  • Data Transformation - Built-in transformers for custom data processing
  • Stream Merging - Combine multiple exchanges into unified streams
  • Type-Safe - Full type hints and Pydantic validation

Installation

pip install streamforge

Requirements: Python 3.8+


Quick Start

Stream Bitcoin price data in 3 lines:

import asyncio
import streamforge as sf

async def main():
    # Configure what to stream
    stream = sf.DataInput(
        type="kline",
        symbols=["BTCUSDT"],
        timeframe="1m"
    )
    
    # Create runner
    runner = sf.BinanceRunner(stream_input=stream)
    # Internal logging handled by sf.config.logger
    
    # Start streaming!
    await runner.run()

asyncio.run(main())

Output:

[Binance] BTCUSDT 1m | Open: 43,250.00 | High: 43,275.00 | Low: 43,240.00 | Close: 43,260.00

📖 Read the full documentation →


Supported Exchanges

Exchange Symbol Format Type Name Backfilling
Binance BTCUSDT kline
Kraken BTC/USD ohlc Limited
OKX BTC-USDT candle
Bybit BTCUSDT kline

Usage Examples

Save to CSV

import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    csv_emitter = sf.CSVEmitter(
        source="Binance",
        symbol="BTCUSDT",
        timeframe="1m",
        file_path="btc_data.csv"
    )
    
    runner.register_emitter(csv_emitter)
    await runner.run()

asyncio.run(main())

Save to PostgreSQL

import asyncio
import streamforge as sf
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Float, BigInteger

Base = declarative_base()

class KlineTable(Base):
    __tablename__ = 'klines'
    source = Column(String, primary_key=True)
    symbol = Column(String, primary_key=True)
    timeframe = Column(String, primary_key=True)
    open_ts = Column(BigInteger, primary_key=True)
    end_ts = Column(BigInteger)
    open = Column(Float)
    high = Column(Float)
    low = Column(Float)
    close = Column(Float)
    volume = Column(Float)

async def main():
    postgres = (sf.PostgresEmitter(
            host="localhost",
            dbname="crypto",
            user="postgres",
            password="password"
        )
        .set_model(KlineTable)
        .on_conflict(["source", "symbol", "timeframe", "open_ts"])
    )
    
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT", "ETHUSDT"],
            timeframe="1m"
        )
    )
    
    runner.register_emitter(postgres)
    await runner.run()

asyncio.run(main())

Multi-Timeframe Aggregation

Stream 1-minute data and automatically create 5m, 15m, and 1h candles:

import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m",
            aggregate_list=["5m", "15m", "1h"]  # Auto-aggregate!
        ),
        active_warmup=True  # Required for aggregation
    )
    
    
    await runner.run()

asyncio.run(main())

Historical Backfilling

Load historical data:

import streamforge as sf

backfiller = sf.BinanceBackfilling(
    symbol="BTCUSDT",
    timeframe="1h",
    from_date="2024-01-01",
    to_date="2024-12-31"
)

backfiller.register_emitter(postgres_emitter)
backfiller.run()  # Downloads and saves year of data

Multi-Exchange Streaming

Merge data from multiple exchanges:

import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams

async def main():
    binance = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    okx = sf.OKXRunner(
        stream_input=sf.DataInput(
            type="candle",
            symbols=["BTC-USDT"],
            timeframe="1m"
        )
    )
    
    async for data in merge_streams(binance, okx):
        print(f"{data.source} | {data.symbol} | ${data.close:,.2f}")

asyncio.run(main())

Documentation

Full documentation: https://paulobueno90.github.io/streamforge/


Key Concepts

Runners

Connect to exchanges and manage data flow:

runner = sf.BinanceRunner(stream_input=stream)  # Binance
runner = sf.KrakenRunner(stream_input=stream)   # Kraken  
runner = sf.OKXRunner(stream_input=stream)      # OKX
runner = sf.BybitRunner(stream_input=stream)    # Bybit

Emitters

Define where data goes:

sf.CSVEmitter()          # Save to CSV
sf.PostgresEmitter()     # Save to PostgreSQL
sf.KafkaEmitter()        # Stream to Kafka

DataInput

Configure what to stream:

stream = sf.DataInput(
    type="kline",                           # Data type
    symbols=["BTCUSDT", "ETHUSDT"],        # Trading pairs
    timeframe="1m",                         # Candle interval
    aggregate_list=["5m", "15m", "1h"]     # Optional aggregation
)

Development

Install from Source

git clone https://github.com/paulobueno90/streamforge.git
cd streamforge
pip install -e ".[dev]"

Run Tests

pytest

Code Formatting

black streamforge/
isort streamforge/
flake8 streamforge/

Requirements

Core dependencies (installed automatically):

  • aiohttp - Async HTTP client
  • websockets - WebSocket client
  • sqlalchemy - SQL ORM
  • pandas - Data manipulation
  • pydantic - Data validation
  • aiokafka - Kafka client
  • asyncpg - PostgreSQL driver
  • aiolimiter - Rate limiting

Examples

Stream Multiple Symbols

import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT", "ETHUSDT", "SOLUSDT"],
            timeframe="1m"
        )
    )
    
    
    await runner.run()

asyncio.run(main())

Multiple Output Destinations

import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    # Register multiple emitters - data goes to ALL
    
    runner.register_emitter(csv_emitter)
    runner.register_emitter(postgres_emitter)
    runner.register_emitter(kafka_emitter)
    
    await runner.run()

asyncio.run(main())

See more examples →


Architecture

Exchange WebSocket → Runner → Normalizer → Processor → Aggregator → Transformer → Emitter(s)
  1. Runner - Manages WebSocket connections
  2. Normalizer - Standardizes data across exchanges
  3. Processor - Buffers and processes data
  4. Aggregator - Creates higher timeframe candles (optional)
  5. Transformer - Applies custom transformations (optional)
  6. Emitter - Outputs to your destination(s)

Learn more about architecture →


Use Cases

  • Trading Bots - Real-time market data for algorithmic trading
  • Data Analysis - Collect data for backtesting and research
  • Price Monitoring - Track cryptocurrency prices across exchanges
  • Arbitrage Detection - Find price differences between exchanges
  • Market Research - Analyze market trends and patterns
  • Portfolio Tracking - Monitor your cryptocurrency holdings

Contributing

Contributions are welcome! Please see our Contributing Guide.

Development Setup

  1. Fork the repository
  2. Clone your fork: git clone https://github.com/YOUR_USERNAME/streamforge.git
  3. Install dev dependencies: pip install -e ".[dev]"
  4. Create a branch: git checkout -b feature/my-feature
  5. Make changes and add tests
  6. Run tests: pytest
  7. Submit a pull request

Links


License

MIT License - see LICENSE file for details.


Author

Paulo Bueno
Email: paulohmbueno@gmail.com
GitHub: @paulobueno90


Acknowledgments

Built with:


Happy Streaming! 🚀

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamforge-0.1.3.tar.gz (80.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamforge-0.1.3-py3-none-any.whl (112.6 kB view details)

Uploaded Python 3

File details

Details for the file streamforge-0.1.3.tar.gz.

File metadata

  • Download URL: streamforge-0.1.3.tar.gz
  • Upload date:
  • Size: 80.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for streamforge-0.1.3.tar.gz
Algorithm Hash digest
SHA256 be6bd98b359220674436f00b7036b8204490e22fcd4649e0ea68ca114297915e
MD5 4692f38fd06cb10c2e237cf09859fd5e
BLAKE2b-256 c31c612e2507d0c4edffe04c1e7ec339c0c9ea95d9d3b38a07115186f9ca165e

See more details on using hashes here.

File details

Details for the file streamforge-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: streamforge-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 112.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for streamforge-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 26ec842746486e3fd00becbf1a924d49ed75472ab551c316de8d2bb4539685d2
MD5 7e15411e1cc545e06ca191bf702c1af9
BLAKE2b-256 da527c3599bcb369bc2672bb49c34b1e0e5673f5f2675f7f700b5a42c6163578

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page