Universal Python connector library for databases, files, cloud storage, and APIs with production-grade features
Project description
SourceBridgeKit
Universal Python Connector Library for Databases, Files, Cloud Storage, and APIs
Version: 0.2.0
What is SourceBridgeKit?
SourceBridgeKit is a standard, reusable Python connector framework that provides:
- One Common API for all data sources (MySQL, Azure Blob, REST APIs, Excel, etc.)
- Configurable Everything - drivers, timeouts, connection pools, retry logic
- Environment Variable Support - secure credential management via
${VAR:default}syntax - Pandas & Polars Output - fetch data in your preferred DataFrame format
- Batch Operations - memory-efficient reads/writes for large datasets
- Incremental Loading - configurable strategies for change detection
- Production Ready - retry logic, circuit breakers, connection pooling, SSL verification
SourceBridgeKit focuses on source access only - no preprocessing, no transformations, just clean data movement.
Quick Start
Installation
# Core library only
pip install sourcebridgekit
# With MySQL support
pip install sourcebridgekit[mysql]
# With Azure Blob Storage
pip install sourcebridgekit[azure]
# Everything
pip install sourcebridgekit[all]
Basic Usage
from sourcebridgekit import connect
# Connect with explicit config
with connect('mysql', config={
'host': 'localhost',
'database': 'analytics',
'username': 'app_user',
'password': '${MYSQL_PASSWORD}', # From environment
}) as conn:
result = conn.read('SELECT * FROM orders LIMIT 1000', output='pandas')
df = result.data
# Or use environment prefix
with connect('mysql', env_prefix='MYSQL_') as conn:
result = conn.read('SELECT * FROM orders', output='polars')
df_pl = result.data
Features
Supported Connectors (V1)
| Category | Connectors |
|---|---|
| Databases | MySQL, PostgreSQL, MSSQL, ClickHouse, MongoDB, Elasticsearch |
| Files | CSV, JSON/JSONL, Excel, Parquet |
| Cloud | Azure Blob Storage, Azure Data Lake Gen2 |
| APIs | REST API (with pagination and curl parsing) |
Output Formats
- pandas - pandas DataFrame
- polars - Polars DataFrame
- arrow - PyArrow Table
- records - List of dictionaries
- raw - Driver-native format
Core Capabilities
✅ Connection management (connect, disconnect, test)
✅ Data operations (read, write, batch read/write)
✅ Metadata discovery (list databases, tables, describe schema)
✅ Incremental loading (high watermark, timestamp, file modified time)
✅ Checkpoint management (memory, JSON file, SQLite)
✅ Retry logic with exponential backoff
✅ Circuit breaker pattern
✅ Connection pooling
✅ SSL/TLS verification
✅ Secret redaction in logs
Usage Examples
MySQL Connector
from sourcebridgekit import connect
from sourcebridgekit.connectors.sql import MySQLConfig
config = MySQLConfig(
host='${MYSQL_HOST:localhost}',
port=3306,
database='analytics',
username='${MYSQL_USER}',
password='${MYSQL_PASSWORD}',
driver='pymysql', # or 'mysql-connector'
pool={'enabled': True, 'pool_size': 10},
retry={'enabled': True, 'max_attempts': 3}
)
with connect('mysql', config=config) as conn:
# Simple read
result = conn.read('SELECT * FROM orders WHERE status = "active"', output='pandas')
# Batch read for large tables
for batch in conn.read_batch('SELECT * FROM large_table', batch_size=10000):
process(batch.data)
# Write data
conn.write(df, target='staging.new_orders', mode='append')
# Metadata
print(conn.list_tables(database='analytics'))
schema = conn.describe_table('orders')
Azure Blob Storage
from sourcebridgekit import connect
config = {
'account_name': '${AZURE_STORAGE_ACCOUNT}',
'container_name': 'data',
'connection_string': '${AZURE_STORAGE_CONNECTION_STRING}',
}
with connect('azure_blob', config=config) as conn:
# Read file
result = conn.read('data/sales/2026/sales.csv', output='pandas')
# Write file
conn.write(df, target='data/output/processed.parquet', format='parquet')
# List files
files = conn.list_files(prefix='data/sales/', pattern='*.csv')
REST API with Pagination
from sourcebridgekit import connect
config = {
'base_url': 'https://api.example.com',
'auth_type': 'bearer',
'auth_token': '${API_TOKEN}',
'pagination': {
'enabled': True,
'type': 'page',
'page_size': 100,
'max_pages': 50
}
}
with connect('rest_api', config=config) as conn:
result = conn.read('/v1/users', params={'status': 'active'}, output='pandas')
df = result.data
Incremental Loading
from sourcebridgekit import connect
incremental_config = {
'enabled': True,
'strategy': 'high_watermark',
'cursor_column': 'updated_at',
'checkpoint_key': 'tenant_a.orders',
'lookback_seconds': 300,
'checkpoint_store': {'type': 'sqlite', 'path': './checkpoints.db'}
}
with connect('mysql', config=mysql_config) as conn:
result = conn.read_incremental(
table='orders',
incremental=incremental_config,
output='polars'
)
# Library automatically tracks checkpoint
print(f"Fetched {result.row_count} new rows")
print(f"New checkpoint: {result.checkpoint}")
Curl to REST API
from sourcebridgekit.connectors.api import RestConfig
# Parse curl command into structured config
config = RestConfig.from_curl('''
curl -X POST https://api.example.com/orders \
-H "Authorization: Bearer ${API_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"status":"active"}'
''')
with connect('rest_api', config=config) as conn:
result = conn.read(output='records')
Configuration
Environment Variables
All configs support ${VAR_NAME} or ${VAR_NAME:default} syntax:
config = {
'host': '${DB_HOST:localhost}', # Fallback to 'localhost'
'port': '${DB_PORT:5432}',
'password': '${DB_PASSWORD}', # Required, no default
}
Secrets Management
Sensitive fields use SecretStr and are redacted from logs:
from pydantic import SecretStr
config = MySQLConfig(
password=SecretStr('secret123') # Redacted in logs
)
Connection Pooling
config = MySQLConfig(
pool={
'enabled': True,
'pool_size': 10,
'max_overflow': 20,
'pool_timeout': 30,
'pool_recycle': 3600
}
)
Retry & Circuit Breaker
config = MySQLConfig(
retry={
'enabled': True,
'max_attempts': 3,
'backoff_factor': 2.0,
'timeout_seconds': 30
},
circuit_breaker={
'enabled': True,
'failure_threshold': 5,
'recovery_timeout': 60
}
)
FetchResult Standard
All read operations return a FetchResult object:
result = conn.read('SELECT * FROM orders', output='pandas')
result.data # pandas DataFrame
result.output_format # 'pandas'
result.row_count # Number of rows
result.columns # List of column names
result.schema # Column types
result.execution_time_ms # Query execution time
result.checkpoint # Incremental checkpoint (if applicable)
result.metadata # Additional metadata
result.warnings # Any warnings
Incremental Strategies
| Strategy | Description | Best For |
|---|---|---|
high_watermark |
Track max value of cursor column | SQL databases, APIs |
incrementing_id |
Track max ID value | Append-only tables |
timestamp_with_lookback |
Timestamp + safety window | Distributed systems |
file_modified_time |
Track file modification time | Local files, object storage |
checksum_or_etag |
Detect changes by hash | Files, object storage |
Checkpoint Stores
| Store | Use Case |
|---|---|
memory |
Testing only (state lost on restart) |
json_file |
Simple local jobs |
sqlite |
Default persistent checkpoint store |
Security
✅ SSL/TLS verification enabled by default
✅ Secrets redacted from logs and exceptions
✅ No raw shell command execution
✅ Parameterized SQL queries
✅ Configurable timeouts
✅ SecretStr for sensitive fields
Roadmap
- V1 (Current): Core connectors, batch operations, incremental loading
- V2 (Planned): Async support, Redis/PostgreSQL checkpoint stores, OAuth2, OpenTelemetry
- V3 (Future): CDC (binlog, logical replication), Kafka/RabbitMQ, distributed execution
Development
# Clone and install in dev mode
git clone https://github.com/yourorg/sourcebridgekit
cd sourcebridgekit
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=sourcebridgekit --cov-report=html
# Format code
black sourcebridgekit/
ruff check sourcebridgekit/
License
MIT License
Support
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sourcebridgekit-0.2.0.tar.gz.
File metadata
- Download URL: sourcebridgekit-0.2.0.tar.gz
- Upload date:
- Size: 67.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
37dccd3f73fa4a1b0e1182f7ff20daa60795d9fe1eb5294b2e70010d9abc1a5c
|
|
| MD5 |
f33d9a22ea252f84f0121b2208123d08
|
|
| BLAKE2b-256 |
fb95df227557840394148851908bbbb1f21432a2868ef07e3989a9dc1664fdb1
|
Provenance
The following attestation bundles were made for sourcebridgekit-0.2.0.tar.gz:
Publisher:
publish.yml on sreeyenan/sourcebridgekit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sourcebridgekit-0.2.0.tar.gz -
Subject digest:
37dccd3f73fa4a1b0e1182f7ff20daa60795d9fe1eb5294b2e70010d9abc1a5c - Sigstore transparency entry: 1601941744
- Sigstore integration time:
-
Permalink:
sreeyenan/sourcebridgekit@58163487317ad07d8f05d2818509231fe99d64b0 -
Branch / Tag:
refs/heads/release/v0.2.0 - Owner: https://github.com/sreeyenan
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@58163487317ad07d8f05d2818509231fe99d64b0 -
Trigger Event:
push
-
Statement type:
File details
Details for the file sourcebridgekit-0.2.0-py3-none-any.whl.
File metadata
- Download URL: sourcebridgekit-0.2.0-py3-none-any.whl
- Upload date:
- Size: 68.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f4ef3c312a94650bc61fdac25f9b63978b6e3c8c395b2d02eba9605d86ee19ad
|
|
| MD5 |
33ece403a928e420a6e37ae6acfc94b7
|
|
| BLAKE2b-256 |
da08e83fa38ce5a741eabc98a9e5153df1c30ca06601a75eb799253230e35d93
|
Provenance
The following attestation bundles were made for sourcebridgekit-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on sreeyenan/sourcebridgekit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sourcebridgekit-0.2.0-py3-none-any.whl -
Subject digest:
f4ef3c312a94650bc61fdac25f9b63978b6e3c8c395b2d02eba9605d86ee19ad - Sigstore transparency entry: 1601941758
- Sigstore integration time:
-
Permalink:
sreeyenan/sourcebridgekit@58163487317ad07d8f05d2818509231fe99d64b0 -
Branch / Tag:
refs/heads/release/v0.2.0 - Owner: https://github.com/sreeyenan
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@58163487317ad07d8f05d2818509231fe99d64b0 -
Trigger Event:
push
-
Statement type: