GTFS-RT Fetcher and Aggregator to Parquet format
Project description
GTFS-RT Aggregator
This project provides a pipeline for fetching, storing, and aggregating GTFS-RT (General Transit Feed Specification - Realtime) data from multiple providers into Parquet format.
Features
- Fetch GTFS-RT data from multiple providers and APIs
- Store individual data files in Parquet format with multiple storage backends (filesystem, Google Cloud Storage, MinIO)
- Aggregate data files based on configurable time intervals
- Run fetcher and aggregator services in parallel
- Configurable via a single TOML configuration file
Requirements
- Python 3.11+
- Required Python packages (see requirements.txt):
- requests
- gtfs-realtime-bindings
- pandas
- pyarrow
- schedule
- pydantic
- google-cloud-storage (optional, for GCS storage)
- minio (optional, for MinIO storage)
Installation
From PyPI (recommended)
pip install gtfs-rt-aggregator
From Source
-
Clone this repository:
git clone https://github.com/GaspardMerten/gtfs-rt-aggregator cd gtfs-rt-aggregator
-
Install in development mode:
pip install -e .
Configuration
The pipeline is configured using a TOML configuration file. Here's an example:
# GTFS-RT Configuration File
[storage]
type = "filesystem" # Options: "filesystem", "gcs", or "minio"
[storage.params]
base_directory = "data" # Base directory for filesystem storage
# Provider configurations
[[providers]]
name = "ovapi"
timezone = "Europe/Amsterdam"
[[providers.apis]]
url = "https://gtfs.ovapi.nl/nl/vehiclePositions.pb"
services = ["VehiclePosition"]
refresh_seconds = 20 # Fetch every 20 seconds
frequency_minutes = 60 # Group files in 60-minute intervals
check_interval_seconds = 300 # Check for new files every 5 minutes
[[providers.apis]]
url = "https://gtfs.ovapi.nl/nl/tripUpdates.pb"
services = ["TripUpdate"]
refresh_seconds = 20 # Fetch every 20 seconds
Storage Backend Examples
Google Cloud Storage
[storage]
type = "gcs"
[storage.params]
bucket_name = "my-gtfs-bucket"
base_path = "gtfs-data" # Optional: subfolder within the bucket
# Authentication is handled via the GOOGLE_APPLICATION_CREDENTIALS environment variable
MinIO Storage
[storage]
type = "minio"
[storage.params]
endpoint = "minio.example.com:9000"
access_key = "YOUR_ACCESS_KEY"
secret_key = "YOUR_SECRET_KEY"
bucket_name = "gtfs-data"
secure = true # Use HTTPS
base_path = "gtfs-feeds" # Optional: subfolder within the bucket
Configuration Options
-
storage: Global storage configuration
- type: Storage backend type ("filesystem", "gcs", or "minio")
- params: Backend-specific parameters
-
providers: List of GTFS-RT data providers
- name: Name of the provider (used for directory structure)
- timezone: Timezone for the provider's data
- apis: List of API endpoints for this provider
- url: URL of the GTFS-RT feed
- services: List of service types to extract from the feed (VehiclePosition, TripUpdate, Alert)
- refresh_seconds: How often to fetch data from this API
- frequency_minutes: The time interval (in minutes) for grouping files
- check_interval_seconds: How often to check for new files to aggregate
Usage
Command Line
Run the pipeline with a configuration file:
gtfs-rt-pipeline configuration.toml
You can adjust the logging level with the --log-level parameter:
gtfs-rt-pipeline configuration.toml --log-level DEBUG
Programmatic Usage
from gtfs_rt_aggregator import run_pipeline_from_toml
# Run pipeline from a TOML file
run_pipeline_from_toml("configuration.toml")
Or with a configuration object:
from gtfs_rt_aggregator.config.loader import load_config_from_toml
from gtfs_rt_aggregator import run_pipeline
# Load configuration
config = load_config_from_toml("configuration.toml")
# Run pipeline
run_pipeline(config)
Project Structure
src/gtfs_rt_aggregator/
├── __init__.py # Package initialization
├── pipeline.py # Main pipeline implementation
├── aggregator/ # Aggregation functionality
├── config/ # Configuration loading and validation
├── fetcher/ # GTFS-RT data fetching functionality
├── storage/ # Storage backend implementations
└── utils/ # Utility functions and helpers
├── cli.py # Command-line interface
└── ...
License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file gtfs_rt_aggregator-0.1.6.tar.gz.
File metadata
- Download URL: gtfs_rt_aggregator-0.1.6.tar.gz
- Upload date:
- Size: 26.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6aea68444f0bd66e7de556833b57fc11b54f15755bf5e48fb07d88ca78721add
|
|
| MD5 |
fe5a0e5d3a1c82b14e21e64737dc80aa
|
|
| BLAKE2b-256 |
050372c6ef20d7a44b5d3631c2bd8fd0baeca62c427f60872bc272da83586587
|
Provenance
The following attestation bundles were made for gtfs_rt_aggregator-0.1.6.tar.gz:
Publisher:
python-publish.yml on GaspardMerten/gtfs-rt-aggregator
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
gtfs_rt_aggregator-0.1.6.tar.gz -
Subject digest:
6aea68444f0bd66e7de556833b57fc11b54f15755bf5e48fb07d88ca78721add - Sigstore transparency entry: 225779261
- Sigstore integration time:
-
Permalink:
GaspardMerten/gtfs-rt-aggregator@222d9622528993c8ff24d803f7fbf806b9927677 -
Branch / Tag:
refs/tags/v0.1.6 - Owner: https://github.com/GaspardMerten
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@222d9622528993c8ff24d803f7fbf806b9927677 -
Trigger Event:
push
-
Statement type:
File details
Details for the file gtfs_rt_aggregator-0.1.6-py3-none-any.whl.
File metadata
- Download URL: gtfs_rt_aggregator-0.1.6-py3-none-any.whl
- Upload date:
- Size: 34.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
50ca42e649fe665d92071731051b590a55c2873322e02887472c8f14d8662162
|
|
| MD5 |
fe1c190177e88ed8ee21c5f38cf3454c
|
|
| BLAKE2b-256 |
e2fa94ff7997c5c184a39efc32be3e016438ad9dd3ae02c839cf222b5f5e8bc7
|
Provenance
The following attestation bundles were made for gtfs_rt_aggregator-0.1.6-py3-none-any.whl:
Publisher:
python-publish.yml on GaspardMerten/gtfs-rt-aggregator
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
gtfs_rt_aggregator-0.1.6-py3-none-any.whl -
Subject digest:
50ca42e649fe665d92071731051b590a55c2873322e02887472c8f14d8662162 - Sigstore transparency entry: 225779265
- Sigstore integration time:
-
Permalink:
GaspardMerten/gtfs-rt-aggregator@222d9622528993c8ff24d803f7fbf806b9927677 -
Branch / Tag:
refs/tags/v0.1.6 - Owner: https://github.com/GaspardMerten
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@222d9622528993c8ff24d803f7fbf806b9927677 -
Trigger Event:
push
-
Statement type: