Skip to main content

Extract and load your data reliably from API Clients with native fault-tolerant and checkpointing mechanism.

Project description

bizon ⚡️

Extract and load your largest data streams with a framework you can trust for billion records.

Features

  • Natively fault-tolerant: Bizon uses a checkpointing mechanism to keep track of the progress and recover from the last checkpoint.

  • High throughput: Bizon is designed to handle high throughput and can process billions of records.

  • Queue system agnostic: Bizon is agnostic of the queuing system, you can use any queuing system among Python Queue, RabbitMQ, Kafka or Redpanda. Thanks to the bizon.engine.queue.Queue interface, adapters can be written for any queuing system.

  • Pipeline metrics: Bizon provides exhaustive pipeline metrics and implement Datadog & OpenTelemetry for tracing. You can monitor:

    • ETAs for completion
    • Number of records processed
    • Completion percentage
    • Latency Source <> Destination
  • Lightweight & lean: Bizon is lightweight, minimal codebase and only uses few dependencies:

    • requests for HTTP requests
    • pyyaml for configuration
    • sqlalchemy for database / warehouse connections
    • polars for memory efficient data buffering and vectorized processing
    • pyarrow for Parquet file format

Installation

For Users

pip install bizon

# With optional dependencies
pip install bizon[postgres]    # PostgreSQL backend
pip install bizon[kafka]       # Kafka queue
pip install bizon[bigquery]    # BigQuery backend/destination
pip install bizon[rabbitmq]    # RabbitMQ queue

For Development

# Install uv (if not already installed)
pip install uv

# Clone and install
git clone https://github.com/bizon-data/bizon-core.git
cd bizon-core
uv sync --all-extras --all-groups

# Run tests
uv run pytest tests/

# Format code
uv run ruff format .
uv run ruff check --fix .

Usage

List available sources and streams

bizon source list
bizon stream list <source_name>

Create a pipeline

Create a file named config.yml in your working directory with the following content:

name: demo-creatures-pipeline

source:
  name: dummy
  stream: creatures
  authentication:
    type: api_key
    params:
      token: dummy_key

destination:
  name: logger
  config:
    dummy: dummy

Run the pipeline with the following command:

bizon run config.yml

Backend configuration

Backend is the interface used by Bizon to store its state. It can be configured in the backend section of the configuration file. The following backends are supported:

  • sqlite: In-memory SQLite database, useful for testing and development.
  • bigquery: Google BigQuery backend, perfect for light setup & production.
  • postgres: PostgreSQL backend, for production use and frequent cursor updates.

Queue configuration

Queue is the interface used by Bizon to exchange data between Source and Destination. It can be configured in the queue section of the configuration file. The following queues are supported:

  • python_queue: Python Queue, useful for testing and development.
  • rabbitmq: RabbitMQ, for production use and high throughput.
  • kafka: Apache Kafka, for production use and high throughput and strong persistence.

Runner configuration

Runner is the interface used by Bizon to run the pipeline. It can be configured in the runner section of the configuration file. The following runners are supported:

  • thread (asynchronous)
  • process (asynchronous)
  • stream (synchronous)

Sync Modes

Bizon supports three sync modes:

  • full_refresh: Re-syncs all data from scratch on each run
  • incremental: Syncs only new/updated data since the last successful run
  • stream: Continuous streaming mode for real-time data (e.g., Kafka)

Incremental Sync

Incremental sync fetches only new or updated records since the last successful run, using an append-only strategy.

Configuration

source:
  name: your_source
  stream: your_stream
  sync_mode: incremental
  cursor_field: updated_at  # The timestamp field to filter records by

How It Works

┌─────────────────────────────────────────────────────────────────────┐
│                        INCREMENTAL SYNC FLOW                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  1. Producer checks for last successful job                         │
│     └─> Backend.get_last_successful_stream_job()                    │
│                                                                     │
│  2. If found, creates SourceIncrementalState:                       │
│     └─> last_run = previous_job.created_at                          │
│     └─> cursor_field = config.cursor_field (e.g., "updated_at")     │
│                                                                     │
│  3. Calls source.get_records_after(source_state, pagination)        │
│     └─> Source filters: WHERE cursor_field > last_run               │
│                                                                     │
│  4. Records written to temp table: {table}_incremental              │
│                                                                     │
│  5. finalize() appends temp table to main table                     │
│     └─> INSERT INTO main_table SELECT * FROM temp_table             │
│     └─> Deletes temp table                                          │
│                                                                     │
│  FIRST RUN: No previous job → falls back to get() (full refresh)    │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Configuration Options

