Client library for Petrosa Data Manager API
Project description
Petrosa Data Manager
Data integrity, intelligence, and distribution hub for the Petrosa trading ecosystem
The Data Manager ensures all trading-related datasets remain accurate, consistent, complete, and analyzable. It acts as both a guardian (maintaining data quality) and a gateway (serving structured data and analytics).
๐ Overview
The Petrosa Data Manager is responsible for:
- Data Integrity: Continuous validation, gap detection, and consistency checking
- Data Auditing: Automated health scoring and quality monitoring
- Data Recovery: Intelligent backfilling of missing data
- Analytics Computation: Market metrics (volatility, volume, spread, trends, correlations)
- Data Serving: Schema-rich APIs for downstream consumption
- Catalog Management: Dataset registry, schemas, and lineage tracking
๐๏ธ Architecture
Core Components
| Component | Purpose |
|---|---|
| NATS Consumer | Subscribe to binance.futures.websocket.data for real-time market data |
| Auditor | Validate data integrity, detect gaps, duplicates, and anomalies |
| Backfiller | Fetch and restore missing data ranges from Binance API |
| Catalog | Maintain dataset metadata, schemas, and lineage registry |
| Analytics Engine | Compute volatility, volume, spread, deviation, trend, seasonality metrics |
| API Server | RESTful endpoints for data access, metrics, health, and catalog |
Data Flow
NATS: binance.futures.websocket.data
โ (subscribe)
Data Manager Consumer
โ
Data Validation & Storage (PostgreSQL/MongoDB)
โ
Auditor (continuous) โ Backfiller (on gaps) โ Analytics (scheduled)
โ
API Layer (FastAPI) โ Downstream consumers (dashboards, strategies, tradeengine)
๐ Quick Start
Prerequisites
- Python 3.11+
- Docker
- kubectl (for Kubernetes deployment)
- Access to remote MicroK8s cluster
Installation
# Complete setup
make setup
# Or manually
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txt
Local Development
# Run locally
make run
# Or directly
python -m data_manager.main
Docker
# Build image
make build
# Run in Docker
make run-docker
Kubernetes Deployment
# Deploy to cluster
make deploy
# Check status
make k8s-status
# View logs
make k8s-logs
# Clean up
make k8s-clean
๐ก API Endpoints
Health & Status
GET /health/liveness- Kubernetes liveness probeGET /health/readiness- Kubernetes readiness probeGET /health/summary- Overall system healthGET /health/databases- Detailed database connection statusGET /health/connections- Connection pool statisticsGET /health?pair={pair}&period={period}- Data quality metrics
Data Access (Domain-Specific)
GET /data/candles?pair={pair}&period={period}- OHLCV candle dataGET /data/trades?pair={pair}- Individual trade dataGET /data/depth?pair={pair}- Order book depthGET /data/funding?pair={pair}- Funding rate data
Generic CRUD API
GET /api/v1/{database}/{collection}- Query records with filtering, sorting, paginationPOST /api/v1/{database}/{collection}- Insert single or multiple recordsPUT /api/v1/{database}/{collection}- Update records with filteringDELETE /api/v1/{database}/{collection}- Delete records with filteringPOST /api/v1/{database}/{collection}/batch- Batch operations (insert/update/delete)
Raw Query API
POST /api/v1/raw/mysql- Execute raw SQL queries (with safety validation)POST /api/v1/raw/mongodb- Execute raw MongoDB queries/aggregations
Schema Registry API
POST /schemas/{database}/{name}- Register new schemaGET /schemas/{database}/{name}- Get latest schemaGET /schemas/{database}/{name}/versions- List schema versionsGET /schemas/{database}/{name}/versions/{version}- Get specific versionPUT /schemas/{database}/{name}/versions/{version}- Update schemaDELETE /schemas/{database}/{name}/versions/{version}- Deprecate schemaGET /schemas- List all schemas (both databases)GET /schemas?database={db}- List schemas for specific databasePOST /schemas/validate- Validate data against schemaPOST /schemas/compatibility- Check schema compatibilityGET /schemas/search?query={query}- Search schemas by name/descriptionPOST /schemas/bootstrap- Bootstrap common schemasGET /schemas/cache/stats- Get schema cache statisticsPOST /schemas/cache/clear- Clear schema cache
Analytics
GET /analysis/volatility?pair={pair}&period={period}&method={method}- Volatility metricsGET /analysis/volume?pair={pair}&period={period}- Volume metricsGET /analysis/spread?pair={pair}- Spread and liquidityGET /analysis/trend?pair={pair}&period={period}- Trend indicatorsGET /analysis/correlation?pairs={pairs}&period={period}- Correlation matrix
Catalog
GET /catalog/datasets- List all datasetsGET /catalog/datasets/{dataset_id}- Dataset metadataGET /catalog/schemas/{dataset_id}- Schema definitionGET /catalog/lineage/{dataset_id}- Data lineage
Backfill
POST /backfill/start- Trigger manual backfillGET /backfill/jobs- List backfill jobsGET /backfill/jobs/{job_id}- Job status
๐ API Usage Examples
Generic CRUD Operations
Query Records
# Get all records from a collection
curl "http://localhost:8000/api/v1/mongodb/candles_BTCUSDT_1m"
# Filter and sort records
curl "http://localhost:8000/api/v1/mongodb/candles_BTCUSDT_1m?filter={\"symbol\":\"BTCUSDT\"}&sort={\"timestamp\":-1}&limit=100"
# Paginate results
curl "http://localhost:8000/api/v1/mongodb/trades_BTCUSDT?limit=50&offset=100"
Insert Records
# Insert single record
curl -X POST "http://localhost:8000/api/v1/mongodb/candles_BTCUSDT_1m" \
-H "Content-Type: application/json" \
-d '{"data": {"symbol": "BTCUSDT", "open": 50000, "high": 51000, "low": 49000, "close": 50500, "volume": 1000}}'
# Insert multiple records
curl -X POST "http://localhost:8000/api/v1/mongodb/trades_BTCUSDT" \
-H "Content-Type: application/json" \
-d '{"data": [{"symbol": "BTCUSDT", "price": 50000, "quantity": 0.1}, {"symbol": "BTCUSDT", "price": 50100, "quantity": 0.2}]}'
Batch Operations
# Batch insert/update/delete
curl -X POST "http://localhost:8000/api/v1/mongodb/candles_BTCUSDT_1m/batch" \
-H "Content-Type: application/json" \
-d '{
"operations": [
{"type": "insert", "data": {"symbol": "BTCUSDT", "open": 50000}},
{"type": "update", "filter": {"symbol": "BTCUSDT"}, "data": {"updated": true}},
{"type": "delete", "filter": {"symbol": "ETHUSDT"}}
]
}'
Schema Registry Operations
Register Schema
# Register MongoDB schema for candles
curl -X POST "http://localhost:8000/schemas/mongodb/candle_v1" \
-H "Content-Type: application/json" \
-d '{
"version": 1,
"schema": {
"type": "object",
"required": ["symbol", "timestamp", "open", "high", "low", "close", "volume"],
"properties": {
"symbol": {"type": "string", "pattern": "^[A-Z]+$"},
"timestamp": {"type": "string", "format": "date-time"},
"open": {"type": "number", "minimum": 0},
"high": {"type": "number", "minimum": 0},
"low": {"type": "number", "minimum": 0},
"close": {"type": "number", "minimum": 0},
"volume": {"type": "number", "minimum": 0}
}
},
"description": "OHLCV candle data schema"
}'
# Register MySQL schema for orders
curl -X POST "http://localhost:8000/schemas/mysql/order_v1" \
-H "Content-Type: application/json" \
-d '{
"version": 1,
"schema": {
"type": "object",
"required": ["order_id", "symbol", "side", "type", "status"],
"properties": {
"order_id": {"type": "string"},
"symbol": {"type": "string", "pattern": "^[A-Z]+$"},
"side": {"type": "string", "enum": ["BUY", "SELL"]},
"type": {"type": "string", "enum": ["LIMIT", "MARKET"]},
"status": {"type": "string", "enum": ["NEW", "FILLED", "CANCELED"]},
"quantity": {"type": "number", "minimum": 0},
"price": {"type": "number", "minimum": 0}
}
},
"description": "Order management schema"
}'
Get Schema Information
# Get latest schema
curl "http://localhost:8000/schemas/mongodb/candle_v1"
# Get specific version
curl "http://localhost:8000/schemas/mongodb/candle_v1/versions/1"
# List all versions
curl "http://localhost:8000/schemas/mongodb/candle_v1/versions"
# List all schemas
curl "http://localhost:8000/schemas"
# List schemas by database
curl "http://localhost:8000/schemas?database=mongodb"
Validate Data Against Schema
# Validate single record
curl -X POST "http://localhost:8000/schemas/validate" \
-H "Content-Type: application/json" \
-d '{
"database": "mongodb",
"schema_name": "candle_v1",
"data": {
"symbol": "BTCUSDT",
"timestamp": "2025-01-01T00:00:00Z",
"open": 50000,
"high": 51000,
"low": 49000,
"close": 50500,
"volume": 100.5
}
}'
# Validate batch data
curl -X POST "http://localhost:8000/schemas/validate" \
-H "Content-Type: application/json" \
-d '{
"database": "mongodb",
"schema_name": "candle_v1",
"data": [
{"symbol": "BTCUSDT", "timestamp": "2025-01-01T00:00:00Z", "open": 50000, "high": 51000, "low": 49000, "close": 50500, "volume": 100.5},
{"symbol": "ETHUSDT", "timestamp": "2025-01-01T00:00:00Z", "open": 3000, "high": 3100, "low": 2900, "close": 3050, "volume": 500.2}
]
}'
Schema Compatibility Checking
# Check compatibility between schema versions
curl -X POST "http://localhost:8000/schemas/compatibility" \
-H "Content-Type: application/json" \
-d '{
"database": "mongodb",
"schema_name": "candle_v1",
"old_version": 1,
"new_version": 2
}'
Bootstrap Common Schemas
# Bootstrap all common schemas for MongoDB
curl -X POST "http://localhost:8000/schemas/bootstrap" \
-H "Content-Type: application/json" \
-d '{
"database": "mongodb",
"schemas": [
{
"version": 1,
"schema": {
"type": "object",
"required": ["symbol", "timestamp", "open", "high", "low", "close", "volume"],
"properties": {
"symbol": {"type": "string", "pattern": "^[A-Z]+$"},
"timestamp": {"type": "string", "format": "date-time"},
"open": {"type": "number", "minimum": 0},
"high": {"type": "number", "minimum": 0},
"low": {"type": "number", "minimum": 0},
"close": {"type": "number", "minimum": 0},
"volume": {"type": "number", "minimum": 0}
}
},
"description": "OHLCV candle data schema"
}
],
"overwrite_existing": false
}'
CRUD Operations with Schema Validation
Insert with Validation
# Insert with automatic schema validation
curl -X POST "http://localhost:8000/api/v1/mongodb/candles_BTCUSDT_1m?schema=candle_v1&validate=true" \
-H "Content-Type: application/json" \
-d '{
"data": {
"symbol": "BTCUSDT",
"timestamp": "2025-01-01T00:00:00Z",
"open": 50000,
"high": 51000,
"low": 49000,
"close": 50500,
"volume": 100.5
}
}'
# Batch insert with validation
curl -X POST "http://localhost:8000/api/v1/mongodb/candles_BTCUSDT_1m?schema=candle_v1&validate=true" \
-H "Content-Type: application/json" \
-d '{
"data": [
{"symbol": "BTCUSDT", "timestamp": "2025-01-01T00:00:00Z", "open": 50000, "high": 51000, "low": 49000, "close": 50500, "volume": 100.5},
{"symbol": "BTCUSDT", "timestamp": "2025-01-01T01:00:00Z", "open": 50500, "high": 51500, "low": 49500, "close": 51000, "volume": 150.2}
]
}'
Update with Validation
# Update with schema validation
curl -X PUT "http://localhost:8000/api/v1/mysql/orders?schema=order_v1&validate=true" \
-H "Content-Type: application/json" \
-d '{
"filter": {"order_id": "12345"},
"data": {
"status": "FILLED",
"updated_at": "2025-01-01T12:00:00Z"
}
}'
Raw Query Operations
MySQL Raw Queries
# Execute SQL query
curl -X POST "http://localhost:8000/api/v1/raw/mysql" \
-H "Content-Type: application/json" \
-d '{"query": "SELECT COUNT(*) as total FROM audit_logs WHERE timestamp > NOW() - INTERVAL 1 HOUR"}'
MongoDB Raw Queries
# Find query
curl -X POST "http://localhost:8000/api/v1/raw/mongodb" \
-H "Content-Type: application/json" \
-d '{"query": "candles_BTCUSDT_1m: {\"find\": {\"symbol\": \"BTCUSDT\"}, \"limit\": 10}"}'
# Aggregation pipeline
curl -X POST "http://localhost:8000/api/v1/raw/mongodb" \
-H "Content-Type: application/json" \
-d '{"query": "{\"collection\": \"trades_BTCUSDT\", \"aggregate\": [{\"$group\": {\"_id\": \"$symbol\", \"count\": {\"$sum\": 1}}}]}"}'
Health Monitoring
Check Database Health
# Overall health
curl "http://localhost:8000/health/readiness"
# Detailed database status
curl "http://localhost:8000/health/databases"
# Connection pool statistics
curl "http://localhost:8000/health/connections"
Metrics and Monitoring
Prometheus Metrics
# View all metrics
curl "http://localhost:8000/metrics"
# Key metrics to monitor:
# - data_manager_requests_total (request count by endpoint/status)
# - data_manager_request_duration_seconds (request latency)
# - data_manager_database_operations_total (DB operation count)
# - data_manager_active_connections (connection pool status)
๐ง Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
NATS_URL |
nats://localhost:4222 |
NATS server URL |
NATS_CONSUMER_SUBJECT |
binance.futures.websocket.data |
NATS subject to subscribe |
POSTGRES_URL |
- | PostgreSQL connection string |
MONGODB_URL |
- | MongoDB connection string |
ENABLE_AUDITOR |
true |
Enable data auditor |
ENABLE_BACKFILLER |
true |
Enable backfiller |
ENABLE_ANALYTICS |
true |
Enable analytics engine |
ENABLE_API |
true |
Enable API server |
API_PORT |
8000 |
API server port |
AUDIT_INTERVAL |
300 |
Audit interval in seconds |
ANALYTICS_INTERVAL |
900 |
Analytics interval in seconds |
ENABLE_LEADER_ELECTION |
true |
Enable MongoDB-based leader election |
LEADER_ELECTION_HEARTBEAT_INTERVAL |
10 |
Leader heartbeat interval (seconds) |
LEADER_ELECTION_TIMEOUT |
30 |
Leader election timeout (seconds) |
ENABLE_AUTO_BACKFILL |
false |
Enable automatic backfill for detected gaps |
MIN_AUTO_BACKFILL_GAP |
3600 |
Minimum gap size to trigger backfill (seconds) |
MAX_AUTO_BACKFILL_JOBS |
5 |
Maximum concurrent backfill jobs |
ENABLE_DUPLICATE_REMOVAL |
false |
Enable automatic duplicate removal |
DUPLICATE_RESOLUTION_STRATEGY |
keep_newest |
Duplicate resolution strategy |
Kubernetes Configuration
The service uses existing shared secrets and configmaps:
- Secret:
petrosa-sensitive-credentials(database credentials) - ConfigMap:
petrosa-common-config(shared settings) - ConfigMap:
petrosa-data-manager-config(service-specific)
๐งช Development
Code Quality
# Run linters
make lint
# Format code
make format
# Run tests
make test
# Security scan
make security
Complete Pipeline
# Run all checks
make pipeline
๐๏ธ Schema Registry
The Schema Registry provides centralized schema management for all Petrosa services, ensuring data consistency and validation across the platform.
Key Features
- Database-Specific Storage: Schemas stored in their respective databases (MySQL for structured data, MongoDB for time-series)
- Version Management: Full schema versioning with compatibility checking
- Automatic Validation: CRUD operations can validate data against registered schemas
- Schema Discovery: Easy schema exploration and documentation
- Compatibility Checking: Validate schema evolution and migration paths
- Bootstrap Support: Predefined schemas for common data types
Schema Storage Strategy
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ petrosa-data-manager (API Gateway) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Schema Registry REST API โ
โ โโ /schemas?database={db} (list schemas) โ
โ โโ /schemas/{db}/{name} (get/register) โ
โ โโ /schemas/{db}/{name}/versions (versions) โ
โ โโ /schemas/validate?database={db} (validate) โ
โ โ
โโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโ
โ โ
โโโโโโโโโโโโผโโโโโโโโโโ โโโโโโโผโโโโโโโโโโโโโโโ
โ MySQL โ โ MongoDB โ
โโโโโโโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโโโโโโค
โ schemas table: โ โ schemas collection:โ
โ - schema_name โ โ { โ
โ - version โ โ name: "...", โ
โ - schema_json โ โ version: 1, โ
โ - created_at โ โ schema: {...}, โ
โ - status โ โ created_at: ... โ
โ โ โ } โ
โโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโ
Common Schemas
MongoDB Schemas (Time-Series Data)
- candle_v1: OHLCV candle data with volume metrics
- trade_v1: Trade execution data with order information
- depth_v1: Order book depth with bid/ask arrays
- funding_v1: Funding rate data with mark/index prices
MySQL Schemas (Structured Data)
- order_v1: Order management with status tracking
- health_metrics_v1: Data quality metrics and monitoring
- audit_log_v1: Data integrity audit logs
- strategy_signal_v1: Trading strategy signals and decisions
Schema Validation Integration
All CRUD operations support optional schema validation:
# Insert with validation
POST /api/v1/mongodb/candles_BTCUSDT_1m?schema=candle_v1&validate=true
# Update with validation
PUT /api/v1/mysql/orders?schema=order_v1&validate=true
# Batch operations with validation
POST /api/v1/mongodb/candles_BTCUSDT_1m?schema=candle_v1&validate=true
Configuration
Environment variables for schema registry:
# Schema validation settings
SCHEMA_VALIDATION_ENABLED=true
SCHEMA_STRICT_MODE=false
SCHEMA_CACHE_TTL=300
SCHEMA_AUTO_REGISTER=false
SCHEMA_MAX_VERSIONS=10
SCHEMA_COMPATIBILITY_MODE=BACKWARD
๐ Metrics
Prometheus metrics are exposed on port 9090:
data_manager_messages_received_total- Total messages received from NATSdata_manager_messages_processed_total- Successfully processed messagesdata_manager_messages_failed_total- Failed messagesdata_manager_message_processing_seconds- Message processing timedata_manager_nats_connection_status- NATS connection statusdata_manager_requests_total- Request count by endpoint/statusdata_manager_request_duration_seconds- Request latency histogramdata_manager_database_operations_total- Database operation countsdata_manager_active_connections- Connection pool statusdata_manager_errors_total- Error rates by endpoint
๐๏ธ Project Structure
petrosa-data-manager/
โโโ data_manager/
โ โโโ models/ # Pydantic data models
โ โโโ consumer/ # NATS consumer and message handling
โ โโโ auditor/ # Data integrity validation
โ โโโ backfiller/ # Gap recovery and backfilling
โ โโโ catalog/ # Dataset registry and metadata
โ โโโ analytics/ # Metrics computation
โ โโโ api/ # FastAPI endpoints
โ โ โโโ routes/ # API route modules
โ โโโ main.py # Application entry point
โโโ k8s/ # Kubernetes manifests
โโโ tests/ # Test suite
โโโ constants.py # Configuration constants
โโโ otel_init.py # OpenTelemetry initialization
โโโ Dockerfile # Container image
โโโ Makefile # Development commands
โโโ README.md # This file
๐ Integration
Event Bus (NATS)
The Data Manager subscribes to:
binance.futures.websocket.data- Real-time market data from socket-client
Supported event types:
trade- Individual tradesticker- 24h ticker statisticsdepth- Order book depth updatesmarkPrice- Mark price updatesfundingRate- Funding rate updateskline- Candle/kline data
Databases
- PostgreSQL: Metadata, catalog, audit logs, health metrics
- MongoDB: Time series data (candles, trades, depth), computed metrics
๐ Leader Election
The Data Manager uses MongoDB-based leader election to ensure only one pod runs background schedulers (auditor, analytics) in multi-replica deployments.
How It Works
- Election: On startup, each pod attempts to become leader via atomic MongoDB write
- Heartbeat: Leader sends heartbeat every 10 seconds to prove it's alive
- Failover: If leader fails (no heartbeat for 30s), followers elect new leader
- Safety: Prevents duplicate work and database contention
Configuration
# Enable leader election (recommended for production)
ENABLE_LEADER_ELECTION: "true"
# Heartbeat frequency
LEADER_ELECTION_HEARTBEAT_INTERVAL: "10" # seconds
# Leader timeout
LEADER_ELECTION_TIMEOUT: "30" # seconds
Monitoring
# Check which pod is the leader
kubectl exec -it data-manager-xxx -- curl localhost:8000/health/leader
# Check audit scheduler status
kubectl exec -it data-manager-xxx -- curl localhost:8000/health/audit-status
See docs/AUDITOR.md for complete details.
๐ฏ Roadmap
- โ NATS consumer for market data events
- โ FastAPI serving layer with schema-rich endpoints
- โ Kubernetes manifests and deployment
- โ
Auditor implementation with leader election
- โ Gap detection with auto-backfill integration
- โ Duplicate detection and removal
- โ Health scoring with enhanced metrics
- โ MongoDB-based leader election for multi-replica safety
- โ Leader election for background schedulers
- ๐ง Database integration (PostgreSQL + MongoDB)
- ๐ง Backfiller implementation (Binance API integration)
- ๐ง Analytics engine (all metric calculators)
- ๐ง Catalog management (dataset registry)
- ๐ง Comprehensive test suite
- ๐ง CI/CD pipeline
๐ Documentation
- API Documentation: Available at
/docswhen running (Swagger UI) - Metrics: Available at
/metrics(Prometheus format) - Health: Available at
/health/*endpoints
๐ ๏ธ Troubleshooting
NATS Connection Issues
# Check NATS connectivity
kubectl --kubeconfig=k8s/kubeconfig.yaml -n nats get pods
# View logs
make k8s-logs
Database Connection Issues
# Verify secrets are configured
kubectl --kubeconfig=k8s/kubeconfig.yaml -n petrosa-apps get secret petrosa-sensitive-credentials
API Not Responding
# Check pod status
make k8s-status
# Check readiness
curl http://petrosa-data-manager.petrosa-apps/health/readiness
๐ License
MIT License - Petrosa Systems
๐ฅ Authors
Petrosa Systems - Trading Infrastructure Team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file petrosa_data_manager_client-1.0.26-py2.py3-none-any.whl.
File metadata
- Download URL: petrosa_data_manager_client-1.0.26-py2.py3-none-any.whl
- Upload date:
- Size: 138.8 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
10d26b22c940462da5a3ba511d6a0b4916647778622f0d5e022235b5f67e2d1d
|
|
| MD5 |
5c12ebcecefe083a98d19610cef55920
|
|
| BLAKE2b-256 |
73b634240cbec014463610555fbe8466613e0ff804f54ffc8220de17b071aba0
|