Skip to main content

Define typed Python entities, generate transformations, run anywhere. A dbt alternative built on Pydantic + Ibis.

Project description

Fyrnheim

Activities-first data transformation framework.

Built on Pydantic + Ibis. Define typed sources, detect business events from state changes, resolve identities across systems, and project entity models -- all in Python.

Install

pip install fyrnheim[duckdb]

Quick Start

1. Create a project:

fyr init myproject && cd myproject

2. Define your pipeline in entities/customers.py:

from fyrnheim import (
    StateSource, ActivityDefinition, RowAppeared, FieldChanged,
    IdentityGraph, IdentitySource, EntityModel, StateField,
)

# Source -- a slowly-changing state table
crm = StateSource(name="crm_contacts", project="p", dataset="raw", table="contacts", id_field="id")

# Activities -- named business events from state changes
signup = ActivityDefinition(name="signup", source="crm_contacts", trigger=RowAppeared())
became_paying = ActivityDefinition(
    name="became_paying", source="crm_contacts",
    trigger=FieldChanged(field="plan", to_values=["pro", "enterprise"]),
)

# Identity -- resolve across sources
identity = IdentityGraph(
    name="customer_identity", canonical_id="customer_id",
    sources=[IdentitySource(source="crm_contacts", id_field="id", match_key_field="email")],
)

# Entity -- derived current state
customers = EntityModel(
    name="customers", identity_graph="customer_identity",
    state_fields=[
        StateField(name="email", source="crm_contacts", field="email", strategy="latest"),
        StateField(name="plan", source="crm_contacts", field="plan", strategy="latest"),
    ],
)

3. Run tests:

pytest tests/

Core Concepts

Sources

StateSource -- a slowly-changing table (CRM contacts, subscription records). The diff engine automatically detects row appearances, disappearances, and field changes between snapshots.

StateSource(name="crm_contacts", project="p", dataset="d", table="contacts", id_field="contact_id")

EventSource -- an append-only event stream (page views, transactions).

EventSource(
    name="billing_events", project="p", dataset="d", table="transactions",
    entity_id_field="customer_id", timestamp_field="created_at", event_type_field="event_type",
)

Activity Definitions

Named business events detected from raw data changes. Each activity ties to a source and a trigger:

Trigger Detects
RowAppeared() New row in a state source
RowDisappeared() Row removed from a state source
FieldChanged(field, to_values) Field value changed (optionally to specific values)
EventOccurred(event_types) Specific event types in an event source
signup = ActivityDefinition(name="signup", source="crm_contacts", trigger=RowAppeared())
became_paying = ActivityDefinition(
    name="became_paying", source="crm_contacts",
    trigger=FieldChanged(field="plan", to_values=["pro", "enterprise"]),
)

Identity Graph

Cross-source identity resolution. Link records from different systems by a shared match key:

IdentityGraph(
    name="customer_identity",
    canonical_id="customer_id",
    sources=[
        IdentitySource(source="crm_contacts", id_field="contact_id", match_key_field="email_hash"),
        IdentitySource(source="billing_events", id_field="customer_id", match_key_field="email_hash"),
    ],
)

Entity Model

Derived current-state projection from resolved identities. Each field picks a source, a column, and a merge strategy (latest, first):

EntityModel(
    name="customers",
    identity_graph="customer_identity",
    state_fields=[
        StateField(name="email", source="crm_contacts", field="email", strategy="latest"),
        StateField(name="first_seen", source="crm_contacts", field="created_at", strategy="first"),
    ],
    computed_fields=[ComputedColumn(name="is_paying", expression="plan != 'free'")],
)

Analytics Model

Time-grain metric aggregation over the activity stream:

StreamAnalyticsModel(
    name="daily_metrics",
    identity_graph="customer_identity",
    date_grain="daily",
    metrics=[
        StreamMetric(name="new_signups", expression="count()", event_filter="signup", metric_type="count"),
        StreamMetric(name="total_customers", expression="count()", metric_type="snapshot"),
    ],
)

CLI

fyr init [project_name]           # Scaffold a new project
fyr run                           # Run the pipeline
fyr run --max-parallel-io 8       # Override worker count for I/O fan-out
fyr bench                         # Run the pipeline and print per-phase timings
fyr bench --json                  # Same, but emit PipelineTimings as JSON on stdout
fyr --version                     # Show version
fyr --help                        # Show available commands

fyr bench reports wall-clock time per phase, per source, per identity graph, and per analytics entity / metrics model (split into projection vs. write), making it easy to spot where a pipeline spends its time.

Source loads and entity/metrics writes fan out across a bounded thread pool (default 4 workers). Tune with the max_parallel_io key in fyrnheim.yaml or the --max-parallel-io CLI flag on fyr run / fyr bench. Set to 1 for strictly serial behavior.

Why Fyrnheim?

dbt Fyrnheim
Language SQL + Jinja Python
Type safety Runtime errors Pydantic validation at definition time
Local dev Requires warehouse connection DuckDB on local parquet files
Backend portability Dialect-specific SQL Ibis compiles to 15+ backends
Testing Custom schema tests pytest
Identity resolution Manual SQL joins Built-in identity graph

Status

  • Alpha -- API may change before 1.0
  • DuckDB backend -- fully supported
  • BigQuery backend -- supported
  • ClickHouse output -- supported as output sink
  • Postgres backend -- supported
  • Python 3.11+ required

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fyrnheim-0.8.2.tar.gz (71.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fyrnheim-0.8.2-py3-none-any.whl (94.4 kB view details)

Uploaded Python 3

File details

Details for the file fyrnheim-0.8.2.tar.gz.

File metadata

  • Download URL: fyrnheim-0.8.2.tar.gz
  • Upload date:
  • Size: 71.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fyrnheim-0.8.2.tar.gz
Algorithm Hash digest
SHA256 025677a1d27daa698ed9ea3ce87ed75a8c3d57fbef8b5447bb2b27718d8f4cc4
MD5 398d4364b2caa8f33157253c392a2e8f
BLAKE2b-256 e6c3650a6192e2d4814717c33aef0558555537dfa5c61aa11c01e2c1dfcd155c

See more details on using hashes here.

Provenance

The following attestation bundles were made for fyrnheim-0.8.2.tar.gz:

Publisher: publish.yml on deepskydatahq/fyrnheim

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fyrnheim-0.8.2-py3-none-any.whl.

File metadata

  • Download URL: fyrnheim-0.8.2-py3-none-any.whl
  • Upload date:
  • Size: 94.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fyrnheim-0.8.2-py3-none-any.whl
Algorithm Hash digest
SHA256 cd8fd1222c1fb27acd648bd9f8fb7c9bea68d825e3e8177b568fa78a705f4d54
MD5 516646f5d00b59ab25c62c4943899410
BLAKE2b-256 ab501c432c458d4643f9c93e47e11a6a50c905e50df49acd9b5428f6e0f08c13

See more details on using hashes here.

Provenance

The following attestation bundles were made for fyrnheim-0.8.2-py3-none-any.whl:

Publisher: publish.yml on deepskydatahq/fyrnheim

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page