Skip to main content

Data Ingestion Framework

Project description

Ingestify

Ingest everything – JSON, CSV, tracking ZIPs, even MP4 – keep it version‑safe, sync only what changed, and analyse while you ingest.


Why Ingestify?

Football‐data APIs are often slow, rate‑limited or just down. One parsing bug and you’re forced to pull tens of gigabytes again.
Ingestify fixes that by building your own data lake of untouched provider files and fetching only what’s new:

  • Own your lake – The first time you ask for a match, Ingestify downloads the original files (metadata, line‑ups, events, tracking, video) and stores them untouched in local disk, S3, GCS… every later query hits your lake, not the provider.

  • Never re‑fetch the world – A file‑level checksum / timestamp check moves only changed bundles across the wire.

  • Atomic, complete packages – A Dataset is all‑or‑nothing:

    Dataset type Always contains
    Match Dataset metadata + line‑ups + events
    Tracking Dataset metadata + raw tracking frames

You never analyse events v2 with lineups v1, or yesterday’s first half with today’s second half.

  • Query while ingesting – Datasets stream out of the engine the moment their files land, so notebooks or downstream services can start before the full season is in.

The Ingestify Workflow


What you gain

For football‑analytics practitioners

Pain Ingestify fix
API slowness / downtime One request → lake; retries and parallelism happen behind the scenes.
Full re‑ingest after a bug File‑level deltas mean you fetch only the corrected bundles.
Partial / drifting data Dataset is atomic, versioned, and validated before it becomes visible.
Waiting hours for a season to sync Stream each Dataset as soon as it lands; analyse while you ingest.
Boilerplate joins engine.load_dataset_with_kloppy(dataset) → analysis‑ready object.

For software engineers

Need How Ingestify helps
Domain‑Driven Design Dataset, Revision, Selector plus rich domain events read like the problem space.
Event‑driven integrations Subscribe to RevisionAdded and push to Kafka, AWS Lambda, Airflow…
Pluggable everything Swap Source, FetchPolicy, DatasetStore subclasses to add providers, change delta logic, or move storage back‑ends.
Safety & speed Multiprocessing downloader with temp‑dir commits – no half‑written matches; near‑linear I/O speed‑ups.
Any file type JSON, CSV, MP4, proprietary binaries – stored verbatim so you parse / transcode later under version control.

Quick start

pip install ingestify            # or: pip install git+https://github.com/PySport/ingestify.git

Developing a new Source

When developing a new Source, use the debug_source() helper for rapid iteration:

from ingestify import Source, debug_source

class MyCustomSource(Source):
    provider = "my_provider"

    def __init__(self, name: str, api_key: str):
        super().__init__(name)
        self.api_key = api_key

    def find_datasets(self, dataset_type, data_spec_versions, **kwargs):
        # Your source implementation
        ...

# Quick debug - runs full ingestion with temp storage
if __name__ == "__main__":
    source = MyCustomSource(name="test", api_key="...")

    debug_source(
        source,
        dataset_type="match",
        data_spec_versions={"events": "v1"},
    )

The debug_source() helper:

  • ✅ Creates an ephemeral dev engine with temp storage
  • ✅ Configures logging automatically
  • ✅ Runs the full ingestion cycle
  • ✅ Shows storage location and results

Perfect for testing your source before adding it to production config!

Minimal config.yaml

main:
  metadata_url: sqlite:///database/catalog.db   # where revision metadata lives
  file_url: file://database/files/              # where raw files live
  default_bucket: main

sources:
  statsbomb:
    type: ingestify.statsbomb_github            # open‑data provider

ingestion_plans:
  - source: statsbomb
    dataset_type: match
    # selectors can narrow the scope
    # selectors:
    #   - competition_id: 11
    #     season_id: [90]

First ingest

When you configured event subscribers, all domain events are dispatched to the subscriber. Publishing the events to Kafka, RabbitMQ or any other system becomes trivial.

mkdir -p database
pip install kloppy

ingestify run                                # fills your data lake

Using the data

By default, Ingestify will search in your DatasetStore when you request data. You can pass several filters to only fetch what you need.

from ingestify.main import get_engine

engine = get_engine("config.yaml")

for dataset in engine.iter_datasets(
        dataset_state="complete",
        provider="statsbomb",
        dataset_type="match",
        competition_id=11,
        season_id=90):
    df = (
        engine
        .load_dataset_with_kloppy(dataset)
        .to_df(engine="polars")
    )
    df.write_parquet(f"out/{dataset.identifier['match_id']}.parquet")

Auto Ingestion

When you don't want to use event driven architecture but just want to work with the latest data, ingestify got you covered. With the auto_ingest option, ingestify syncs the data in the background when you ask for the data.

