Skip to main content

Middleware implementation of EasyBDI 2.0 architecture for data virtualization and federation

Project description

SchemaFusion Engine

A middleware implementation of the EasyBDI 2.0 architecture, focusing on Data Virtualization and Federation. SchemaFusion provides a headless interface for orchestrating distributed queries across heterogeneous data sources using Trino as the query engine.

Architecture

SchemaFusion acts as a middleware layer that:

  • Federates queries across multiple data sources (PostgreSQL, MongoDB, CSV files, etc.)
  • Virtualizes data access through Trino's distributed query engine
  • Orchestrates query planning and execution via Python-based FastAPI middleware
  • Manages configuration through a Typer-based CLI

See Architecture Documentation for detailed architecture diagrams and component descriptions.

Components

Infrastructure (Docker)

The project uses Docker Compose to orchestrate:

  • Trino (port 8080): Distributed query engine
  • PostgreSQL (port 5432): Relational data source
  • MongoDB (port 27017): NoSQL data source
  • Kafka (port 9092): Streaming data source
  • CSV Files: CSV files imported into PostgreSQL (example files in docker/trino/csv-data/)
  • Google Sheets: Query Google Sheets as tables (requires OAuth2 credentials)
  • Redis (port 6379): Caching layer

Middleware (Python)

  • FastAPI Application (src/main.py): Headless REST API interface
  • Typer CLI (src/cli.py): Configuration management interface
  • Verification Script (scripts/check_fusion.py): Validates Trino connectivity and catalog visibility

Quick Start

Prerequisites

  • Docker and Docker Compose
  • Python 3.11+
  • pip

Setup

  1. Start the infrastructure:

    docker compose up -d
    
  2. Install Python dependencies:

    pip install -r requirements.txt
    
  3. Verify connectivity:

    python scripts/check_fusion.py
    

    This script connects to Trino and runs SHOW CATALOGS to verify that all configured connectors (PostgreSQL, MongoDB, Kafka, etc.) are visible.

  4. Start the FastAPI server:

    uvicorn src.main:app --reload
    

    The API will be available at http://localhost:8000

    • Interactive API docs: http://localhost:8000/docs
    • OpenAPI schema: http://localhost:8000/openapi.json
  5. Use the CLI:

    python -m src.cli check
    python -m src.cli version
    

Project Structure

schema-fusion-engine/
├── docker/
├── docker-compose.yml      # Infrastructure orchestration
│   └── trino/
│       ├── catalog/
│       │   ├── postgres.properties # Trino PostgreSQL connector config
│       │   ├── mongo.properties    # Trino MongoDB connector config
│       │   └── csv.properties      # Trino CSV connector config
│       └── csv-data/              # CSV files directory
├── src/
│   ├── main.py                 # FastAPI headless API
│   ├── cli.py                  # Typer CLI configuration manager
│   ├── api/                    # API modules
│   ├── cli/                    # CLI modules
│   └── core/                   # Core business logic
├── docs/                       # Documentation
│   └── architecture.md         # Architecture diagrams and design
├── scripts/
│   └── check_fusion.py         # Trino connectivity verification
├── tests/                      # Test suite
├── requirements.txt            # Python dependencies
└── README.md                   # This file

API Reference

Query Execution

Execute SQL queries across federated data sources:

curl -X POST "http://localhost:8000/fusion/query" \
  -H "Content-Type: application/json" \
  -d '{
    "query": "SELECT * FROM postgres.public.users LIMIT 10",
    "catalog": "postgres",
    "schema": "public",
    "max_rows": 100
  }'

Response:

{
  "query": "SELECT * FROM postgres.public.users LIMIT 10",
  "columns": ["id", "name", "email"],
  "rows": [[1, "Alice", "alice@example.com"], [2, "Bob", "bob@example.com"]],
  "row_count": 2,
  "execution_time_ms": 45.23,
  "error": null
}

Schema Discovery

Discover available catalogs, schemas, and tables:

# List catalogs
curl "http://localhost:8000/fusion/catalogs"

# List schemas
curl "http://localhost:8000/fusion/catalogs/postgres/schemas"

# List tables
curl "http://localhost:8000/fusion/catalogs/postgres/schemas/public/tables"

# Get table info
curl "http://localhost:8000/fusion/catalogs/postgres/schemas/public/tables/users"

Schema Matching

Automatically match columns between tables:

curl -X POST "http://localhost:8000/fusion/match" \
  -H "Content-Type: application/json" \
  -d '{
    "source_catalog": "postgres",
    "source_schema": "public",
    "source_table": "users",
    "target_catalog": "mongo",
    "target_schema": "testdb",
    "target_table": "customers",
    "threshold": 0.8
  }'

Response:

{
  "source": {
    "catalog": "postgres",
    "schema": "public",
    "table": "users",
    "columns": ["id", "name", "email"]
  },
  "target": {
    "catalog": "mongo",
    "schema": "testdb",
    "table": "customers",
    "columns": ["_id", "name", "email"]
  },
  "matches": [
    {"source_col": "name", "target_col": "name", "confidence": 1.0},
    {"source_col": "email", "target_col": "email", "confidence": 1.0}
  ],
  "match_count": 2,
  "threshold": 0.8
}

Fusion Views

Create federated views that combine data from multiple sources:

curl -X POST "http://localhost:8000/fusion/create-view" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "global_customers",
    "source_a": {"catalog": "postgres", "schema": "public", "table": "users"},
    "source_b": {"catalog": "mongo", "schema": "testdb", "table": "customers"},
    "matches": [
      {"source_col": "name", "target_col": "name", "confidence": 1.0},
      {"source_col": "email", "target_col": "email", "confidence": 1.0}
    ],
    "join_key_a": "id",
    "join_key_b": "_id"
  }'

Response:

{
  "status": "success",
  "message": "View created successfully",
  "view_name": "global_customers",
  "sql": "CREATE OR REPLACE VIEW memory.default.global_customers AS ..."
}

List views:

curl "http://localhost:8000/fusion/views?catalog=memory&schema=default"

Delete view:

curl -X DELETE "http://localhost:8000/fusion/views/global_customers?catalog=memory&schema=default"

Multi-Source Fusion (3+ Sources)

Create fusion views that combine data from 3 or more sources:

curl -X POST "http://localhost:8000/fusion/create-multi-view" \
  -H "X-API-Key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "unified_customers",
    "sources": [
      {"catalog": "postgres", "schema": "public", "table": "users", "alias": "p"},
      {"catalog": "mongo", "schema": "testdb", "table": "customers", "alias": "m"},
      {"catalog": "postgres", "schema": "public", "table": "clients", "alias": "c"}
    ],
    "matches": [
      {
        "global": "customer_id",
        "mappings": [
          {"source": "p", "column": "id"},
          {"source": "m", "column": "_id"},
          {"source": "c", "column": "client_id"}
        ]
      },
      {
        "global": "name",
        "mappings": [
          {"source": "p", "column": "name"},
          {"source": "m", "column": "full_name"},
          {"source": "c", "column": "name"}
        ]
      }
    ],
    "fusion_type": "join",
    "join_keys": [
      {"source": "p", "column": "id"},
      {"source": "m", "column": "_id"},
      {"source": "c", "column": "client_id"}
    ]
  }'

Response:

{
  "status": "success",
  "message": "View created successfully",
  "view_name": "unified_customers",
  "sql": "CREATE OR REPLACE VIEW memory.default.unified_customers AS ..."
}

For UNION ALL (horizontal partitioning):

curl -X POST "http://localhost:8000/fusion/create-multi-view" \
  -H "X-API-Key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "view_name": "all_customers",
    "sources": [
      {"catalog": "postgres", "schema": "public", "table": "customers", "alias": "p"},
      {"catalog": "mongo", "schema": "testdb", "table": "clients", "alias": "m"}
    ],
    "matches": [
      {
        "global": "id",
        "mappings": [
          {"source": "p", "column": "id"},
          {"source": "m", "column": "_id"}
        ]
      }
    ],
    "fusion_type": "union",
    "enable_type_coercion": true
  }'

See Multi-Source Fusion Guide for detailed documentation.

Health Check

curl "http://localhost:8000/health"

Configuration

Trino Connectors

Trino connectors are configured in docker/trino/catalog/:

  • PostgreSQL: Connects to postgres:5432 using JDBC
  • MongoDB: Connects to mongo:27017 using MongoDB native protocol
  • Kafka: Connects to kafka:29092 for streaming data (see Kafka Setup)
  • Google Sheets: Query Google Sheets as tables (requires OAuth2, see Google Sheets Setup)
  • CSV: Import CSV files into PostgreSQL (see CSV Support)

Environment Variables

Configuration is managed via environment variables or a .env file. See Configuration Guide for all available options.

Quick reference:

  • TRINO_HOST - Trino coordinator host (default: localhost)
  • TRINO_PORT - Trino coordinator port (default: 8080)
  • TRINO_USER - Trino user (default: schemafusion)
  • LOG_LEVEL - Logging level (default: INFO)
  • LOG_FORMAT - Log format: text or json (default: text)
  • VIEW_CATALOG - Catalog for fusion views (default: memory)
  • VIEW_SCHEMA - Schema for fusion views (default: default)

Development

Running Tests

pytest tests/
pytest tests/ --cov=src --cov-report=html

Code Quality

# Linting
ruff check src/
ruff check src/ --fix

# Formatting
black src/ tests/

# Type checking
mypy src/

# Pre-commit hooks (runs automatically on commit)
pre-commit run --all-files

Documentation

References

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

schema_fusion_engine-0.15.0.tar.gz (20.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

schema_fusion_engine-0.15.0-py3-none-any.whl (7.3 kB view details)

Uploaded Python 3

File details

Details for the file schema_fusion_engine-0.15.0.tar.gz.

File metadata

  • Download URL: schema_fusion_engine-0.15.0.tar.gz
  • Upload date:
  • Size: 20.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for schema_fusion_engine-0.15.0.tar.gz
Algorithm Hash digest
SHA256 f10bb6b4b0dff7694c5ae1ec2b8040626452ca8d97af20dceb6d09f1c6eb904a
MD5 11aaee808486f60a779344b3b52160bb
BLAKE2b-256 415788db2f244a95cbea59e02111f51a50c1e46425c96b50c6ceb4c1fa6d068b

See more details on using hashes here.

File details

Details for the file schema_fusion_engine-0.15.0-py3-none-any.whl.

File metadata

File hashes

Hashes for schema_fusion_engine-0.15.0-py3-none-any.whl
Algorithm Hash digest
SHA256 288e6973ef3cdee7746474b2e213777423ab703cdd43c9da416e4ba1f47fadbd
MD5 cd2727be594107046aa996b9de9a247f
BLAKE2b-256 430f209eb87c329bdce2cdfda873f514c5023ad44d1e3bba0749026219e55b2f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page