Python SDK for Tinybird Forward
Project description
tinybird-sdk (Python)
Note: This package is experimental. APIs may change between versions.
A Python SDK for defining Tinybird resources with a TypeScript-SDK-like workflow. Define your datasources, pipes, and queries in Python and sync them directly to Tinybird.
Installation
pip install tinybird-sdk
Requirements
- Python
>=3.11 - Server-side usage only (do not expose Tinybird credentials in browser code)
Quick Start
1. Initialize your project
tinybird init
This creates:
tinybird.config.json- Configuration filelib/datasources.py- Define your datasourceslib/pipes.py- Define your pipes/endpointslib/client.py- Your Tinybird client module
2. Configure your token
Create a .env.local file:
TINYBIRD_TOKEN=p.your_token_here
3. Define your datasources
# lib/datasources.py
from tinybird_sdk import define_datasource, t, engine
page_views = define_datasource(
"page_views",
{
"description": "Page view tracking data",
"schema": {
"timestamp": t.date_time(),
"pathname": t.string(),
"session_id": t.string(),
"country": t.string().low_cardinality().nullable(),
},
"engine": engine.merge_tree(
{
"sorting_key": ["pathname", "timestamp"],
}
),
},
)
4. Define your endpoints
# lib/pipes.py
from tinybird_sdk import define_endpoint, node, p, t
top_pages = define_endpoint(
"top_pages",
{
"description": "Get the most visited pages",
"params": {
"start_date": p.date_time(),
"end_date": p.date_time(),
"limit": p.int32().optional(10),
},
"nodes": [
node(
{
"name": "aggregated",
"sql": """
SELECT pathname, count() AS views
FROM page_views
WHERE timestamp >= {{DateTime(start_date)}}
AND timestamp <= {{DateTime(end_date)}}
GROUP BY pathname
ORDER BY views DESC
LIMIT {{Int32(limit, 10)}}
""",
}
)
],
"output": {
"pathname": t.string(),
"views": t.uint64(),
},
},
)
5. Create your client
# lib/client.py
from tinybird_sdk import Tinybird
from .datasources import page_views
from .pipes import top_pages
tinybird = Tinybird(
{
"datasources": {"page_views": page_views},
"pipes": {"top_pages": top_pages},
}
)
__all__ = ["tinybird", "page_views", "top_pages"]
6. Optional: use a stable local import path
In larger applications, keep a single module (for example lib/client.py) and import from there:
from lib.client import tinybird
7. Start development
tinybird dev
This watches your schema files and syncs changes to Tinybird.
8. Use the client
from lib.client import tinybird
# Ingest one row
tinybird.page_views.ingest(
{
"timestamp": "2024-01-15 10:30:00",
"pathname": "/home",
"session_id": "abc123",
"country": "US",
}
)
# Query endpoint
result = tinybird.top_pages.query(
{
"start_date": "2024-01-01 00:00:00",
"end_date": "2024-01-31 23:59:59",
"limit": 5,
}
)
9. Manage datasource rows
from lib.client import tinybird
# Datasource accessors support: ingest, append, replace, delete, truncate
tinybird.page_views.ingest(
{
"timestamp": "2024-01-15 10:30:00",
"pathname": "/pricing",
"session_id": "session_123",
"country": "US",
}
)
tinybird.page_views.append(
{
"url": "https://example.com/page_views.csv",
}
)
tinybird.page_views.replace(
{
"url": "https://example.com/page_views_full_snapshot.csv",
}
)
tinybird.page_views.delete(
{
"delete_condition": "country = 'XX'",
}
)
tinybird.page_views.delete(
{
"delete_condition": "country = 'XX'",
"dry_run": True,
}
)
tinybird.page_views.truncate()
Public Tinybird API (Optional)
If you want a low-level API wrapper decoupled from the high-level client layer,
use create_tinybird_api() directly with base_url and token:
from tinybird_sdk import create_tinybird_api
api = create_tinybird_api(
{
"base_url": "https://api.tinybird.co",
"token": "p.your_token",
}
)
# Query endpoint pipe
top_pages = api.query(
"top_pages",
{
"start_date": "2024-01-01",
"end_date": "2024-01-31",
"limit": 5,
},
)
# Ingest one row
api.ingest(
"events",
{
"timestamp": "2024-01-15 10:30:00",
"event_name": "page_view",
"pathname": "/home",
},
)
# Ingest retry behavior (disabled by default):
# - 429 retries use Retry-After / X-RateLimit-Reset headers.
# - 503 retries use SDK default exponential backoff.
api.ingest(
"events",
{
"timestamp": "2024-01-15 10:31:00",
"event_name": "button_click",
"pathname": "/pricing",
},
{
"max_retries": 3,
},
)
# Import rows from URL/file
api.append_datasource(
"events",
{
"url": "https://example.com/events.csv",
},
)
# Delete rows matching a SQL condition
api.delete_datasource(
"events",
{
"delete_condition": "event_name = 'test'",
},
)
# Delete dry run
api.delete_datasource(
"events",
{
"delete_condition": "event_name = 'test'",
"dry_run": True,
},
)
# Truncate datasource
api.truncate_datasource("events")
# Execute raw SQL
sql_result = api.sql("SELECT count() AS total FROM events")
# Optional per-request token override
workspace_response = api.request_json(
"/v1/workspace",
token="p.branch_or_jwt_token",
)
This Tinybird API is standalone and can be used without create_client() or Tinybird(...).
JWT Token Creation
Create short-lived JWT tokens for secure scoped access to Tinybird resources.
from datetime import datetime, timedelta, timezone
from tinybird_sdk import create_client
client = create_client(
{
"base_url": "https://api.tinybird.co",
"token": "p.your_admin_token",
}
)
result = client.tokens.create_jwt(
{
"name": "user_123_session",
"expires_at": datetime.now(tz=timezone.utc) + timedelta(hours=1),
"scopes": [
{
"type": "PIPES:READ",
"resource": "user_dashboard",
"fixed_params": {"user_id": 123},
}
],
"limits": {"rps": 10},
}
)
jwt_token = result["token"]
Scope Types
| Scope | Description |
|---|---|
PIPES:READ |
Read access to a specific pipe endpoint |
DATASOURCES:READ |
Read access to a datasource (with optional filter) |
DATASOURCES:APPEND |
Append access to a datasource |
Scope Options
fixed_params: For pipes, embed parameters that cannot be overridden by the caller.filter: For datasources, append a SQL WHERE clause (for example,"org_id = 'acme'").
CLI Commands
This package installs tinybird as a runtime dependency.
tinybird generate is handled by this SDK; other commands are delegated to the Tinybird CLI.
tinybird init
Initialize a new Tinybird project:
tinybird init
tinybird init --force
tinybird init --skip-login
tinybird migrate
Migrate local Tinybird datafiles (.datasource, .pipe, .connection) into a Python definitions file.
tinybird migrate "tinybird/**/*.datasource" "tinybird/**/*.pipe" "tinybird/**/*.connection"
tinybird migrate tinybird/legacy --out ./tinybird.migration.py
tinybird migrate tinybird --dry-run
tinybird dev
tinybird dev
tinybird dev --local
tinybird dev --branch
tinybird build
tinybird build
tinybird build --dry-run
tinybird build --local
tinybird build --branch
tinybird deploy
tinybird deploy
tinybird deploy --check
tinybird deploy --allow-destructive-operations
tinybird pull
tinybird pull
tinybird pull --output-dir ./tinybird-datafiles
tinybird pull --force
tinybird login
tinybird login
tinybird branch
tinybird branch list
tinybird branch status
tinybird branch delete <name>
tinybird info
tinybird info
tinybird info --json
Configuration
Create a tinybird.config.json (or tinybird.config.py / tinybird_config.py for dynamic logic) in your project root:
{
"include": [
"lib/*.py",
"tinybird/**/*.datasource",
"tinybird/**/*.pipe",
"tinybird/**/*.connection"
],
"token": "${TINYBIRD_TOKEN}",
"base_url": "https://api.tinybird.co",
"dev_mode": "branch"
}
You can mix Python files with raw .datasource, .pipe, and .connection files for incremental migration.
include supports glob patterns.
Config File Formats
Supported config files (search order):
| File | Description |
|---|---|
tinybird.config.py |
Python config with dynamic logic |
tinybird_config.py |
Python config alias |
tinybird.config.json |
JSON config (default for new projects) |
tinybird.json |
Legacy JSON config |
For Python configs, export one of:
configdictCONFIGdictdefaultdictget_config()returning a dict
Example:
# tinybird.config.py
config = {
"include": ["lib/*.py"],
"token": "${TINYBIRD_TOKEN}",
"base_url": "https://api.tinybird.co",
"dev_mode": "branch",
}
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
include |
list[str] |
required | File paths or glob patterns for Python and raw datafiles |
token |
str |
required | API token; supports ${ENV_VAR} interpolation. If missing, SDK falls back to TINYBIRD_TOKEN, then .tinyb |
base_url |
str |
"https://api.tinybird.co" |
Tinybird API URL |
dev_mode |
"branch" | "local" |
"branch" |
Development mode |
If base_url is omitted, SDK resolves it from TINYBIRD_URL, then TINYBIRD_HOST, then .tinyb (host), and finally defaults to https://api.tinybird.co.
Local Development Mode
Use a local Tinybird container for development without affecting cloud workspaces:
-
Start the local container:
docker run -d -p 7181:7181 --name tinybird-local tinybirdco/tinybird-local:latest
-
Configure your project:
{ "dev_mode": "local" }
Or use CLI flag:
tinybird dev --local
Defining Resources
Connections
from tinybird_sdk import define_gcs_connection, define_kafka_connection, define_s3_connection, secret
events_kafka = define_kafka_connection(
"events_kafka",
{
"bootstrap_servers": "kafka.example.com:9092",
"security_protocol": "SASL_SSL",
"sasl_mechanism": "PLAIN",
"key": secret("KAFKA_KEY"),
"secret": secret("KAFKA_SECRET"),
},
)
landing_s3 = define_s3_connection(
"landing_s3",
{
"region": "us-east-1",
"arn": "arn:aws:iam::123456789012:role/tinybird-s3-access",
},
)
landing_gcs = define_gcs_connection(
"landing_gcs",
{
"service_account_credentials_json": secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),
},
)
Datasources
from tinybird_sdk import define_datasource, engine, t
events = define_datasource(
"events",
{
"description": "Event tracking data",
"schema": {
"timestamp": t.date_time(),
"event_name": t.string().low_cardinality(),
"user_id": t.string().nullable(),
"properties": t.string(),
},
"engine": engine.merge_tree(
{
"sorting_key": ["event_name", "timestamp"],
"partition_key": "toYYYYMM(timestamp)",
"ttl": "timestamp + INTERVAL 90 DAY",
}
),
},
)
Endpoints (API pipes)
from tinybird_sdk import define_endpoint, node, p, t
top_events = define_endpoint(
"top_events",
{
"description": "Get the most frequent events",
"params": {
"start_date": p.date_time(),
"end_date": p.date_time(),
"limit": p.int32().optional(10),
},
"nodes": [
node(
{
"name": "aggregated",
"sql": """
SELECT event_name, count() AS event_count
FROM events
WHERE timestamp >= {{DateTime(start_date)}}
AND timestamp <= {{DateTime(end_date)}}
GROUP BY event_name
ORDER BY event_count DESC
LIMIT {{Int32(limit, 10)}}
""",
}
)
],
"output": {
"event_name": t.string(),
"event_count": t.uint64(),
},
},
)
Internal Pipes (not exposed as API)
from tinybird_sdk import define_pipe, node, p
filtered_events = define_pipe(
"filtered_events",
{
"description": "Filter events by date range",
"params": {
"start_date": p.date_time(),
"end_date": p.date_time(),
},
"nodes": [
node(
{
"name": "filtered",
"sql": """
SELECT * FROM events
WHERE timestamp >= {{DateTime(start_date)}}
AND timestamp <= {{DateTime(end_date)}}
""",
}
)
],
},
)
Materialized Views
from tinybird_sdk import define_datasource, define_materialized_view, engine, node, t
daily_stats = define_datasource(
"daily_stats",
{
"schema": {
"date": t.date(),
"pathname": t.string(),
"views": t.simple_aggregate_function("sum", t.uint64()),
"unique_sessions": t.aggregate_function("uniq", t.string()),
},
"engine": engine.aggregating_merge_tree({"sorting_key": ["date", "pathname"]}),
},
)
daily_stats_mv = define_materialized_view(
"daily_stats_mv",
{
"datasource": daily_stats,
"nodes": [
node(
{
"name": "aggregate",
"sql": """
SELECT
toDate(timestamp) AS date,
pathname,
count() AS views,
uniqState(session_id) AS unique_sessions
FROM page_views
GROUP BY date, pathname
""",
}
)
],
},
)
Copy Pipes
from tinybird_sdk import define_copy_pipe, node
# Scheduled copy pipe
daily_snapshot = define_copy_pipe(
"daily_snapshot",
{
"datasource": events,
"copy_schedule": "0 0 * * *",
"copy_mode": "append",
"nodes": [
node(
{
"name": "snapshot",
"sql": """
SELECT today() AS snapshot_date, event_name, count() AS events
FROM events
WHERE toDate(timestamp) = today() - 1
GROUP BY event_name
""",
}
)
],
},
)
# On-demand copy pipe
manual_report = define_copy_pipe(
"manual_report",
{
"datasource": events,
"copy_schedule": "@on-demand",
"copy_mode": "replace",
"nodes": [
node(
{
"name": "report",
"sql": "SELECT * FROM events WHERE timestamp >= now() - interval 7 day",
}
)
],
},
)
Sink Pipes
Use sink pipes to publish query results to external systems. The SDK supports Kafka and S3 sinks.
from tinybird_sdk import define_sink_pipe, node
# Kafka sink
kafka_events_sink = define_sink_pipe(
"kafka_events_sink",
{
"sink": {
"connection": events_kafka,
"topic": "events_export",
"schedule": "@on-demand",
},
"nodes": [
node(
{
"name": "publish",
"sql": "SELECT timestamp, payload FROM kafka_events",
}
)
],
},
)
# S3 sink
s3_events_sink = define_sink_pipe(
"s3_events_sink",
{
"sink": {
"connection": landing_s3,
"bucket_uri": "s3://my-bucket/exports/",
"file_template": "events_{date}",
"format": "csv",
"schedule": "@once",
"strategy": "create_new",
"compression": "gzip",
},
"nodes": [
node(
{
"name": "export",
"sql": "SELECT timestamp, session_id FROM s3_landing",
}
)
],
},
)
Static Tokens
from tinybird_sdk import define_datasource, define_endpoint, define_token, node, t
app_token = define_token("app_read")
ingest_token = define_token("ingest_token")
events = define_datasource(
"events",
{
"schema": {
"timestamp": t.date_time(),
"event_name": t.string(),
},
"tokens": [
{"token": app_token, "scope": "READ"},
{"token": ingest_token, "scope": "APPEND"},
],
},
)
top_events = define_endpoint(
"top_events",
{
"nodes": [node({"name": "endpoint", "sql": "SELECT * FROM events LIMIT 10"})],
"output": {"timestamp": t.date_time(), "event_name": t.string()},
"tokens": [{"token": app_token, "scope": "READ"}],
},
)
Type Validators
Use t.* to define column types:
from tinybird_sdk import t
schema = {
# Strings
"name": t.string(),
"id": t.uuid(),
"code": t.fixed_string(3),
# Numbers
"count": t.int32(),
"amount": t.float64(),
"big_number": t.uint64(),
"price": t.decimal(10, 2),
# Date/Time
"created_at": t.date_time(),
"updated_at": t.date_time64(3),
"birth_date": t.date(),
# Boolean
"is_active": t.bool(),
# Complex types
"tags": t.array(t.string()),
"metadata": t.map(t.string(), t.string()),
# Aggregate functions
"total": t.simple_aggregate_function("sum", t.uint64()),
"unique_users": t.aggregate_function("uniq", t.string()),
# Modifiers
"optional_field": t.string().nullable(),
"category": t.string().low_cardinality(),
"status": t.string().default("pending"),
}
Parameter Validators
Use p.* to define query parameters:
from tinybird_sdk import p
params = {
"start_date": p.date_time(),
"user_id": p.string(),
"limit": p.int32().optional(10),
"offset": p.int32().optional(0),
"status": p.string().optional("active").describe("Filter by status"),
}
Engine Configurations
from tinybird_sdk import engine
engine.merge_tree(
{
"sorting_key": ["user_id", "timestamp"],
"partition_key": "toYYYYMM(timestamp)",
"ttl": "timestamp + INTERVAL 90 DAY",
}
)
engine.replacing_merge_tree(
{
"sorting_key": ["id"],
"ver": "updated_at",
}
)
engine.summing_merge_tree(
{
"sorting_key": ["date", "category"],
"columns": ["count", "total"],
}
)
engine.aggregating_merge_tree(
{
"sorting_key": ["date"],
}
)
Python App Integration
For Python web apps (FastAPI, Django, Flask), keep Tinybird definitions and client in a dedicated module and import that module from your app services.
The CLI automatically loads .env.local and .env files in project root when resolving configuration.
Schema Inference Helpers
The tinybird_sdk.infer module can inspect datasource and pipe definitions:
from tinybird_sdk.infer import infer_output_schema, infer_params_schema, infer_row_schema
row_schema = infer_row_schema(page_views)
params_schema = infer_params_schema(top_pages)
output_schema = infer_output_schema(top_pages)
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tinybird_sdk-0.1.5.tar.gz.
File metadata
- Download URL: tinybird_sdk-0.1.5.tar.gz
- Upload date:
- Size: 80.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
96f94d6ef71efbb3066e6051be0da535dd22bf9a4ae4692fe1db347ec4833d39
|
|
| MD5 |
82528a815474a6cee4c512744bc86422
|
|
| BLAKE2b-256 |
aa87e39a5535931f67d539dbe757bcb74535f2f3c8a7001d719c77f1b3a84a5e
|
File details
Details for the file tinybird_sdk-0.1.5-py3-none-any.whl.
File metadata
- Download URL: tinybird_sdk-0.1.5-py3-none-any.whl
- Upload date:
- Size: 119.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
27854dd80b94cf25b4426736d4af8a4d95dddaefefcd4f3df55d9e96aa06a495
|
|
| MD5 |
d115d8d7da564f576065ed7e8316a06b
|
|
| BLAKE2b-256 |
66badd1362b81ef047b150f6b5ba98e18c708361a7937e06997b60dece5b77f9
|