Skip to main content

A framework for transforming tabular (CSV, SQL) and hierarchical data (JSON, XML) into property graphs and ingesting them into graph databases (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph). Features automatic PostgreSQL schema inference.

Project description

GraFlo graflo logo

A Graph Schema & Transformation Language (GSTL) for Labeled Property Graphs (LPG).

GraFlo provides a declarative, database-agnostic specification for mapping heterogeneous data sources — tabular (CSV, SQL), hierarchical (JSON, XML), and RDF/SPARQL — to a unified LPG representation and ingesting it into ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, or NebulaGraph.

Package Renamed: This package was formerly known as graphcast.

Python PyPI version PyPI Downloads License: BSL pre-commit DOI

Overview

GraFlo separates what the graph looks like from where data comes from and which database stores it.

%%{ init: { 
  "theme": "base",
  "themeVariables": {
    "primaryColor": "#90CAF9",
    "primaryTextColor": "#111111",
    "primaryBorderColor": "#1E88E5",
    "lineColor": "#546E7A",
    "secondaryColor": "#A5D6A7",
    "tertiaryColor": "#CE93D8"
  }
} }%%

flowchart LR
    SI["<b>Source Instance</b><br/>File · SQL · SPARQL · API"]
    R["<b>Resource</b><br/>Actor Pipeline"]
    GS["<b>Logical Graph Schema</b><br/>Vertex/Edge Definitions<br/>Identities · DB Profile"]
    DBA["<b>DB-aware Projection</b><br/>DatabaseProfile<br/>VertexConfigDBAware · EdgeConfigDBAware"]
    GC["<b>GraphContainer</b><br/>Covariant Graph Representation"]
    DB["<b>Graph DB (LPG)</b><br/>ArangoDB · Neo4j · TigerGraph · Others"]

    SI --> R --> GS --> GC --> DBA --> DB

Source InstanceResourceLogical Graph SchemaCovariant Graph RepresentationDB-aware ProjectionGraph DB

Stage Role Code
Source Instance A concrete data artifact — a CSV file, a PostgreSQL table, a SPARQL endpoint, a .ttl file. AbstractDataSource subclasses (FileDataSource, SQLDataSource, SparqlEndpointDataSource, …) with a DataSourceType.
Resource A reusable transformation pipeline — actor steps (descend, transform, vertex, edge, vertex_router, edge_router) that map raw records to graph elements. Data sources bind to Resources by name via the DataSourceRegistry. Resource (part of IngestionModel).
Graph Schema Declarative logical vertex/edge definitions, identities, typed fields, and DB profile — defined in YAML or Python. Schema, VertexConfig, EdgeConfig.
Covariant Graph Representation A database-independent collection of vertices and edges. GraphContainer.
DB-aware Projection Resolves DB-specific naming/default/index behavior from logical schema + DatabaseProfile. Schema.resolve_db_aware(), VertexConfigDBAware, EdgeConfigDBAware.
Graph DB The target LPG store — same API for all supported databases. ConnectionManager, DBWriter, DB connectors.

Supported source types (DataSourceType)

DataSourceType Connector DataSource Schema inference
FILE — CSV / JSON / JSONL / Parquet FileConnector FileDataSource manual
SQL — PostgreSQL tables TableConnector SQLDataSource automatic (3NF with PK/FK)
SPARQL — RDF files (.ttl, .rdf, .n3) SparqlConnector RdfFileDataSource automatic (OWL/RDFS ontology)
SPARQL — SPARQL endpoints (Fuseki, …) SparqlConnector SparqlEndpointDataSource automatic (OWL/RDFS ontology)
API — REST APIs APIDataSource manual
IN_MEMORY — list / DataFrame InMemoryDataSource manual

Supported targets

ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph — same API for all.

Features

  • Declarative LPG schema — Define vertices, edges, vertex identity, secondary DB indexes, weights, and transforms in YAML or Python. The Schema is the single source of truth, independent of source or target.
  • Database abstraction — One logical schema, one API. Target ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, or NebulaGraph without rewriting pipelines. DB idiosyncrasies are handled in DB-aware projection (Schema.resolve_db_aware(...)) and connector/writer stages.
  • Resource abstraction — Each Resource defines a reusable actor pipeline (descend, transform, vertex, edge, plus VertexRouter and EdgeRouter for dynamic type-based routing) that maps raw records to graph elements. Data sources bind to Resources by name via the DataSourceRegistry, decoupling transformation logic from data retrieval.
  • SPARQL & RDF support — Query SPARQL endpoints (e.g. Apache Fuseki), read .ttl/.rdf/.n3 files, and auto-infer schemas from OWL/RDFS ontologies (rdflib and SPARQLWrapper ship with the default package).
  • Schema inference — Generate graph schemas from PostgreSQL 3NF databases (PK/FK heuristics) or from OWL/RDFS ontologies (owl:Class → vertices, owl:ObjectProperty → edges, owl:DatatypeProperty → vertex fields).
  • Typed fields — Vertex fields and edge weights carry types (INT, FLOAT, STRING, DATETIME, BOOL) for validation and database-specific optimisation.
  • Parallel batch processing — Configurable batch sizes and multi-core execution.
  • Credential-free source contractsBindings.connector_connection maps each TableConnector / SparqlConnector (by name, hash, or resource alias) to a conn_proxy label. Manifests stay free of secrets; a runtime ConnectionProvider resolves each proxy to concrete GeneralizedConnConfig (for example PostgreSQL or SPARQL endpoint settings).

