Skip to main content

A framework for transforming tabular (CSV, SQL) and hierarchical data (JSON, XML) into property graphs and ingesting them into graph databases (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph). Features automatic PostgreSQL schema inference.

Project description

GraFlo graflo logo

A Graph Schema & Transformation Language (GSTL) for Labeled Property Graphs (LPG).

GraFlo provides a declarative, database-agnostic specification for mapping heterogeneous data sources — tabular (CSV, SQL), hierarchical (JSON, XML), and RDF/SPARQL — to a unified LPG representation and ingesting it into ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, or NebulaGraph.

Package Renamed: This package was formerly known as graphcast.

Python PyPI version PyPI Downloads License: BSL pre-commit DOI

Overview

GraFlo separates what the graph looks like from where data comes from and which database stores it.

%%{ init: { 
  "theme": "base",
  "themeVariables": {
    "primaryColor": "#90CAF9",
    "primaryTextColor": "#111111",
    "primaryBorderColor": "#1E88E5",
    "lineColor": "#546E7A",
    "secondaryColor": "#A5D6A7",
    "tertiaryColor": "#CE93D8"
  }
} }%%

flowchart LR
    SI["<b>Source Instance</b><br/>File · SQL · SPARQL · API"]
    R["<b>Resource</b><br/>Actor Pipeline"]
    GS["<b>Logical Graph Schema</b><br/>Vertex/Edge Definitions<br/>Identities · DB Profile"]
    DBA["<b>DB-aware Projection</b><br/>DatabaseProfile<br/>VertexConfigDBAware · EdgeConfigDBAware"]
    GC["<b>GraphContainer</b><br/>Covariant Graph Representation"]
    DB["<b>Graph DB (LPG)</b><br/>ArangoDB · Neo4j · TigerGraph · Others"]

    SI --> R --> GS --> GC --> DBA --> DB

Source InstanceResourceLogical Graph SchemaCovariant Graph RepresentationDB-aware ProjectionGraph DB

Stage Role Code
Source Instance A concrete data artifact — a CSV file, a PostgreSQL table, a SPARQL endpoint, a .ttl file. AbstractDataSource subclasses (FileDataSource, SQLDataSource, SparqlEndpointDataSource, …) with a DataSourceType.
Resource A reusable transformation pipeline — actor steps (descend, transform, vertex, edge, vertex_router, edge_router) that map raw records to graph elements. Data sources bind to Resources by name via the DataSourceRegistry. Resource (part of IngestionModel).
Graph Schema Declarative logical vertex/edge definitions, identities, typed fields, and DB profile — defined in YAML or Python. Schema, VertexConfig, EdgeConfig.
Covariant Graph Representation A database-independent collection of vertices and edges. GraphContainer.
DB-aware Projection Resolves DB-specific naming/default/index behavior from logical schema + DatabaseProfile. Schema.resolve_db_aware(), VertexConfigDBAware, EdgeConfigDBAware.
Graph DB The target LPG store — same API for all supported databases. ConnectionManager, DBWriter, DB connectors.

Supported source types (DataSourceType)

DataSourceType Connector DataSource Schema inference
FILE — CSV / JSON / JSONL / Parquet FileConnector FileDataSource manual
SQL — PostgreSQL tables TableConnector SQLDataSource automatic (3NF with PK/FK)
SPARQL — RDF files (.ttl, .rdf, .n3) SparqlConnector RdfFileDataSource automatic (OWL/RDFS ontology)
SPARQL — SPARQL endpoints (Fuseki, …) SparqlConnector SparqlEndpointDataSource automatic (OWL/RDFS ontology)
API — REST APIs APIDataSource manual
IN_MEMORY — list / DataFrame InMemoryDataSource manual

Supported targets

ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph — same API for all.

Features

  • Declarative LPG schema — Define vertices, edges, vertex identity, secondary DB indexes, weights, and transforms in YAML or Python. The Schema is the single source of truth, independent of source or target.
  • Database abstraction — One logical schema, one API. Target ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, or NebulaGraph without rewriting pipelines. DB idiosyncrasies are handled in DB-aware projection (Schema.resolve_db_aware(...)) and connector/writer stages.
  • Resource abstraction — Each Resource defines a reusable actor pipeline (descend, transform, vertex, edge, plus VertexRouter and EdgeRouter for dynamic type-based routing) that maps raw records to graph elements. Data sources bind to Resources by name via the DataSourceRegistry, decoupling transformation logic from data retrieval.
  • SPARQL & RDF support — Query SPARQL endpoints (e.g. Apache Fuseki), read .ttl/.rdf/.n3 files, and auto-infer schemas from OWL/RDFS ontologies (rdflib and SPARQLWrapper ship with the default package).
  • Schema inference — Generate graph schemas from PostgreSQL 3NF databases (PK/FK heuristics) or from OWL/RDFS ontologies (owl:Class → vertices, owl:ObjectProperty → edges, owl:DatatypeProperty → vertex fields).
  • Typed fields — Vertex fields and edge weights carry types (INT, FLOAT, STRING, DATETIME, BOOL) for validation and database-specific optimisation.
  • Parallel batch processing — Configurable batch sizes and multi-core execution.

Documentation

Full documentation is available at: growgraph.github.io/graflo

Installation

pip install graflo

Optional extras (see pyproject.toml[project.optional-dependencies]):

  • dev — pytest, ty, pre-commit
  • docs — MkDocs stack for building the documentation site
  • plotpygraphviz for the plot_manifest CLI (install system Graphviz first)
pip install "graflo[dev]"
pip install "graflo[dev,docs,plot]"

Usage Examples

Simple ingest

from suthing import FileHandle

from graflo import Bindings, GraphManifest
from graflo.db.connection.onto import ArangoConfig

manifest = GraphManifest.from_config(FileHandle.load("schema.yaml"))
manifest.finish_init()
schema = manifest.require_schema()
ingestion_model = manifest.require_ingestion_model()

# Option 1: Load config from docker/arango/.env (recommended)
conn_conf = ArangoConfig.from_docker_env()

# Option 2: Load from environment variables
# Set: ARANGO_URI, ARANGO_USERNAME, ARANGO_PASSWORD, ARANGO_DATABASE
conn_conf = ArangoConfig.from_env()

# Option 3: Load with custom prefix (for multiple configs)
# Set: USER_ARANGO_URI, USER_ARANGO_USERNAME, USER_ARANGO_PASSWORD, USER_ARANGO_DATABASE
user_conn_conf = ArangoConfig.from_env(prefix="USER")

# Option 4: Create config directly
# conn_conf = ArangoConfig(
#     uri="http://localhost:8535",
#     username="root",
#     password="123",
#     database="mygraph",  # For ArangoDB, 'database' maps to schema/graph
# )
# Note: If 'database' (or 'schema_name' for TigerGraph) is not set,
# Caster will automatically use Schema.metadata.name as fallback

from graflo.architecture.bindings import FileConnector
import pathlib

# Create Bindings with file connectors
bindings = Bindings()
bindings.add_file_connector(
    "work",
    FileConnector(regex="\Sjson$", sub_path=pathlib.Path("./data"), resource_name="work")
)

# Or use resource_mapping for simpler initialization
# bindings = Bindings(
#     _resource_mapping={
#         "work": "./data/work.json",
#     }
# )

from graflo.hq.caster import IngestionParams
from graflo.hq import GraphEngine

# Option 1: Use GraphEngine for schema definition and ingestion (recommended)
engine = GraphEngine()
ingestion_params = IngestionParams(
    clear_data=False,
    # max_items=1000,  # Optional: limit number of items to process
    # batch_size=10000,  # Optional: customize batch size
)

ingest_manifest = manifest.model_copy(update={"bindings": bindings})
ingest_manifest.finish_init()

engine.define_and_ingest(
    manifest=ingest_manifest,
    target_db_config=conn_conf,  # Target database config
    ingestion_params=ingestion_params,
    recreate_schema=False,  # Set to True to drop and redefine schema (script halts if schema exists)
)

# Option 2: Use Caster directly (schema must be defined separately)
# from graflo.hq import GraphEngine
# engine = GraphEngine()
# engine.define_schema(manifest=manifest, target_db_config=conn_conf, recreate_schema=False)
# 
# caster = Caster(schema=schema, ingestion_model=ingestion_model)
# caster.ingest(
#     target_db_config=conn_conf,
#     bindings=bindings,
#     ingestion_params=ingestion_params,
# )

PostgreSQL Schema Inference

from graflo.hq import GraphEngine
from graflo.db.connection.onto import PostgresConfig, ArangoConfig
from graflo import Caster
from graflo.onto import DBType

# Connect to PostgreSQL
postgres_config = PostgresConfig.from_docker_env()  # or PostgresConfig.from_env()

# Create GraphEngine and infer schema from PostgreSQL 3NF database
# Connection is automatically managed inside infer_schema()
engine = GraphEngine(target_db_flavor=DBType.ARANGO)
manifest = engine.infer_manifest(
    postgres_config,
    schema_name="public",  # PostgreSQL schema name
)
schema = manifest.require_schema()
ingestion_model = manifest.require_ingestion_model()

