Skip to main content

No project description provided

Project description

Version GitHub Workflow Status GitHub Release Date codecov PyPi downloads

Framelink is a simple wrapper thats designed to provide context into pandas, polars and other Dataframe engines. See roadmap below for future of the project.

This project is still in prerelease, consider the API unstable. Any usage should be pinned.

pip install framelink

Goals

Framelink should provide a way for collaborating teams to write python or SQL models to see their data flow easily and get the a whole load of stuff for free!

  • Simple to write - writing models should be no harder than a function implementation but provide a dependency tree, schemas & model metadata.
  • Simple to run - writing models should be agnostic of running models, once the models are written execution wrappers with diagnostics, tracing & lineage should be easy to derive for the execution platform any team is running without having any special requirements for running locally.
  • Scheduler agnostic - we are not making a new airflow, dagster etc. Framelink serves to add metadata to a project for free.

Concepts

  • A Pipeline is a DAG of models that can be executed in a particular way.
  • A Model is a definition of sourcing data and, potentially, a transform. It's an ETL in its most basic form.
  • A Frame is a result of a model run.

Features

  • Model links & DAG + diagramming
  • Context logging per model
  • Diagramming and tracking of the model DAG
  • Caches and auto-persistence
  • Dynamic sourcing for models
  • Cli to run a project
  • Transpiler for popular DAG execution environments

Example

from pathlib import Path

import pandas as pd
import polars as pl

from framelink.core import FramelinkPipeline, FramelinkSettings
from framelink.storage.core import PickleStorage, NoStorage

settings = FramelinkSettings(
    default_storage=PickleStorage(Path(__file__).parent / "data")
)

pipeline = FramelinkPipeline(settings=settings)


@pipeline.model()
def src_frame_1(_: FramelinkPipeline) -> pd.DataFrame:
    return pd.DataFrame(data={
        "name": ["amy", "peter"],
        "age": [31, 12],
    })


@pipeline.model(storage=NoStorage())
def src_frame_2(_: FramelinkPipeline) -> pd.DataFrame:
    return pd.DataFrame(data={
        "name": ["amy", "peter", "helen"],
        "fave_food": ["oranges", "chocolate", "water"],
    })


@pipeline.model()
def merge_model(ctx: FramelinkPipeline) -> pl.DataFrame:
    res_1 = ctx.ref(src_frame_1)
    res_2 = ctx.ref(src_frame_2)
    key = "name"
    ctx.log.info(f"Merging both sources on {key}")
    return pl.from_pandas(res_1).join(pl.from_pandas(res_2), on=key)


# build with implicit context
r_1 = pipeline.build(merge_model)
print(r_1)
# shape: (2, 3)
# ┌───────┬─────┬───────────┐
# │ name  ┆ age ┆ fave_food │
# │ ---   ┆ --- ┆ ---       │
# │ str   ┆ i64 ┆ str       │
# ╞═══════╪═════╪═══════════╡
# │ amy   ┆ 31  ┆ oranges   │
# │ peter ┆ 12  ┆ chocolate │
# └───────┴─────┴───────────┘

print(merge_model.upstreams)
# {<src_frame_2 at 0x1477c2c90>, <src_frame_1 at 0x144f0ab50>}

print(src_frame_1.downstreams)
# {<merge_model at 0x1477c2910>}

print(pipeline.model_names)
# ['merge_model', 'src_frame_1', 'src_frame_2']

print(list(pipeline.topological_sorted_nodes()))
# [(<src_frame_1 at 0x144f0ab50>, <src_frame_2 at 0x1477c2c90>), (<merge_model at 0x1477c2910>,)]

# if you have the graphing options engaged.
pipeline.graph_plt()  # will draw you a matplotlib of the DAG
dot = pipeline.graph_dot()  # will provide a DOT language representation of the DAG

Feature Roadmap

This could change...

v0.2.0

  • Model links & DAG implemented
  • Context logger available
  • Diagramming and tracking of the model DAG

v0.3.0

  • Cleaner graph results
  • Merging of multiple framelink pipelines enabling
  • Orchestration passthrough and local execution.
  • Caches and auto-persistence
  • Dynamic sourcing for models
  • model overrides for CLI and python runtimes.
  • Cli to run a project

v0.4.0

  • SQL models & dbt, sqlmesh compatability
  • Open Tracing integration

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

framelink-0.2.2.tar.gz (158.0 kB view hashes)

Uploaded Source

Built Distribution

framelink-0.2.2-py3-none-any.whl (12.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page