SDK for creating DataForge extensions
Project description
dataforge-sdk
SDK for creating DataForge extensions.
Postgres Utilities
The dataforge.pg module provides helper functions to execute SQL operations against the DataForge Postgres metastore:
from dataforge.pg import select, update, pull
# Execute a SELECT query and return a Spark DataFrame
df = select("SELECT * FROM my_table")
# Execute an UPDATE/INSERT/DELETE query
update("UPDATE my_table SET col = 'value'")
# Trigger a new data pull for source_id 123
pull(123)
IngestionSession
The IngestionSession class manages a custom data ingestion process lifecycle.
from dataforge import IngestionSession
# Initialize a session (production use)
session = IngestionSession()
# Initialize a session (optional source_name/project_name for testing)
session = IngestionSession(source_name="my_source", project_name="my_project")
# Ingest data
# pass a function returning a DataFrame (recommended to integrate logging with DataForge)
session.ingest(lambda: spark.read.csv("s3://bucket/path/input.csv"))
# pass a DataFrame (can be used for testing, not recommended for production deployment)
df = spark.read.csv("s3://bucket/path/input.csv")
session.ingest(df)
# ingest empty dataframe to create 0-record input
session.ingest()
# Fail the process with error message
session.fail("Error message")
# Retrieve latest tracking fields
tracking = session.latest_tracking_fields()
# Retrieve connection parameters for the current source
connection_parameters = session.connection_parameters()
# Retrieve custom parameters for the current source
custom_parameters = session.custom_parameters()
ParsingSession
The ParsingSession class manages a custom parse process lifecycle.
from dataforge import ParsingSession
# Initialize a session (production use)
session = ParsingSession()
# Initialize a session (optional input_id for testing)
session = ParsingSession(input_id=123)
# Retrieve custom parameters
params = session.custom_parameters()
# Get the path of file to be parsed
path = session.file_path
# Run parsing: pass a DataFrame, a function returning a DataFrame or None (0-record file)
session.run(lambda: spark.read.json(session.file_path))
# Fail the process with error message
session.fail("Error message")
PostOutputSession
The PostOutputSession class manages a custom post-output process lifecycle.
from dataforge import PostOutputSession
# Initialize a session (production use)
session = PostOutputSession()
# Initialize a session (optional names for testing)
session = PostOutputSession(output_name="report", output_source_name="my_source", project_name="my_project")
# Get the path of file generated by preceding output process
path = session.file_path()
# Retrieve connection parameters for the current output
connection_parameters = session.connection_parameters()
# Retrieve custom parameters for the current source
custom_parameters = session.custom_parameters()
# Run post-output logic: pass a function encapsulating custom code
session.run(lambda: print(f"Uploading file from {path}"))
# Fail the process with error message
session.fail("Error message")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dataforge_sdk-9.2.0rc3.tar.gz.
File metadata
- Download URL: dataforge_sdk-9.2.0rc3.tar.gz
- Upload date:
- Size: 10.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cc0213ff71ce67ae0ef3034065272df29d779f4a6734758f6c83c834031ba8d9
|
|
| MD5 |
6202c9d0741530a526c35e505fe832ca
|
|
| BLAKE2b-256 |
5de89b5962d03ee69bb0af459633b5451cbb48f5a4a3d69de1579d07019a7b3b
|
File details
Details for the file dataforge_sdk-9.2.0rc3-py3-none-any.whl.
File metadata
- Download URL: dataforge_sdk-9.2.0rc3-py3-none-any.whl
- Upload date:
- Size: 13.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6c86b6b4daa2f51ba860bc69214c8cff84c547d8e64c99b76a9d5ab21d939363
|
|
| MD5 |
48bdbade95bcf91c4bfa8e4c227873f3
|
|
| BLAKE2b-256 |
7b869a6e0cbb7ea655120b416e9da2d568a37ab7623e0aa6254e46a40d29c2f9
|