No project description provided
Project description
Introduction
The mad_prefect library introduces a powerful pattern for managing data within Prefect workflows using data assets. A data asset represents a unit of data that can be materialized, cached, and queried independently. By leveraging data assets, you can build modular, reusable, and maintainable data pipelines that are efficient and easy to reason about.
At the core of this pattern is the @asset decorator, which transforms a regular Python function into a data asset. This decorator handles the intricacies of data persistence, caching, artifact management, and provides querying capabilities using DuckDB.
Example:
from datetime import timedelta
from mad_prefect.data_assets import asset
@asset(
path="/data/results.json",
name="my_data_asset",
artifact_filetype="json",
cache_expiration=timedelta(hours=1),
)
def generate_data():
# Data generation logic
data = [{"id": 1, "value": "a"}, {"id": 2, "value": "b"}]
return data
# Executing the data asset
result_artifact = await generate_data()
# Querying the data asset
query_result = await generate_data.query("WHERE id > 1")
# Reusing the cached artifact for ad-hoc analysis
cached_view = await generate_data.cache_first().query("SELECT * FROM data")
In this example, generate_data is defined as a data asset using the @asset decorator. When executed, it automatically handles data persistence to the specified path, caching based on the cache_expiration, and allows querying the data without loading it entirely into memory.
Power of the Data Asset Pattern:
- Modularity and Reusability: Encapsulate data logic in self-contained units that can be reused across different workflows.
- Automatic Caching: Avoid redundant computations by caching results, leading to performance improvements.
- Efficient Data Handling: Process large datasets efficiently by handling data in batches and using disk-based storage.
- Seamless Querying: Utilize DuckDB's powerful SQL capabilities to query data assets directly.
- Historical Data Management: Use features like
snapshot_artifactsto maintain historical versions of data for auditing and rollback.
Use Cases:
- Data Transformation Pipelines: Build ETL processes where each step is a data asset that can be independently executed and queried.
- Machine Learning Workflows: Prepare datasets, cache intermediate results, and efficiently query data for model training and evaluation.
- Data Analysis and Reporting: Enable data analysts to access preprocessed data assets for reporting without needing to understand the underlying data retrieval mechanisms.
- Incremental Data Processing: Handle data that arrives incrementally by processing new data and updating the data assets accordingly.
Table of Contents
Modules
Asset Decorator
asset
def asset(
path: str,
artifacts_dir: str = "",
name: str | None = None,
snapshot_artifacts: bool = False,
artifact_filetype: ARTIFACT_FILE_TYPES = "json",
read_json_options: ReadJsonOptions | None = None,
read_csv_options: ReadCSVOptions | None = None,
cache_expiration: datetime.timedelta | None = None,
):
...
The asset decorator is used to define a data asset within a Prefect flow. It wraps a function and returns a DataAsset instance that manages the data asset lifecycle, including caching, persistence, and querying.
Parameters:
path(str): The path where the final result artifact will be stored.- Supports multiple file types using a |-delimited syntax
- e.g. "path/to/file.parquet|csv" will produce two result artifacts at "path/to/file.parquet" and "path/to/file.csv"
artifacts_dir(str, optional): The directory where intermediate artifacts will be stored.name(str, optional): The name of the data asset. If not provided, defaults to the function name.snapshot_artifacts(bool, optional): Whether to snapshot artifacts over time.artifact_filetype(Literal["parquet", "json", "csv"], optional): The file type for intermediate artifacts.read_json_options(ReadJsonOptions, optional): Options for reading JSON data.read_csv_options(ReadCSVOptions, optional): Options for reading comma separated values data.cache_expiration(datetime.timedelta, optional): The cache expiration time. If data has been materialized within this period, it will be reused.
Usage:
@asset(path="/data/results.json", name="my_asset")
def generate_data():
# Generate data logic
return data
DataAsset Class
DataAsset
class DataAsset:
def __init__(
self,
fn: Callable,
path: str,
artifacts_dir: str = "",
name: str | None = None,
snapshot_artifacts: bool = False,
artifact_filetype: ARTIFACT_FILE_TYPES = "json",
read_json_options: ReadJsonOptions | None = None,
read_csv_options: ReadCSVOptions | None = None,
cache_expiration: timedelta | None = None,
):
...
The DataAsset class represents a data asset in the system. It manages the execution of the associated function, caching, artifact management, and querying capabilities.
Key Methods:
with_arguments(*args, **kwargs): Returns a newDataAssetinstance with the provided arguments bound.with_options(...): Returns a newDataAssetinstance with overridden options.__call__(self, *args, **kwargs): Executes the data asset, handling caching and persistence.cache_first(self, expiration: timedelta | None = None): Returns a new asset configured to reuse cached artifacts, optionally overriding the default long-lived TTL.query(self, query_str: str | None = None): Queries the data asset using DuckDB.
Properties:
name: The name of the data asset.path: The path where the final result artifact is stored.artifact_filetype: The file type for artifacts (e.g., "json", "parquet", "csv").
DataArtifact Class
DataArtifact
class DataArtifact:
def __init__(
self,
path: str,
data: object | None = None,
read_json_options: ReadJsonOptions | None = None,
read_csv_options: ReadCSVOptions | None = None,
):
...
The DataArtifact class represents an individual data artifact, which can be a fragment of data or the final result. It handles persistence and querying of the data.
Key Methods:
persist(self): Persists the data artifact to the filesystem.query(self, query_str: str | None = None): Queries the artifact data using DuckDB.exists(self): Checks if the artifact exists on the filesystem.
Usage:
Data artifacts are usually managed internally by DataAsset and DataArtifactCollector, but can be interacted with directly if needed.
DataArtifactCollector Class
DataArtifactCollector
class DataArtifactCollector:
def __init__(
self,
collector: object,
dir: str,
filetype: ARTIFACT_FILE_TYPES = "json",
artifacts: list[DataArtifact] | None = None,
read_json_options: ReadJsonOptions | None = None,
read_csv_options: ReadCSVOptions | None = None,
):
...
The DataArtifactCollector class is responsible for collecting data artifacts from a data generation function, persisting them, and tracking their locations.
Key Methods:
collect(self): Asynchronously collects data artifacts by persisting each batch of data.
DataArtifactQuery Class
DataArtifactQuery
class DataArtifactQuery:
def __init__(
self,
artifacts: list[DataArtifact] | None = None,
read_json_options: ReadJsonOptions | None = None,
read_csv_options: ReadCSVOptions | None = None,
):
...
The DataArtifactQuery class provides functionality to query multiple data artifacts using DuckDB.
Key Methods:
query(self, query_str: str | None = None): Executes a query against the combined data of the provided artifacts.
DataAssetRun Class
DataAssetRun
class DataAssetRun(BaseModel):
id: str | None = None
runtime: datetime.datetime | None = None
materialized: datetime.datetime | None = None
duration_miliseconds: int | None = None
asset_id: str | None = None
asset_name: str | None = None
asset_path: str | None = None
parameters: str | None = None
async def persist(self):
...
The DataAssetRun class represents a single execution (run) of a data asset. It tracks metadata such as runtime, duration, and parameters used.
Utilities
yield_data_batches Function
async def yield_data_batches(data: object):
...
The yield_data_batches function is a utility that yields data batches from various types of data sources, such as coroutines, generators, and async generators.
Usage:
Used internally by DataArtifact and DataArtifactCollector to handle different types of data sources uniformly.
register_mad_protocol Function
async def register_mad_protocol(connection: duckdb.DuckDBPyConnection | None = None):
...
Registers the custom "mad" filesystem protocol with DuckDB, allowing DuckDB to read data from the custom filesystem used by mad_prefect.
Usage:
Called before executing queries that involve the "mad://" protocol.
get_fs Function
async def get_fs():
...
Returns an instance of FsspecFileSystem configured with the appropriate filesystem URL and options.
Usage:
Used internally whenever filesystem access is required.
Filesystems
FsspecFileSystem Class
class FsspecFileSystem(
prefect.filesystems.WritableFileSystem,
prefect.filesystems.WritableDeploymentStorage,
):
...
FsspecFileSystem is a custom filesystem class that extends Prefect's WritableFileSystem and WritableDeploymentStorage. It uses fsspec to interact with various filesystems.
Key Methods:
write_path(self, path: str, content: bytes): Writes data to the specified path.read_path(self, path: str): Reads data from the specified path.exists(self, path: str): Checks if the path exists.delete_path(self, path: str, recursive: bool = False): Deletes the specified path.
Configuration:
basepath: The base path for the filesystem.storage_options: Options for configuring the underlying filesystem (e.g., authentication credentials).
MadFileSystem Class
class MadFileSystem(DirFileSystem):
...
MadFileSystem is a custom filesystem class that extends fsspec's DirFileSystem. It is used to integrate with DuckDB by providing a filesystem interface that DuckDB can use.
Usage Examples
Defining a Data Asset
from mad_prefect.data_assets import asset
@asset(path="/data/results.json", name="my_data_asset")
def generate_data():
# Your data generation logic here
data = [{"id": 1, "value": "a"}, {"id": 2, "value": "b"}]
return data
Executing a Data Asset
# Execute the data asset
result_artifact = await generate_data()
# The result_artifact is a DataArtifact instance
Querying a Data Asset
# Query the data asset
query_result = await generate_data.query("WHERE id > 1")
Using with_arguments
@asset(path="/data/{dataset_name}.json", name="dataset_{dataset_name}")
def generate_dataset(dataset_name: str):
# Generate data based on dataset_name
data = fetch_data(dataset_name)
return data
# Execute with specific arguments
dataset_asset = generate_dataset.with_arguments(dataset_name="users")
result_artifact = await dataset_asset()
Configuring Filesystem
Set environment variables to configure the filesystem:
export FILESYSTEM_URL="s3://my-bucket"
export FILESYSTEM_BLOCK_NAME="my_s3_block"
Notes
- Caching: Data assets support caching based on the
cache_expirationparameter. If data has been materialized within the expiration period, the cached result will be used. - Artifacts: Intermediate artifacts are stored in the
artifacts_dir. Ifsnapshot_artifactsis enabled, artifacts are stored with timestamps to allow historical data inspection. - File Types: Supports "json" and "parquet" file types for artifacts. Ensure consistency when querying multiple artifacts.
- Filesystem Integration: Uses
fsspecfor filesystem abstraction, allowing interaction with various storage systems (local, S3, etc.).
Best Practices
- Consistent File Types: When collecting artifacts, ensure they are all of the same file type to avoid querying issues.
- Error Handling: Be mindful of the data types returned by your data generation functions to ensure they are compatible with the persistence mechanisms.
- Filesystem Configuration: Properly configure your filesystem via environment variables or block storage to ensure data is read from and written to the correct locations.
- Parameterization: Use the
with_argumentsmethod to create parameterized data assets for different datasets or configurations.
Troubleshooting
- Data Not Persisted: Check if the data returned is empty or falsy, which could prevent the artifact from being persisted.
- DuckDB Errors: Ensure the "mad" protocol is registered before executing queries involving
mad://URIs. - Filesystem Access Issues: Verify that the filesystem is correctly configured and that necessary credentials are provided.
- Schema Mismatches: When querying artifacts, ensure that all artifacts have compatible schemas, especially when using different data sources.
Extending Functionality
Developers can extend the functionality of mad_prefect by:
- Adding Support for Additional File Types: Extend
DataArtifactandDataArtifactQueryto handle more file formats (e.g., Avro). - Custom Persistence Strategies: Implement new methods in
DataArtifactfor persisting data using different storage mechanisms. - Enhanced Filesystem Features: Extend
FsspecFileSystemwith additional methods or support for more complex storage options. - Integrate with Other Databases: Adapt the querying capabilities to work with databases other than DuckDB if needed.
Contributing
Contributions to mad_prefect are welcome. Please follow the project's contribution guidelines and ensure that new features are accompanied by tests and documentation.
- Code Style: Adhere to PEP 8 guidelines and use type hints where appropriate.
- Testing: Write unit tests for new features or bug fixes.
- Documentation: Update the documentation to reflect changes and additions.
- Issue Reporting: Use the issue tracker to report bugs or suggest enhancements.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mad_prefect-2.3.0rc19.tar.gz.
File metadata
- Download URL: mad_prefect-2.3.0rc19.tar.gz
- Upload date:
- Size: 29.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c0fc573e8f8ec08a22d2e2f0ed5dc4834e04b6db85d1468d78b1da8ac46b8808
|
|
| MD5 |
0e1e1a34af79a558aba4da9628ce658c
|
|
| BLAKE2b-256 |
569441c6518e9d8c016badd49497cdf136b78ec00065f3747aa3d70629736988
|
Provenance
The following attestation bundles were made for mad_prefect-2.3.0rc19.tar.gz:
Publisher:
publish.yml on maitdevptyltd/MAD.Prefect
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mad_prefect-2.3.0rc19.tar.gz -
Subject digest:
c0fc573e8f8ec08a22d2e2f0ed5dc4834e04b6db85d1468d78b1da8ac46b8808 - Sigstore transparency entry: 1268962279
- Sigstore integration time:
-
Permalink:
maitdevptyltd/MAD.Prefect@e77b3bd524355c42716d075f4dd617b67fda5afd -
Branch / Tag:
refs/heads/next - Owner: https://github.com/maitdevptyltd
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e77b3bd524355c42716d075f4dd617b67fda5afd -
Trigger Event:
push
-
Statement type:
File details
Details for the file mad_prefect-2.3.0rc19-py3-none-any.whl.
File metadata
- Download URL: mad_prefect-2.3.0rc19-py3-none-any.whl
- Upload date:
- Size: 33.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b02c7f62fecfbe3f6fdb7b84a9cc44c71ba1f4c469ddcf811a81cba0e192ea34
|
|
| MD5 |
6a272a25b79e883a846e76821c75059e
|
|
| BLAKE2b-256 |
a16ff2be03c6fba21f2a8a822a8682e57a0df92d4be8e1b4cdd60779b0a63105
|
Provenance
The following attestation bundles were made for mad_prefect-2.3.0rc19-py3-none-any.whl:
Publisher:
publish.yml on maitdevptyltd/MAD.Prefect
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mad_prefect-2.3.0rc19-py3-none-any.whl -
Subject digest:
b02c7f62fecfbe3f6fdb7b84a9cc44c71ba1f4c469ddcf811a81cba0e192ea34 - Sigstore transparency entry: 1268962447
- Sigstore integration time:
-
Permalink:
maitdevptyltd/MAD.Prefect@e77b3bd524355c42716d075f4dd617b67fda5afd -
Branch / Tag:
refs/heads/next - Owner: https://github.com/maitdevptyltd
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e77b3bd524355c42716d075f4dd617b67fda5afd -
Trigger Event:
push
-
Statement type: