Skip to main content

A metadata-driven data ingestion framework for AtomSQL with fintech-grade security and compliance features.

Project description

AtomIngest

A Metadata-Driven Data Ingestion Framework for AtomSQL

PyPI Version Python Versions License: MIT Downloads Build Status Code Coverage

AtomIngest is a configurable, zero-boilerplate data ingestion framework designed for fintech environments where data reliability, compliance, and scalability are paramount. Built on the metaprogramming foundations of AtomSQL, it dynamically generates ORM models, validators, and transformation logic entirely from YAML configurations.


🚀 Key Features

AtomIngest transforms raw data (CSV, JSON, APIs) into structured, compliant records in AtomSQL-managed databases without writing repetitive model code.

  • 📄 YAML-Driven Configuration: Define table schemas, constraints, and pipelines declaratively.
  • 🔮 Dynamic Model Generation: Uses Python metaprogramming to create AtomSQL models (ModelMeta) at runtime.
  • 🛡️ Fintech-Grade Security:
    • PII Vault: Transparent field-level encryption for sensitive data.
    • Immutable Ledgers: WORM (Write Once, Read Many) support for audit trails.
    • Crypto-Shredding: GDPR compliance via key destruction.
  • ✅ Robust Validation: Inject regex, range checks, and custom business rules directly into Field definitions.
  • 🔄 Reliability: Built-in support for Idempotency (Upserts), Dead Letter Queues (DLQ), and Transaction Management.
  • ⚡ High Performance: Async batching and smart buffering for high-throughput feeds.

📦 Installation

pip install atomingest

Requires Python 3.12+ and AtomSQL.


⚡ Quick Start

1. Define your Pipeline (trades.yaml)

Create a YAML configuration defining your target table, schema, and validation rules. AtomIngest maps these types directly to atomsql.orm.fields.

target_table: trades
strategy: upsert
business_keys: [trade_id]

schema:
  trade_id:
    type: StringField
    unique: true
    nullable: false
  symbol:
    type: CharField
    max_length: 10
    validation:
      regex: "^[A-Z]{3,5}$"  # Validates ticker symbols (e.g., AAPL)
  amount:
    type: DecimalField
    validation:
      min: 0.01
  executed_at:
    type: DateTimeField

hooks:
  pre_save: "utils.enrich_metadata"

2. Run the Ingestion

Use the CLI or Python API to ingest data. The framework dynamically builds the Trade model and inserts data into your database.

Using CLI:

atomingest run trades.yaml --source data/daily_trades.csv

Using Python:

from atomingest.core import Ingester
from atomsql import Database

# Connect to DB
db = Database("sqlite:///finance.db")

# Initialize Ingester
ingest = Ingester(db)

# Run Pipeline
stats = ingest.run(
    config="trades.yaml",
    source="data/daily_trades.csv"
)

print(f"Ingested: {stats.processed}, Failed: {stats.failed}")

3. Normalization Pipeline

AtomIngest includes a powerful normalization pipeline to clean and standardize raw data before ingestion:

from atomingest.normalization import normalize_csv_file, NormalizationStepConfig

# Define normalization steps
steps = [
    NormalizationStepConfig(step="RemoveBOMStep", config={}, enabled=True),
    NormalizationStepConfig(step="TrimWhitespaceStep", config={"trim_quotes": True}, enabled=True),
    NormalizationStepConfig(step="ColumnNameNormalizationStep", config={"case": "lower"}, enabled=True),
]

# Normalize data
rows, report = normalize_csv_file("messy_data.csv", steps=steps)
print(f"Cleaned {report.rows_emitted} rows with {len(report.warnings)} warnings")

YAML Integration:

target_table: customers

# Embedded normalization configuration
normalization:
  strict: false
  steps:
    - step: TrimWhitespaceStep
      config: {trim_quotes: true, collapse_spaces: true}
      enabled: true
    - step: NullTokenNormalizationStep
      config: {null_tokens: ["", "NA", "NULL", "-"]}
      enabled: true
    - step: BasicTypeCleanupStep
      config:
        numeric_columns: ["age", "amount"]
        date_columns: ["created_date"]
        date_formats: ["%Y-%m-%d", "%m/%d/%Y"]
      enabled: true

schema:
  name: {type: CharField, max_length: 100}
  age: {type: IntegerField}
  amount: {type: DecimalField}
  created_date: {type: DateField}

🏗️ Architecture

AtomIngest leverages the dynamic nature of AtomSQL:

  1. Schema Loader: Parses YAML and resolves types to AtomSQL classes (e.g., StringField, IntegerField).

  2. Metaclass Factory: Uses type() to construct a new class inheriting from atomsql.orm.models.Model, automatically registering it with the Database.

  3. Pipeline Runner: Streams data, applies "Validator Injection" decorators, executes hooks, and manages transactions.

  4. Reliability Layer: Catches errors per row, routing failed records to a DLQ table while committing valid rows in batches.


🗺️ Roadmap

We are currently building out the core phases:

  • Phase 1: Core Configuration - YAML Parser & Dynamic Model Factory.

  • Phase 2: Validation - Regex/Range constraints & Hook system.

  • Phase 3: Reliability - DLQ, Upsert strategies, and Async Batching.

  • Phase 4: Security (In Progress) - PII Encryption & Immutable Mixins.

  • Phase 5: Orchestration - Topological dependency resolution.

See the full Project Backlog for details.


🤝 Contributing

We welcome contributions! Please see CONTRIBUTING.md for details on how to set up the dev environment.

  1. Clone the repo.
  2. Install dependencies: uv pip install -r pyproject.toml.
  3. Run tests: pytest tests/.

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.


Built with ❤️ for the Fintech Open Source Community.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

atomingest-0.2.0.tar.gz (377.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

atomingest-0.2.0-py3-none-any.whl (69.5 kB view details)

Uploaded Python 3

File details

Details for the file atomingest-0.2.0.tar.gz.

File metadata

  • Download URL: atomingest-0.2.0.tar.gz
  • Upload date:
  • Size: 377.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for atomingest-0.2.0.tar.gz
Algorithm Hash digest
SHA256 bc5273f26361cd44f9a23a51e8351876a7edfff42a8eeb9d9bb07a62026c0ef0
MD5 652ec2d10bb0acca1712353d500fe961
BLAKE2b-256 a7238563d61ce2b38d93ba0cee1cf8f3ba374a82680c240c71e4afb0254a60b2

See more details on using hashes here.

File details

Details for the file atomingest-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: atomingest-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 69.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for atomingest-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4229fccb3baceec3d9142488b6afe9465019a0c72dd9b3010064dfc6b2e7f681
MD5 fca52fd45ba1facf81a7a1a5ac7adac3
BLAKE2b-256 a4a02658a3357cb182fb3c06c89d3307250bf817e329eeade7d37aa0dd5ef311

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page