ETL Tools for ccflow
Project description
ccflow-etl
Domain-neutral ETL building blocks for ccflow callable models.
What It Provides
ccflow-etl is a small library of reusable ETL support primitives for projects built on ccflow. It does not ship a generic ETL class or skeletal extract/transform/load base classes. Real pipelines should be concrete CallableModel graphs owned by the application or domain package, with reusable behavior pulled from this package where it helps.
The package currently provides:
- Shared CLI entry points:
cc-etlandcc-etl-explain. - Date expansion:
Interval,BaseCalendar, built-in calendars,BackfillContext, andBackfillModelfor fixed, business-day, calendar-boundary, and custom-calendar backfills. - Handoff metadata:
ETLArtifactfor typed stage artifacts. - Format-aware writes and cache handoffs:
LocalWriteModel,CachePutModel/CacheGetModel,PayloadCodec, andLocalCacheStorefor JSON, CSV, text, binary, gzip, and pyarrow-backed parquet payloads over byte-oriented stores. - Checkpointing:
CheckpointRecord, checkpoint statuses, andCheckpointDecisionModelfor idempotent skip decisions; connector-backed stores live in connector packages. - Retry orchestration:
RetryPolicy,RetryModel, retry event summaries, timeout categories, and backoff/jitter helpers. - Run reporting:
RunSummaryfor structured counts by status and artifact stage.
ccflow-etl is domain-neutral. It does not contain application workflows, provider-specific clients, private deployment assumptions, domain specific functionality (e.g. financial concepts, locale-specific information, etc). Domain-specific functionality may be available in separate packages.
Installation
pip install ccflow-etl
Connector-backed cache and checkpoint stores are installed from the connector packages that own their I/O:
pip install ccflow-s3
pip install ccflow-db
CLI Basics
cc-etl runs a configured ccflow callable model through Hydra. The packaged default config is intentionally tiny: it writes a local JSON payload using LocalWriteModel(format="json").
cc-etl context.path=./example-output.json context.payload.message='hello from ccflow-etl'
Use cc-etl-explain to inspect the resolved config without running the callable:
cc-etl-explain context.path=./example-output.json
Most projects should provide their own Hydra config directory and still use the shared entry point:
cc-etl --config-path ./config --config-name text_stats context.input_path=./notes.txt context.output_path=./stats.json
A minimal config has three important pieces:
# config/text_stats.yaml
model:
_target_: text_pipeline.TextStatsModel
cli:
model:
_target_: ccflow.FlowOptions
evaluator:
_target_: ccflow.evaluators.MultiEvaluator
evaluators:
- _target_: ccflow.evaluators.GraphEvaluator
- _target_: ccflow.evaluators.MemoryCacheEvaluator
- _target_: ccflow.evaluators.LoggingEvaluator
cacheable: true
callable: /model
context:
input_path: ./notes.txt
output_path: ./stats.json
min_length: 1
model defines the callable. callable points at the registered model to run. context is validated against the callable model's context_type.
Writing A Simple Pipeline
The recommended pattern is to write a concrete CallableModel for the workflow you actually need. This example reads a text file, computes word counts, writes JSON locally, and returns artifact metadata plus a run summary.
# text_pipeline.py
from collections import Counter
from datetime import date
from pathlib import Path
from typing import Type
from ccflow import CallableModel, ContextBase, ContextType, Flow, GenericResult, ResultType
from pydantic import Field
from ccflow_etl import CachePutContext, CachePutModel, ETLArtifact, LocalCacheStore, RunSummary
class TextStatsContext(ContextBase):
input_path: Path
output_path: Path
min_length: int = 1
date: date | None = None
class TextStatsModel(CallableModel):
writer: CachePutModel = Field(default_factory=lambda: CachePutModel(store=LocalCacheStore(), format="json"))
@property
def context_type(self) -> Type[ContextType]:
return TextStatsContext
@property
def result_type(self) -> Type[ResultType]:
return GenericResult
@Flow.call
def __call__(self, context: TextStatsContext) -> GenericResult:
text = context.input_path.read_text()
words = [word.lower() for word in text.split() if len(word) >= context.min_length]
counts = Counter(words)
output_path = context.output_path
if context.date is not None:
output_path = output_path.with_name(f"{output_path.stem}-{context.date.isoformat()}{output_path.suffix}")
payload = {
"input_path": str(context.input_path),
"date": context.date.isoformat() if context.date else None,
"word_count": len(words),
"unique_words": len(counts),
"top_words": counts.most_common(10),
}
write_result = self.writer(
CachePutContext(
path=output_path,
payload=payload,
dataset="text_stats",
stage="load",
overwrite=True,
)
)
artifacts = [write_result.artifact]
summary = RunSummary.from_items([{"status": write_result.status}], artifacts=artifacts)
return GenericResult(
value={
"payload": payload,
"artifacts": [artifact.model_dump(mode="json") for artifact in artifacts],
"run_summary": summary.model_dump(mode="json"),
}
)
This is deliberately not a subclass of a generic ETL shell. The model name, context, output shape, and dependencies should describe the domain workflow directly. ccflow-etl supplies reusable parts: the writer, artifact contract, and summary model.
Run it through the shared CLI with your config:
echo 'small tools make larger workflows easier to trust' > notes.txt
cc-etl --config-path ./config --config-name text_stats context.input_path=./notes.txt context.output_path=./stats.json
Backfills
Use BackfillModel when the same callable should run once per date or datetime step. BackfillContext gets its steps from a calendar. The default is daily, interval strings such as 2M are accepted through IntervalCalendar, and downstream packages can provide their own BaseCalendar subclasses.
A literal backfill_text_stats config would wrap the same /model in a BackfillModel and pass the underlying text-stats context as the third item in the compact backfill context list:
# config/backfill_text_stats.yaml
model:
_target_: text_pipeline.TextStatsModel
cli:
model:
_target_: ccflow.FlowOptions
evaluator:
_target_: ccflow.evaluators.MultiEvaluator
evaluators:
- _target_: ccflow.evaluators.GraphEvaluator
- _target_: ccflow.evaluators.MemoryCacheEvaluator
- _target_: ccflow.evaluators.LoggingEvaluator
cacheable: true
backfill:
_target_: ccflow_etl.BackfillModel
model: /model
callable: /backfill
context:
- 2026-05-01
- 2026-05-03
- input_path: ./notes.txt
output_path: ./stats.json
min_length: 1
- forward
- daily
Run that explicit config with:
cc-etl --config-path ./config --config-name backfill_text_stats
That shape is useful to understand the pieces, but most projects should not create a separate backfill config for every callable. ccflow-etl ships config groups for the common execution shapes:
callable/callable: run/modeldirectly.backfill/default: wrap/modelinBackfillModel.backfill/daily: the same wrapper for daily-style backfills, so CLI users can selectbackfill=daily.
The context key remains the ccflow runtime context. For a backfill run, the root context is a BackfillContext; the wrapped callable's context is nested under context.context, or passed as the third item in the compact list form. The packaged backfill groups do not create a separate backfill.context shadow namespace.
The backfill groups also make built-in calendars available in the root registry under /calendars: daily, hourly, weekly, weekdays, business_daily, and monday_friday. Set context.calendar to one of those paths when you want the calendar object to choose steps instead of the interval shorthand:
cc-etl --config-path ./config --config-name text_stats_runner backfill=daily +context.start_datetime=2026-05-01 +context.end_datetime=2026-05-15 +context.calendar=/calendars/weekdays +context.context.input_path=./notes.txt +context.context.output_path=./stats.json +context.context.min_length=1
For static runner configs that set context.* values in the file, compose the packaged group before _self_ so local values override group defaults:
defaults:
- /backfill: daily
- _self_
The wrapper uses the same /cli/model FlowOptions as direct execution, so a local GraphEvaluator can evaluate each step from the callable dependency graph. With the packaged groups available in your config search path, the same model can be run directly or as a daily backfill from the CLI:
# config/text_stats_runner.yaml
defaults:
- _self_
- /callable: callable
- optional /backfill: null
hydra:
searchpath:
- pkg://ccflow_etl.config
model:
_target_: text_pipeline.TextStatsModel
cli:
model:
_target_: ccflow.FlowOptions
evaluator:
_target_: ccflow.evaluators.MultiEvaluator
evaluators:
- _target_: ccflow.evaluators.GraphEvaluator
- _target_: ccflow.evaluators.MemoryCacheEvaluator
- _target_: ccflow.evaluators.LoggingEvaluator
cacheable: true
cc-etl --config-path ./config --config-name text_stats_runner +context.input_path=./notes.txt +context.output_path=./stats.json +context.min_length=1
cc-etl --config-path ./config --config-name text_stats_runner backfill=daily +context.start_datetime=2026-05-01 +context.end_datetime=2026-05-03 +context.interval=daily +context.context.input_path=./notes.txt +context.context.output_path=./stats.json +context.context.min_length=1
Backfill contexts can use aliases such as daily, fixed intervals such as 1D, 6h, or 30min, month intervals such as 2M, business days such as 1B, and calendar boundaries such as MS, ME, BMS, BME, QS, QE, YS, and YE. calendar takes precedence over interval when both are provided.
Config Scopes
ccflow-etl keeps its packaged config scopes small and domain-neutral:
callable/*: choose the callable path to run, or keep compatibility aliases.backfill/*: wrap/modelwithBackfillModel; runtime dates still live in rootcontextbecause that is whatccflowexecutes.calendars/*: register reusable calendar objects under/calendars/*.
Connection and credential scopes should live with the packages that own those concepts. Use connector packages for reusable connections/rest, connections/s3, and connections/db groups, and keep credentials/* configs as references to environment variables, secret names, or runtime-injected values rather than literal secrets. Domain packages can compose those groups into provider-specific configs without making ccflow-etl depend on HTTP, S3, database, or finance libraries.
Checkpoints And Skip Decisions
CheckpointDecisionModel combines checkpoint stores and destination existence checks into planned or skipped units. Use it before calling expensive or non-idempotent work. Stores only need to provide should_skip(key); connector-backed stores such as ccflow_db.SQLiteCheckpointStore provide the durable implementation.
Typical unit statuses are:
planned: should run now.checkpoint: skipped because a checkpoint says the unit already succeeded.exists: skipped because the destination already exists.database: skipped because a database destination already has the expected row.writtenorupserted: completed successfully.failed,retried, orcancelled: execution did not finish cleanly.
Retries
Use RetryPolicy and RetryModel around callables that may fail transiently. Policies classify retryable status codes, timeout exceptions, and other exception types, then produce event summaries with attempt counts and backoff decisions.
from ccflow_etl import RetryModel, RetryPolicy
retrying_model = RetryModel(
model=my_callable,
policy=RetryPolicy(max_attempts=3, initial_delay_seconds=0.5, backoff_multiplier=2.0, jitter_ratio=0.1),
)
Cache Handoffs And Formats
Use cache put/get models when persisted payloads need ETL metadata: they return ETLArtifact records with stable keys, dataset names, stages, URIs, media types, and statuses. ccflow-etl owns the format conversion; stores only need byte-oriented exists(key), put_bytes(key, payload, content_type=...), get_bytes(key), and uri(key) methods.
from ccflow_etl import CacheGetContext, CacheGetModel, LocalCacheStore
result = CacheGetModel(store=LocalCacheStore(), format="json")(CacheGetContext(path="./stats.json", dataset="text_stats", stage="load"))
if result.status == "hit":
print(result.payload)
CachePutModel / CacheGetModel accept format="json", format="csv", format="text", format="binary", format="parquet", or a compressed form such as format=["json", "gzip"]. The selected PayloadCodec determines suffixes and media types, so adding a format does not require new cache model classes.
from ccflow_s3 import S3CacheStore, S3Client
from ccflow_etl import CacheGetContext, CacheGetModel
store = S3CacheStore(client=S3Client(), bucket="bucket", prefix="cache")
result = CacheGetModel(store=store, format="json")(CacheGetContext(key="text_stats/2026-05-01"))
from ccflow_db import SQLiteCacheStore, SQLiteConfig
from ccflow_etl import CacheGetContext, CacheGetModel
store = SQLiteCacheStore(config=SQLiteConfig(path="./cache.sqlite"), table="cache_entries")
result = CacheGetModel(store=store, format="json")(CacheGetContext(key="text_stats/2026-05-01"))
Run Summaries
RunSummary turns item statuses and artifact stages into consistent reporting fields:
from ccflow_etl import ETLArtifact, RunSummary
summary = RunSummary.from_items(
[{"status": "planned"}, {"status": "exists"}, {"status": "written"}],
artifacts=[ETLArtifact(key="stats", stage="load", status="written")],
)
print(summary.model_dump(mode="json"))
Use summary.legacy_counts() when a caller needs only planned, skipped, succeeded, failed, retried, and cancelled counts.
Development
Run the package tests with:
python -m pytest ccflow_etl/tests -q
Run Ruff before release:
python -m ruff check ccflow_etl
Default tests should use synthetic local fixtures and must not require live HTTP, S3, database, Celery, or provider credentials. Integration tests that need external services should be opt-in and skipped by default.
ccflow-etl provides public, domain-neutral ETL building blocks for ccflow callable models. It should own reusable concepts such as extract/transform/load composition, backfill planning, checkpointing, caching, retry policy models, idempotency metadata, and CLI workflows.
ccflow-etl should not contain finance-specific calendars, market-data provider behavior, connector-specific client code, or application-specific workflows. Connector packages and domain packages should depend on these ETL contracts where useful.
Current Status
- Implemented:
ETL,ExtractModel,TransformModel,LoadModel,BackfillContext,BackfillModel, interval parsing, business-day context expansion,SQLiteCheckpointStore, transport-neutralRetryPolicy, and thecc-etlHydra CLI entry points. - Partial: current ETL stage models establish ordering and status shells, backfill can generate concrete contexts, SQLite checkpoints can mark/read completed units, and connector packages can consume retry classification, but durable data handoff, cache stores, retry execution orchestration, structured summaries, and generic planner/executor resume semantics still need implementation.
- Missing: local/S3/database cache adapters, broader checkpoint adapters, structured run summaries, generalized skip policies, dry-run planning, backoff/jitter scheduling, and cross-package integration examples.
Dependency Contract
- Depends on
ccflowfor model, context, result, evaluator, and Hydra integration primitives. - May define generic interfaces that connector packages implement.
- Must not depend on finance packages or application-specific packages.
Test Convention
Default tests should use synthetic local fixtures and must not require live HTTP, S3, database, Celery, or provider credentials. Integration tests that need external services should be opt-in and skipped by default.
[!NOTE] This library was generated using copier from the Base Python Project Template repository.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ccflow_etl-0.1.0.tar.gz.
File metadata
- Download URL: ccflow_etl-0.1.0.tar.gz
- Upload date:
- Size: 29.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
946250da77a73d25419ddc280b8622e1b9177e45ccd3fb77a44f90a617e8743c
|
|
| MD5 |
75781d200e16ace71b5ba5d70d928745
|
|
| BLAKE2b-256 |
866a0515798829ff5fd1460195f3f6759e1214ca21506bc7b9667a1d4d46a5a0
|
File details
Details for the file ccflow_etl-0.1.0-py3-none-any.whl.
File metadata
- Download URL: ccflow_etl-0.1.0-py3-none-any.whl
- Upload date:
- Size: 39.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
73ff67525a7edd50a8a30c61f2da1ab11750d4e1bb168a3fe453efbf4b661d49
|
|
| MD5 |
9d4f3276277d1a2d4c921ae2b35f8b11
|
|
| BLAKE2b-256 |
0fc2ebe82673fe0c15d68561600fd23b72a60f5a9df6b79be478e47a57b96188
|