Skip to main content

Versioned schema & data migrations for neomodel Neo4j graphs

Project description

Crochet

Versioned schema & data migrations for neomodel Neo4j graphs.

Crochet is a Git-backed, migration-driven framework that makes neomodel-defined Neo4j graphs evolvable, auditable, and rollback-safe without relying on database introspection.

Problem It Solves

  • neomodel has no native schema diff or migration system
  • Neo4j is schemaless, so schema drift is silent
  • Data loading and schema evolution are often intertwined but unmanaged
  • Rollbacks are usually impossible or unsafe
  • Git history and database state frequently diverge

Crochet enforces alignment between neomodel code, data ingests, and the live graph.

Installation

pip install crochet-migration

With optional data loading support (file parsing, validation):

pip install "crochet-migration[data]"

With remote fetching from S3 or GCS:

pip install "crochet-migration[s3]"
pip install "crochet-migration[gcs]"

Install everything:

pip install "crochet-migration[all]"

For development:

pip install -e ".[dev]"

Quick Start

1. Initialize a project

crochet new-project --name my-graph

This creates:

my-graph/
  crochet.toml          # project config
  models/               # neomodel definitions
  migrations/           # migration files
  .crochet/ledger.db    # SQLite ledger

2. Create node and relationship models

crochet create-node Person
crochet create-relationship Friendship --rel-type FRIENDS_WITH

Each model gets an immutable __kgid__ identifier. Models can be renamed or moved across files without losing identity, because the __kgid__ is what Crochet tracks — not class names or file paths.

# models/person.py
from neomodel import StructuredNode, StringProperty, IntegerProperty

class Person(StructuredNode):
    __kgid__ = "person_v1"
    name = StringProperty(required=True, unique_index=True)
    age = IntegerProperty(index=True)

3. Create a migration

crochet create-migration "add person node"

Crochet snapshots the current schema IR, diffs it against the previous snapshot, and scaffolds a migration file with detected changes as comments:

# migrations/0001_add_person_node.py

revision_id = "0001_add_person_node"
parent_id = None
schema_hash = "a1b2c3..."
rollback_safe = True

def upgrade(ctx):
    ctx.add_unique_constraint("Person", "name")
    ctx.add_index("Person", "age")

def downgrade(ctx):
    ctx.drop_index("Person", "age")
    ctx.drop_unique_constraint("Person", "name")

4. Apply migrations

crochet upgrade              # apply all pending
crochet upgrade --dry-run    # preview without executing
crochet upgrade --target 0001_add_person_node  # apply up to a specific revision

5. Revert migrations

crochet downgrade            # revert the most recent migration
crochet downgrade --target 0001_add_person_node  # revert down to a target

Rollback-unsafe migrations will refuse to downgrade and raise an error.

6. Check status and verify

crochet status     # show applied/pending migrations, head, batches
crochet verify     # check ledger chain, file presence, schema hash consistency

Core Concepts

Intermediate Representation (IR)

neomodel files are parsed into an intermediate schema representation. IR snapshots can be hashed, serialized, and diffed. No Neo4j connection is required for schema comparison.

Hash-Chained Migrations

Migrations are ordered by a parent chain (Alembic-style). Each migration records the schema hash at the time it was created, so drift between code and migrations is detectable.

SQLite Ledger

A local SQLite database (.crochet/ledger.db) is the authoritative record of:

  • Applied migrations and their order
  • Dataset batches with file checksums and loader versions
  • Schema snapshots for diffing

Deterministic Data Ingest

Data loading is a first-class migration operation. The MigrationContext provides helpers for batch-tracked ingests:

def upgrade(ctx):
    batch_id = ctx.begin_batch()
    ctx.create_nodes("Person", [
        {"name": "Alice", "age": 30},
        {"name": "Bob", "age": 25},
    ])

Every node and relationship created through a batch is tagged with _crochet_batch, enabling delete-by-batch rollback.

Rollback Semantics

Rollbacks are explicitly declared, not assumed:

  • Append-only ingests support delete_nodes_by_batch / delete_relationships_by_batch
  • Destructive transforms must set rollback_safe = False
  • Unsafe downgrades are prevented by policy

Migration Context Operations

The MigrationContext passed to upgrade() and downgrade() provides:

Schema Operations

Operation Description
add_unique_constraint(label, prop) Create a uniqueness constraint
drop_unique_constraint(label, prop) Drop a uniqueness constraint
add_node_property_existence_constraint(label, prop) Create a NOT NULL constraint
drop_node_property_existence_constraint(label, prop) Drop a NOT NULL constraint
add_index(label, prop) Create an index
drop_index(label, prop) Drop an index
rename_label(old, new) Rename a node label
rename_relationship_type(old, new) Rename a relationship type
add_node_property(label, prop, default) Add a property with optional default
remove_node_property(label, prop) Remove a property
rename_node_property(label, old, new) Rename a property
run_cypher(cypher, params) Execute raw Cypher

Data Operations

Operation Description
begin_batch(batch_id) Start a tracked data batch
create_nodes(label, data) Batch-create nodes
create_relationships(src, tgt, type, data) Batch-create relationships
upsert_nodes(label, data, merge_keys) Create or update nodes using MERGE on specified keys
upsert_relationships(src, tgt, type, data) Create or update relationships using MERGE
delete_nodes_by_batch(label, batch_id) Delete nodes by batch
delete_relationships_by_batch(type, batch_id) Delete relationships by batch

Bulk Operations

For large datasets, chunked variants process data in configurable batches (default 5,000 rows):

Operation Description
bulk_create_nodes(label, data, chunk_size) Create nodes in chunked batches
bulk_upsert_nodes(label, data, merge_keys, chunk_size) Upsert nodes in chunked batches
bulk_create_relationships(src, tgt, type, data, chunk_size) Create relationships in chunked batches

