Skip to main content

Custom Spark data sources for reading and writing data in Apache Spark, using the Python Data Source API

Project description

PySpark Data Sources

pypi code style: ruff

Custom Apache Spark data sources using the Python Data Source API (Spark 4.0+). Learn by example and build your own data sources.

Quick Start

Installation

pip install pyspark-data-sources

# Install with specific extras
pip install pyspark-data-sources[faker]        # For FakeDataSource

pip install pyspark-data-sources[all]          # All optional dependencies

Requirements

Basic Usage

from pyspark.sql import SparkSession
from pyspark_datasources import FakeDataSource

# Create Spark session
spark = SparkSession.builder.appName("datasource-demo").getOrCreate()

# Register the data source
spark.dataSource.register(FakeDataSource)

# Read batch data
df = spark.read.format("fake").option("numRows", 5).load()
df.show()
# +--------------+----------+-------+------------+
# |          name|      date|zipcode|       state|
# +--------------+----------+-------+------------+
# |  Pam Mitchell|1988-10-20|  23788|   Tennessee|
# |Melissa Turner|1996-06-14|  30851|      Nevada|
# |  Brian Ramsey|2021-08-21|  55277|  Washington|
# |  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
# | Douglas James|2007-01-18|  46226|     Alabama|
# +--------------+----------+-------+------------+

# Stream data
stream = spark.readStream.format("fake").load()
query = stream.writeStream.format("console").start()

Available Data Sources

Sources (Read)

Data Source Type Description Dependency
fake Batch/Stream Generate synthetic test data using Faker [faker]
github Batch Read GitHub pull requests None
googlesheets Batch Read public Google Sheets None
huggingface Batch Load Hugging Face datasets [huggingface]
stock Batch Fetch stock market data (Alpha Vantage) None
opensky Batch/Stream Live flight tracking data None
kaggle Batch Load Kaggle datasets [kaggle]
arrow Batch Read Apache Arrow files [arrow]
robinhood Batch Read cryptocurrency market data from Robinhood API [robinhood]
jsonplaceholder Batch Read JSON data for testing None
weather Batch Read current weather data (OpenWeatherMap) None

Sinks (Write)

Data Source Type Description Dependency
lance Batch Write Write Lance vector format [lance]
salesforce Stream Write Write to Salesforce objects [salesforce]
meta_capi Batch/Stream Write Write to Meta Conversions API None

📚 See detailed examples for all data sources →

Example: Generate Fake Data

from pyspark_datasources import FakeDataSource

spark.dataSource.register(FakeDataSource)

# Generate synthetic data with custom schema
df = spark.read.format("fake") \
    .schema("name string, email string, company string") \
    .option("numRows", 5) \
    .load()

df.show(truncate=False)
# +------------------+-------------------------+-----------------+
# |name              |email                    |company          |
# +------------------+-------------------------+-----------------+
# |Christine Sampson |johnsonjeremy@example.com|Hernandez-Nguyen |
# |Yolanda Brown     |williamlowe@example.net  |Miller-Hernandez |
# +------------------+-------------------------+-----------------+

Building Your Own Data Source

Here's a minimal example to get started:

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class MyCustomDataSource(DataSource):
    def name(self):
        return "mycustom"

    def schema(self):
        return StructType([
            StructField("id", IntegerType()),
            StructField("name", StringType())
        ])

    def reader(self, schema):
        return MyCustomReader(self.options, schema)

class MyCustomReader(DataSourceReader):
    def __init__(self, options, schema):
        self.options = options
        self.schema = schema

    def read(self, partition):
        # Your data reading logic here
        for i in range(10):
            yield (i, f"name_{i}")

# Register and use
spark.dataSource.register(MyCustomDataSource)
df = spark.read.format("mycustom").load()

📖 Complete guide with advanced patterns →

Documentation

Requirements

  • Apache Spark 4.0+ or Databricks Runtime 15.4 LTS+
  • Python 3.9-3.12

Contributing

We welcome contributions! See our Development Guide for details.

Resources

Community Tools

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_data_sources-0.1.11.tar.gz (37.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_data_sources-0.1.11-py3-none-any.whl (48.0 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_data_sources-0.1.11.tar.gz.

File metadata

File hashes

Hashes for pyspark_data_sources-0.1.11.tar.gz
Algorithm Hash digest
SHA256 a50ad178989d35880979617c2c5e5edddd473e8e4e41a8a7666c01fb2e7bcda2
MD5 b8afecd6a66da28202c3b75d9ff93b1f
BLAKE2b-256 76dfa5182bc688d94c588fe843ba4b62c74f3fda6d3d0e744ef1eeea06f9d6aa

See more details on using hashes here.

File details

Details for the file pyspark_data_sources-0.1.11-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_data_sources-0.1.11-py3-none-any.whl
Algorithm Hash digest
SHA256 057bdab17edfdff2b742c55b80ec7b28ccb642ef4c66c63a141111185edc007c
MD5 9ac419edf8eb826dbff82f79d7d5d029
BLAKE2b-256 af23effd4f138e14ad8b4777ed78ab70d6d271a17ef4630b3d39e95663903d7b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page