Skip to main content

This project provides a set of base classes that simplify the art of crafting bullet-proof Spark Structured Streaming applications.

Project description

pyspark-streaming-base

This project provides a set of base classes that simplify the art of crafting bullet-proof Spark Structured Streaming applications.

AutoSloth

Getting Started

Installation

pip install pyspark-streaming-base

Or with uv:

uv add pyspark-streaming-base

Quick Example: Delta-to-Delta Streaming

Here's a complete example of a streaming application that reads from one Delta table and writes to another:

from pathlib import Path
from pyspark_streaming_base.app import StreamingApp
from pyspark_streaming_base.sources import DeltaStreamingSource
from pyspark_streaming_base.sinks import DeltaStreamingSink

# Define paths
source_table_name = 'source_table'
sink_table_name = 'sink_table'
checkpoints_path = '/data/checkpoints'
source_table_path = f'/data/delta_tables/{source_table_name}'
sink_table_path = f'/data/delta_tables/{sink_table_name}'

# Configuration dictionary
spark_config = {
    # Application configuration
    'spark.app.name': 'delta-to-delta-streaming',
    'spark.app.version': '1.0.0',
    'spark.app.checkpoints.path': checkpoints_path,
    'spark.app.checkpoint.version': '1.0.0',

    # Source: Delta table configuration
    'spark.app.source.delta.options.path': source_table_path,
    'spark.app.source.delta.options.startingVersion': '0',
    'spark.app.source.delta.options.maxFilesPerTrigger': '10',
    'spark.app.source.delta.options.ignoreChanges': 'true',
    'spark.app.source.delta.options.withEventTimeOrder': 'true',
    'spark.app.source.delta.table.tableName': source_table_name,

    # Sink: Delta table configuration
    'spark.app.sink.delta.options.path': sink_table_path,
    'spark.app.sink.delta.options.outputMode': 'append',
    'spark.app.sink.delta.options.mergeSchema': 'true',
    'spark.app.sink.delta.options.maxRecordsPerFile': '100000',
    'spark.app.sink.delta.options.queryName': 'delta-to-delta-query',
    'spark.app.sink.delta.table.name': sink_table_name,
}

# Create and configure the application
app = StreamingApp()

# Apply all configurations to Spark RuntimeConf
for key, value in spark_config.items():
    app.spark.conf.set(key, value)

# Initialize the application
app.initialize()

# Set up source and sink (they read from RuntimeConf automatically)
delta_source = DeltaStreamingSource(config_prefix='spark.app.source')
delta_sink = DeltaStreamingSink(config_prefix='spark.app.sink')
app.with_source(delta_source).with_sink(delta_sink)

# Run the streaming query
df = app.delta_source().generate().load()
sink_options = app.delta_sink().options()

query = (
    delta_sink.fromDF(df)
    .queryName(sink_options['queryName'])
    .trigger(availableNow=True)  # Process all available data
    .outputMode(sink_options['outputMode'])
    .start()
)

query.awaitTermination()

Key Features

  • Flexible Configuration: Use RuntimeConf or builder pattern approaches
  • Three-Tier Config System: Defaults → SparkSession config → Direct config
  • Built-in Sources: Kafka and Delta Lake streaming sources
  • Built-in Sinks: Delta Lake streaming sink
  • Checkpoint Management: Automatic checkpoint location handling with version isolation
  • Testing Friendly: Easy to configure for local testing with embedded Spark

Documentation

For comprehensive documentation including configuration reference, testing patterns, and best practices, see:

Local Developer Environment

Prerequisites

  • Python 3.13+ - Managed via uv
  • Java 17 or 21 - Required for PySpark 4.0.1 (Spark uses Scala 2.13)
  • PySpark 4.0.1 - Specified in pyproject.toml

Java Setup (Mac)

# Install Java 21
brew install openjdk@21

# Set JAVA_HOME (add to ~/.zshrc or ~/.bashrc)
export JAVA_HOME=$(/usr/libexec/java_home -v 21)

# Verify installation
/usr/libexec/java_home -V

Setup and Development

# Install Python 3.13
uv python install 3.13

# Sync dependencies
uv sync

# Build the package
uv build

# Run tests with coverage
uv run pytest --cov=pyspark_streaming_base --cov-report term

# Or use the Makefile
make build  # Runs sync, ruff check, pytest, and build
make test   # Runs pytest only

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_streaming_base-0.1.1.tar.gz (15.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_streaming_base-0.1.1-py3-none-any.whl (23.5 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_streaming_base-0.1.1.tar.gz.

File metadata

File hashes

Hashes for pyspark_streaming_base-0.1.1.tar.gz
Algorithm Hash digest
SHA256 65682a88bceaef8af6858c301c8490bcc0dd6bbd6e0de2406804056ac141d669
MD5 573952b72928c986582d80034df5636c
BLAKE2b-256 f3e6377a03d09c8b39f2208d89bfc6d48bee4f5dc3a37f60c16c742b8925d0e9

See more details on using hashes here.

File details

Details for the file pyspark_streaming_base-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_streaming_base-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e5a7070c17925c1071f889ee17fbfb393b05c7b8882a9804a5e560d936f54bae
MD5 52fc7387d0fd5dd13960295d75ee0a47
BLAKE2b-256 bebcb217c7fb8fec6604e0ab646df75d51262f534ea07a524d2c032e20999750

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page