Skip to main content

Universal Python connector library for databases, files, cloud storage, and APIs with production-grade features

Project description

SourceBridgeKit

Universal Python Connector Library for Databases, Files, Cloud Storage, and APIs

Version: 0.2.0


What is SourceBridgeKit?

SourceBridgeKit is a standard, reusable Python connector framework that provides:

  • One Common API for all data sources (MySQL, Azure Blob, REST APIs, Excel, etc.)
  • Configurable Everything - drivers, timeouts, connection pools, retry logic
  • Environment Variable Support - secure credential management via ${VAR:default} syntax
  • Pandas & Polars Output - fetch data in your preferred DataFrame format
  • Batch Operations - memory-efficient reads/writes for large datasets
  • Incremental Loading - configurable strategies for change detection
  • Production Ready - retry logic, circuit breakers, connection pooling, SSL verification

SourceBridgeKit focuses on source access only - no preprocessing, no transformations, just clean data movement.


Quick Start

Installation

# Core library only
pip install sourcebridgekit

# With MySQL support
pip install sourcebridgekit[mysql]

# With Azure Blob Storage
pip install sourcebridgekit[azure]

# Everything
pip install sourcebridgekit[all]

Basic Usage

from sourcebridgekit import connect

# Connect with explicit config
with connect('mysql', config={
    'host': 'localhost',
    'database': 'analytics',
    'username': 'app_user',
    'password': '${MYSQL_PASSWORD}',  # From environment
}) as conn:
    result = conn.read('SELECT * FROM orders LIMIT 1000', output='pandas')
    df = result.data

# Or use environment prefix
with connect('mysql', env_prefix='MYSQL_') as conn:
    result = conn.read('SELECT * FROM orders', output='polars')
    df_pl = result.data

Features

Supported Connectors (V1)

Category Connectors
Databases MySQL, PostgreSQL, MSSQL, ClickHouse, MongoDB, Elasticsearch
Files CSV, JSON/JSONL, Excel, Parquet
Cloud Azure Blob Storage, Azure Data Lake Gen2
APIs REST API (with pagination and curl parsing)

Output Formats

  • pandas - pandas DataFrame
  • polars - Polars DataFrame
  • arrow - PyArrow Table
  • records - List of dictionaries
  • raw - Driver-native format

Core Capabilities

✅ Connection management (connect, disconnect, test)
✅ Data operations (read, write, batch read/write)
✅ Metadata discovery (list databases, tables, describe schema)
✅ Incremental loading (high watermark, timestamp, file modified time)
✅ Checkpoint management (memory, JSON file, SQLite)
✅ Retry logic with exponential backoff
✅ Circuit breaker pattern
✅ Connection pooling
✅ SSL/TLS verification
✅ Secret redaction in logs


Usage Examples

MySQL Connector

from sourcebridgekit import connect
from sourcebridgekit.connectors.sql import MySQLConfig

config = MySQLConfig(
    host='${MYSQL_HOST:localhost}',
    port=3306,
    database='analytics',
    username='${MYSQL_USER}',
    password='${MYSQL_PASSWORD}',
    driver='pymysql',  # or 'mysql-connector'
    pool={'enabled': True, 'pool_size': 10},
    retry={'enabled': True, 'max_attempts': 3}
)

with connect('mysql', config=config) as conn:
    # Simple read
    result = conn.read('SELECT * FROM orders WHERE status = "active"', output='pandas')
    
    # Batch read for large tables
    for batch in conn.read_batch('SELECT * FROM large_table', batch_size=10000):
        process(batch.data)
    
    # Write data
    conn.write(df, target='staging.new_orders', mode='append')
    
    # Metadata
    print(conn.list_tables(database='analytics'))
    schema = conn.describe_table('orders')

Azure Blob Storage

from sourcebridgekit import connect

config = {
    'account_name': '${AZURE_STORAGE_ACCOUNT}',
    'container_name': 'data',
    'connection_string': '${AZURE_STORAGE_CONNECTION_STRING}',
}

with connect('azure_blob', config=config) as conn:
    # Read file
    result = conn.read('data/sales/2026/sales.csv', output='pandas')
    
    # Write file
    conn.write(df, target='data/output/processed.parquet', format='parquet')
    
    # List files
    files = conn.list_files(prefix='data/sales/', pattern='*.csv')

REST API with Pagination

from sourcebridgekit import connect

config = {
    'base_url': 'https://api.example.com',
    'auth_type': 'bearer',
    'auth_token': '${API_TOKEN}',
    'pagination': {
        'enabled': True,
        'type': 'page',
        'page_size': 100,
        'max_pages': 50
    }
}

with connect('rest_api', config=config) as conn:
    result = conn.read('/v1/users', params={'status': 'active'}, output='pandas')
    df = result.data

Incremental Loading

from sourcebridgekit import connect

incremental_config = {
    'enabled': True,
    'strategy': 'high_watermark',
    'cursor_column': 'updated_at',
    'checkpoint_key': 'tenant_a.orders',
    'lookback_seconds': 300,
    'checkpoint_store': {'type': 'sqlite', 'path': './checkpoints.db'}
}

