Skip to main content

Configuration-driven PySpark pipeline framework with HOCON

Project description

pyspark-pipeline-framework

CI Documentation Status PyPI version Production Ready License: Apache 2.0 Code style: black Ruff

Configuration-driven PySpark pipeline framework with HOCON configuration, resilience patterns, lifecycle hooks, and streaming support.

Scala/JVM users may also be interested in spark-pipeline-framework, the Scala implementation of this framework using PureConfig and Typesafe Config. You can find it on GitHub and Maven Central.

Installation

pip install pyspark-pipeline-framework

For development:

git clone https://github.com/dwsmith1983/pyspark-pipeline-framework.git
cd pyspark-pipeline-framework
pip install -e ".[dev]"

Quick Start

from pyspark_pipeline_framework.runner import SimplePipelineRunner

# Load pipeline from HOCON config and run
runner = SimplePipelineRunner.from_file("pipeline.conf")
result = runner.run()

print(result.status)            # PipelineResultStatus.SUCCESS
print(result.total_duration_ms) # 1234

Batch Pipelines

Creating Components

Extend DataFlow and implement name and run():

from pyspark_pipeline_framework.runtime.dataflow.base import DataFlow


class MyTransform(DataFlow):
    def __init__(self, output_view: str) -> None:
        super().__init__()
        self._output_view = output_view

    @property
    def name(self) -> str:
        return "MyTransform"

    @classmethod
    def from_config(cls, config: dict) -> "MyTransform":
        return cls(**config)

    def run(self) -> None:
        df = self.spark.sql("SELECT id, UPPER(name) AS name FROM raw")
        df.createOrReplaceTempView(self._output_view)

The framework injects a SparkSession via set_spark_session() before calling run(). Access it through self.spark.

Built-in Example Components

Three reference components are included:

  • ReadTable -- reads a table and registers a temp view
  • SqlTransform -- executes SQL and registers the result
  • WriteTable -- writes a temp view to a table
from pyspark_pipeline_framework.examples.batch import (
    ReadTable, ReadTableConfig,
    SqlTransform, SqlTransformConfig,
    WriteTable, WriteTableConfig,
)

HOCON Configuration

{
  name: "customer-etl"
  version: "1.0.0"

  spark {
    app_name: "Customer ETL"
    master: "local[*]"
  }

  components: [
    {
      name: "read_raw"
      component_type: source
      class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
      config {
        table_name: "raw.customers"
        output_view: "raw_customers"
      }
    },
    {
      name: "transform"
      component_type: transformation
      class_path: "pyspark_pipeline_framework.examples.batch.SqlTransform"
      depends_on: ["read_raw"]
      config {
        sql: "SELECT id, UPPER(name) AS name FROM raw_customers"
        output_view: "cleaned"
      }
    },
    {
      name: "write"
      component_type: sink
      class_path: "pyspark_pipeline_framework.examples.batch.WriteTable"
      depends_on: ["transform"]
      config {
        input_view: "cleaned"
        output_table: "curated.customers"
      }
    }
  ]
}

Streaming Pipelines

Creating a Streaming Pipeline

Extend StreamingPipeline and provide a source, sink, and optional transform:

from pyspark_pipeline_framework.runtime.streaming.base import (
    StreamingPipeline, StreamingSource, StreamingSink,
    TriggerConfig, TriggerType,
)
from pyspark_pipeline_framework.runtime.streaming.sources import (
    KafkaStreamingSource,
)
from pyspark_pipeline_framework.runtime.streaming.sinks import (
    DeltaStreamingSink,
)


class EventIngestion(StreamingPipeline):
    def __init__(self) -> None:
        super().__init__()
        self._source = KafkaStreamingSource(
            bootstrap_servers="broker:9092", topics="events",
        )
        self._sink = DeltaStreamingSink(
            path="/data/delta/events",
            checkpoint_location="/checkpoints/events",
        )

    @property
    def name(self) -> str:
        return "EventIngestion"

    @property
    def source(self) -> StreamingSource:
        return self._source

    @property
    def sink(self) -> StreamingSink:
        return self._sink

    @property
    def trigger(self) -> TriggerConfig:
        return TriggerConfig(TriggerType.PROCESSING_TIME, "30 seconds")

    def transform(self, df):
        # Parse JSON value from Kafka
        return df.selectExpr("CAST(value AS STRING) AS raw_json")

Built-in Streaming Components

Sources: KafkaStreamingSource, FileStreamingSource, DeltaStreamingSource, IcebergStreamingSource, RateStreamingSource

Sinks: KafkaStreamingSink, DeltaStreamingSink, ConsoleStreamingSink, IcebergStreamingSink, FileStreamingSink

Example Pipelines

from pyspark_pipeline_framework.examples.streaming import (
    FileToConsolePipeline,
    KafkaToDeltaPipeline,
)

Running a Stream

pipeline.set_spark_session(spark)

# Blocking -- runs until terminated
pipeline.run()

# Non-blocking -- returns StreamingQuery handle
query = pipeline.start_stream()
query.awaitTermination(timeout=60)

Resilience

Retry Policy

Configure per-component retries with exponential backoff:

components: [
  {
    name: "flaky_source"
    component_type: source
    class_path: "my.module.FlakySource"
    retry {
      max_attempts: 3
      initial_delay_seconds: 1.0
      max_delay_seconds: 30.0
      backoff_multiplier: 2.0
    }
  }
]

Circuit Breaker

Prevent repeated calls to failing components:

