Skip to main content

Unified data sources library for industrial protocols

Project description

DataSources Library

A unified Python library for connecting to industrial data sources with built-in connection pooling and async support.

Features

  • Unified Interface: Single API for multiple industrial protocols
  • Async-First: Built with asyncio for high performance
  • Connection Pooling: Efficient connection reuse and management
  • Type Safety: Full type hints and Pydantic models
  • Production Ready: Retry logic, error handling, and health monitoring
  • Extensible: Easy to add new protocol implementations

Supported Protocols

  • OPC-UA - Fully implemented
  • 🚧 Modbus - Planned
  • 🚧 MQTT - Planned
  • 🚧 OPC-DA - Planned

Installation

pip install datasources-lib

Quick Start

Basic Usage

import asyncio
from datasources_lib import OpcUaConfig, ConnectionPool

async def main():
    # Configuration
    config = OpcUaConfig(
        name="my_plc",
        type="opcua",
        url="opc.tcp://localhost:4840",
        username="admin",
        password="password"
    )
    
    # Connection pool
    pool = ConnectionPool()
    await pool.start()
    
    try:
        # Get connection
        async with pool.get_connection(config) as client:
            # Read a value
            value = await client.read_node("ns=2;s=Temperature")
            print(f"Temperature: {value.value}")
            
            # Write a value
            success = await client.write_node("ns=2;s=Setpoint", 25.0)
            print(f"Write successful: {success}")
            
    finally:
        await pool.stop()

asyncio.run(main())

Configuration from TOML

# config.toml
[datasources]
default_source = "main_plc"

[datasources.sources.main_plc]
type = "opcua"
url = "opc.tcp://192.168.1.100:4840"
username = "admin"
password = "password"
max_retries = 3
retry_delay = 5.0
connection_timeout = 30.0

[datasources.sources.backup_plc]
type = "opcua"
url = "opc.tcp://192.168.1.101:4840"
username = "admin"
password = "password"
import tomllib
from datasources_lib import load_config_from_toml, ConnectionPool

# Load configuration
with open("config.toml", "rb") as f:
    toml_data = tomllib.load(f)

config = load_config_from_toml(toml_data)
main_plc_config = config.get_source_config("main_plc")

# Use with connection pool
pool = ConnectionPool()
async with pool.get_connection(main_plc_config) as client:
    # Your code here
    pass

Bulk Operations

# Read multiple nodes at once
node_ids = ["ns=2;s=Temperature", "ns=2;s=Pressure", "ns=2;s=Flow"]
values = await client.read_nodes(node_ids)

for node_id, value in values.items():
    if value:
        print(f"{node_id}: {value.value}")

Browsing Nodes

# Browse available nodes
nodes = await client.browse(start_node="ns=2;s=MyDevice", max_depth=2)

for node in nodes:
    print(f"{node.node_id}: {node.name} ({node.node_class})")
    if node.is_folder:
        print(f"  Children: {len(node.children)}")

Architecture

Core Components

BaseDataSource (Abstract Interface)
    ├── OpcUaClient (Implemented)
    ├── ModbusClient (Future)
    └── MqttClient (Future)

ConnectionPool
    └── Manages multiple connections
    └── Auto-cleanup of idle connections
    └── Health monitoring

Key Classes

  • BaseDataSource: Abstract base class for all data sources
  • ConnectionPool: Manages connection lifecycle and pooling
  • NodeValue: Data class for node values with metadata
  • BrowseResult: Data class for node browsing results

Error Handling

The library provides comprehensive error handling with custom exceptions:

from datasources_lib import (
    DataSourceError,
    ConnectionError,
    ReadError,
    WriteError,
    TimeoutError
)

try:
    value = await client.read_node("ns=2;s=Temperature")
except ReadError as e:
    print(f"Read failed: {e.message}")
    print(f"Node: {e.node_id}")
    print(f"Details: {e.details}")
except ConnectionError as e:
    print(f"Connection failed: {e.message}")

Development

Setup Development Environment

git clone <repository>
cd datasources-lib
pip install -e ".[dev]"

Running Tests

pytest

Code Quality

# Format code
black src/

# Lint code
ruff check src/

# Type checking
mypy src/

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run the test suite
  6. Submit a pull request

License

MIT License - see LICENSE file for details.

Roadmap

  • Modbus TCP/RTU support
  • MQTT support
  • OPC-DA support
  • Database connectors (PostgreSQL, InfluxDB)
  • REST API connectors
  • Subscription support for OPC-UA
  • Data validation and transformation
  • Metrics and monitoring
  • Web UI for configuration management

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datasources_lib-0.1.0.tar.gz (14.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

datasources_lib-0.1.0-py3-none-any.whl (14.2 kB view details)

Uploaded Python 3

File details

Details for the file datasources_lib-0.1.0.tar.gz.

File metadata

  • Download URL: datasources_lib-0.1.0.tar.gz
  • Upload date:
  • Size: 14.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for datasources_lib-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d033a442bd6c9e5f943fa4790d734cff672185311e758a6cab3eb7d895c4dd9d
MD5 f9b336cdf67521401f1cea82af35254d
BLAKE2b-256 45fdb2b528f9d9d64f1e9e9f3284f3804739a3b0f78a9b0442c738f302568ccb

See more details on using hashes here.

File details

Details for the file datasources_lib-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for datasources_lib-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a8e593be4726c3c80da23f39249acb151cb4329ccac9dc443b604554036b7eba
MD5 b2b85cb98cf9dd6252f3b2bf1c57ea80
BLAKE2b-256 4d2c45fc165695402ee30b684a315ea786bb5d634e3a98f3876d512e07c5d98a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page