Skip to main content

Railway Oriented Programming framework for Python

Project description

Railway Framework for Python

型安全なパイプラインで、運用自動化をシンプルに。

# IDE補完が効く型安全なパイプライン
from railway import Contract, node, typed_pipeline

class UsersFetchResult(Contract):
    users: list[dict]
    total: int

class ReportResult(Contract):
    content: str

@node(output=UsersFetchResult)
def fetch_users() -> UsersFetchResult:
    return UsersFetchResult(users=[{"id": 1, "name": "Alice"}], total=1)

@node(inputs={"data": UsersFetchResult}, output=ReportResult)
def generate_report(data: UsersFetchResult) -> ReportResult:
    return ReportResult(content=f"{data.total} users found")
    #                            ^^^^
    #                            Ctrl+Space でフィールド補完!

result = typed_pipeline(fetch_users, generate_report)
print(result.content)  # IDE補完が効く!

特徴:

  • IDE補完で開発効率アップ
  • 型チェックでバグを早期発見
  • テストはモック不要、引数を渡すだけ

Python Version Test Coverage License: MIT Tests


Why Railway?

従来のパイプラインの問題

# ❌ 従来: 何が渡されるか分からない
def process(data):
    users = data["users"]  # KeyError? typo? IDE補完なし
    return {"processed": users[0]["name"]}  # ネストが深い...

result = pipeline(fetch, process, save)
# result["???"] 何が入ってる?

Railway の解決策

# ✅ Railway: 型契約で明確に
@node(inputs={"data": FetchResult}, output=ProcessResult)
def process(data: FetchResult) -> ProcessResult:
    users = data.users  # IDE補完 ✓ 型チェック ✓
    return ProcessResult(name=users[0].name)
    #                         ^^^^
    #                         Ctrl+Space で候補表示

result = typed_pipeline(fetch, process, save)
print(result.saved_count)  # 補完が効く!
観点 従来 Railway
データ構造 dict["key"]["nested"] model.field
IDE補完
型チェック ✅ (mypy対応)
テスト モック必須 引数渡しのみ
リファクタ grep検索 IDE一括変更 (F2)

Quick Start (5分)

1. インストール

# uvをインストール(未インストールの場合)
curl -LsSf https://astral.sh/uv/install.sh | sh

# railway コマンドをインストール
uv tool install railway-framework

2. プロジェクト作成

railway init my_automation
cd my_automation
uv sync
cp .env.example .env

3. 型契約(Contract)を定義

railway new contract UsersFetchResult
# src/contracts/users_fetch_result.py
from railway import Contract

class User(Contract):
    id: int
    name: str

class UsersFetchResult(Contract):
    users: list[User]
    total: int

4. 型付きノードを作成

railway new node fetch_users --output UsersFetchResult
# src/nodes/fetch_users.py
from railway import node
from contracts.users_fetch_result import UsersFetchResult, User

@node(output=UsersFetchResult)
def fetch_users() -> UsersFetchResult:
    # APIからユーザー取得
    return UsersFetchResult(
        users=[User(id=1, name="Alice")],
        total=1,
    )

5. テストを書く(TDD)

# tests/nodes/test_fetch_users.py
from nodes.fetch_users import fetch_users
from contracts.users_fetch_result import UsersFetchResult

def test_fetch_users():
    result = fetch_users()  # モック不要!

    assert isinstance(result, UsersFetchResult)
    assert result.total == len(result.users)

6. 実行

uv run railway run main

🎉 完成! 型安全な自動化ツールができました。


アーキテクチャ

Contract(型契約)

ノード間で交換されるデータの「契約」を定義します。

from railway import Contract

class OrderResult(Contract):
    """注文処理の結果"""
    order_id: int
    status: str
    total: float

Contractの特徴:

  • Pydantic BaseModel がベース(自動バリデーション)
  • イミュータブル で安全(frozen=True)
  • IDE補完 が効く

Node(処理単位)

@node(
    inputs={"order": OrderResult},  # 必要な入力を宣言
    output=ShippingResult,          # 出力の型を宣言
)
def create_shipping(order: OrderResult) -> ShippingResult:
    # 純粋関数として実装
    return ShippingResult(
        order_id=order.order_id,
        tracking_number=generate_tracking(),
    )

