Skip to main content

Licensed Kafka to Neo4j Graph Ingestion Engine for turn-key graph building

Project description

KafkaGraph

Licensed Kafka to Neo4j ingestion SDK.

Overview

KafkaGraph is a production-grade SDK that ingests events from Kafka and materializes them into a Neo4j graph. It includes enterprise features such as signed license enforcement, machine binding, throughput and partition limits, feature gating, and pluggable mapping modes. It is suitable for embedding into customer systems and selling as a commercial SDK.

Problem It Solves

  • Converts high-volume Kafka event streams into navigable Neo4j graph structures without bespoke pipelines.
  • Provides a consistent ingestion engine with batching, error safety, and transactional commits.
  • Enforces enterprise-grade licensing, rate limits, and partition caps to align with commercial agreements.
  • Offers simple-to-extend mapping modes to transform JSON events to nodes and relationships rapidly.

Features

  • License enforcement with signed license files and machine fingerprint binding.
  • API key mode for testing with default deterministic keys.
  • Feature gating: simple, autograph, sequence mappers.
  • Batching for efficient writes and offset commits.
  • Partition monitoring to enforce per-topic and total caps.
  • Extensible dispatcher for new mapping modes and features.

Install

pip install .

Usage

from kafkagraph import KafkaGraph

kg = KafkaGraph(
    license_file="license.json",
    kafka_config={"brokers": ["localhost:9092"], "group_id": "kafkagraph"},
    neo4j_config={"uri": "bolt://localhost:7687", "user": "neo4j", "password": "pass"},
    topics_config_path="topics.yaml",
    batch_size=500
)

kg.start()

Topics Configuration

Provide a topics.yaml that declares how events should be mapped:

orders:
  mode: simple
  nodes:
    order:
      label: Order
      id: orderId
    customer:
      label: Customer
      id: customerId
  relationships:
    - type: PLACED_BY
      from: order
      to: customer
      properties: [createdAt, source]

profiles:
  mode: sequence
  base:
    label: User
    id: userId
  sequences:
    - field: devices
      label: Device
      id_field: deviceId
      type: OWNS
      properties: [model, os]

License Mode (Enterprise)

Provide a signed license JSON and set PUBLIC_KEY_B64 in kafkagraph/license/signed.py. The engine validates signature, machine binding, expiry, and enforces limits.

kg = KafkaGraph(
  license_file="license.json",
  kafka_config={...},
  neo4j_config={...},
  topics_config_path="topics.yaml"
)

API Key Mode (Test)

Set API keys via environment or a file. If none are provided, 10 deterministic test keys are available.

Environment:

export KAFKAGRAPH_API_KEYS="key1,key2,key3"
# or from a file with one key per line
export KAFKAGRAPH_API_KEYS_FILE=/path/to/apikeys.txt

Use with API key:

from kafkagraph import KafkaGraph

kg = KafkaGraph(
    api_key="your_api_key",
    kafka_config={"brokers": ["localhost:9092"], "group_id": "kafkagraph"},
    neo4j_config={"uri": "bolt://localhost:7687", "user": "neo4j", "password": "pass"},
    topics_config_path="topics.yaml",
    batch_size=500
)

kg.start()

Default test keys can be generated programmatically:

from kafkagraph.license.api_keys import default_test_keys
print(default_test_keys())  # 10 keys for local testing

Neo4j Write Semantics

  • Nodes are merged by label and id.
  • Relationships are merged by type and endpoints, then properties are set from each event batch.
  • Writes happen when batch_size is reached, followed by consumer offset commits.

Extensibility

  • Add new mapping modes under kafkagraph/mappers/ and extend core/dispatcher.py.
  • Adjust license limits and feature flags per enterprise tier in license manager classes.
  • Swap batching strategy (size/time) by modifying core/batcher.py.

Security

  • Do not hardcode private keys. PUBLIC_KEY_B64 must be provided securely in deployments.
  • API keys can be loaded via environment or file; rotate keys as needed.

Author

Siddhappa Birajdar
Software Developer @FirstCry.com | Ex-Founder & CEO @Punarvspace Inc | Entrepreneurship | Python, Django, Flask, FastAPI, AI/ML, Generative AI, Quantum Computing, Blockchain | AWS Certified Solution Architect Associate
LinkedIn: https://www.linkedin.com/in/siddhappabirajdar

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafkagraph-0.1.2.tar.gz (11.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafkagraph-0.1.2-py3-none-any.whl (13.0 kB view details)

Uploaded Python 3

File details

Details for the file kafkagraph-0.1.2.tar.gz.

File metadata

  • Download URL: kafkagraph-0.1.2.tar.gz
  • Upload date:
  • Size: 11.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for kafkagraph-0.1.2.tar.gz
Algorithm Hash digest
SHA256 6aa55e86b228938c06bd89c056308411945f5b8c14bfa6ab7881c4c1fa6f7e31
MD5 606e321a75c9d1202707a8f54dd2a8b4
BLAKE2b-256 76e896ec2d59bbfed8874e2263cfd1520f886b204250023a34e6a0c329da3714

See more details on using hashes here.

File details

Details for the file kafkagraph-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: kafkagraph-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 13.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for kafkagraph-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8dc5ad7d02c1fc3f9002239a490c56b7688c6677d34b87c9aabe0ff9cf33cf64
MD5 0234429d98e1f731dc62fa5349b5955c
BLAKE2b-256 f3cadd95b030fbd02d7494c42c864898e6a7ef9df86cfd1937f530a703aa2c53

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page