Skip to main content

SDK for creating DataForge extensions

Project description

dataforge-sdk

SDK for creating DataForge extensions.

Postgres Utilities

The dataforge.pg module provides helper functions to execute SQL operations against the DataForge Postgres metastore:

from dataforge.pg import select, update, pull

# Execute a SELECT query and return a Spark DataFrame
df = select("SELECT * FROM my_table")

# Execute an UPDATE/INSERT/DELETE query
update("UPDATE my_table SET col = 'value'")

# Trigger a new data pull for source_id 123
pull(123)

IngestionSession

The IngestionSession class manages a custom data ingestion process lifecycle.

from dataforge import IngestionSession

# Initialize a session (production use)
session = IngestionSession()

# Initialize a session (optional source_name/project_name for testing)
session = IngestionSession(source_name="my_source", project_name="my_project")

# Ingest data 
# pass a function returning a DataFrame (recommended to integrate logging with DataForge)
session.ingest(lambda: spark.read.csv("s3://bucket/path/input.csv"))

# pass a DataFrame (can be used for testing, not recommended for production deployment)
df = spark.read.csv("s3://bucket/path/input.csv")
session.ingest(df)

# ingest empty dataframe to create 0-record input
session.ingest()


# Fail the process with error message
session.fail("Error message")

# Retrieve latest tracking fields
tracking = session.latest_tracking_fields()

# Retrieve connection parameters for the current source
connection_parameters = session.connection_parameters()

# Retrieve custom parameters for the current source
custom_parameters = session.custom_parameters()

ParsingSession

The ParsingSession class manages a custom parse process lifecycle.

from dataforge import ParsingSession

# Initialize a session (production use)
session = ParsingSession()

# Initialize a session (optional input_id for testing)
session = ParsingSession(input_id=123)

# Retrieve custom parameters
params = session.custom_parameters()

# Get the path of file to be parsed
path = session.file_path

# Run parsing: pass a DataFrame, a function returning a DataFrame or None (0-record file)
session.run(lambda: spark.read.json(session.file_path))

# Fail the process with error message
session.fail("Error message")

PostOutputSession

The PostOutputSession class manages a custom post-output process lifecycle.

from dataforge import PostOutputSession

# Initialize a session (production use)
session = PostOutputSession()

# Initialize a session (optional names for testing)
session = PostOutputSession(output_name="report", output_source_name="my_source", project_name="my_project")


# Get the path of file generated by preceding output process
path = session.file_path()

# Retrieve connection parameters for the current output
connection_parameters = session.connection_parameters()

# Retrieve custom parameters for the current source
custom_parameters = session.custom_parameters()

# Run post-output logic: pass a function encapsulating custom code
session.run(lambda: print(f"Uploading file from {path}"))

# Fail the process with error message
session.fail("Error message")

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataforge_sdk-9.2.6.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dataforge_sdk-9.2.6-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file dataforge_sdk-9.2.6.tar.gz.

File metadata

  • Download URL: dataforge_sdk-9.2.6.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for dataforge_sdk-9.2.6.tar.gz
Algorithm Hash digest
SHA256 5c91254a1fba462f4856cb55ee41f00b3eb9895505c1d3dea418d1bb071149e4
MD5 0051406881f95f96d6e429d6e0a5982a
BLAKE2b-256 7a51b12465b675f14715b8aa9662d27a233a3b2dca32d25d66ce39f6be00c4e3

See more details on using hashes here.

File details

Details for the file dataforge_sdk-9.2.6-py3-none-any.whl.

File metadata

  • Download URL: dataforge_sdk-9.2.6-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for dataforge_sdk-9.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 6c6a80afdb3ab0d57baf0c010609ea1a3b6806425a806a54bc9ebeafe791452b
MD5 23eb5c069e05fede4924e1855beb5dd7
BLAKE2b-256 deb0e17be9ab238ae86ea321d9c088bb10a49014d3ff8ce7770b079c55abdd40

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page