Import market data from Alpaca-API with PySpark
Project description
alpaca-pyspark
A high-performance PySpark connector for importing market data from the Alpaca Markets API in a distributed fashion.
⚠️ Important: This library requires Apache Spark 4.0+ and Python 3.11+
Overview
alpaca-pyspark provides custom PySpark DataSource implementations that enable efficient, parallel retrieval of market data from Alpaca Markets. The library leverages PySpark's distributed computing capabilities to fetch data across multiple stock symbols concurrently, with built-in retry logic, error handling, and PyArrow batch processing for optimal performance.
Key Features
- Distributed Data Fetching: Automatically parallelizes API requests across stock symbols and time ranges
- Intelligent Partitioning: Dynamically sizes partitions based on data volume for optimal load balancing
- PyArrow Batch Processing: Uses Apache Arrow for high-performance data transfer (up to 10x faster than row-by-row processing)
- Resilient: Built-in retry logic with exponential backoff for network failures
- Type-Safe: Strict schema definitions ensure data consistency
- Easy Integration: Works seamlessly with PySpark DataFrames and Spark SQL
Quick Start
Installation
This project uses Poetry for dependency management.
Prerequisites:
- Python 3.11 or higher
- Apache Spark 4.0+ / PySpark 4.0+
- Poetry installed (installation guide)
Setup:
git clone https://github.com/tnixon/alpaca-pyspark.git
cd alpaca-pyspark
poetry install
poetry shell
Basic Example
import datetime as dt
from zoneinfo import ZoneInfo
from pyspark.sql import SparkSession
from alpaca_pyspark.stocks import HistoricalBarsDataSource
# Initialize Spark session (requires Spark 4.0+)
spark = SparkSession.builder.appName("AlpacaExample").getOrCreate()
# Register the data source
spark.dataSource.register(HistoricalBarsDataSource)
# Configure the data source options
tz = ZoneInfo("America/New_York")
options = {
"symbols": ["AAPL", "MSFT", "GOOG"],
"APCA-API-KEY-ID": "your-api-key-id",
"APCA-API-SECRET-KEY": "your-api-secret-key",
"timeframe": "1Day",
"start": dt.datetime(2021, 1, 1, tzinfo=tz).isoformat(),
"end": dt.datetime(2022, 1, 1, tzinfo=tz).isoformat()
}
# Load data as a DataFrame
df = (spark.read
.format("Alpaca_Stocks_Bars")
.options(**options)
.load())
# Use the DataFrame
df.show()
df.createOrReplaceTempView("bars")
spark.sql("SELECT symbol, time, close FROM bars WHERE symbol = 'AAPL'").show()
Available Data Sources
| DataSource Name | Python Class | Description |
|---|---|---|
Alpaca_Stocks_Bars |
HistoricalBarsDataSource |
Historical OHLCV bars/candles for stocks |
Alpaca_Stocks_Trades |
HistoricalTradesDataSource |
Historical tick-by-tick trades for stocks |
Alpaca_Options_Bars |
HistoricalOptionBarsDataSource |
Historical OHLCV bars/candles for options |
Alpaca_Corporate_Actions |
CorporateActionsDataSource |
Corporate actions (splits, dividends, etc.) |
Example: Stock Trades
from alpaca_pyspark.stocks import HistoricalTradesDataSource
spark.dataSource.register(HistoricalTradesDataSource)
df = (spark.read
.format("Alpaca_Stocks_Trades")
.options(**options) # No timeframe needed for trades
.load())
Example: Options Data
from alpaca_pyspark.options import HistoricalOptionBarsDataSource
spark.dataSource.register(HistoricalOptionBarsDataSource)
options = {
"symbols": ["AAPL241220C00150000"], # Options use specific format: SYMBOL[YY]MMDD[C/P]XXXXXXXX
"APCA-API-KEY-ID": "your-api-key-id",
"APCA-API-SECRET-KEY": "your-api-secret-key",
"timeframe": "1Hour",
"start": dt.datetime(2024, 12, 1, tzinfo=tz).isoformat(),
"end": dt.datetime(2024, 12, 20, tzinfo=tz).isoformat()
}
df = (spark.read
.format("Alpaca_Options_Bars")
.options(**options)
.load())
Documentation
For detailed information about using and contributing to alpaca-pyspark:
- Usage Guide: Comprehensive configuration options, data schemas, and advanced usage patterns
- Contributing Guide: Development environment setup, testing procedures, and contribution workflow
Security
⚠️ Important: Never commit API credentials to version control. Use secure methods like:
- Environment variables
- Spark secrets management (e.g., Databricks secrets)
- Cloud secret managers (AWS Secrets Manager, Azure Key Vault, etc.)
import os
options = {
"APCA-API-KEY-ID": os.getenv("APCA_API_KEY_ID"),
"APCA-API-SECRET-KEY": os.getenv("APCA_API_SECRET_KEY"),
# ... other options
}
External Documentation
- Alpaca Market Data API: API specification and endpoints
- PySpark DataSource API: PySpark custom data source implementation
- PyArrow Documentation: Arrow batch processing
- Apache Spark SQL Guide: Spark SQL programming
License
See LICENSE file for details.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for detailed guidelines on:
- Development environment setup
- Code quality standards
- Testing procedures
- Contribution workflow
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file alpaca_pyspark-0.1.0.tar.gz.
File metadata
- Download URL: alpaca_pyspark-0.1.0.tar.gz
- Upload date:
- Size: 15.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4f2110228a3b17e4182affdeaba2aa0b51c989e28c4c4c9b6026083c964b29a5
|
|
| MD5 |
abde2af61d348da1b497eec7acd92149
|
|
| BLAKE2b-256 |
79499782763e849e730d5f3132633dc1fec797746d0cf4f2775d19c20228c8c3
|
Provenance
The following attestation bundles were made for alpaca_pyspark-0.1.0.tar.gz:
Publisher:
publish.yml on tnixon/alpaca-pyspark
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
alpaca_pyspark-0.1.0.tar.gz -
Subject digest:
4f2110228a3b17e4182affdeaba2aa0b51c989e28c4c4c9b6026083c964b29a5 - Sigstore transparency entry: 850111176
- Sigstore integration time:
-
Permalink:
tnixon/alpaca-pyspark@cbdf923d0ed213a2b557d284f4dac502976d36ce -
Branch / Tag:
refs/heads/main - Owner: https://github.com/tnixon
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@cbdf923d0ed213a2b557d284f4dac502976d36ce -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file alpaca_pyspark-0.1.0-py3-none-any.whl.
File metadata
- Download URL: alpaca_pyspark-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d4cca68b2b3b30b7ede345fdbc30f63caa62979287f8b86297b51e83c6222187
|
|
| MD5 |
a48504e3da9bbdfb58720495172e03e8
|
|
| BLAKE2b-256 |
623a586617717119791cea9842e4657e327a7c9903564f453faf5c2ae4eb6791
|
Provenance
The following attestation bundles were made for alpaca_pyspark-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on tnixon/alpaca-pyspark
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
alpaca_pyspark-0.1.0-py3-none-any.whl -
Subject digest:
d4cca68b2b3b30b7ede345fdbc30f63caa62979287f8b86297b51e83c6222187 - Sigstore transparency entry: 850111182
- Sigstore integration time:
-
Permalink:
tnixon/alpaca-pyspark@cbdf923d0ed213a2b557d284f4dac502976d36ce -
Branch / Tag:
refs/heads/main - Owner: https://github.com/tnixon
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@cbdf923d0ed213a2b557d284f4dac502976d36ce -
Trigger Event:
workflow_dispatch
-
Statement type: