Skip to main content

Data Ingestion Framework

Project description

Ingestify

Ingest everything – JSON, CSV, tracking ZIPs, even MP4 – keep it version‑safe, sync only what changed, and analyse while you ingest.


Why Ingestify?

Football‐data APIs are often slow, rate‑limited or just down. One parsing bug and you’re forced to pull tens of gigabytes again.
Ingestify fixes that by building your own data lake of untouched provider files and fetching only what’s new:

  • Own your lake – The first time you ask for a match, Ingestify downloads the original files (metadata, line‑ups, events, tracking, video) and stores them untouched in local disk, S3, GCS… every later query hits your lake, not the provider.

  • Never re‑fetch the world – A file‑level checksum / timestamp check moves only changed bundles across the wire.

  • Atomic, complete packages – A Dataset is all‑or‑nothing:

    Dataset type Always contains
    Match Dataset metadata + line‑ups + events
    Tracking Dataset metadata + raw tracking frames

You never analyse events v2 with lineups v1, or yesterday’s first half with today’s second half.

  • Query while ingesting – Datasets stream out of the engine the moment their files land, so notebooks or downstream services can start before the full season is in.

The Ingestify Workflow


What you gain

For football‑analytics practitioners

Pain Ingestify fix
API slowness / downtime One request → lake; retries and parallelism happen behind the scenes.
Full re‑ingest after a bug File‑level deltas mean you fetch only the corrected bundles.
Partial / drifting data Dataset is atomic, versioned, and validated before it becomes visible.
Waiting hours for a season to sync Stream each Dataset as soon as it lands; analyse while you ingest.
Boilerplate joins engine.load_dataset_with_kloppy(dataset) → analysis‑ready object.

For software engineers

Need How Ingestify helps
Domain‑Driven Design Dataset, Revision, Selector plus rich domain events read like the problem space.
Event‑driven integrations Subscribe to RevisionAdded and push to Kafka, AWS Lambda, Airflow…
Pluggable everything Swap Source, FetchPolicy, DatasetStore subclasses to add providers, change delta logic, or move storage back‑ends.
Safety & speed Multiprocessing downloader with temp‑dir commits – no half‑written matches; near‑linear I/O speed‑ups.
Any file type JSON, CSV, MP4, proprietary binaries – stored verbatim so you parse / transcode later under version control.

Quick start

pip install ingestify            # or: pip install git+https://github.com/PySport/ingestify.git

Developing a new Source

When developing a new Source, use the debug_source() helper for rapid iteration:

from ingestify import Source, debug_source

class MyCustomSource(Source):
    provider = "my_provider"

    def __init__(self, name: str, api_key: str):
        super().__init__(name)
        self.api_key = api_key

    def find_datasets(self, dataset_type, data_spec_versions, **kwargs):
        # Your source implementation
        ...

# Quick debug - runs full ingestion with temp storage
if __name__ == "__main__":
    source = MyCustomSource(name="test", api_key="...")

    debug_source(
        source,
        dataset_type="match",
        data_spec_versions={"events": "v1"},
    )

The debug_source() helper:

  • ✅ Creates an ephemeral dev engine with temp storage
  • ✅ Configures logging automatically
  • ✅ Runs the full ingestion cycle
  • ✅ Shows storage location and results

Perfect for testing your source before adding it to production config!

Minimal config.yaml

main:
  metadata_url: sqlite:///database/catalog.db   # where revision metadata lives
  file_url: file://database/files/              # where raw files live
  default_bucket: main

sources:
  statsbomb:
    type: ingestify.statsbomb_github            # open‑data provider

ingestion_plans:
  - source: statsbomb
    dataset_type: match
    # selectors can narrow the scope
    # selectors:
    #   - competition_id: 11
    #     season_id: [90]

First ingest

When you configured event subscribers, all domain events are dispatched to the subscriber. Publishing the events to Kafka, RabbitMQ or any other system becomes trivial.

mkdir -p database
pip install kloppy

ingestify run                                # fills your data lake

Using the data

By default, Ingestify will search in your DatasetStore when you request data. You can pass several filters to only fetch what you need.

from ingestify.main import get_engine

engine = get_engine("config.yaml")

for dataset in engine.iter_datasets(
        dataset_state="complete",
        provider="statsbomb",
        dataset_type="match",
        competition_id=11,
        season_id=90):
    df = (
        engine
        .load_dataset_with_kloppy(dataset)
        .to_df(engine="polars")
    )
    df.write_parquet(f"out/{dataset.identifier['match_id']}.parquet")

Auto Ingestion

When you don't want to use event driven architecture but just want to work with the latest data, ingestify got you covered. With the auto_ingest option, ingestify syncs the data in the background when you ask for the data.