Option Required Description Example
sync_mode Yes Set to incremental incremental
cursor_field Yes Timestamp field to filter by updated_at, last_edited_time, modified_at

Supported Sources

Sources must implement get_records_after() to support incremental sync:

Source Cursor Field Notes
notion last_edited_time Supports pages, databases, blocks, blocks_markdown streams
(others) Varies Check source docs or implement get_records_after()

Supported Destinations

Destinations must implement finalize() with incremental logic:

Destination Support Notes
bigquery Append-only via temp table
bigquery_streaming_v2 Append-only via temp table
file Appends to existing file
logger Logs completion

Example: Notion Incremental Sync

name: notion_incremental_sync

source:
  name: notion
  stream: blocks_markdown
  sync_mode: incremental
  cursor_field: last_edited_time
  authentication:
    type: api_key
    params:
      token: secret_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

  database_ids:
    - "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

  # Optional: filter which pages to sync
  database_filters:
    "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx":
      property: "Status"
      select:
        equals: "Published"

destination:
  name: bigquery
  config:
    project_id: my-gcp-project
    dataset_id: notion_data
    dataset_location: US

engine:
  backend:
    type: bigquery
    database: my-gcp-project
    schema: bizon_backend
    syncCursorInDBEvery: 2

First Run Behavior

On the first incremental run (no previous successful job):

  • Falls back to get() method (full refresh behavior)
  • All data is fetched and loaded
  • Job is marked as successful
  • Subsequent runs use get_records_after() with last_run timestamp

Start syncing your data 🚀

Quick setup without any dependencies ✌️

Queue configuration can be set to python_queue and backend configuration to sqlite. This will allow you to test the pipeline without any external dependencies.

Local Kafka setup

To test the pipeline with Kafka, you can use docker compose to setup Kafka or Redpanda locally.

Kafka

docker compose --file ./scripts/kafka-compose.yml up # Kafka
docker compose --file ./scripts/redpanda-compose.yml up # Redpanda

In your YAML configuration, set the queue configuration to Kafka under engine:

engine:
  queue:
    type: kafka
    config:
      queue:
        bootstrap_server: localhost:9092 # Kafka:9092 & Redpanda: 19092

RabbitMQ

docker compose --file ./scripts/rabbitmq-compose.yml up

In your YAML configuration, set the queue configuration to Kafka under engine:

engine:
  queue:
    type: rabbitmq
    config:
      queue:
        host: localhost
        queue_name: bizon

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bizon-0.3.4.tar.gz (299.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bizon-0.3.4-py3-none-any.whl (166.5 kB view details)

Uploaded Python 3

File details

Details for the file bizon-0.3.4.tar.gz.

File metadata

  • Download URL: bizon-0.3.4.tar.gz
  • Upload date:
  • Size: 299.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for bizon-0.3.4.tar.gz
Algorithm Hash digest
SHA256 790f5d75fcadd4ff679dd2e3e27d37fa11294818b9b84030f9348fd6e3c35094
MD5 743dd6af8fd57510fcc8b86ef7b8e2e2
BLAKE2b-256 faa136a4f91a676cff6f2fa0f74dfa54bd169ec0884c9f55f73be36e685e1212

See more details on using hashes here.

File details

Details for the file bizon-0.3.4-py3-none-any.whl.

File metadata

  • Download URL: bizon-0.3.4-py3-none-any.whl
  • Upload date:
  • Size: 166.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for bizon-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 7483414901c8d26bbb0739480b60a7f722bf184b382a3163716f66e0af488fb1
MD5 ed11f3c5c390115941372f3d3e454aae
BLAKE2b-256 e471e68cec7e24097e1552b3fbc259c3599f217b7f10630711977c0a691b871a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page