Skip to main content

Middleware implementation of EasyBDI 2.0 architecture for data virtualization and federation

Project description

SchemaFusion Engine

A middleware implementation of the EasyBDI 2.0 architecture, focusing on Data Virtualization and Federation. SchemaFusion provides a headless interface for orchestrating distributed queries across heterogeneous data sources using Trino as the query engine.

Architecture

SchemaFusion acts as a middleware layer that:

  • Federates queries across multiple data sources (PostgreSQL, MongoDB, CSV files, etc.)
  • Virtualizes data access through Trino's distributed query engine
  • Orchestrates query planning and execution via Python-based FastAPI middleware
  • Manages configuration through a Typer-based CLI

See Architecture Documentation for detailed architecture diagrams and component descriptions.

Components

Infrastructure (Docker)

The project uses Docker Compose to orchestrate:

  • Trino (port 8080): Distributed query engine
  • PostgreSQL (port 5432): Relational data source
  • MongoDB (port 27017): NoSQL data source
  • Kafka (port 9092): Streaming data source
  • CSV Files: CSV files imported into PostgreSQL (example files in docker/trino/csv-data/)
  • Google Sheets: Query Google Sheets as tables (requires OAuth2 credentials)
  • Redis (port 6379): Caching layer

Middleware (Python)

  • FastAPI Application (src/main.py): Headless REST API interface
  • Typer CLI (src/cli.py): Configuration management interface
  • Verification Script (scripts/check_fusion.py): Validates Trino connectivity and catalog visibility

Quick Start

Prerequisites

  • Docker and Docker Compose
  • Python 3.11+
  • pip

Setup

  1. Start the infrastructure:

    docker compose up -d
    
  2. Install Python dependencies:

    pip install -r requirements.txt
    
  3. Verify connectivity:

    python scripts/check_fusion.py
    

    This script connects to Trino and runs SHOW CATALOGS to verify that all configured connectors (PostgreSQL, MongoDB, Kafka, etc.) are visible.

  4. Start the FastAPI server:

    uvicorn src.main:app --reload
    

    The API will be available at http://localhost:8000

    • Interactive API docs: http://localhost:8000/docs
    • OpenAPI schema: http://localhost:8000/openapi.json
  5. Use the CLI:

    python -m src.cli check
    python -m src.cli version
    

Project Structure

schema-fusion-engine/
├── docker/
├── docker-compose.yml      # Infrastructure orchestration
│   └── trino/
│       ├── catalog/
│       │   ├── postgres.properties # Trino PostgreSQL connector config
│       │   ├── mongo.properties    # Trino MongoDB connector config
│       │   └── csv.properties      # Trino CSV connector config
│       └── csv-data/              # CSV files directory
├── src/
│   ├── main.py                 # FastAPI headless API
│   ├── cli.py                  # Typer CLI configuration manager
│   ├── api/                    # API modules
│   ├── cli/                    # CLI modules
│   └── core/                   # Core business logic
├── docs/                       # Documentation
│   └── architecture.md         # Architecture diagrams and design
├── scripts/
│   └── check_fusion.py         # Trino connectivity verification
├── tests/                      # Test suite
├── requirements.txt            # Python dependencies
└── README.md                   # This file

API Reference

Query Execution

Execute SQL queries across federated data sources:

curl -X POST "http://localhost:8000/fusion/query" \
  -H "Content-Type: application/json" \
  -d '{
    "query": "SELECT * FROM postgres.public.users LIMIT 10",
    "catalog": "postgres",
    "schema": "public",
    "max_rows": 100
  }'

Response:

{
  "query": "SELECT * FROM postgres.public.users LIMIT 10",
  "columns": ["id", "name", "email"],
  "rows": [[1, "Alice", "alice@example.com"], [2, "Bob", "bob@example.com"]],
  "row_count": 2,
  "execution_time_ms": 45.23,
  "error": null
}

Schema Discovery

Discover available catalogs, schemas, and tables:

# List catalogs
curl "http://localhost:8000/fusion/catalogs"

# List schemas
curl "http://localhost:8000/fusion/catalogs/postgres/schemas"

# List tables
curl "http://localhost:8000/fusion/catalogs/postgres/schemas/public/tables"

# Get table info
curl "http://localhost:8000/fusion/catalogs/postgres/schemas/public/tables/users"

Schema Matching

Automatically match columns between tables:

curl -X POST "http://localhost:8000/fusion/match" \
  -H "Content-Type: application/json" \
  -d '{
    "source_catalog": "postgres",
    "source_schema": "public",
    "source_table": "users",
    "target_catalog": "mongo",
    "target_schema": "testdb",
    "target_table": "customers",
    "threshold": 0.8
  }'

Response:

{
  "source": {
    "catalog": "postgres",
    "schema": "public",
    "table": "users",
    "columns": ["id", "name", "email"]
  },
  "target": {
    "catalog": "mongo",
    "schema": "testdb",
    "table": "customers",
    "columns": ["_id", "name", "email"]
  },
  "matches": [
    {"source_col": "name", "target_col": "name", "confidence": 1.0},
    {"source_col": "email", "target_col": "email", "confidence": 1.0}
  ],
  "match_count": 2,
  "threshold": 0.8
}

Fusion Views

Create federated views that combine data from multiple sources:

