Skip to main content

Middleware implementation of EasyBDI 2.0 architecture for data virtualization and federation

Project description

SchemaFusion Engine

A middleware implementation of the EasyBDI 2.0 architecture, focusing on Data Virtualization and Federation. SchemaFusion provides a headless interface for orchestrating distributed queries across heterogeneous data sources using Trino as the query engine.

Architecture

SchemaFusion acts as a middleware layer that:

  • Federates queries across multiple data sources (PostgreSQL, MongoDB, CSV files, etc.)
  • Virtualizes data access through Trino's distributed query engine
  • Orchestrates query planning and execution via Python-based FastAPI middleware
  • Manages configuration through a Typer-based CLI

See Architecture Documentation for detailed architecture diagrams and component descriptions.

Components

Infrastructure (Docker)

The project uses Docker Compose to orchestrate:

  • Trino (port 8080): Distributed query engine
  • PostgreSQL (port 5432): Relational data source
  • MongoDB (port 27017): NoSQL data source
  • Kafka (port 9092): Streaming data source
  • CSV Files: CSV files imported into PostgreSQL (example files in docker/trino/csv-data/)
  • Google Sheets: Query Google Sheets as tables (requires OAuth2 credentials)
  • Redis (port 6379): Caching layer
  • Gateway (port 80): Single entrypoint reverse proxy that serves the UI and forwards /api requests to FastAPI

Middleware (Python)

  • FastAPI Application (src/main.py): Headless REST API interface
  • Typer CLI (src/cli.py): Configuration management interface
  • Verification Script (scripts/check_fusion.py): Validates Trino connectivity and catalog visibility

Quick Start

Prerequisites

  • Docker and Docker Compose
  • Python 3.11+
  • pip

Setup

  1. Start the infrastructure:

    docker compose up -d
    

Once the stack is healthy, the gateway exposes the UI at http://localhost/ and proxies all /api/* calls to FastAPI, so you no longer need to remember separate ports in the browser. (The UI container remains reachable on http://localhost:3000/ if you prefer to bypass the proxy while developing.)

  1. Install Python dependencies:

    pip install -r requirements.txt
    
  2. Verify connectivity:

    python scripts/check_fusion.py
    

    This script connects to Trino and runs SHOW CATALOGS to verify that all configured connectors (PostgreSQL, MongoDB, Kafka, etc.) are visible.

  3. Start the FastAPI server:

    uvicorn src.main:app --reload
    

    The API will be available at http://localhost:8000

    • Interactive API docs: http://localhost:8000/docs
    • OpenAPI schema: http://localhost:8000/openapi.json
  4. Use the CLI:

    python -m src.cli check
    python -m src.cli version
    

Project Structure

schema-fusion-engine/
├── docker/
├── docker-compose.yml      # Infrastructure orchestration
│   └── trino/
│       ├── catalog/
│       │   ├── postgres.properties # Trino PostgreSQL connector config
│       │   ├── mongo.properties    # Trino MongoDB connector config
│       │   └── csv.properties      # Trino CSV connector config
│       └── csv-data/              # CSV files directory
├── src/
│   ├── main.py                 # FastAPI headless API
│   ├── cli.py                  # Typer CLI configuration manager
│   ├── api/                    # API modules
│   ├── cli/                    # CLI modules
│   └── core/                   # Core business logic
├── docs/                       # Documentation
│   └── architecture.md         # Architecture diagrams and design
├── scripts/
│   └── check_fusion.py         # Trino connectivity verification
├── tests/                      # Test suite
├── requirements.txt            # Python dependencies
└── README.md                   # This file

API Reference

Query Execution

Execute SQL queries across federated data sources:

curl -X POST "http://localhost:8000/fusion/query" \
  -H "Content-Type: application/json" \
  -d '{
    "query": "SELECT * FROM postgres.public.users LIMIT 10",
    "catalog": "postgres",
    "schema": "public",
    "max_rows": 100
  }'

Response:

{
  "query": "SELECT * FROM postgres.public.users LIMIT 10",
  "columns": ["id", "name", "email"],
  "rows": [[1, "Alice", "alice@example.com"], [2, "Bob", "bob@example.com"]],
  "row_count": 2,
  "execution_time_ms": 45.23,
  "error": null
}

Schema Discovery

Discover available catalogs, schemas, and tables:

# List catalogs
curl "http://localhost:8000/fusion/catalogs"

# List schemas
curl "http://localhost:8000/fusion/catalogs/postgres/schemas"

# List tables
curl "http://localhost:8000/fusion/catalogs/postgres/schemas/public/tables"

# Get table info
curl "http://localhost:8000/fusion/catalogs/postgres/schemas/public/tables/users"

Schema Matching

Automatically match columns between tables:

curl -X POST "http://localhost:8000/fusion/match" \
  -H "Content-Type: application/json" \
  -d '{
    "source_catalog": "postgres",
    "source_schema": "public",
    "source_table": "users",
    "target_catalog": "mongo",
    "target_schema": "testdb",
    "target_table": "customers",
    "threshold": 0.8
  }'

Response:

{
  "source": {
    "catalog": "postgres",
    "schema": "public",
    "table": "users",
    "columns": ["id", "name", "email"]
  },
  "target": {
    "catalog": "mongo",
    "schema": "testdb",
    "table": "customers",
    "columns": ["_id", "name", "email"]
  },
  "matches": [
    {"source_col": "name", "target_col": "name", "confidence": 1.0},
    {"source_col": "email", "target_col": "email", "confidence": 1.0}
  ],
  "match_count": 2,
  "threshold": 0.8
}

Fusion Views

Create federated views that combine data from multiple sources:

curl -X POST "http://localhost:8000/fusion/create-view" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "global_customers",
    "source_a": {"catalog": "postgres", "schema": "public", "table": "users"},
    "source_b": {"catalog": "mongo", "schema": "testdb", "table": "customers"},
    "matches": [
      {"source_col": "name", "target_col": "name", "confidence": 1.0},
      {"source_col": "email", "target_col": "email", "confidence": 1.0}
    ],
    "join_key_a": "id",
    "join_key_b": "_id"
  }'

Response:

{
  "status": "success",
  "message": "View created successfully",
  "view_name": "global_customers",
  "sql": "CREATE OR REPLACE VIEW memory.default.global_customers AS ..."
}

List views:

curl "http://localhost:8000/fusion/views?catalog=memory&schema=default"

Delete view:

curl -X DELETE "http://localhost:8000/fusion/views/global_customers?catalog=memory&schema=default"

Multi-Source Fusion (3+ Sources)

Create fusion views that combine data from 3 or more sources:

curl -X POST "http://localhost:8000/fusion/create-multi-view" \
  -H "X-API-Key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "unified_customers",
    "sources": [
      {"catalog": "postgres", "schema": "public", "table": "users", "alias": "p"},
      {"catalog": "mongo", "schema": "testdb", "table": "customers", "alias": "m"},
      {"catalog": "postgres", "schema": "public", "table": "clients", "alias": "c"}
    ],
    "matches": [
      {
        "global": "customer_id",
        "mappings": [
          {"source": "p", "column": "id"},
          {"source": "m", "column": "_id"},
          {"source": "c", "column": "client_id"}
        ]
      },
      {
        "global": "name",
        "mappings": [
          {"source": "p", "column": "name"},
          {"source": "m", "column": "full_name"},
          {"source": "c", "column": "name"}
        ]
      }
    ],
    "fusion_type": "join",
    "join_keys": [
      {"source": "p", "column": "id"},
      {"source": "m", "column": "_id"},
      {"source": "c", "column": "client_id"}
    ]
  }'

Response:

{
  "status": "success",
  "message": "View created successfully",
  "view_name": "unified_customers",
  "sql": "CREATE OR REPLACE VIEW memory.default.unified_customers AS ..."
}

For UNION ALL (horizontal partitioning):

curl -X POST "http://localhost:8000/fusion/create-multi-view" \
  -H "X-API-Key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "all_customers",
    "sources": [
      {"catalog": "postgres", "schema": "public", "table": "customers", "alias": "p"},
      {"catalog": "mongo", "schema": "testdb", "table": "clients", "alias": "m"}
    ],
    "matches": [
      {
        "global": "id",
        "mappings": [
          {"source": "p", "column": "id"},
          {"source": "m", "column": "_id"}
        ]
      }
    ],
    "fusion_type": "union",
    "enable_type_coercion": true
  }'

See Multi-Source Fusion Guide for detailed documentation.

Health Check

curl "http://localhost:8000/health"

Configuration

Trino Connectors

Trino connectors are configured in docker/trino/catalog/:

  • PostgreSQL: Connects to postgres:5432 using JDBC
  • MongoDB: Connects to mongo:27017 using MongoDB native protocol
  • Kafka: Connects to kafka:29092 for streaming data (see Kafka Setup)
  • Google Sheets: Query Google Sheets as tables (requires OAuth2, see Google Sheets Setup)
  • CSV: Import CSV files into PostgreSQL (see CSV Support)

Environment Variables

Configuration is managed via environment variables or a .env file. See Configuration Guide for all available options.

Quick reference:

  • TRINO_HOST - Trino coordinator host (default: localhost)
  • TRINO_PORT - Trino coordinator port (default: 8080)
  • TRINO_USER - Trino user (default: schemafusion)
  • LOG_LEVEL - Logging level (default: INFO)
  • LOG_FORMAT - Log format: text or json (default: text)
  • VIEW_CATALOG - Catalog for fusion views (default: memory)
  • VIEW_SCHEMA - Schema for fusion views (default: default)

Development

Running Tests

pytest tests/
pytest tests/ --cov=src --cov-report=html

Code Quality

# Linting
ruff check src/
ruff check src/ --fix

# Formatting
black src/ tests/

# Type checking
mypy src/

# Pre-commit hooks (runs automatically on commit)
pre-commit run --all-files

Documentation

References

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

schema_fusion_engine-0.16.0.tar.gz (23.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

schema_fusion_engine-0.16.0-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file schema_fusion_engine-0.16.0.tar.gz.

File metadata

  • Download URL: schema_fusion_engine-0.16.0.tar.gz
  • Upload date:
  • Size: 23.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for schema_fusion_engine-0.16.0.tar.gz
Algorithm Hash digest
SHA256 ee79ef6217fe43a24396e5ec9efe573a08b3b1a5a5c4131b307529d209e26c25
MD5 b858cd28f9d76b2fca75a86f562fb6a1
BLAKE2b-256 459951642363638f05ac7e6c40f21de56731914ea00ee54fb012d78fe56f60a2

See more details on using hashes here.

File details

Details for the file schema_fusion_engine-0.16.0-py3-none-any.whl.

File metadata

File hashes

Hashes for schema_fusion_engine-0.16.0-py3-none-any.whl
Algorithm Hash digest
SHA256 761bfbeac9a96a172c1702c3f9e9f338a80486c5011966ae7618d7d9d999b0e7
MD5 1d78f6e57c65c50212f98eaaebaaaa1e
BLAKE2b-256 a1df7d51b4398014a628b85f2f6e65f9008f615942aedfef135b9090c14fd9c7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page