Skip to main content

GTFS-RT Fetcher and Aggregator to Parquet format

Project description

GTFS-RT Aggregator

This project provides a pipeline for fetching, storing, and aggregating GTFS-RT (General Transit Feed Specification - Realtime) data from multiple providers into Parquet format.

Features

  • Fetch GTFS-RT data from multiple providers and APIs
  • Store individual data files in Parquet format with multiple storage backends (filesystem, Google Cloud Storage, MinIO)
  • Aggregate data files based on configurable time intervals
  • Run fetcher and aggregator services in parallel
  • Configurable via a single TOML configuration file

Requirements

  • Python 3.11+
  • Required Python packages (see requirements.txt):
    • requests
    • gtfs-realtime-bindings
    • pandas
    • pyarrow
    • schedule
    • pydantic
    • google-cloud-storage (optional, for GCS storage)
    • minio (optional, for MinIO storage)

Installation

From PyPI (recommended)

pip install gtfs-rt-aggregator

From Source

  1. Clone this repository:

    git clone https://github.com/GaspardMerten/gtfs-rt-aggregator
    cd gtfs-rt-aggregator
    
  2. Install in development mode:

    pip install -e .
    

Configuration

The pipeline is configured using a TOML configuration file. Here's an example:

# GTFS-RT Configuration File
[storage]
type = "filesystem"  # Options: "filesystem", "gcs", or "minio"
[storage.params]
base_directory = "data"  # Base directory for filesystem storage

# Provider configurations
[[providers]]
name = "ovapi"
timezone = "Europe/Amsterdam"

  [[providers.apis]]
  url = "https://gtfs.ovapi.nl/nl/vehiclePositions.pb"
  services = ["VehiclePosition"]
  refresh_seconds = 20  # Fetch every 20 seconds
  frequency_minutes = 60  # Group files in 60-minute intervals
  check_interval_seconds = 300  # Check for new files every 5 minutes

  [[providers.apis]]
  url = "https://gtfs.ovapi.nl/nl/tripUpdates.pb"
  services = ["TripUpdate"]
  refresh_seconds = 20  # Fetch every 20 seconds

Storage Backend Examples

Google Cloud Storage

[storage]
type = "gcs"
[storage.params]
bucket_name = "my-gtfs-bucket"
base_path = "gtfs-data"  # Optional: subfolder within the bucket
# Authentication is handled via the GOOGLE_APPLICATION_CREDENTIALS environment variable

MinIO Storage

[storage]
type = "minio"
[storage.params]
endpoint = "minio.example.com:9000"
access_key = "YOUR_ACCESS_KEY"
secret_key = "YOUR_SECRET_KEY"
bucket_name = "gtfs-data"
secure = true  # Use HTTPS
base_path = "gtfs-feeds"  # Optional: subfolder within the bucket

Configuration Options

  • storage: Global storage configuration

    • type: Storage backend type ("filesystem", "gcs", or "minio")
    • params: Backend-specific parameters
  • providers: List of GTFS-RT data providers

    • name: Name of the provider (used for directory structure)
    • timezone: Timezone for the provider's data
    • apis: List of API endpoints for this provider
      • url: URL of the GTFS-RT feed
      • services: List of service types to extract from the feed (VehiclePosition, TripUpdate, Alert)
      • refresh_seconds: How often to fetch data from this API
      • frequency_minutes: The time interval (in minutes) for grouping files
      • check_interval_seconds: How often to check for new files to aggregate

Usage

Command Line

Run the pipeline with a configuration file:

gtfs-rt-pipeline configuration.toml

You can adjust the logging level with the --log-level parameter:

gtfs-rt-pipeline configuration.toml --log-level DEBUG

Programmatic Usage

from gtfs_rt_aggregator import run_pipeline_from_toml

# Run pipeline from a TOML file
run_pipeline_from_toml("configuration.toml")

Or with a configuration object:

from gtfs_rt_aggregator.config.loader import load_config_from_toml
from gtfs_rt_aggregator import run_pipeline

# Load configuration
config = load_config_from_toml("configuration.toml")

# Run pipeline
run_pipeline(config)

Project Structure

src/gtfs_rt_aggregator/
  ├── __init__.py                # Package initialization
  ├── pipeline.py                # Main pipeline implementation
  ├── aggregator/                # Aggregation functionality
  ├── config/                    # Configuration loading and validation
  ├── fetcher/                   # GTFS-RT data fetching functionality
  ├── storage/                   # Storage backend implementations
  └── utils/                     # Utility functions and helpers
      ├── cli.py                 # Command-line interface
      └── ...

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gtfs_rt_aggregator-0.1.5.tar.gz (25.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gtfs_rt_aggregator-0.1.5-py3-none-any.whl (33.9 kB view details)

Uploaded Python 3

File details

Details for the file gtfs_rt_aggregator-0.1.5.tar.gz.

File metadata

  • Download URL: gtfs_rt_aggregator-0.1.5.tar.gz
  • Upload date:
  • Size: 25.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for gtfs_rt_aggregator-0.1.5.tar.gz
Algorithm Hash digest
SHA256 bf802845aa01dd9dbe7e1505becda4c5b67b488f927c87c66943dc9d54b2fed4
MD5 6541549d92e73884afc22ba504a4c478
BLAKE2b-256 74f3535d8ae22bf3c4796e430575bd7d5b7e4bc1d453ac092b27e0bf64dcb765

See more details on using hashes here.

Provenance

The following attestation bundles were made for gtfs_rt_aggregator-0.1.5.tar.gz:

Publisher: python-publish.yml on GaspardMerten/gtfs-rt-aggregator

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file gtfs_rt_aggregator-0.1.5-py3-none-any.whl.

File metadata

File hashes

Hashes for gtfs_rt_aggregator-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 7b7f1459634c78b6cc97e6b5d15c7af0791a5b840f8f053d424b825cef17577b
MD5 b151f920f9f9b2302b63c1802cc8ec25
BLAKE2b-256 b5c854c349c45a4ad18f4dbc41c6b8e48529fe251016e6307bfa07caa50839c9

See more details on using hashes here.

Provenance

The following attestation bundles were made for gtfs_rt_aggregator-0.1.5-py3-none-any.whl:

Publisher: python-publish.yml on GaspardMerten/gtfs-rt-aggregator

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page