from ingestify.main import get_engine

engine = get_engine("config.yaml")

for dataset in engine.iter_datasets(
        # When set to True it will first do a full sync and then start yielding datasets
        auto_ingest=True, 
  
        # With streaming enabled all Datasets are yielded when they are up-to-date (not changed, or refetched)
        # auto_ingest={"streaming": True}
  
        dataset_state="complete",
        provider="statsbomb",
        dataset_type="match",
        competition_id=11,
        season_id=90):
    df = (
        engine
        .load_dataset_with_kloppy(dataset)
        .to_df(engine="polars")
    )
    df.write_parquet(f"out/{dataset.identifier['match_id']}.parquet")

Open data

Ingestify has build-in support for StatsBomb Open Data (more to come).

mkdir database_open_data
pip install kloppy
import logging, sys

from ingestify.main import get_engine

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    stream=sys.stderr,
)

engine = get_engine(
    metadata_url="sqlite:///database_open_data/catalog.db",
    file_url="file://database_open_data/files/"
)

dataset_iter = engine.iter_datasets(
    # This will tell ingestify to look for an Open Data provider
    auto_ingest={"use_open_data": True, "streaming": True},

    provider="statsbomb",
    dataset_type="match",
    competition_id=43,  # "FIFA World Cup"
    #season_id=281
)

for dataset in dataset_iter:
    kloppy_dataset = engine.load_dataset_with_kloppy(dataset)
    logging.info(f"Loaded {kloppy_dataset}")

Event Log

Ingestify ships a built-in event log that lets a separate service (or cron job) react to dataset lifecycle events — without polling the database or coupling services together.

How it works

Ingestify ingestion run
  └── EventLogSubscriber writes to event_log table (same DB)

Consumer process (your service / cron)
  └── EventLogConsumer reads new rows, calls your callback, advances cursor

The cursor is per-reader — multiple independent consumers can each track their own position.

Enable the subscriber

Add one line to config.yaml:

event_subscribers:
  - type: ingestify.infra.event_log.EventLogSubscriber

That's it. The event_log and reader_state tables are created automatically in the same database as the rest of ingestify.

Write a consumer

# run_consumer.py
from ingestify.infra.event_log import EventLogConsumer

def on_event(event) -> None:
    if event.event_type == "revision_added":
        trigger_downstream(event.dataset.dataset_id)

# Run once (e.g. from a cron job):
EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event)

# Or keep running, polling every 5 seconds:
EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event, poll_interval=5)

on_event receives a domain event with event.event_type and event.dataset. Available event types: dataset_created, revision_added, metadata_updated.

from_config reads metadata_url from your existing config.yaml — no duplicate connection strings.

run() returns 0 on success and 1 if a processing error occurred. On error the cursor is not advanced, so the failing event will be retried on the next run.


Roadmap

  • Workflow orchestration helpers (Airflow, Dagster, Prefect)
  • Built‑in Kafka / Kinesis event emitters
  • Streaming data ingestion
  • Data quality hooks (Great Expectations)

Stop refetching the world. Own your data lake, keep it version‑safe, and analyse football faster with Ingestify.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ingestify-0.13.0.tar.gz (76.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ingestify-0.13.0-py3-none-any.whl (100.8 kB view details)

Uploaded Python 3

File details

Details for the file ingestify-0.13.0.tar.gz.

File metadata

  • Download URL: ingestify-0.13.0.tar.gz
  • Upload date:
  • Size: 76.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ingestify-0.13.0.tar.gz
Algorithm Hash digest
SHA256 bc03d51cb9e0f8a173a4a5ea7a7a159b010da8583d994cd59e568d4430b48cf8
MD5 ce0f00c7ebb1754f5113de00ea12a0a8
BLAKE2b-256 9aa3232979ee89e17cb5c9853d74285338cbeeb89c3c13de7f15e05b0fc8d475

See more details on using hashes here.

Provenance

The following attestation bundles were made for ingestify-0.13.0.tar.gz:

Publisher: release.yml on PySport/ingestify

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ingestify-0.13.0-py3-none-any.whl.

File metadata

  • Download URL: ingestify-0.13.0-py3-none-any.whl
  • Upload date:
  • Size: 100.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ingestify-0.13.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9e80797d7e539e0a928dddbf6567b34f38824f7088a162e0415def498e66f037
MD5 8b78dfb6e1ba75b8cb8a1ab1f8fcc587
BLAKE2b-256 7ef666e8277b77becefb6ad415652f3069345e4146e3e0075216a963139ffa0e

See more details on using hashes here.

Provenance

The following attestation bundles were made for ingestify-0.13.0-py3-none-any.whl:

Publisher: release.yml on PySport/ingestify

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page