Licensed Kafka to Neo4j Graph Ingestion Engine for turn-key graph building
Project description
KafkaGraph
Licensed Kafka to Neo4j ingestion SDK.
Overview
KafkaGraph is a production-grade SDK that ingests events from Kafka and materializes them into a Neo4j graph. It includes enterprise features such as signed license enforcement, machine binding, throughput and partition limits, feature gating, and pluggable mapping modes. It is suitable for embedding into customer systems and selling as a commercial SDK.
Problem It Solves
- Converts high-volume Kafka event streams into navigable Neo4j graph structures without bespoke pipelines.
- Provides a consistent ingestion engine with batching, error safety, and transactional commits.
- Enforces enterprise-grade licensing, rate limits, and partition caps to align with commercial agreements.
- Offers simple-to-extend mapping modes to transform JSON events to nodes and relationships rapidly.
Features
- License enforcement with signed license files and machine fingerprint binding.
- API key mode for testing with default deterministic keys.
- Feature gating:
simple,autograph,sequencemappers. - Batching for efficient writes and offset commits.
- Partition monitoring to enforce per-topic and total caps.
- Extensible dispatcher for new mapping modes and features.
Install
pip install .
Usage
from kafkagraph import KafkaGraph
kg = KafkaGraph(
license_file="license.json",
kafka_config={"brokers": ["localhost:9092"], "group_id": "kafkagraph"},
neo4j_config={"uri": "bolt://localhost:7687", "user": "neo4j", "password": "pass"},
topics_config_path="topics.yaml",
batch_size=500
)
kg.start()
Topics Configuration
Provide a topics.yaml that declares how events should be mapped:
orders:
mode: simple
nodes:
order:
label: Order
id: orderId
customer:
label: Customer
id: customerId
relationships:
- type: PLACED_BY
from: order
to: customer
properties: [createdAt, source]
profiles:
mode: sequence
base:
label: User
id: userId
sequences:
- field: devices
label: Device
id_field: deviceId
type: OWNS
properties: [model, os]
License Mode (Enterprise)
Provide a signed license JSON and set PUBLIC_KEY_B64 in kafkagraph/license/signed.py. The engine validates signature, machine binding, expiry, and enforces limits.
kg = KafkaGraph(
license_file="license.json",
kafka_config={...},
neo4j_config={...},
topics_config_path="topics.yaml"
)
API Key Mode (Test)
Set API keys via environment or a file. If none are provided, 10 deterministic test keys are available.
Environment:
export KAFKAGRAPH_API_KEYS="key1,key2,key3"
# or from a file with one key per line
export KAFKAGRAPH_API_KEYS_FILE=/path/to/apikeys.txt
Use with API key:
from kafkagraph import KafkaGraph
kg = KafkaGraph(
api_key="your_api_key",
kafka_config={"brokers": ["localhost:9092"], "group_id": "kafkagraph"},
neo4j_config={"uri": "bolt://localhost:7687", "user": "neo4j", "password": "pass"},
topics_config_path="topics.yaml",
batch_size=500
)
kg.start()
Default test keys can be generated programmatically:
from kafkagraph.license.api_keys import default_test_keys
print(default_test_keys()) # 10 keys for local testing
Neo4j Write Semantics
- Nodes are merged by
labelandid. - Relationships are merged by
typeand endpoints, then properties are set from each event batch. - Writes happen when
batch_sizeis reached, followed by consumer offset commits.
Extensibility
- Add new mapping modes under
kafkagraph/mappers/and extendcore/dispatcher.py. - Adjust license limits and feature flags per enterprise tier in license manager classes.
- Swap batching strategy (size/time) by modifying
core/batcher.py.
Security
- Do not hardcode private keys.
PUBLIC_KEY_B64must be provided securely in deployments. - API keys can be loaded via environment or file; rotate keys as needed.
Author
Siddhappa Birajdar
Software Developer @FirstCry.com | Ex-Founder & CEO @Punarvspace Inc | Entrepreneurship | Python, Django, Flask, FastAPI, AI/ML, Generative AI, Quantum Computing, Blockchain | AWS Certified Solution Architect Associate
LinkedIn: https://www.linkedin.com/in/siddhappabirajdar
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafkagraph-0.1.2.tar.gz.
File metadata
- Download URL: kafkagraph-0.1.2.tar.gz
- Upload date:
- Size: 11.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6aa55e86b228938c06bd89c056308411945f5b8c14bfa6ab7881c4c1fa6f7e31
|
|
| MD5 |
606e321a75c9d1202707a8f54dd2a8b4
|
|
| BLAKE2b-256 |
76e896ec2d59bbfed8874e2263cfd1520f886b204250023a34e6a0c329da3714
|
File details
Details for the file kafkagraph-0.1.2-py3-none-any.whl.
File metadata
- Download URL: kafkagraph-0.1.2-py3-none-any.whl
- Upload date:
- Size: 13.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8dc5ad7d02c1fc3f9002239a490c56b7688c6677d34b87c9aabe0ff9cf33cf64
|
|
| MD5 |
0234429d98e1f731dc62fa5349b5955c
|
|
| BLAKE2b-256 |
f3cadd95b030fbd02d7494c42c864898e6a7ef9df86cfd1937f530a703aa2c53
|