with connect('mysql', config=mysql_config) as conn:
    result = conn.read_incremental(
        table='orders',
        incremental=incremental_config,
        output='polars'
    )
    
    # Library automatically tracks checkpoint
    print(f"Fetched {result.row_count} new rows")
    print(f"New checkpoint: {result.checkpoint}")

Curl to REST API

from sourcebridgekit.connectors.api import RestConfig

# Parse curl command into structured config
config = RestConfig.from_curl('''
curl -X POST https://api.example.com/orders \
  -H "Authorization: Bearer ${API_TOKEN}" \
  -H "Content-Type: application/json" \
  -d '{"status":"active"}'
''')

with connect('rest_api', config=config) as conn:
    result = conn.read(output='records')

Configuration

Environment Variables

All configs support ${VAR_NAME} or ${VAR_NAME:default} syntax:

config = {
    'host': '${DB_HOST:localhost}',  # Fallback to 'localhost'
    'port': '${DB_PORT:5432}',
    'password': '${DB_PASSWORD}',    # Required, no default
}

Secrets Management

Sensitive fields use SecretStr and are redacted from logs:

from pydantic import SecretStr

config = MySQLConfig(
    password=SecretStr('secret123')  # Redacted in logs
)

Connection Pooling

config = MySQLConfig(
    pool={
        'enabled': True,
        'pool_size': 10,
        'max_overflow': 20,
        'pool_timeout': 30,
        'pool_recycle': 3600
    }
)

Retry & Circuit Breaker

config = MySQLConfig(
    retry={
        'enabled': True,
        'max_attempts': 3,
        'backoff_factor': 2.0,
        'timeout_seconds': 30
    },
    circuit_breaker={
        'enabled': True,
        'failure_threshold': 5,
        'recovery_timeout': 60
    }
)

FetchResult Standard

All read operations return a FetchResult object:

result = conn.read('SELECT * FROM orders', output='pandas')

result.data              # pandas DataFrame
result.output_format     # 'pandas'
result.row_count         # Number of rows
result.columns           # List of column names
result.schema            # Column types
result.execution_time_ms # Query execution time
result.checkpoint        # Incremental checkpoint (if applicable)
result.metadata          # Additional metadata
result.warnings          # Any warnings

Incremental Strategies

Strategy Description Best For
high_watermark Track max value of cursor column SQL databases, APIs
incrementing_id Track max ID value Append-only tables
timestamp_with_lookback Timestamp + safety window Distributed systems
file_modified_time Track file modification time Local files, object storage
checksum_or_etag Detect changes by hash Files, object storage

Checkpoint Stores

Store Use Case
memory Testing only (state lost on restart)
json_file Simple local jobs
sqlite Default persistent checkpoint store

Security

✅ SSL/TLS verification enabled by default
✅ Secrets redacted from logs and exceptions
✅ No raw shell command execution
✅ Parameterized SQL queries
✅ Configurable timeouts
✅ SecretStr for sensitive fields


Roadmap

  • V1 (Current): Core connectors, batch operations, incremental loading
  • V2 (Planned): Async support, Redis/PostgreSQL checkpoint stores, OAuth2, OpenTelemetry
  • V3 (Future): CDC (binlog, logical replication), Kafka/RabbitMQ, distributed execution

Development

# Clone and install in dev mode
git clone https://github.com/yourorg/sourcebridgekit
cd sourcebridgekit
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=sourcebridgekit --cov-report=html

# Format code
black sourcebridgekit/
ruff check sourcebridgekit/

License

MIT License


Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sourcebridgekit-0.2.0.tar.gz (67.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sourcebridgekit-0.2.0-py3-none-any.whl (68.6 kB view details)

Uploaded Python 3

File details

Details for the file sourcebridgekit-0.2.0.tar.gz.

File metadata

  • Download URL: sourcebridgekit-0.2.0.tar.gz
  • Upload date:
  • Size: 67.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sourcebridgekit-0.2.0.tar.gz
Algorithm Hash digest
SHA256 37dccd3f73fa4a1b0e1182f7ff20daa60795d9fe1eb5294b2e70010d9abc1a5c
MD5 f33d9a22ea252f84f0121b2208123d08
BLAKE2b-256 fb95df227557840394148851908bbbb1f21432a2868ef07e3989a9dc1664fdb1

See more details on using hashes here.

Provenance

The following attestation bundles were made for sourcebridgekit-0.2.0.tar.gz:

Publisher: publish.yml on sreeyenan/sourcebridgekit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sourcebridgekit-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: sourcebridgekit-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 68.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sourcebridgekit-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f4ef3c312a94650bc61fdac25f9b63978b6e3c8c395b2d02eba9605d86ee19ad
MD5 33ece403a928e420a6e37ae6acfc94b7
BLAKE2b-256 da08e83fa38ce5a741eabc98a9e5153df1c30ca06601a75eb799253230e35d93

See more details on using hashes here.

Provenance

The following attestation bundles were made for sourcebridgekit-0.2.0-py3-none-any.whl:

Publisher: publish.yml on sreeyenan/sourcebridgekit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page