Middleware implementation of EasyBDI 2.0 architecture for data virtualization and federation
Project description
SchemaFusion Engine
A middleware implementation of the EasyBDI 2.0 architecture, focusing on Data Virtualization and Federation. SchemaFusion provides a headless interface for orchestrating distributed queries across heterogeneous data sources using Trino as the query engine.
Architecture
SchemaFusion acts as a middleware layer that:
- Federates queries across multiple data sources (PostgreSQL, MongoDB, CSV files, etc.)
- Virtualizes data access through Trino's distributed query engine
- Orchestrates query planning and execution via Python-based FastAPI middleware
- Manages configuration through a Typer-based CLI
See Architecture Documentation for detailed architecture diagrams and component descriptions.
Components
Infrastructure (Docker)
The project uses Docker Compose to orchestrate:
- Trino (port 8080): Distributed query engine
- PostgreSQL (port 5432): Relational data source
- MongoDB (port 27017): NoSQL data source
- Kafka (port 9092): Streaming data source
- CSV Files: CSV files imported into PostgreSQL (example files in
docker/trino/csv-data/) - Google Sheets: Query Google Sheets as tables (requires OAuth2 credentials)
- Redis (port 6379): Caching layer
- Gateway (port 80): Single entrypoint reverse proxy that serves the UI and forwards
/apirequests to FastAPI
Middleware (Python)
- FastAPI Application (
src/main.py): Headless REST API interface - Typer CLI (
src/cli.py): Configuration management interface - Verification Script (
scripts/check_fusion.py): Validates Trino connectivity and catalog visibility
Quick Start
Prerequisites
- Docker and Docker Compose
- Python 3.11+
- pip
Setup
-
Start the infrastructure:
docker compose up -d
Once the stack is healthy, the gateway exposes the UI at http://localhost/ and proxies all /api/* calls to FastAPI, so you no longer need to remember separate ports in the browser. (The UI container remains reachable on http://localhost:3000/ if you prefer to bypass the proxy while developing.)
-
Install Python dependencies:
pip install -r requirements.txt
-
Verify connectivity:
python scripts/check_fusion.pyThis script connects to Trino and runs
SHOW CATALOGSto verify that all configured connectors (PostgreSQL, MongoDB, Kafka, etc.) are visible. -
Start the FastAPI server:
uvicorn src.main:app --reload
The API will be available at
http://localhost:8000- Interactive API docs:
http://localhost:8000/docs - OpenAPI schema:
http://localhost:8000/openapi.json
- Interactive API docs:
-
Use the CLI:
python -m src.cli check python -m src.cli version
Project Structure
schema-fusion-engine/
├── docker/
├── docker-compose.yml # Infrastructure orchestration
│ └── trino/
│ ├── catalog/
│ │ ├── postgres.properties # Trino PostgreSQL connector config
│ │ ├── mongo.properties # Trino MongoDB connector config
│ │ └── csv.properties # Trino CSV connector config
│ └── csv-data/ # CSV files directory
├── src/
│ ├── main.py # FastAPI headless API
│ ├── cli.py # Typer CLI configuration manager
│ ├── api/ # API modules
│ ├── cli/ # CLI modules
│ └── core/ # Core business logic
├── docs/ # Documentation
│ └── architecture.md # Architecture diagrams and design
├── scripts/
│ └── check_fusion.py # Trino connectivity verification
├── tests/ # Test suite
├── requirements.txt # Python dependencies
└── README.md # This file
API Reference
Query Execution
Execute SQL queries across federated data sources:
curl -X POST "http://localhost:8000/fusion/query" \
-H "Content-Type: application/json" \
-d '{
"query": "SELECT * FROM postgres.public.users LIMIT 10",
"catalog": "postgres",
"schema": "public",
"max_rows": 100
}'
Response:
{
"query": "SELECT * FROM postgres.public.users LIMIT 10",
"columns": ["id", "name", "email"],
"rows": [[1, "Alice", "alice@example.com"], [2, "Bob", "bob@example.com"]],
"row_count": 2,
"execution_time_ms": 45.23,
"error": null
}
Schema Discovery
Discover available catalogs, schemas, and tables:
# List catalogs
curl "http://localhost:8000/fusion/catalogs"
# List schemas
curl "http://localhost:8000/fusion/catalogs/postgres/schemas"
# List tables
curl "http://localhost:8000/fusion/catalogs/postgres/schemas/public/tables"
# Get table info
curl "http://localhost:8000/fusion/catalogs/postgres/schemas/public/tables/users"
Schema Matching
Automatically match columns between tables:
curl -X POST "http://localhost:8000/fusion/match" \
-H "Content-Type: application/json" \
-d '{
"source_catalog": "postgres",
"source_schema": "public",
"source_table": "users",
"target_catalog": "mongo",
"target_schema": "testdb",
"target_table": "customers",
"threshold": 0.8
}'
Response:
{
"source": {
"catalog": "postgres",
"schema": "public",
"table": "users",
"columns": ["id", "name", "email"]
},
"target": {
"catalog": "mongo",
"schema": "testdb",
"table": "customers",
"columns": ["_id", "name", "email"]
},
"matches": [
{"source_col": "name", "target_col": "name", "confidence": 1.0},
{"source_col": "email", "target_col": "email", "confidence": 1.0}
],
"match_count": 2,
"threshold": 0.8
}
Fusion Views
Create federated views that combine data from multiple sources:
curl -X POST "http://localhost:8000/fusion/create-view" \
-H "Content-Type: application/json" \
-d '{
"view_name": "global_customers",
"source_a": {"catalog": "postgres", "schema": "public", "table": "users"},
"source_b": {"catalog": "mongo", "schema": "testdb", "table": "customers"},
"matches": [
{"source_col": "name", "target_col": "name", "confidence": 1.0},
{"source_col": "email", "target_col": "email", "confidence": 1.0}
],
"join_key_a": "id",
"join_key_b": "_id"
}'
Response:
{
"status": "success",
"message": "View created successfully",
"view_name": "global_customers",
"sql": "CREATE OR REPLACE VIEW memory.default.global_customers AS ..."
}
List views:
curl "http://localhost:8000/fusion/views?catalog=memory&schema=default"
Delete view:
curl -X DELETE "http://localhost:8000/fusion/views/global_customers?catalog=memory&schema=default"
Multi-Source Fusion (3+ Sources)
Create fusion views that combine data from 3 or more sources:
curl -X POST "http://localhost:8000/fusion/create-multi-view" \
-H "X-API-Key: your-api-key" \
-H "Content-Type: application/json" \
-d '{
"view_name": "unified_customers",
"sources": [
{"catalog": "postgres", "schema": "public", "table": "users", "alias": "p"},
{"catalog": "mongo", "schema": "testdb", "table": "customers", "alias": "m"},
{"catalog": "postgres", "schema": "public", "table": "clients", "alias": "c"}
],
"matches": [
{
"global": "customer_id",
"mappings": [
{"source": "p", "column": "id"},
{"source": "m", "column": "_id"},
{"source": "c", "column": "client_id"}
]
},
{
"global": "name",
"mappings": [
{"source": "p", "column": "name"},
{"source": "m", "column": "full_name"},
{"source": "c", "column": "name"}
]
}
],
"fusion_type": "join",
"join_keys": [
{"source": "p", "column": "id"},
{"source": "m", "column": "_id"},
{"source": "c", "column": "client_id"}
]
}'
Response:
{
"status": "success",
"message": "View created successfully",
"view_name": "unified_customers",
"sql": "CREATE OR REPLACE VIEW memory.default.unified_customers AS ..."
}
For UNION ALL (horizontal partitioning):
curl -X POST "http://localhost:8000/fusion/create-multi-view" \
-H "X-API-Key: your-api-key" \
-H "Content-Type: application/json" \
-d '{
"view_name": "all_customers",
"sources": [
{"catalog": "postgres", "schema": "public", "table": "customers", "alias": "p"},
{"catalog": "mongo", "schema": "testdb", "table": "clients", "alias": "m"}
],
"matches": [
{
"global": "id",
"mappings": [
{"source": "p", "column": "id"},
{"source": "m", "column": "_id"}
]
}
],
"fusion_type": "union",
"enable_type_coercion": true
}'
See Multi-Source Fusion Guide for detailed documentation.
Health Check
curl "http://localhost:8000/health"
Configuration
Trino Connectors
Trino connectors are configured in docker/trino/catalog/:
- PostgreSQL: Connects to
postgres:5432using JDBC - MongoDB: Connects to
mongo:27017using MongoDB native protocol - Kafka: Connects to
kafka:29092for streaming data (see Kafka Setup) - Google Sheets: Query Google Sheets as tables (requires OAuth2, see Google Sheets Setup)
- CSV: Import CSV files into PostgreSQL (see CSV Support)
Environment Variables
Configuration is managed via environment variables or a .env file. See Configuration Guide for all available options.
Quick reference:
TRINO_HOST- Trino coordinator host (default:localhost)TRINO_PORT- Trino coordinator port (default:8080)TRINO_USER- Trino user (default:schemafusion)LOG_LEVEL- Logging level (default:INFO)LOG_FORMAT- Log format:textorjson(default:text)VIEW_CATALOG- Catalog for fusion views (default:memory)VIEW_SCHEMA- Schema for fusion views (default:default)
Development
Running Tests
pytest tests/
pytest tests/ --cov=src --cov-report=html
Code Quality
# Linting
ruff check src/
ruff check src/ --fix
# Formatting
black src/ tests/
# Type checking
mypy src/
# Pre-commit hooks (runs automatically on commit)
pre-commit run --all-files
Documentation
- Architecture - System architecture and design
- Deployment Guide - Production deployment instructions
- Configuration - Environment variables and settings
- Multi-Source Fusion - Guide for 3+ source fusion views
- Kafka Setup - Kafka connector configuration
- Google Sheets Setup - Google Sheets connector setup
- CSV Support - CSV file import options
- Troubleshooting - Common issues and solutions
- Security Guide - Security considerations and best practices
- Release Guide - How to create releases
- Monitoring Setup - Prometheus, Grafana, and AlertManager setup
References
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file schema_fusion_engine-0.16.0.tar.gz.
File metadata
- Download URL: schema_fusion_engine-0.16.0.tar.gz
- Upload date:
- Size: 23.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ee79ef6217fe43a24396e5ec9efe573a08b3b1a5a5c4131b307529d209e26c25
|
|
| MD5 |
b858cd28f9d76b2fca75a86f562fb6a1
|
|
| BLAKE2b-256 |
459951642363638f05ac7e6c40f21de56731914ea00ee54fb012d78fe56f60a2
|
File details
Details for the file schema_fusion_engine-0.16.0-py3-none-any.whl.
File metadata
- Download URL: schema_fusion_engine-0.16.0-py3-none-any.whl
- Upload date:
- Size: 7.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
761bfbeac9a96a172c1702c3f9e9f338a80486c5011966ae7618d7d9d999b0e7
|
|
| MD5 |
1d78f6e57c65c50212f98eaaebaaaa1e
|
|
| BLAKE2b-256 |
a1df7d51b4398014a628b85f2f6e65f9008f615942aedfef135b9090c14fd9c7
|