curl -X POST "http://localhost:8000/fusion/create-view" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "global_customers",
    "source_a": {"catalog": "postgres", "schema": "public", "table": "users"},
    "source_b": {"catalog": "mongo", "schema": "testdb", "table": "customers"},
    "matches": [
      {"source_col": "name", "target_col": "name", "confidence": 1.0},
      {"source_col": "email", "target_col": "email", "confidence": 1.0}
    ],
    "join_key_a": "id",
    "join_key_b": "_id"
  }'

Response:

{
  "status": "success",
  "message": "View created successfully",
  "view_name": "global_customers",
  "sql": "CREATE OR REPLACE VIEW memory.default.global_customers AS ..."
}

List views:

curl "http://localhost:8000/fusion/views?catalog=memory&schema=default"

Delete view:

curl -X DELETE "http://localhost:8000/fusion/views/global_customers?catalog=memory&schema=default"

Multi-Source Fusion (3+ Sources)

Create fusion views that combine data from 3 or more sources:

curl -X POST "http://localhost:8000/fusion/create-multi-view" \
  -H "X-API-Key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "unified_customers",
    "sources": [
      {"catalog": "postgres", "schema": "public", "table": "users", "alias": "p"},
      {"catalog": "mongo", "schema": "testdb", "table": "customers", "alias": "m"},
      {"catalog": "postgres", "schema": "public", "table": "clients", "alias": "c"}
    ],
    "matches": [
      {
        "global": "customer_id",
        "mappings": [
          {"source": "p", "column": "id"},
          {"source": "m", "column": "_id"},
          {"source": "c", "column": "client_id"}
        ]
      },
      {
        "global": "name",
        "mappings": [
          {"source": "p", "column": "name"},
          {"source": "m", "column": "full_name"},
          {"source": "c", "column": "name"}
        ]
      }
    ],
    "fusion_type": "join",
    "join_keys": [
      {"source": "p", "column": "id"},
      {"source": "m", "column": "_id"},
      {"source": "c", "column": "client_id"}
    ]
  }'

Response:

{
  "status": "success",
  "message": "View created successfully",
  "view_name": "unified_customers",
  "sql": "CREATE OR REPLACE VIEW memory.default.unified_customers AS ..."
}

For UNION ALL (horizontal partitioning):

curl -X POST "http://localhost:8000/fusion/create-multi-view" \
  -H "X-API-Key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "all_customers",
    "sources": [
      {"catalog": "postgres", "schema": "public", "table": "customers", "alias": "p"},
      {"catalog": "mongo", "schema": "testdb", "table": "clients", "alias": "m"}
    ],
    "matches": [
      {
        "global": "id",
        "mappings": [
          {"source": "p", "column": "id"},
          {"source": "m", "column": "_id"}
        ]
      }
    ],
    "fusion_type": "union",
    "enable_type_coercion": true
  }'

See Multi-Source Fusion Guide for detailed documentation.

Health Check

curl "http://localhost:8000/health"

Configuration

Trino Connectors

Trino connectors are configured in docker/trino/catalog/:

  • PostgreSQL: Connects to postgres:5432 using JDBC
  • MongoDB: Connects to mongo:27017 using MongoDB native protocol
  • Kafka: Connects to kafka:29092 for streaming data (see Kafka Setup)
  • Google Sheets: Query Google Sheets as tables (requires OAuth2, see Google Sheets Setup)
  • CSV: Import CSV files into PostgreSQL (see CSV Support)

Environment Variables

Configuration is managed via environment variables or a .env file. See Configuration Guide for all available options.

Quick reference:

  • TRINO_HOST - Trino coordinator host (default: localhost)
  • TRINO_PORT - Trino coordinator port (default: 8080)
  • TRINO_USER - Trino user (default: schemafusion)
  • LOG_LEVEL - Logging level (default: INFO)
  • LOG_FORMAT - Log format: text or json (default: text)
  • VIEW_CATALOG - Catalog for fusion views (default: memory)
  • VIEW_SCHEMA - Schema for fusion views (default: default)

Development

Running Tests

pytest tests/
pytest tests/ --cov=src --cov-report=html

Code Quality

# Linting
ruff check src/
ruff check src/ --fix

# Formatting
black src/ tests/

# Type checking
mypy src/

# Pre-commit hooks (runs automatically on commit)
pre-commit run --all-files

Documentation

References

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

schema_fusion_engine-0.13.0.tar.gz (20.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

schema_fusion_engine-0.13.0-py3-none-any.whl (7.3 kB view details)

Uploaded Python 3

File details

Details for the file schema_fusion_engine-0.13.0.tar.gz.

File metadata

  • Download URL: schema_fusion_engine-0.13.0.tar.gz
  • Upload date:
  • Size: 20.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for schema_fusion_engine-0.13.0.tar.gz
Algorithm Hash digest
SHA256 0ba2d93b7dcfd2299c91ad1e80346472a0e31cebeccab3a95d096d935b12c257
MD5 30674dba31d0d9aa410f4a5862a6e7e6
BLAKE2b-256 36771c73ad3540f609e5ebcd59b3fe7789c87530911fb5f2daadb66288f4c1c2

See more details on using hashes here.

File details

Details for the file schema_fusion_engine-0.13.0-py3-none-any.whl.

File metadata

File hashes

Hashes for schema_fusion_engine-0.13.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e9a1f28bfc884182a83d4c1d03664f727dccd5c9072d18810f67c417746b840c
MD5 29bc0f50607d560fc1b3ee0a08d2d2f1
BLAKE2b-256 709b148318584c3a7449488bebc1225f0aef5ec0ed50be069c36072f15d5c373

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page