Skip to main content

This project provides a set of base classes that simplify the art of crafting bullet-proof Spark Structured Streaming applications.

Project description

pyspark-streaming-base

This project provides a set of base classes that simplify the art of crafting bullet-proof Spark Structured Streaming applications.

AutoSloth

Getting Started

Installation

pip install pyspark-streaming-base

Or with uv:

uv add pyspark-streaming-base

Quick Example: Delta-to-Delta Streaming

Here's a complete example of a streaming application that reads from one Delta table and writes to another:

from pathlib import Path
from pyspark_streaming_base.app import StreamingApp
from pyspark_streaming_base.sources import DeltaStreamingSource
from pyspark_streaming_base.sinks import DeltaStreamingSink

# Define paths
source_table_name = 'source_table'
sink_table_name = 'sink_table'
checkpoints_path = '/data/checkpoints'
source_table_path = f'/data/delta_tables/{source_table_name}'
sink_table_path = f'/data/delta_tables/{sink_table_name}'

# Configuration dictionary
spark_config = {
    # Application configuration
    'spark.app.name': 'delta-to-delta-streaming',
    'spark.app.version': '1.0.0',
    'spark.app.checkpoints.path': checkpoints_path,
    'spark.app.checkpoint.version': '1.0.0',

    # Source: Delta table configuration
    'spark.app.source.delta.options.path': source_table_path,
    'spark.app.source.delta.options.startingVersion': '0',
    'spark.app.source.delta.options.maxFilesPerTrigger': '10',
    'spark.app.source.delta.options.ignoreChanges': 'true',
    'spark.app.source.delta.options.withEventTimeOrder': 'true',
    'spark.app.source.delta.table.tableName': source_table_name,

    # Sink: Delta table configuration
    'spark.app.sink.delta.options.path': sink_table_path,
    'spark.app.sink.delta.options.outputMode': 'append',
    'spark.app.sink.delta.options.mergeSchema': 'true',
    'spark.app.sink.delta.options.maxRecordsPerFile': '100000',
    'spark.app.sink.delta.options.queryName': 'delta-to-delta-query',
    'spark.app.sink.delta.table.name': sink_table_name,
}

# Create and configure the application
app = StreamingApp()

# Apply all configurations to Spark RuntimeConf
for key, value in spark_config.items():
    app.spark.conf.set(key, value)

# Initialize the application
app.initialize()

# Set up source and sink (they read from RuntimeConf automatically)
delta_source = DeltaStreamingSource(config_prefix='spark.app.source')
delta_sink = DeltaStreamingSink(config_prefix='spark.app.sink')
app.with_source(delta_source).with_sink(delta_sink)

# Run the streaming query
df = app.delta_source().generate().load()
sink_options = app.delta_sink().options()

query = (
    delta_sink.fromDF(df)
    .queryName(sink_options['queryName'])
    .trigger(availableNow=True)  # Process all available data
    .outputMode(sink_options['outputMode'])
    .start()
)

query.awaitTermination()

Key Features

  • Flexible Configuration: Use RuntimeConf or builder pattern approaches
  • Three-Tier Config System: Defaults → SparkSession config → Direct config
  • Built-in Sources: Kafka and Delta Lake streaming sources
  • Built-in Sinks: Delta Lake streaming sink
  • Checkpoint Management: Automatic checkpoint location handling with version isolation
  • Testing Friendly: Easy to configure for local testing with embedded Spark

Documentation

For comprehensive documentation including configuration reference, testing patterns, and best practices, see:

Local Developer Environment

Prerequisites

  • Python 3.12.3+ - Managed via uv
  • Java 17 or 21 - Required for PySpark 4.0.1 (Spark uses Scala 2.13)
  • PySpark 4.0.1 - Specified in pyproject.toml

Java Setup (Mac)

# Install Java 21
brew install openjdk@21

# Set JAVA_HOME (add to ~/.zshrc or ~/.bashrc)
export JAVA_HOME=$(/usr/libexec/java_home -v 21)

# Verify installation
/usr/libexec/java_home -V

Setup and Development

# Install Python 3.12.3
uv python install 3.12.3

# Sync dependencies
uv sync

# Build the package
uv build

# Run tests with coverage
uv run pytest --cov=pyspark_streaming_base --cov-report term

# Or use the Makefile
make build  # Runs sync, ruff check, pytest, and build
make test   # Runs pytest only

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_streaming_base-0.2.1.tar.gz (15.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_streaming_base-0.2.1-py3-none-any.whl (23.6 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_streaming_base-0.2.1.tar.gz.

File metadata

  • Download URL: pyspark_streaming_base-0.2.1.tar.gz
  • Upload date:
  • Size: 15.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pyspark_streaming_base-0.2.1.tar.gz
Algorithm Hash digest
SHA256 7a287031188e455807e4042434f6e6c27166fb551f214be839f0fcb0f5745ef4
MD5 cadcd502cfd01e57bd40070d3da8f213
BLAKE2b-256 fa173f9969cc9c4f62d155358cb9ad5d6061db57a5cdb0cc65a4dea417235b09

See more details on using hashes here.

File details

Details for the file pyspark_streaming_base-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: pyspark_streaming_base-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 23.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pyspark_streaming_base-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2bf29c279d36a468accbbfc08e6912ee51b3ebab0d1be8b6ef80c4e22eb10773
MD5 aae42e9bf9f6dd5c02889ce695534bd8
BLAKE2b-256 f132f21e28e05a35e67d73fb18e8469fc60f21c2f44edbf475dab36510f91f18

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page