Documentation

Full documentation is available at: growgraph.github.io/graflo

Installation

pip install graflo

Optional extras (see pyproject.toml[project.optional-dependencies]):

  • dev — pytest, ty, pre-commit
  • docs — MkDocs stack for building the documentation site
  • plotpygraphviz for the plot_manifest CLI (install system Graphviz first)
pip install "graflo[dev]"
pip install "graflo[dev,docs,plot]"

Usage Examples

Simple ingest

from suthing import FileHandle

from graflo import Bindings, GraphManifest
from graflo.db.connection.onto import ArangoConfig

manifest = GraphManifest.from_config(FileHandle.load("schema.yaml"))
manifest.finish_init()
schema = manifest.require_schema()
ingestion_model = manifest.require_ingestion_model()

# Option 1: Load config from docker/arango/.env (recommended)
conn_conf = ArangoConfig.from_docker_env()

# Option 2: Load from environment variables
# Set: ARANGO_URI, ARANGO_USERNAME, ARANGO_PASSWORD, ARANGO_DATABASE
conn_conf = ArangoConfig.from_env()

# Option 3: Load with custom prefix (for multiple configs)
# Set: USER_ARANGO_URI, USER_ARANGO_USERNAME, USER_ARANGO_PASSWORD, USER_ARANGO_DATABASE
user_conn_conf = ArangoConfig.from_env(prefix="USER")

# Option 4: Create config directly
# conn_conf = ArangoConfig(
#     uri="http://localhost:8535",
#     username="root",
#     password="123",
#     database="mygraph",  # For ArangoDB, 'database' maps to schema/graph
# )
# Note: If 'database' (or 'schema_name' for TigerGraph) is not set,
# Caster will automatically use Schema.metadata.name as fallback

from graflo.architecture.contract.bindings import FileConnector
import pathlib

# Create Bindings with file connectors
bindings = Bindings()
work_connector = FileConnector(regex="\Sjson$", sub_path=pathlib.Path("./data"))
bindings.add_connector(
    work_connector,
)
bindings.bind_resource("work", work_connector)

# Or initialize via connectors + resource_connector
# bindings = Bindings(
#     connectors=[
#         FileConnector(
#             name="work_files",
#             regex="^work\\.json$",
#             sub_path=pathlib.Path("./data"),
#         )
#     ],
#     resource_connector=[{"resource": "work", "connector": "work_files"}],
#     # Optional: for SQL/SPARQL connectors, name a proxy; register secrets via ConnectionProvider.
#     # connector_connection=[{"connector": "work_files", "conn_proxy": "files_readonly"}],
# )

from graflo.hq.caster import IngestionParams
from graflo.hq import GraphEngine

# Option 1: Use GraphEngine for schema definition and ingestion (recommended)
engine = GraphEngine()
ingestion_params = IngestionParams(
    clear_data=False,
    # max_items=1000,  # Optional: limit number of items to process
    # batch_size=10000,  # Optional: customize batch size
)

ingest_manifest = manifest.model_copy(update={"bindings": bindings})
ingest_manifest.finish_init()

engine.define_and_ingest(
    manifest=ingest_manifest,
    target_db_config=conn_conf,  # Target database config
    ingestion_params=ingestion_params,
    recreate_schema=False,  # Set to True to drop and redefine schema (script halts if schema exists)
)

# Option 2: Use Caster directly (schema must be defined separately)
# from graflo.hq import GraphEngine
# engine = GraphEngine()
# engine.define_schema(manifest=manifest, target_db_config=conn_conf, recreate_schema=False)
# 
# caster = Caster(schema=schema, ingestion_model=ingestion_model)
# caster.ingest(
#     target_db_config=conn_conf,
#     bindings=bindings,
#     ingestion_params=ingestion_params,
# )

PostgreSQL Schema Inference

from graflo.hq import GraphEngine
from graflo.db.connection.onto import PostgresConfig, ArangoConfig
from graflo import Caster
from graflo.onto import DBType

# Connect to PostgreSQL
postgres_config = PostgresConfig.from_docker_env()  # or PostgresConfig.from_env()

