This project provides a set of base classes that simplify the art of crafting bullet-proof Spark Structured Streaming applications.
Project description
pyspark-streaming-base
This project provides a set of base classes that simplify the art of crafting bullet-proof Spark Structured Streaming applications.
Getting Started
Installation
pip install pyspark-streaming-base
Or with uv:
uv add pyspark-streaming-base
Quick Example: Delta-to-Delta Streaming
Here's a complete example of a streaming application that reads from one Delta table and writes to another:
from pathlib import Path
from pyspark_streaming_base.app import StreamingApp
from pyspark_streaming_base.sources import DeltaStreamingSource
from pyspark_streaming_base.sinks import DeltaStreamingSink
# Define paths
source_table_name = 'source_table'
sink_table_name = 'sink_table'
checkpoints_path = '/data/checkpoints'
source_table_path = f'/data/delta_tables/{source_table_name}'
sink_table_path = f'/data/delta_tables/{sink_table_name}'
# Configuration dictionary
spark_config = {
# Application configuration
'spark.app.name': 'delta-to-delta-streaming',
'spark.app.version': '1.0.0',
'spark.app.checkpoints.path': checkpoints_path,
'spark.app.checkpoint.version': '1.0.0',
# Source: Delta table configuration
'spark.app.source.delta.options.path': source_table_path,
'spark.app.source.delta.options.startingVersion': '0',
'spark.app.source.delta.options.maxFilesPerTrigger': '10',
'spark.app.source.delta.options.ignoreChanges': 'true',
'spark.app.source.delta.options.withEventTimeOrder': 'true',
'spark.app.source.delta.table.tableName': source_table_name,
# Sink: Delta table configuration
'spark.app.sink.delta.options.path': sink_table_path,
'spark.app.sink.delta.options.outputMode': 'append',
'spark.app.sink.delta.options.mergeSchema': 'true',
'spark.app.sink.delta.options.maxRecordsPerFile': '100000',
'spark.app.sink.delta.options.queryName': 'delta-to-delta-query',
'spark.app.sink.delta.table.name': sink_table_name,
}
# Create and configure the application
app = StreamingApp()
# Apply all configurations to Spark RuntimeConf
for key, value in spark_config.items():
app.spark.conf.set(key, value)
# Initialize the application
app.initialize()
# Set up source and sink (they read from RuntimeConf automatically)
delta_source = DeltaStreamingSource(config_prefix='spark.app.source')
delta_sink = DeltaStreamingSink(config_prefix='spark.app.sink')
app.with_source(delta_source).with_sink(delta_sink)
# Run the streaming query
df = app.delta_source().generate().load()
sink_options = app.delta_sink().options()
query = (
delta_sink.fromDF(df)
.queryName(sink_options['queryName'])
.trigger(availableNow=True) # Process all available data
.outputMode(sink_options['outputMode'])
.start()
)
query.awaitTermination()
Key Features
- Flexible Configuration: Use RuntimeConf or builder pattern approaches
- Three-Tier Config System: Defaults → SparkSession config → Direct config
- Built-in Sources: Kafka and Delta Lake streaming sources
- Built-in Sinks: Delta Lake streaming sink
- Checkpoint Management: Automatic checkpoint location handling with version isolation
- Testing Friendly: Easy to configure for local testing with embedded Spark
Documentation
For comprehensive documentation including configuration reference, testing patterns, and best practices, see:
Local Developer Environment
Prerequisites
- Python 3.12.3+ - Managed via uv
- Java 17 or 21 - Required for PySpark 4.0.1 (Spark uses Scala 2.13)
- PySpark 4.0.1 - Specified in pyproject.toml
Java Setup (Mac)
# Install Java 21
brew install openjdk@21
# Set JAVA_HOME (add to ~/.zshrc or ~/.bashrc)
export JAVA_HOME=$(/usr/libexec/java_home -v 21)
# Verify installation
/usr/libexec/java_home -V
Setup and Development
# Install Python 3.12.3
uv python install 3.12.3
# Sync dependencies
uv sync
# Build the package
uv build
# Run tests with coverage
uv run pytest --cov=pyspark_streaming_base --cov-report term
# Or use the Makefile
make build # Runs sync, ruff check, pytest, and build
make test # Runs pytest only
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyspark_streaming_base-0.2.1.tar.gz.
File metadata
- Download URL: pyspark_streaming_base-0.2.1.tar.gz
- Upload date:
- Size: 15.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7a287031188e455807e4042434f6e6c27166fb551f214be839f0fcb0f5745ef4
|
|
| MD5 |
cadcd502cfd01e57bd40070d3da8f213
|
|
| BLAKE2b-256 |
fa173f9969cc9c4f62d155358cb9ad5d6061db57a5cdb0cc65a4dea417235b09
|
File details
Details for the file pyspark_streaming_base-0.2.1-py3-none-any.whl.
File metadata
- Download URL: pyspark_streaming_base-0.2.1-py3-none-any.whl
- Upload date:
- Size: 23.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2bf29c279d36a468accbbfc08e6912ee51b3ebab0d1be8b6ef80c4e22eb10773
|
|
| MD5 |
aae42e9bf9f6dd5c02889ce695534bd8
|
|
| BLAKE2b-256 |
f132f21e28e05a35e67d73fb18e8469fc60f21c2f44edbf475dab36510f91f18
|