Extract and load your data reliably from API Clients with native fault-tolerant and checkpointing mechanism.
Project description
bizon ⚡️
Extract and load your largest data streams with a framework you can trust for billion records.
Features
-
Natively fault-tolerant: Bizon uses a checkpointing mechanism to keep track of the progress and recover from the last checkpoint.
-
High throughput: Bizon is designed to handle high throughput and can process billions of records.
-
Queue system agnostic: Bizon is agnostic of the queuing system, you can use any queuing system among Python Queue, RabbitMQ, Kafka or Redpanda. Thanks to the
bizon.engine.queue.Queueinterface, adapters can be written for any queuing system. -
Pipeline metrics: Bizon provides exhaustive pipeline metrics and implement Datadog & OpenTelemetry for tracing. You can monitor:
- ETAs for completion
- Number of records processed
- Completion percentage
- Latency Source <> Destination
-
Lightweight & lean: Bizon is lightweight, minimal codebase and only uses few dependencies:
requestsfor HTTP requestspyyamlfor configurationsqlalchemyfor database / warehouse connectionspolarsfor memory efficient data buffering and vectorized processingpyarrowfor Parquet file format
Installation
For Users
pip install bizon
# With optional dependencies
pip install bizon[postgres] # PostgreSQL backend
pip install bizon[kafka] # Kafka queue
pip install bizon[bigquery] # BigQuery backend/destination
pip install bizon[rabbitmq] # RabbitMQ queue
For Development
# Install uv (if not already installed)
pip install uv
# Clone and install
git clone https://github.com/bizon-data/bizon-core.git
cd bizon-core
uv sync --all-extras --all-groups
# Run tests
uv run pytest tests/
# Format code
uv run ruff format .
uv run ruff check --fix .
Usage
List available sources and streams
bizon source list
bizon stream list <source_name>
Create a pipeline
Create a file named config.yml in your working directory with the following content:
name: demo-creatures-pipeline
source:
name: dummy
stream: creatures
authentication:
type: api_key
params:
token: dummy_key
destination:
name: logger
config:
dummy: dummy
Run the pipeline with the following command:
bizon run config.yml
Backend configuration
Backend is the interface used by Bizon to store its state. It can be configured in the backend section of the configuration file. The following backends are supported:
sqlite: In-memory SQLite database, useful for testing and development.bigquery: Google BigQuery backend, perfect for light setup & production.postgres: PostgreSQL backend, for production use and frequent cursor updates.
Queue configuration
Queue is the interface used by Bizon to exchange data between Source and Destination. It can be configured in the queue section of the configuration file. The following queues are supported:
python_queue: Python Queue, useful for testing and development.rabbitmq: RabbitMQ, for production use and high throughput.kafka: Apache Kafka, for production use and high throughput and strong persistence.
Runner configuration
Runner is the interface used by Bizon to run the pipeline. It can be configured in the runner section of the configuration file. The following runners are supported:
thread(asynchronous)process(asynchronous)stream(synchronous)
Sync Modes
Bizon supports three sync modes:
full_refresh: Re-syncs all data from scratch on each runincremental: Syncs only new/updated data since the last successful runstream: Continuous streaming mode for real-time data (e.g., Kafka)
Incremental Sync
Incremental sync fetches only new or updated records since the last successful run, using an append-only strategy.
Configuration
source:
name: your_source
stream: your_stream
sync_mode: incremental
cursor_field: updated_at # The timestamp field to filter records by
How It Works
┌─────────────────────────────────────────────────────────────────────┐
│ INCREMENTAL SYNC FLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Producer checks for last successful job │
│ └─> Backend.get_last_successful_stream_job() │
│ │
│ 2. If found, creates SourceIncrementalState: │
│ └─> last_run = previous_job.created_at │
│ └─> cursor_field = config.cursor_field (e.g., "updated_at") │
│ │
│ 3. Calls source.get_records_after(source_state, pagination) │
│ └─> Source filters: WHERE cursor_field > last_run │
│ │
│ 4. Records written to temp table: {table}_incremental │
│ │
│ 5. finalize() appends temp table to main table │
│ └─> INSERT INTO main_table SELECT * FROM temp_table │
│ └─> Deletes temp table │
│ │
│ FIRST RUN: No previous job → falls back to get() (full refresh) │
│ │
└─────────────────────────────────────────────────────────────────────┘
Configuration Options
| Option | Required | Description | Example |
|---|---|---|---|
sync_mode |
Yes | Set to incremental |
incremental |
cursor_field |
Yes | Timestamp field to filter by | updated_at, last_edited_time, modified_at |
Supported Sources
Sources must implement get_records_after() to support incremental sync:
| Source | Cursor Field | Notes |
|---|---|---|
notion |
last_edited_time |
Supports pages, databases, blocks, blocks_markdown streams |
| (others) | Varies | Check source docs or implement get_records_after() |
Supported Destinations
Destinations must implement finalize() with incremental logic:
| Destination | Support | Notes |
|---|---|---|
bigquery |
✅ | Append-only via temp table |
bigquery_streaming_v2 |
✅ | Append-only via temp table |
file |
✅ | Appends to existing file |
logger |
✅ | Logs completion |
Example: Notion Incremental Sync
name: notion_incremental_sync
source:
name: notion
stream: blocks_markdown
sync_mode: incremental
cursor_field: last_edited_time
authentication:
type: api_key
params:
token: secret_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
database_ids:
- "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
# Optional: filter which pages to sync
database_filters:
"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx":
property: "Status"
select:
equals: "Published"
destination:
name: bigquery
config:
project_id: my-gcp-project
dataset_id: notion_data
dataset_location: US
engine:
backend:
type: bigquery
database: my-gcp-project
schema: bizon_backend
syncCursorInDBEvery: 2
First Run Behavior
On the first incremental run (no previous successful job):
- Falls back to
get()method (full refresh behavior) - All data is fetched and loaded
- Job is marked as successful
- Subsequent runs use
get_records_after()withlast_runtimestamp
Start syncing your data 🚀
Quick setup without any dependencies ✌️
Queue configuration can be set to python_queue and backend configuration to sqlite.
This will allow you to test the pipeline without any external dependencies.
Local Kafka setup
To test the pipeline with Kafka, you can use docker compose to setup Kafka or Redpanda locally.
Kafka
docker compose --file ./scripts/kafka-compose.yml up # Kafka
docker compose --file ./scripts/redpanda-compose.yml up # Redpanda
In your YAML configuration, set the queue configuration to Kafka under engine:
engine:
queue:
type: kafka
config:
queue:
bootstrap_server: localhost:9092 # Kafka:9092 & Redpanda: 19092
RabbitMQ
docker compose --file ./scripts/rabbitmq-compose.yml up
In your YAML configuration, set the queue configuration to Kafka under engine:
engine:
queue:
type: rabbitmq
config:
queue:
host: localhost
queue_name: bizon
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bizon-0.3.2.tar.gz.
File metadata
- Download URL: bizon-0.3.2.tar.gz
- Upload date:
- Size: 298.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d1d6cf4c9d05311d9bef78249df8b66630b3802da640ca2fb11976ea09649b24
|
|
| MD5 |
48ebfdc684459f05851df9f217ffce61
|
|
| BLAKE2b-256 |
b95fd57cea352a4d75b9a7ca2f403dfb292ed5ec6ab97351c7c9eda7248ec88d
|
File details
Details for the file bizon-0.3.2-py3-none-any.whl.
File metadata
- Download URL: bizon-0.3.2-py3-none-any.whl
- Upload date:
- Size: 165.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a06654f28255733239a60e4785a539b710b04ebadd9c1ad3481d4b5e9ce5440b
|
|
| MD5 |
786c89bc7f3994e84eb317954b174e2c
|
|
| BLAKE2b-256 |
7104f343269cb70395996363f04e3d1946f9ef527354582e31f62787e2728a3a
|