# Create GraphEngine and infer schema from PostgreSQL 3NF database
# Connection is automatically managed inside infer_schema()
engine = GraphEngine(target_db_flavor=DBType.ARANGO)
manifest = engine.infer_manifest(
    postgres_config,
    schema_name="public",  # PostgreSQL schema name
)
schema = manifest.require_schema()
ingestion_model = manifest.require_ingestion_model()

# Define schema in target database (optional, can also use define_and_ingest)
target_config = ArangoConfig.from_docker_env()
engine.define_schema(
    manifest=manifest,
    target_db_config=target_config,
    recreate_schema=False,
)

# Use the inferred schema with Caster for ingestion
caster = Caster(schema=schema, ingestion_model=ingestion_model)
# ... continue with ingestion

RDF / SPARQL Ingestion

from pathlib import Path
from graflo.hq import GraphEngine
from graflo.db.connection.onto import ArangoConfig
from graflo.architecture.manifest import GraphManifest

engine = GraphEngine()

# Infer schema from an OWL/RDFS ontology file
ontology = Path("ontology.ttl")
schema, ingestion_model = engine.infer_schema_from_rdf(source=ontology)

# Create source bindings (reads a local .ttl file per rdf:Class)
bindings = engine.create_bindings_from_rdf(source=ontology)

# Or point at a SPARQL endpoint instead:
# from graflo.db.connection.onto import SparqlEndpointConfig
# sparql_cfg = SparqlEndpointConfig(uri="http://localhost:3030", dataset="mydata")
# bindings = engine.create_bindings_from_rdf(
#     source=ontology,
#     endpoint_url=sparql_cfg.query_endpoint,
# )

target = ArangoConfig.from_docker_env()
engine.define_and_ingest(
    manifest=GraphManifest(
        graph_schema=schema,
        ingestion_model=ingestion_model,
        bindings=bindings,
    ),
    target_db_config=target,
)

Development

To install requirements

git clone git@github.com:growgraph/graflo.git && cd graflo
uv sync --extra dev

Tests

Test databases

Quick Start: To start all test databases at once, use the convenience scripts from the docker folder:

cd docker
./start-all.sh    # Start all services
./stop-all.sh      # Stop all services
./cleanup-all.sh   # Remove containers and volumes

Individual Services: To start individual databases, navigate to each database folder and run:

Spin up Arango from arango docker folder by

docker-compose --env-file .env up arango

Neo4j from neo4j docker folder by

docker-compose --env-file .env up neo4j

TigerGraph from tigergraph docker folder by

docker-compose --env-file .env up tigergraph

FalkorDB from falkordb docker folder by

docker-compose --env-file .env up falkordb

Memgraph from memgraph docker folder by

docker-compose --env-file .env up memgraph

NebulaGraph from nebula docker folder by

docker-compose --env-file .env up

and Apache Fuseki from fuseki docker folder by

docker-compose --env-file .env up fuseki

To run unit tests

uv run pytest test

Note: Tests require external database containers (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph, Fuseki) to be running. CI builds intentionally skip test execution. Tests must be run locally with the required database images started (see Test databases section above). NebulaGraph tests are gated behind pytest --run-nebula.

Requirements

  • Python 3.11+ (Python 3.11 and 3.12 are officially supported)
  • python-arango
  • nebula3-python>=3.8.3 (NebulaGraph v3.x support)
  • nebula5-python>=5.2.1 (NebulaGraph v5.x support)
  • sqlalchemy>=2.0.0 (for PostgreSQL and SQL data sources)
  • rdflib>=7.0.0 + SPARQLWrapper>=2.0.0 (included in the default install)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

graflo-1.7.6.tar.gz (272.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

graflo-1.7.6-py3-none-any.whl (337.1 kB view details)

Uploaded Python 3

File details

Details for the file graflo-1.7.6.tar.gz.

File metadata

  • Download URL: graflo-1.7.6.tar.gz
  • Upload date:
  • Size: 272.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for graflo-1.7.6.tar.gz
Algorithm Hash digest
SHA256 cddba35312ac3f49633d9f53fadaa0c92101011bc389dcbf75e66672025e18c7
MD5 5c7ca5bd6ac43b1ded6a1b70ac9f9e0f
BLAKE2b-256 d88d561b67d1fc05bb8166ed636d2fabd89043fec5fc146c5a4bdc157ae734cd

See more details on using hashes here.

File details

Details for the file graflo-1.7.6-py3-none-any.whl.

File metadata

  • Download URL: graflo-1.7.6-py3-none-any.whl
  • Upload date:
  • Size: 337.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for graflo-1.7.6-py3-none-any.whl
Algorithm Hash digest
SHA256 6ea9d43819feebe0213ed7f3322b6e8489ceec01faacdba98b145c55186d8272
MD5 b0b023f5ea3985cc10d845ddc2651555
BLAKE2b-256 a7c6338210505b5ce9bd4e86210fb7b481366ac0018a6122cde632e3acc90f50

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page