components: [
  {
    name: "external_api"
    component_type: source
    class_path: "my.module.ApiSource"
    circuit_breaker {
      failure_threshold: 5
      timeout_seconds: 60.0
    }
  }
]

Lifecycle Hooks

Hooks receive callbacks at pipeline and component lifecycle events. Combine multiple hooks with CompositeHooks:

from pyspark_pipeline_framework.runner import (
    CompositeHooks, LoggingHooks, MetricsHooks,
    SimplePipelineRunner,
)

hooks = CompositeHooks(LoggingHooks(), MetricsHooks())
runner = SimplePipelineRunner(config, hooks=hooks)
result = runner.run()

Available Hooks

  • LoggingHooks -- logs lifecycle events
  • MetricsHooks -- collects timing and retry counts
  • DataQualityHooks -- runs data quality checks at lifecycle points
  • AuditHooks -- emits audit events for compliance
  • CheckpointHooks -- saves checkpoint state for resume

Data Quality Checks

from pyspark_pipeline_framework.core.quality import row_count_check, null_check
from pyspark_pipeline_framework.runner import DataQualityHooks, CompositeHooks

dq = DataQualityHooks(spark_wrapper)
dq.register(row_count_check("curated.customers", min_rows=100))
dq.register(null_check("curated.customers", "email", max_null_pct=5.0))

hooks = CompositeHooks(LoggingHooks(), dq)
runner = SimplePipelineRunner(config, hooks=hooks)

Audit Trail

from pyspark_pipeline_framework.core.audit import (
    LoggingAuditSink, FileAuditSink, CompositeAuditSink,
)
from pyspark_pipeline_framework.runner import AuditHooks, CompositeHooks

audit_sink = CompositeAuditSink(
    LoggingAuditSink(),
    FileAuditSink("/var/log/pipeline-audit.jsonl"),
)
hooks = CompositeHooks(LoggingHooks(), AuditHooks(audit_sink))

Secrets Management

from pyspark_pipeline_framework.core.secrets import (
    EnvSecretsProvider, SecretsResolver, SecretsCache, SecretsReference,
)

resolver = SecretsResolver()
resolver.register(EnvSecretsProvider())

cache = SecretsCache(resolver, ttl_seconds=300)

result = cache.resolve(SecretsReference(provider="env", key="DB_PASSWORD"))
if result.value:
    print("Secret resolved successfully")

Providers: EnvSecretsProvider, AwsSecretsProvider, VaultSecretsProvider

Checkpoint and Resume

Resume pipelines from the last successful component:

from pyspark_pipeline_framework.runner import (
    LocalCheckpointStore, CheckpointHooks, CompositeHooks,
    compute_pipeline_fingerprint, load_checkpoint_for_resume,
)

store = LocalCheckpointStore(Path("/tmp/checkpoints"))
fingerprint = compute_pipeline_fingerprint(config)
checkpoint_hooks = CheckpointHooks(store, run_id="run-001", pipeline_fingerprint=fingerprint)

hooks = CompositeHooks(LoggingHooks(), checkpoint_hooks)
runner = SimplePipelineRunner(config, hooks=hooks)

# First run
result = runner.run()

# Resume after failure
completed = load_checkpoint_for_resume(store, "run-001", config)
result = runner.run(completed_components=completed)

Testing Components

Use MagicMock for the SparkSession:

from unittest.mock import MagicMock
from my_project.components import MyTransform


def test_my_transform():
    spark = MagicMock()
    df = MagicMock()
    spark.sql.return_value = df

    comp = MyTransform(output_view="result")
    comp.set_spark_session(spark)
    comp.run()

    spark.sql.assert_called_once()
    df.createOrReplaceTempView.assert_called_once_with("result")

Configuration Reference

Field Type Required Description
name string yes Pipeline name
version string yes Pipeline version
spark.app_name string yes Spark application name
spark.master string no Spark master URL
components[].name string yes Unique component name
components[].component_type enum yes source, transformation, sink
components[].class_path string yes Python class to instantiate
components[].config object no Component-specific configuration
components[].depends_on list no Names of prerequisite components
components[].enabled bool no Enable/disable (default: true)
components[].retry object no Retry policy configuration
components[].circuit_breaker object no Circuit breaker configuration

License

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_pipeline_framework-1.0.0.tar.gz (125.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_pipeline_framework-1.0.0-py3-none-any.whl (90.8 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_pipeline_framework-1.0.0.tar.gz.

File metadata

File hashes

Hashes for pyspark_pipeline_framework-1.0.0.tar.gz
Algorithm Hash digest
SHA256 a1cbf15956190d6f89d95f433b8fdadfc488f5738987f4a1ee8a5899dae45187
MD5 d8f0c7c953a1ca0807415388609ab0f2
BLAKE2b-256 39ef19f6f699b16b16e30c6c7ba23c4017ba35de422a0e91258f4e63696abf63

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyspark_pipeline_framework-1.0.0.tar.gz:

Publisher: release.yml on dwsmith1983/pyspark-pipeline-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyspark_pipeline_framework-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_pipeline_framework-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fd89e90f3df7e91da4dc39cc7655454942468ac4b4b604a7bbb66b71bbe3db95
MD5 5e7f276988c4c64bbe9bd5d33a3a591e
BLAKE2b-256 12bcc746c6eee4bd7d69a54b9362540455f427b2299b857f4f0f5f9b6bbd4410

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyspark_pipeline_framework-1.0.0-py3-none-any.whl:

Publisher: release.yml on dwsmith1983/pyspark-pipeline-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page