Bulk operations support two batching strategies:

  • Client-side chunking (default): Sends multiple smaller transactions
  • Server-side batching: Uses CALL {} IN TRANSACTIONS OF N ROWS (Neo4j 4.4+) when use_call_in_transactions=True
def upgrade(ctx):
    batch_id = ctx.begin_batch()

    # Upsert nodes — existing nodes matched by merge_keys are updated
    ctx.upsert_nodes("Person", [
        {"name": "Alice", "age": 31},
        {"name": "Bob", "age": 26},
    ], merge_keys=["name"])

    # Bulk create for large datasets
    ctx.bulk_create_nodes("Event", large_event_list, chunk_size=10_000)

Configuration

crochet.toml:

[project]
name = "my-graph"
models_path = "models"
migrations_path = "migrations"

[neo4j]
uri = "bolt://localhost:7687"
username = "neo4j"

[ledger]
path = ".crochet/ledger.db"

Neo4j credentials can be overridden with environment variables:

  • CROCHET_NEO4J_URI
  • CROCHET_NEO4J_USERNAME
  • CROCHET_NEO4J_PASSWORD

Data Ingest

Crochet includes a data ingest pipeline for parsing, validating, and loading external data files into migrations. Install the data extra for file parsing support.

File Parsing

Parse CSV, TSV, JSON, JSON Lines, and Parquet files with automatic format and compression detection:

from crochet.ingest.parsers import parse_file, iter_batches

# Parse an entire file into records
result = parse_file("people.csv")
print(result.row_count, result.column_names)

# Memory-efficient batch iteration for large files
for batch in iter_batches("large_dataset.csv.gz", batch_size=10_000):
    ctx.bulk_create_nodes("Person", batch)

Supported formats: CSV, TSV, JSON, JSONL, Parquet

Transparent compression: gzip, bzip2, zstd, lz4, xz, snappy (auto-detected from file extension)

Data Validation

Validate records against declarative schemas before loading:

from crochet.ingest.validate import DataSchema, validate

schema = (
    DataSchema(strict=True, min_rows=1, unique_columns=["email"])
    .column("name", required=True, dtype="str", min_length=1)
    .column("email", required=True, pattern=r".+@.+\..+")
    .column("age", dtype="int", min_value=0, max_value=150)
)

result = validate(records, schema)
if not result.is_valid:
    print(result.summary())
    result.raise_on_errors()

Column rules support: required fields, type checking, numeric ranges, string length constraints, regex patterns, allowed value sets, and custom predicates.

Remote File Fetching

Fetch data files from HTTP, S3, or GCS with checksum verification and local caching:

from crochet.ingest.remote import RemoteSource, fetch_remote

source = RemoteSource(
    uri="https://example.com/data.csv.gz",
    expected_checksum="abc123...",
)
result = fetch_remote(source)
print(result.local_path, result.checksum)

Features:

  • Protocol registry: Built-in HTTP/HTTPS, S3 (s3://), and GCS (gs://) fetchers with support for custom protocols
  • SHA-256 verification: Checksum mismatch raises an error
  • Content-addressable cache: Files stored by checksum in .crochet/cache/
  • Atomic downloads: Write to temp file, then rename to prevent partial files

CLI Reference

Core Commands

Command Description
crochet new-project Initialize a new Crochet project
crochet create-node NAME Scaffold a StructuredNode model
crochet create-relationship NAME Scaffold a StructuredRel model
crochet create-migration DESC Create a new migration file
crochet upgrade Apply pending migrations
crochet downgrade Revert the most recent migration
crochet status Show migration status and data batches
crochet verify Run verification checks

Data Commands

Command Description
crochet load-data PATH Parse and preview a data file (CSV/TSV/JSON/Parquet)
crochet validate-data PATH Validate a data file against column rules
crochet fetch-data URI Fetch a remote file with checksum verification
crochet cache-clear Remove all cached remote data files
crochet cache-verify Verify integrity of all cached files

Design Principles

  • No hidden magic — all changes are explicit migration files
  • Code > database state — neomodel files are the source of truth
  • Determinism over convenience — schema IR is hashed and diffed
  • Rollback is a contract, not a guess — explicitly declared per migration
  • Git history and graph state must agree — ledger + hash chains enforce this

Development

pip install -e ".[dev]"
pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

crochet_migration-0.1.3.tar.gz (54.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

crochet_migration-0.1.3-py3-none-any.whl (47.6 kB view details)

Uploaded Python 3

File details

Details for the file crochet_migration-0.1.3.tar.gz.

File metadata

  • Download URL: crochet_migration-0.1.3.tar.gz
  • Upload date:
  • Size: 54.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for crochet_migration-0.1.3.tar.gz
Algorithm Hash digest
SHA256 2fefc76a7d03290a6388e12a35848673620a388c93f2f403b28951b231b88a40
MD5 deac1e1d677cdc4f36a84651dbcdf02a
BLAKE2b-256 626d9842926563d4926f8ddc48a9368d527a304f9da668de70d0fa398d5e3d4a

See more details on using hashes here.

Provenance

The following attestation bundles were made for crochet_migration-0.1.3.tar.gz:

Publisher: publish.yml on keshavd/crochet

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file crochet_migration-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for crochet_migration-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 e3e62bab33c879cec263f8958f68838f064591e2e60f70a560471f2d976b0877
MD5 80e67c9a9a5e3104107469844f627100
BLAKE2b-256 74e42c171dcb7cb075fe622280696c4f7e6fc57d87d177448063464416068fc9

See more details on using hashes here.

Provenance

The following attestation bundles were made for crochet_migration-0.1.3-py3-none-any.whl:

Publisher: publish.yml on keshavd/crochet

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page