Pipeline(実行)

from railway import typed_pipeline

result = typed_pipeline(
    create_order,      # OrderResult を出力
    process_payment,   # PaymentResult を出力
    create_shipping,   # OrderResult を入力、ShippingResult を出力
)
# result は ShippingResult 型

依存関係はフレームワークが自動解決:

create_order ─────────────────┐
  output: OrderResult              │
                                   ├──> create_shipping
process_payment ──────────────┘       output: ShippingResult
  output: PaymentResult

CLI Commands

プロジェクト管理

railway init <name>              # プロジェクト作成
railway new entry <name>         # エントリポイント作成
railway docs                     # ドキュメント表示

Contract(型契約)

railway new contract <Name>          # Contract作成
railway new contract <Name> --entity # エンティティContract(id付き)
railway new contract <Name> --params # パラメータ用Contract
railway list contracts               # Contract一覧

Node(処理単位)

railway new node <name>                      # 基本node作成
railway new node <name> --output ResultType  # 出力型指定
railway new node <name> --input data:InputType --output ResultType
railway show node <name>                     # 依存関係表示

実行

railway run <entry>              # 実行
railway list                     # エントリポイント/ノード一覧

バージョン管理

railway update                   # プロジェクトを最新バージョンに更新
railway update --dry-run         # 変更をプレビュー(実行しない)
railway update --init            # バージョン情報のないプロジェクトを初期化
railway backup list              # バックアップ一覧
railway backup restore           # バックアップから復元
railway backup clean --keep 3    # 古いバックアップを削除

特徴

  • 5分で開始: railway init でプロジェクト作成、すぐに実装開始
  • 🛤️ 型安全パイプライン: Contract による型契約でIDE補完が効く
  • 🔒 型チェック: mypyによる静的型チェック + ランタイム検証
  • 🐍 Pythonらしいエラーハンドリング: 例外機構を活かした3層設計
  • 非同期対応: async/await 完全サポート
  • 🎯 シンプルなAPI: デコレータベースで直感的
  • 📝 自動生成: テンプレートから即座にコード生成
  • 🧪 テスト容易: モック不要、引数を渡すだけ
  • ⚙️ 環境別設定: development/production を簡単に切り替え
  • 🔄 自動リトライ: 一時的なエラーに自動で対処
  • 📊 構造化ロギング: loguru による美しいログ出力
  • 🆙 バージョン管理: プロジェクトバージョン追跡、自動マイグレーション

コア概念

1. ノード (@node)

ノード = 再利用可能な処理単位

from railway import node
from loguru import logger

@node(retry=True)  # リトライ有効化
def fetch_data(url: str) -> dict:
    """データ取得ノード"""
    logger.info(f"Fetching from {url}")
    response = requests.get(url)
    return response.json()

型付きノード(推奨):

@node(output=UsersFetchResult)
def fetch_users() -> UsersFetchResult:
    return UsersFetchResult(users=[...], total=10)

@node(inputs={"users": UsersFetchResult}, output=ReportResult)
def generate_report(users: UsersFetchResult) -> ReportResult:
    return ReportResult(content=f"{users.total} users")

2. エントリーポイント (@entry_point)

エントリーポイント = 実行の起点

from railway import entry_point, typed_pipeline

@entry_point
def main(date: str = None, dry_run: bool = False):
    """日次レポート生成"""
    result = typed_pipeline(
        fetch_data,
        process_data,
        generate_report,
    )
    return result

3. パイプライン

レガシーパイプライン(dict渡し):

result = pipeline(
    step1(),
    step2,
    step3,
)

型付きパイプライン(推奨):

result = typed_pipeline(
    fetch_users,      # UsersFetchResult を出力
    process_users,    # UsersFetchResult を入力
    generate_report,  # ReportResult を出力
)
# result は ReportResult 型

inputs 自動推論:

Contract 型の引数は自動的に依存関係として解決されます。 明示的な inputs= 指定は不要です。

# 型ヒントから自動的に inputs が推論される
@node(output=ReportResult)
def generate_report(users: UsersFetchResult) -> ReportResult:
    # UsersFetchResult は自動的に前のステップから解決
    return ReportResult(content=f"{users.total} users")

pipeline vs typed_pipeline