from ingestify.main import get_engine

engine = get_engine("config.yaml")

for dataset in engine.iter_datasets(
        # When set to True it will first do a full sync and then start yielding datasets
        auto_ingest=True, 
  
        # With streaming enabled all Datasets are yielded when they are up-to-date (not changed, or refetched)
        # auto_ingest={"streaming": True}
  
        dataset_state="complete",
        provider="statsbomb",
        dataset_type="match",
        competition_id=11,
        season_id=90):
    df = (
        engine
        .load_dataset_with_kloppy(dataset)
        .to_df(engine="polars")
    )
    df.write_parquet(f"out/{dataset.identifier['match_id']}.parquet")

Open data

Ingestify has build-in support for StatsBomb Open Data (more to come).

mkdir database_open_data
pip install kloppy
import logging, sys

from ingestify.main import get_engine

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    stream=sys.stderr,
)

engine = get_engine(
    metadata_url="sqlite:///database_open_data/catalog.db",
    file_url="file://database_open_data/files/"
)

dataset_iter = engine.iter_datasets(
    # This will tell ingestify to look for an Open Data provider
    auto_ingest={"use_open_data": True, "streaming": True},

    provider="statsbomb",
    dataset_type="match",
    competition_id=43,  # "FIFA World Cup"
    #season_id=281
)

for dataset in dataset_iter:
    kloppy_dataset = engine.load_dataset_with_kloppy(dataset)
    logging.info(f"Loaded {kloppy_dataset}")

Event Log

Ingestify ships a built-in event log that lets a separate service (or cron job) react to dataset lifecycle events — without polling the database or coupling services together.

How it works

Ingestify ingestion run
  └── EventLogSubscriber writes to event_log table (same DB)

Consumer process (your service / cron)
  └── EventLogConsumer reads new rows, calls your callback, advances cursor

The cursor is per-reader — multiple independent consumers can each track their own position.

Enable the subscriber

Add one line to config.yaml:

event_subscribers:
  - type: ingestify.infra.event_log.EventLogSubscriber

That's it. The event_log and reader_state tables are created automatically in the same database as the rest of ingestify.

Write a consumer

# run_consumer.py
from ingestify.infra.event_log import EventLogConsumer

def on_event(event) -> None:
    if event.event_type == "revision_added":
        trigger_downstream(event.dataset.dataset_id)

# Run once (e.g. from a cron job):
EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event)

# Or keep running, polling every 5 seconds:
EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event, poll_interval=5)

on_event receives a domain event with event.event_type and event.dataset. Available event types: dataset_created, revision_added, metadata_updated.

from_config reads metadata_url from your existing config.yaml — no duplicate connection strings.

run() returns 0 on success and 1 if a processing error occurred. On error the cursor is not advanced, so the failing event will be retried on the next run.


Roadmap

  • Workflow orchestration helpers (Airflow, Dagster, Prefect)
  • Built‑in Kafka / Kinesis event emitters
  • Streaming data ingestion
  • Data quality hooks (Great Expectations)

Stop refetching the world. Own your data lake, keep it version‑safe, and analyse football faster with Ingestify.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ingestify-0.14.0.tar.gz (81.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ingestify-0.14.0-py3-none-any.whl (107.1 kB view details)

Uploaded Python 3

File details

Details for the file ingestify-0.14.0.tar.gz.

File metadata

  • Download URL: ingestify-0.14.0.tar.gz
  • Upload date:
  • Size: 81.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for ingestify-0.14.0.tar.gz
Algorithm Hash digest
SHA256 3f81189816ebafc03631e6b24624856d0605b7fd198050102f3dec935bbe0851
MD5 f3a58719da78c70165b48f7eabd4b3db
BLAKE2b-256 d3d8a13b9eea52636d2986d65da80db5254d51ed3d00b4ca757a9bcb33a8f2ee

See more details on using hashes here.

Provenance

The following attestation bundles were made for ingestify-0.14.0.tar.gz:

Publisher: release.yml on PySport/ingestify

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ingestify-0.14.0-py3-none-any.whl.

File metadata

  • Download URL: ingestify-0.14.0-py3-none-any.whl
  • Upload date:
  • Size: 107.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for ingestify-0.14.0-py3-none-any.whl
Algorithm Hash digest
SHA256 872863de81d4420c9f65ea4832c01d8a561ee96802f71d531965b710e5d6ba93
MD5 a80bde6f73ac005772773840b30b0641
BLAKE2b-256 858f9f3c2f01a93f90bd229aa81c291d96f1d05d44b00e983c923fd6a254937e

See more details on using hashes here.

Provenance

The following attestation bundles were made for ingestify-0.14.0-py3-none-any.whl:

Publisher: release.yml on PySport/ingestify

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page