Skip to main content

Core interfaces and abstractions for DataPulse connectors - a high-performance, async-first data connectivity framework

Project description

DataPulse Core

PyPI version Python versions License: MIT Code style: black Imports: isort Type checked with mypy

High-performance, async-first data connectivity framework for modern Python applications.

DataPulse Core provides the foundational interfaces and abstractions that all DataPulse connectors implement. It's designed for building robust, scalable data pipelines with enterprise-grade reliability.

✨ Features

  • 🔌 Universal Interface: Consistent API across all database types
  • ⚡ Async-First: Built for high-performance, non-blocking operations
  • 🛡️ Type Safe: Full type hints and runtime validation
  • 🔧 Extensible: Easy to implement custom connectors
  • 📊 Connection Pooling: Efficient resource management
  • 🔄 Transaction Support: ACID compliance and rollback capabilities
  • 📈 Performance Monitoring: Built-in metrics and observability

🚀 Quick Start

Installation

pip install metronome-pulse-core

Basic Usage

from metronome_pulse_core import Pulse, Readable, Writable
from typing import Any

class MyCustomConnector(Pulse, Readable, Writable):
    """Example custom connector implementation."""
    
    async def connect(self) -> None:
        """Establish connection to data source."""
        pass
    
    async def disconnect(self) -> None:
        """Close connection to data source."""
        pass
    
    async def is_connected(self) -> bool:
        """Check if connection is active."""
        return True
    
    async def query(self, query: str, params: dict[str, Any] | None = None) -> list[dict[str, Any]]:
        """Execute read operation."""
        return []
    
    async def write(self, data: list[dict[str, Any]], config: dict[str, Any] | None = None) -> int:
        """Execute write operation."""
        return len(data)

📚 Core Interfaces

Pulse - Base Interface

The foundation interface that all connectors must implement:

class Pulse(ABC):
    """Base interface for all DataPulse connectors."""
    
    @abstractmethod
    async def connect(self) -> None:
        """Establish connection to the data source."""
        pass
    
    @abstractmethod
    async def disconnect(self) -> None:
        """Close connection to the data source."""
        pass
    
    @abstractmethod
    async def is_connected(self) -> bool:
        """Check if the connection is currently active."""
        pass

Readable - Read Operations

Interface for data retrieval operations:

class Readable(ABC):
    """Interface for read operations."""
    
    @abstractmethod
    async def query(self, query: str, params: dict[str, Any] | None = None) -> list[dict[str, Any]]:
        """Execute a query and return results."""
        pass

Writable - Write Operations

Interface for data modification operations:

class Writable(ABC):
    """Interface for write operations."""
    
    @abstractmethod
    async def write(self, data: list[dict[str, Any]], config: dict[str, Any] | None = None) -> int:
        """Write data using the specified configuration."""
        pass

🔧 Advanced Features

Configuration-Driven Operations

All write operations support flexible configuration:

# Simple insert
await connector.write(data, {"operation": "insert"})

# High-performance replace
await connector.write(data, {
    "operation": "replace",
    "batch_size": 1000,
    "use_transaction": True
})

# Custom SQL operations
await connector.write(data, {
    "operation": "custom",
    "sql_template": "INSERT INTO {table} ({columns}) VALUES {values}",
    "on_conflict": "DO NOTHING"
})

Connection Pooling

Efficient resource management for high-throughput applications:

from metronome_pulse_core import ConnectionPool

pool = ConnectionPool(
    connector_class=PostgresConnector,
    min_connections=5,
    max_connections=20,
    connection_timeout=30
)

async with pool.get_connection() as conn:
    result = await conn.query("SELECT * FROM users LIMIT 10")

🧪 Testing

Run Tests

# Install development dependencies
pip install -e ".[dev]"

# Run all tests
pytest

# Run with coverage
pytest --cov=metronome_pulse_core

# Run specific test categories
pytest -m "unit"
pytest -m "integration"
pytest -m "slow"

Code Quality

# Format code
black metronome_pulse_core tests

# Sort imports
isort metronome_pulse_core tests

# Type checking
mypy metronome_pulse_core

# Linting
ruff check metronome_pulse_core tests

📦 Available Connectors

  • PostgreSQL: metronome-pulse-postgres - High-performance async PostgreSQL
  • PostgreSQL (psycopg3): metronome-pulse-postgres-psycopg3 - Modern psycopg3 driver
  • PostgreSQL (SQLAlchemy): metronome-pulse-postgres-sqlalchemy - SQLAlchemy integration
  • SQLite: metronome-pulse-sqlite - Lightweight SQLite support
  • MongoDB: metronome-pulse-mongodb - Document database support
  • Redis: metronome-pulse-redis - In-memory data store

🤝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

# Clone the repository
git clone https://github.com/datametronome/metronome-pulse-core.git
cd metronome-pulse-core

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install in development mode
pip install -e ".[dev]"

# Install pre-commit hooks
pre-commit install

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🔗 Links

🙏 Acknowledgments

  • Built with ❤️ by the DataMetronome team
  • Inspired by modern async Python patterns
  • Designed for enterprise data engineering workflows

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metronome_pulse_core-0.1.0.tar.gz (18.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metronome_pulse_core-0.1.0-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file metronome_pulse_core-0.1.0.tar.gz.

File metadata

  • Download URL: metronome_pulse_core-0.1.0.tar.gz
  • Upload date:
  • Size: 18.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.0

File hashes

Hashes for metronome_pulse_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ed11a526e2875fd3e199c18f5b9f0c4627b0eab11d91e7553f8acd76fcf0dc76
MD5 7e61dfd9eba17744656b76fc1c437c5c
BLAKE2b-256 cf6a12ea809a08d830c9895c31567e8af807ba9caa8f98053615f2ed153c774a

See more details on using hashes here.

File details

Details for the file metronome_pulse_core-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for metronome_pulse_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 da89d2b7d0cfdee555744ddcfb4a620424a3ce237d474b26f0cc7c9da8ac5a4f
MD5 aa13e5f9bae9130039a92cc028272bf5
BLAKE2b-256 d97eaaf68e9ef6c17ca80616676fc763882ae8e729181272135927900e8e4273

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page