特徴 typed_pipeline pipeline
最初の引数 関数(@node) 評価済みの値
型安全性 Contract ベース 限定的
IDE補完 フル対応 限定的
依存解決 自動 なし
エラーハンドリング on_error, on_step 例外伝播のみ
推奨用途 通常の開発 動的構成、既存値から開始

typed_pipeline(推奨):

result = typed_pipeline(fetch, process, save)  # 関数を渡す

pipeline(レガシー):

result = pipeline(initial_value, step1, step2)  # 最初に値を渡す

エラーハンドリング

Railway Framework は Python標準の例外機構を最大限活用 します。 新しい概念を導入せず、Pythonエンジニアが慣れ親しんだパターンで運用できます。

設計思想: 3層のエラーハンドリング

┌─────────────────────────────────────────────────────────────┐
│ レベル1: Node内部                                            │
│   シンプル: 必要な箇所でtry/exceptを書く                      │
│   リトライ: retry_on で一時的エラーを自動リトライ             │
├─────────────────────────────────────────────────────────────┤
│ レベル2: Pipeline(デフォルト)                              │
│   何もしない: 例外はそのまま伝播                              │
│   スタックトレース保持、デバッグ容易                          │
├─────────────────────────────────────────────────────────────┤
│ レベル3: Pipeline(必要な時だけ)                            │
│   on_error: 例外をマッチしてフォールバック/ログ/再送出        │
└─────────────────────────────────────────────────────────────┘

なぜこの設計か?

設計判断 理由
Result型を採用しない Pythonエコシステム(requests等)は例外ベース。ラップは冗長。
デフォルトは例外伝播 スタックトレースが保持され、デバッグしやすい。
on_errorは任意 高度な制御が必要な時だけ使う。シンプルなケースを複雑にしない。

使用例

シンプルなケース(レベル2で十分)

# 例外はそのまま伝播。これで十分なケースが多い。
result = typed_pipeline(fetch_users, process, save)

Node内で処理(レベル1)

@node
def fetch_users():
    try:
        return api.get_users()
    except NotFoundError:
        return []  # このNodeで完結

一時的エラーのリトライ(レベル1)

@node(retries=3, retry_on=(ConnectionError, TimeoutError))
def fetch_data():
    return requests.get(API_URL).json()

Pipeline単位の高度な制御(レベル3)

def handle_error(error: Exception, step_name: str) -> Any:
    match error:
        case ConnectionError():
            return load_from_cache()  # フォールバック
        case _:
            raise  # 再送出

result = typed_pipeline(fetch, process, save, on_error=handle_error)

デバッグと監査

on_step コールバック

パイプラインの各ステップ完了後にコールバックを受け取れます。 デバッグ、監査ログ、メトリクス収集に便利です。

steps = []

def capture_step(step_name: str, output: Any) -> None:
    steps.append({"step": step_name, "output": output})

result = typed_pipeline(
    fetch_users, process_users, generate_report,
    on_step=capture_step
)

# 各ステップの結果を確認
for step in steps:
    print(f"[{step['step']}] -> {step['output']}")

on_error と併用可能:

result = typed_pipeline(
    fetch, process, save,
    on_step=capture_step,    # 各ステップをログ
    on_error=handle_error    # エラー時のフォールバック
)

設定管理

統合設定ファイル: config/development.yaml

# アプリケーション設定
app:
  name: my_automation

# API設定
api:
  base_url: "https://api.example.com"
  timeout: 30

# ログ設定
logging:
  level: DEBUG
  handlers:
    - type: console
      level: DEBUG
    - type: file
      path: logs/app.log

# リトライ設定
retry:
  default:
    max_attempts: 3
    min_wait: 2
    max_wait: 10

コードから設定にアクセス

from src.settings import settings

url = settings.api.base_url
retry_config = settings.get_retry_settings("fetch_data")

バージョン管理

Railway Framework はプロジェクトのバージョン情報を追跡し、安全なアップグレードを支援します。

なぜバージョン管理が必要か?

問題 影響 Railway の解決策
バージョン不明 チームで不整合発生 .railway/project.yaml で明示
テンプレート変更 railway new で不整合 互換性チェック + 警告
手動マイグレーション 面倒、ミスしやすい railway update で自動化

プロジェクトメタデータ

railway init 実行時に自動生成:

# .railway/project.yaml
railway:
  version: "0.10.0"              # 生成時のrailway-frameworkバージョン
  created_at: "2026-01-23T10:30:00+09:00"
  updated_at: "2026-01-23T10:30:00+09:00"

project:
  name: "my_automation"

compatibility:
  min_version: "0.10.0"          # 必要な最小バージョン

設計判断:

判断 理由
YAML形式 人間が読みやすく、手動編集も可能
.railway/ ディレクトリ フレームワーク関連ファイルを集約
Git管理対象 チーム全員でバージョン情報を共有

バージョン互換性チェック

railway new 系コマンド実行時に自動チェック:

$ railway new node fetch_data

⚠️  バージョン不一致を検出
    プロジェクト: 0.9.0
    現在:         0.10.0

    マイナーバージョンが異なります。
    テンプレートが更新されている可能性があります。

    [c] 続行 / [u] 'railway update' を実行 / [a] 中止

互換性ルール:

条件 動作
同一バージョン そのまま実行
マイナー差異 警告 + 確認
メジャー差異 エラー + 拒否
バージョン不明 警告 + 確認

railway update コマンド

プロジェクトを最新バージョンにマイグレーション:

$ railway update

🔍 プロジェクトを分析中...

   プロジェクト名:      my_automation
   現在のバージョン:    0.9.0
   ターゲットバージョン: 0.10.0

📋 適用される変更:

   [ファイル追加]
   + .railway/project.yaml

   [設定更新]
   ~ config/development.yaml
     - 新規キー: railway.version_check

続行しますか? [y/N]: y

✅ 更新完了
   バックアップ: .railway/backups/0.9.0_20260123_103000/
   新バージョン: 0.10.0

オプション:

railway update --dry-run     # 変更をプレビュー(実行しない)
railway update --init        # バージョン情報のないプロジェクトを初期化
railway update --force       # 確認なしで実行
railway update --no-backup   # バックアップをスキップ

設計判断:

判断 理由
自動バックアップ 失敗時にロールバック可能、安全第一
確認プロンプト 意図しない変更を防止
段階的マイグレーション 0.9→0.10→0.11 と順番に適用、大きなジャンプも安全
ユーザーコード不変更 src/nodes/*, tests/* は変更しない

バックアップ・ロールバック

# バックアップ一覧
$ railway backup list
  0.9.0_20260123_103000  2時間前   15KB
  0.8.0_20260120_090000  3日前     12KB

# 復元
$ railway backup restore
どのバックアップに戻しますか? [1]: 1 ロールバック完了

# 古いバックアップを削除
$ railway backup clean --keep 3
🗑️  2件のバックアップを削除しました

バックアップ対象:

対象 理由
.railway/project.yaml バージョン情報の復元
TUTORIAL.md 自動生成ファイル
pyproject.toml 依存関係
config/*.yaml 設定ファイル

除外対象(ユーザーコード):

  • src/nodes/* - 更新対象外
  • tests/* - 更新対象外
  • .venv/ - 仮想環境

Dry-run モード

変更を実行せずにプレビュー:

$ railway update --dry-run

📋 適用される変更:

   [ファイル追加]
   + src/py.typed (0 bytes)

   [ファイル更新]
   ~ TUTORIAL.md (+150/-0 lines)
     - セクション「Step 8」を追加

   [設定更新]
   ~ config/development.yaml
     - 新規キー: railway.version_check
     - キー名変更: log_level  logging.level

   [コード変更ガイダンス(手動対応推奨)]
   ! src/nodes/fetch_data.py:15
     @node(log_input=True)    @node(log_inputs=True)

[dry-run] 実際の変更は行われませんでした。
実行するには: railway update

テストの書き方

型付きノードはテストが簡単:

# tests/nodes/test_process_users.py
from contracts.users import UsersFetchResult, User
from contracts.report import ReportResult
from nodes.process_users import process_users

def test_process_users():
    # Arrange - 引数を渡すだけ(モック不要)
    users = UsersFetchResult(
        users=[User(id=1, name="Alice")],
        total=1,
    )

    # Act
    result = process_users(users)

    # Assert
    assert isinstance(result, ReportResult)
    assert "Alice" in result.content
# テスト実行
pytest -v
pytest --cov=src --cov-report=html

実例: 日次レポート生成

ステップ1: Contractを定義

railway new contract SalesData
railway new contract ReportResult

ステップ2: ノードを作成

railway new node fetch_sales --output SalesData
railway new node generate_report --input data:SalesData --output ReportResult

ステップ3: エントリーポイント

# src/daily_report.py
from railway import entry_point, typed_pipeline
from nodes.fetch_sales import fetch_sales
from nodes.generate_report import generate_report

@entry_point
def main(date: str = None):
    result = typed_pipeline(
        fetch_sales,
        generate_report,
    )
    print(result.content)
    return result

ステップ4: 実行

uv run railway run daily_report

非同期サポート

from railway import node
from railway.core.resolver import typed_async_pipeline

@node(output=UsersFetchResult)
async def fetch_users_async() -> UsersFetchResult:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
            return UsersFetchResult(users=data["users"], total=len(data["users"]))

@entry_point
async def main():
    result = await typed_async_pipeline(
        fetch_users_async,
        process_users,
    )
    return result

採用技術スタック

ライブラリ 用途
pydantic Contract(データバリデーション)
tenacity リトライ処理
typer CLIインターフェース
loguru 構造化ロギング

ロードマップ

Phase 1 ✅ 完了

  • @node, @entry_point デコレータ
  • pipeline(), async_pipeline() 関数
  • ✅ 設定管理、ロギング、リトライ
  • ✅ CLIツール (init, new, list, run)

Phase 1.5 ✅ 完了(Output Model Pattern)

  • Contract ベースクラス
  • Params パラメータクラス
  • typed_pipeline(), typed_async_pipeline()
  • DependencyResolver 自動依存解決
  • ✅ CLI拡張 (new contract, list contracts, show node)

Phase 1.6 ✅ 完了(3層エラーハンドリング)

  • on_error コールバック(パイプラインレベルのエラー制御)
  • on_step コールバック(中間結果へのアクセス)
  • RetryPolicy / retries / retry_on(柔軟なリトライ設定)
  • ✅ inputs 自動推論(型ヒントからの依存関係解決)
  • ✅ ログメッセージ日本語統一

Phase 2 ✅ 完了(バージョン管理)

  • ✅ プロジェクトバージョン記録(.railway/project.yaml
  • ✅ バージョン互換性チェック(railway new 実行時)
  • railway update コマンド(プロジェクトマイグレーション)
  • railway backup コマンド(バックアップ・ロールバック)
  • ✅ Dry-run モード(--dry-run で変更プレビュー)

Phase 3 📋 計画中

  • 🔜 並列パイプライン実行
  • 🔜 グラフベースワークフロー
  • 🔜 WebUI
  • 🔜 メトリクス収集

ライセンス

MIT License


Railway Framework で型安全な運用自動化を始めましょう!

railway init my_automation
cd my_automation
railway new contract UserResult
railway new node fetch_users --output UserResult
uv run railway run main

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

railway_framework-0.10.0.tar.gz (496.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

railway_framework-0.10.0-py3-none-any.whl (81.5 kB view details)

Uploaded Python 3

File details

Details for the file railway_framework-0.10.0.tar.gz.

File metadata

  • Download URL: railway_framework-0.10.0.tar.gz
  • Upload date:
  • Size: 496.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for railway_framework-0.10.0.tar.gz
Algorithm Hash digest
SHA256 1c25e1c121bdc05d2e781ae268ece9ba5f857f2021986e170ecd7ab7cbd9c694
MD5 6262c45fa28f4f967c3a8cd8d0ab5403
BLAKE2b-256 61ec76eb8af1382e0cc5582c94063024495ecc9bf127d63712cf71e83c56fcf1

See more details on using hashes here.

File details

Details for the file railway_framework-0.10.0-py3-none-any.whl.

File metadata

  • Download URL: railway_framework-0.10.0-py3-none-any.whl
  • Upload date:
  • Size: 81.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for railway_framework-0.10.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4a6be50210bc011f2eb15a03a4fdae93cbb98900fe49b3c574d268373dc55a97
MD5 ee3ba1b9e5d85fc07b768d2135aded2a
BLAKE2b-256 97f05a31e6f5626ca2e0a78d739449a51ba1fba2d7b0ed86a6bf7bf388224d38

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page