# Define schema in target database (optional, can also use define_and_ingest)
target_config = ArangoConfig.from_docker_env()
engine.define_schema(
    manifest=manifest,
    target_db_config=target_config,
    recreate_schema=False,
)

# Use the inferred schema with Caster for ingestion
caster = Caster(schema=schema, ingestion_model=ingestion_model)
# ... continue with ingestion

RDF / SPARQL Ingestion

from pathlib import Path
from graflo.hq import GraphEngine
from graflo.db.connection.onto import ArangoConfig
from graflo.architecture.manifest import GraphManifest

engine = GraphEngine()

# Infer schema from an OWL/RDFS ontology file
ontology = Path("ontology.ttl")
schema, ingestion_model = engine.infer_schema_from_rdf(source=ontology)

# Create source bindings (reads a local .ttl file per rdf:Class)
bindings = engine.create_bindings_from_rdf(source=ontology)

# Or point at a SPARQL endpoint instead:
# from graflo.db.connection.onto import SparqlEndpointConfig
# sparql_cfg = SparqlEndpointConfig(uri="http://localhost:3030", dataset="mydata")
# bindings = engine.create_bindings_from_rdf(
#     source=ontology,
#     endpoint_url=sparql_cfg.query_endpoint,
# )

target = ArangoConfig.from_docker_env()
engine.define_and_ingest(
    manifest=GraphManifest(
        graph_schema=schema,
        ingestion_model=ingestion_model,
        bindings=bindings,
    ),
    target_db_config=target,
)

Development

To install requirements

git clone git@github.com:growgraph/graflo.git && cd graflo
uv sync --extra dev

Tests

Test databases

Quick Start: To start all test databases at once, use the convenience scripts from the docker folder:

cd docker
./start-all.sh    # Start all services
./stop-all.sh      # Stop all services
./cleanup-all.sh   # Remove containers and volumes

Individual Services: To start individual databases, navigate to each database folder and run:

Spin up Arango from arango docker folder by

docker-compose --env-file .env up arango

Neo4j from neo4j docker folder by

docker-compose --env-file .env up neo4j

TigerGraph from tigergraph docker folder by

docker-compose --env-file .env up tigergraph

FalkorDB from falkordb docker folder by

docker-compose --env-file .env up falkordb

Memgraph from memgraph docker folder by

docker-compose --env-file .env up memgraph

NebulaGraph from nebula docker folder by

docker-compose --env-file .env up

and Apache Fuseki from fuseki docker folder by

docker-compose --env-file .env up fuseki

To run unit tests

uv run pytest test

Note: Tests require external database containers (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph, Fuseki) to be running. CI builds intentionally skip test execution. Tests must be run locally with the required database images started (see Test databases section above). NebulaGraph tests are gated behind pytest --run-nebula.

Requirements

  • Python 3.11+ (Python 3.11 and 3.12 are officially supported)
  • python-arango
  • nebula3-python>=3.8.3 (NebulaGraph v3.x support)
  • nebula5-python>=5.2.1 (NebulaGraph v5.x support)
  • sqlalchemy>=2.0.0 (for PostgreSQL and SQL data sources)
  • rdflib>=7.0.0 + SPARQLWrapper>=2.0.0 (included in the default install)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

graflo-1.7.5.tar.gz (269.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

graflo-1.7.5-py3-none-any.whl (333.1 kB view details)

Uploaded Python 3

File details

Details for the file graflo-1.7.5.tar.gz.

File metadata

  • Download URL: graflo-1.7.5.tar.gz
  • Upload date:
  • Size: 269.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for graflo-1.7.5.tar.gz
Algorithm Hash digest
SHA256 0604de5f44c565b9429a221ece0b85a4846321abe8418ebde7e926ecea585672
MD5 2a8bf083e1e6d593f643b4c588485e92
BLAKE2b-256 2e4ad78b19d76d92758738a2051128200facbc8a3e8288ab2fd905475d0ac28e

See more details on using hashes here.

File details

Details for the file graflo-1.7.5-py3-none-any.whl.

File metadata

  • Download URL: graflo-1.7.5-py3-none-any.whl
  • Upload date:
  • Size: 333.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.28 {"installer":{"name":"uv","version":"0.9.28","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for graflo-1.7.5-py3-none-any.whl
Algorithm Hash digest
SHA256 8ee327cc678e77fa126fcbd7794ea87f49cc05f06f88389fe69ea5b6459af8e0
MD5 9824e1c40e2c388418c13e04c0b22f68
BLAKE2b-256 2a7d148c9c9275443bc8b38321dabf75a7acda762379bf16a3a95d8618dd0864

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page