Skip to main content

This project provides a set of base classes that simplify the art of crafting bullet-proof Spark Structured Streaming applications.

Project description

pyspark-streaming-base

This project provides a set of base classes that simplify the art of crafting bullet-proof Spark Structured Streaming applications.

AutoSloth

Getting Started

Installation

pip install pyspark-streaming-base

Or with uv:

uv add pyspark-streaming-base

Quick Example: Delta-to-Delta Streaming

Here's a complete example of a streaming application that reads from one Delta table and writes to another:

from pathlib import Path
from pyspark_streaming_base.app import StreamingApp
from pyspark_streaming_base.sources import DeltaStreamingSource
from pyspark_streaming_base.sinks import DeltaStreamingSink

# Define paths
source_table_name = 'source_table'
sink_table_name = 'sink_table'
checkpoints_path = '/data/checkpoints'
source_table_path = f'/data/delta_tables/{source_table_name}'
sink_table_path = f'/data/delta_tables/{sink_table_name}'

# Configuration dictionary
spark_config = {
    # Application configuration
    'spark.app.name': 'delta-to-delta-streaming',
    'spark.app.version': '1.0.0',
    'spark.app.checkpoints.path': checkpoints_path,
    'spark.app.checkpoint.version': '1.0.0',

    # Source: Delta table configuration
    'spark.app.source.delta.options.path': source_table_path,
    'spark.app.source.delta.options.startingVersion': '0',
    'spark.app.source.delta.options.maxFilesPerTrigger': '10',
    'spark.app.source.delta.options.ignoreChanges': 'true',
    'spark.app.source.delta.options.withEventTimeOrder': 'true',
    'spark.app.source.delta.table.tableName': source_table_name,

    # Sink: Delta table configuration
    'spark.app.sink.delta.options.path': sink_table_path,
    'spark.app.sink.delta.options.outputMode': 'append',
    'spark.app.sink.delta.options.mergeSchema': 'true',
    'spark.app.sink.delta.options.maxRecordsPerFile': '100000',
    'spark.app.sink.delta.options.queryName': 'delta-to-delta-query',
    'spark.app.sink.delta.table.name': sink_table_name,
}

# Create and configure the application
app = StreamingApp()

# Apply all configurations to Spark RuntimeConf
for key, value in spark_config.items():
    app.spark.conf.set(key, value)

# Initialize the application
app.initialize()

# Set up source and sink (they read from RuntimeConf automatically)
delta_source = DeltaStreamingSource(config_prefix='spark.app.source')
delta_sink = DeltaStreamingSink(config_prefix='spark.app.sink')
app.with_source(delta_source).with_sink(delta_sink)

# Run the streaming query
df = app.delta_source().generate().load()
sink_options = app.delta_sink().options()

query = (
    delta_sink.fromDF(df)
    .queryName(sink_options['queryName'])
    .trigger(availableNow=True)  # Process all available data
    .outputMode(sink_options['outputMode'])
    .start()
)

query.awaitTermination()

Key Features

  • Flexible Configuration: Use RuntimeConf or builder pattern approaches
  • Three-Tier Config System: Defaults → SparkSession config → Direct config
  • Built-in Sources: Kafka and Delta Lake streaming sources
  • Built-in Sinks: Delta Lake streaming sink
  • Checkpoint Management: Automatic checkpoint location handling with version isolation
  • Testing Friendly: Easy to configure for local testing with embedded Spark

Documentation

For comprehensive documentation including configuration reference, testing patterns, and best practices, see:

Local Developer Environment

Prerequisites

  • Python 3.12.3+ - Managed via uv
  • Java 17 or 21 - Required for PySpark 4.0.1 (Spark uses Scala 2.13)
  • PySpark 4.0.1 - Specified in pyproject.toml

Java Setup (Mac)

# Install Java 21
brew install openjdk@21

# Set JAVA_HOME (add to ~/.zshrc or ~/.bashrc)
export JAVA_HOME=$(/usr/libexec/java_home -v 21)

# Verify installation
/usr/libexec/java_home -V

Setup and Development

# Install Python 3.12.3
uv python install 3.12.3

# Sync dependencies
uv sync

# Build the package
uv build

# Run tests with coverage
uv run pytest --cov=pyspark_streaming_base --cov-report term

# Or use the Makefile
make build  # Runs sync, ruff check, pytest, and build
make test   # Runs pytest only

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_streaming_base-0.2.0.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_streaming_base-0.2.0-py3-none-any.whl (23.5 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_streaming_base-0.2.0.tar.gz.

File metadata

  • Download URL: pyspark_streaming_base-0.2.0.tar.gz
  • Upload date:
  • Size: 15.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pyspark_streaming_base-0.2.0.tar.gz
Algorithm Hash digest
SHA256 e91173124a32161fd1278eec1818c194bace8904554409c94cc68eb2fec4405f
MD5 1c30d8d7b7613c532f183d106ce7f77a
BLAKE2b-256 416766d09e341f3124b69703e5c534c8ce42bd892483311e9f162dca49ebcacc

See more details on using hashes here.

File details

Details for the file pyspark_streaming_base-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pyspark_streaming_base-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 23.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.9 {"installer":{"name":"uv","version":"0.10.9","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pyspark_streaming_base-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a3a690d7ea51f52ed95c2f1152e41fabf2fced2cbc28c1ccab57f7f5b6ab1095
MD5 86c85e79ea33bfd6729f0bb171a4e75a
BLAKE2b-256 e12d847e275d3f789185a9e44cf40bd08e709fbf30dc13012289eaef790622c8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page