Canonical audit connector for Python with PostgreSQL and Kafka publishers.
Project description
auditx-connector (Python)
Python implementation of AuditX connector with canonical audit envelope and pluggable publishers.
Supports:
- ASYNC_DB publisher (
PostgresAuditPublisher) - Kafka publisher (
KafkaAuditPublisher) - Java API publisher (
JavaApiAuditPublisher) for remote publish to Spring Boot AuditX endpoint
Install
pip install auditx-connector
Core package does not force DB/Kafka drivers. Consumers install only needed drivers.
PostgreSQL publisher supports both drivers:
psycopg(v3, preferred)psycopg2/psycopg2-binary(fallback)
Optional extras:
pip install "auditx-connector[postgres]"
pip install "auditx-connector[kafka]"
pip install "auditx-connector[all]"
Publisher modes (PublisherType):
ASYNC_DBKAFKAJAVA_API
Quick Start
1. Publish via AuditWriteRequest
from auditx_connector import (
AuditConnectorConfig,
AuditService,
AuditWriteRequest,
AuditSource,
DefaultIdempotencyKeyFactory,
PostgresAuditPublisher,
PostgresConfig,
)
publisher = PostgresAuditPublisher(
postgres_config=PostgresConfig(
dsn="postgresql://user:pass@localhost:5432/auditdb",
table="AUDITX_EVENT",
),
connector_config=AuditConnectorConfig(enabled=True, enforce_idempotency=True),
idempotency_key_factory=DefaultIdempotencyKeyFactory(),
)
service = AuditService(publisher)
service.publish(
AuditWriteRequest(
event_type="DISCONNECT_REQUEST_RECEIVED",
source=AuditSource.CRON,
conversation_id="550e8400-e29b-41d4-a716-446655440000",
group_id="grp-1001",
interaction_id="int-2001",
extra_map={"zapperCustId": "ZP-10091", "plan": "PREMIUM"},
)
)
You can also connect without DSN:
postgres_config=PostgresConfig(
host="localhost",
port=5432,
database="auditdb",
username="audit_user",
password="audit_pass",
table="AUDITX_EVENT",
)
2. Publish via CanonicalAuditEnvelope
from auditx_connector import CanonicalAuditEnvelope, AuditSource, AuditSeverity
envelope = CanonicalAuditEnvelope(
event_type="DISCONNECT_API_TRIGGERED",
source=AuditSource.API,
severity=AuditSeverity.INFO,
service_name="zapper-disconnect-service",
environment="prod",
conversation_id="550e8400-e29b-41d4-a716-446655440000",
group_id="grp-1001",
interaction_id="int-2001",
trace_id="trace-9f8d2",
business_keys={"zapperCustId": "ZP-10091"},
extra_map={"decision": "AUTO_DISCONNECT_ELIGIBLE"},
)
service.publish_envelope(envelope)
3. Publish from Python to Java AuditX API (JAVA_API)
Java endpoint used: POST /auditx/v1/events/publish
from auditx_connector import (
JavaApiAuditPublisher,
JavaApiConfig,
AuditWriteRequest,
CanonicalAuditEnvelope,
)
java_api = JavaApiAuditPublisher(
JavaApiConfig(base_url="http://localhost:8080")
)
# mode 1: stage + metadata map
java_api.publish_stage_map(
stage="DISCONNECT_REQUEST_RECEIVED",
conversation_id="550e8400-e29b-41d4-a716-446655440000",
trace_id="trace-2001",
metadata={"zapperCustId": "ZP-10091", "plan": "PREMIUM"},
)
# mode 2: AuditWriteRequest
java_api.publish_write_request(
AuditWriteRequest(
event_type="BILLING_VALIDATION_FAILED",
conversation_id="550e8400-e29b-41d4-a716-446655440000",
)
)
# mode 3: CanonicalAuditEnvelope
java_api.publish_envelope(
CanonicalAuditEnvelope(
event_type="DISCONNECT_API_TRIGGERED",
conversation_id="550e8400-e29b-41d4-a716-446655440000",
)
)
Stage Enum Utility Pattern
Define stage enum with static stage metadata (stage_name, source, severity).
from enum import Enum
from auditx_connector import AuditSeverity, AuditSource, AuditStage
class DisconnectStage(Enum):
DISCONNECT_REQUEST_RECEIVED = (
"DISCONNECT_REQUEST_RECEIVED",
AuditSource.EMAIL_POSTFIX,
AuditSeverity.INFO,
)
BILLING_VALIDATION_FAILED = (
"BILLING_VALIDATION_FAILED",
AuditSource.API,
AuditSeverity.ERROR,
)
def stage_name(self) -> str:
return self.value[0]
def source(self) -> AuditSource:
return self.value[1]
def severity(self) -> AuditSeverity:
return self.value[2]
Publish with stage + trace + metadata:
metadata = {
"zapperCustId": "ZP-10091",
"address": "ABCD, XX",
"plan": "PREMIUM",
}
service.publish_by_stage(
stage=DisconnectStage.DISCONNECT_REQUEST_RECEIVED,
conversation_id="550e8400-e29b-41d4-a716-446655440000",
trace_id="trace-11",
metadata=metadata,
)
Publish with stage + base AuditWriteRequest:
base_request = AuditWriteRequest(
event_type="PLACEHOLDER",
source=AuditSource.CRON,
conversation_id="550e8400-e29b-41d4-a716-446655440000",
group_id="grp-1001",
interaction_id="int-2001",
extra_map={"existing": "value"},
)
service.publish_by_stage(
stage=DisconnectStage.BILLING_VALIDATION_FAILED,
conversation_id="550e8400-e29b-41d4-a716-446655440000",
trace_id="trace-12",
metadata={"errorCode": "ADDRESS_MISMATCH"},
base_request=base_request,
)
Publish with stage + base CanonicalAuditEnvelope:
base_envelope = CanonicalAuditEnvelope(
event_type="PLACEHOLDER",
source=AuditSource.SYSTEM,
conversation_id="550e8400-e29b-41d4-a716-446655440000",
group_id="grp-1001",
interaction_id="int-2001",
service_name="zapper-disconnect-service",
extra_map={"existing": "value"},
)
service.publish_by_stage(
stage=DisconnectStage.DISCONNECT_REQUEST_RECEIVED,
conversation_id="550e8400-e29b-41d4-a716-446655440000",
trace_id="trace-13",
metadata={"sourceSystem": "CCTEAM_EXCEL"},
base_envelope=base_envelope,
)
Utility Class Example
from __future__ import annotations
from enum import Enum
from typing import Any
from auditx_connector import (
AuditService,
AuditSeverity,
AuditSource,
AuditStage,
AuditWriteRequest,
CanonicalAuditEnvelope,
)
class DisconnectStage(Enum):
DISCONNECT_REQUEST_RECEIVED = (
"DISCONNECT_REQUEST_RECEIVED",
AuditSource.EMAIL_POSTFIX,
AuditSeverity.INFO,
)
INVENTORY_ENRICHMENT_STARTED = (
"INVENTORY_ENRICHMENT_STARTED",
AuditSource.CRON,
AuditSeverity.INFO,
)
BILLING_VALIDATION_FAILED = (
"BILLING_VALIDATION_FAILED",
AuditSource.API,
AuditSeverity.ERROR,
)
def stage_name(self) -> str:
return self.value[0]
def source(self) -> AuditSource:
return self.value[1]
def severity(self) -> AuditSeverity:
return self.value[2]
class AuditEventUtil:
@staticmethod
def base_metadata(zapper_cust_id: str, address: str, plan: str) -> dict[str, Any]:
return {
"zapperCustId": zapper_cust_id,
"address": address,
"plan": plan,
"domain": "ELECTRICITY_DISCONNECT",
}
@staticmethod
def build_write_request(
stage: str,
conversation_id: str,
group_id: str,
interaction_id: str,
metadata: dict[str, Any] | None,
) -> AuditWriteRequest:
return AuditWriteRequest(
event_type=stage,
source=AuditSource.CRON,
severity=AuditSeverity.INFO,
conversation_id=conversation_id,
group_id=group_id,
interaction_id=interaction_id,
extra_map=dict(metadata or {}),
)
@staticmethod
def build_canonical_envelope(
stage: str,
conversation_id: str,
group_id: str,
interaction_id: str,
metadata: dict[str, Any] | None,
) -> CanonicalAuditEnvelope:
return CanonicalAuditEnvelope(
event_type=stage,
source=AuditSource.SYSTEM,
severity=AuditSeverity.INFO,
service_name="zapper-disconnect-service",
environment="prod",
conversation_id=conversation_id,
group_id=group_id,
interaction_id=interaction_id,
business_keys={"entityType": "disconnect-request"},
extra_map=dict(metadata or {}),
)
@staticmethod
def publish_by_stage(
service: AuditService,
stage: DisconnectStage,
conversation_id: str,
trace_id: str,
metadata: dict[str, Any] | None,
) -> None:
service.publish_by_stage(stage, conversation_id, trace_id, metadata)
Validation Rules
conversation_idis required and must be UUID.- If
source == AuditSource.UI,session_idis required.
Idempotency
Default key input:
event_type | source | conversation_id | interaction_id | group_id
SHA-256 hash is used as idempotency key when key is missing.
Local install without publishing
Use this when you want to test on another system without uploading to PyPI.
Build wheel
python -m pip install --upgrade build
python -m build
Copy generated dist/*.whl to your local package folder, for example:
/Users/salilvnair/local-pypi
Install directly from local folder (one-time command)
pip install --no-index --find-links=/Users/salilvnair/local-pypi auditx-connector
Persistent pip config (local folder only, no internet)
~/.pip/pip.conf
[global]
no-index = true
find-links = /Users/salilvnair/local-pypi
Persistent pip config with fallback to indexes
[global]
find-links =
/Users/salilvnair/local-pypi
/Users/salilvnair/team-pypi
index-url = https://your-primary/simple
extra-index-url =
https://repo1/simple
https://repo2/simple
Notes:
- If
no-index = true, pip never queries any online index. - For fallback to indexes, do not set
no-index.
Build and Publish to PyPI
python -m pip install --upgrade build twine
python -m build
python -m twine check dist/*
python -m twine upload dist/*
Notes
- Package name:
auditx-connector - Import path:
auditx_connector
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file auditx_connector-1.0.0.tar.gz.
File metadata
- Download URL: auditx_connector-1.0.0.tar.gz
- Upload date:
- Size: 12.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
650415c14bf948874198b0242dccce84ccc2ea1ab0a7cfa40f312eff3c73d194
|
|
| MD5 |
2faa77a691508e29ac3147eab2404698
|
|
| BLAKE2b-256 |
b4e17b9b0f81273c41bac2d99be04f8f268db0e197c28203f41ced1c66cadc0d
|
File details
Details for the file auditx_connector-1.0.0-py3-none-any.whl.
File metadata
- Download URL: auditx_connector-1.0.0-py3-none-any.whl
- Upload date:
- Size: 13.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9877e260e88ded3d70ddb13445a57fca2c5e898f06217573014b70e100e54d34
|
|
| MD5 |
67225fb236a721a8f3122d182dcbbf31
|
|
| BLAKE2b-256 |
03d7b86d984b9fe7c16cba6a9975632f5ccc4e30b6f32ad8f97c86b075346f15
|