Railway Oriented Programming framework for Python
Project description
Railway Framework for Python
型安全なワークフローで、運用自動化をシンプルに。
条件分岐を含む複雑なワークフローをYAMLで宣言的に定義できます。
# DAGワークフロー: 条件分岐対応
from railway import Contract, ExitContract, node, entry_point
from railway.core.dag import dag_runner, Outcome
class AlertContext(Contract):
severity: str
handled: bool = False
@node
def check_severity(ctx: AlertContext) -> tuple[AlertContext, Outcome]:
if ctx.severity == "critical":
return ctx, Outcome.success("critical")
return ctx, Outcome.success("normal")
@node
def escalate(ctx: AlertContext) -> tuple[AlertContext, Outcome]:
return ctx.model_copy(update={"handled": True}), Outcome.success("done")
@node
def log_only(ctx: AlertContext) -> tuple[AlertContext, Outcome]:
return ctx.model_copy(update={"handled": True}), Outcome.success("done")
# 終端ノード: ExitContract を返す(v0.12.3+)
class AlertResult(ExitContract):
exit_state: str = "success.done"
handled: bool
def exit_success_done(ctx: AlertContext) -> AlertResult:
return AlertResult(handled=ctx.handled)
exit_success_done._node_name = "exit.success.done"
TRANSITIONS = {
"check_severity::success::critical": escalate,
"check_severity::success::normal": log_only,
"escalate::success::done": exit_success_done,
"log_only::success::done": exit_success_done,
}
@entry_point
def main():
result = dag_runner(
start=lambda: (AlertContext(severity="critical"), Outcome.success("start")),
transitions=TRANSITIONS,
)
# result は ExitContract: exit_code, exit_state, is_success 等を持つ
return result
特徴:
- DAGワークフロー: 条件分岐を含むワークフローをYAMLで定義
- 型安全: Contract + Outcome による静的型チェック
- 純粋関数: ノードは副作用のない純粋関数
- コード生成: YAMLから遷移コードを自動生成
- バージョン管理: プロジェクトバージョン追跡、自動マイグレーション
設計思想を理解したい方へ: アーキテクチャガイド で 3 つのコンポーネントと 5 つの設計思想を解説しています。
クイックスタート
1. インストール
# uvをインストール(未インストールの場合)
curl -LsSf https://astral.sh/uv/install.sh | sh
# railway コマンドをインストール
uv tool install railway-framework
2. プロジェクト作成
railway init my_workflow
cd my_workflow
uv sync
3. エントリーポイント作成
railway new entry my_workflow
これにより以下が生成されます:
src/my_workflow.py- エントリーポイント(dag_runner使用)src/nodes/my_workflow/start.py- 開始ノードtransition_graphs/my_workflow_*.yml- 遷移グラフ
4. 遷移グラフを編集
transition_graphs/my_workflow_*.yml:
version: "1.0"
entrypoint: my_workflow
nodes:
start:
module: nodes.my_workflow.start
function: start
description: "開始ノード"
process:
module: nodes.my_workflow.process
function: process
description: "処理ノード"
exits:
success:
code: 0
error:
code: 1
start: start
transitions:
start:
success::done: process
failure::error: exit::error
process:
success::complete: exit::success
failure::error: exit::error
5. コード生成
railway sync transition --entry my_workflow
6. 実行
railway run my_workflow
ノードの実装
ノードは Contract と Outcome を返す純粋関数です:
from railway import Contract, node
from railway.core.dag import Outcome
class MyContext(Contract):
value: str
@node
def process(ctx: MyContext) -> tuple[MyContext, Outcome]:
if ctx.value:
return ctx, Outcome.success("done")
else:
return ctx, Outcome.failure("empty")
Outcomeの種類:
| メソッド | 用途 | 例 |
|---|---|---|
Outcome.success(detail) |
正常完了 | Outcome.success("done") |
Outcome.failure(detail) |
エラー | Outcome.failure("not_found") |
遷移キーの形式:
node_name::status::detail
例: check_severity::success::critical → escalate ノードへ遷移
ノードの作成
railway new node コマンドは、型安全なノードをすぐに開発開始できる状態で生成します。
なぜこのコマンドを使うのか?
| 手動作成 | railway new node |
|---|---|
| ノード、Contract、テストを個別に作成 | 3ファイル同時生成 |
| import文を自分で書く | 正しいimport済み |
| テスト構造を考える | TDDテンプレート付き |
| Outcomeの使い方を調べる | 動作するサンプル付き |
# dag 形式(デフォルト): 条件分岐ワークフロー向け
railway new node check_status
# → src/nodes/check_status.py ← ノード本体(動作するサンプル付き)
# → src/contracts/check_status_context.py ← Contract(型安全)
# → tests/nodes/test_check_status.py ← TDDテンプレート
# linear 形式: 線形パイプライン向け
railway new node transform --mode linear
# → Input/Output の2つのContractが生成される
dag 形式(デフォルト) - 条件分岐が可能:
from railway import node
from railway.core.dag.outcome import Outcome
from contracts.check_status_context import CheckStatusContext
@node
def check_status(ctx: CheckStatusContext) -> tuple[CheckStatusContext, Outcome]:
"""ステータスをチェックする。"""
if ctx.is_valid:
return ctx, Outcome.success("valid") # → valid 遷移
return ctx, Outcome.failure("invalid") # → invalid 遷移
linear 形式 - シンプルなデータ変換向け:
from typing import Optional
from railway import node
from contracts.transform_input import TransformInput
from contracts.transform_output import TransformOutput
@node
def transform(input_data: Optional[TransformInput] = None) -> TransformOutput:
"""データを変換する。"""
return TransformOutput(result="transformed")
どちらを使う?
| 用途 | 形式 | 理由 |
|---|---|---|
| 運用自動化、条件分岐あり | dag(デフォルト) | Outcomeで遷移を制御 |
| ETL、データ変換 | linear | シンプルな入出力 |
| 迷ったら | dag | より汎用的 |
使い分けガイド
| 場面 | 推奨方法 |
|---|---|
| 既存ワークフローにノード追加 | railway new node |
| 単体の処理を作成 | railway new node |
| 新規ワークフロー作成 | railway new entry(ノードも同時生成) |
実行モデル
Railway Framework は2つの実行モデルを提供します:
| モデル | 用途 | コマンド |
|---|---|---|
| dag_runner | 条件分岐ワークフロー(推奨) | railway new entry <name> |
| typed_pipeline | 線形パイプライン | railway new entry <name> --mode linear |
どちらを使うべきか?
dag_runner を使う:
- 条件分岐がある(if-else, switch)
- エラーパスが複数ある
- 運用自動化、複雑なワークフロー
typed_pipeline を使う:
- 処理が必ず順番に実行される(A→B→C→D)
- 条件分岐がない
- ETL、データ変換パイプライン
dag_runner(推奨)
条件分岐がある複雑なワークフローに適しています:
from railway import ExitContract
from railway.core.dag import dag_runner, Outcome
# 終端ノードを定義(v0.12.3+: ExitContract を返す)
class WorkflowResult(ExitContract):
exit_state: str = "success.done"
def exit_success_done(ctx) -> WorkflowResult:
return WorkflowResult()
exit_success_done._node_name = "exit.success.done"
TRANSITIONS = {
"check::success::critical": escalate,
"check::success::normal": log_only,
"escalate::success::done": exit_success_done,
"log_only::success::done": exit_success_done,
}
result = dag_runner(
start=check_severity,
transitions=TRANSITIONS,
)
# result は ExitContract: is_success, exit_code, exit_state を持つ
if result.is_success:
print("Workflow completed successfully")
dag_runner の特徴:
- 条件分岐: Outcome に応じて遷移先を決定
- YAML定義: 遷移グラフをYAMLで宣言的に定義
- コード生成:
railway sync transitionで遷移コードを自動生成 - ステップコールバック:
on_stepで各ステップを監視
typed_pipeline(線形パイプライン)
条件分岐がない線形処理に適しています:
from railway import typed_pipeline
result = typed_pipeline(
fetch_data, # 1. データ取得
transform_data, # 2. 変換
save_result, # 3. 保存
)
線形パイプラインの詳細は readme_linear.md を参照してください。
詳細な設計判断は ADR-002: 実行モデルの共存 を参照。
CLI Commands
プロジェクト管理
railway init <name> # プロジェクト作成
railway new entry <name> # エントリポイント作成(dag_runnerモード)
railway new entry <name> --mode linear # 線形パイプラインモード
railway docs # ドキュメント表示
遷移グラフ管理
railway sync transition --entry <name> # 遷移コード生成
railway sync transition --all # 全遷移コード生成
Contract(型契約)
railway new contract <Name> # Contract作成
railway new contract <Name> --entity # エンティティContract(id付き)
railway new contract <Name> --params # パラメータ用Contract
railway list contracts # Contract一覧
Node(処理単位)
railway new node <name> # 基本node作成
railway new node <name> --output ResultType # 出力型指定
railway new node <name> --input data:InputType --output ResultType
railway show node <name> # 依存関係表示
実行
railway run <entry> # 実行
railway list # エントリポイント/ノード一覧
バージョン管理
railway update # プロジェクトを最新バージョンに更新
railway update --dry-run # 変更をプレビュー(実行しない)
railway update --init # バージョン情報のないプロジェクトを初期化
railway backup list # バックアップ一覧
railway backup restore # バックアップから復元
railway backup clean --keep 3 # 古いバックアップを削除
特徴
- ✨ 5分で開始:
railway initでプロジェクト作成、すぐに実装開始 - 🛤️ DAGワークフロー: 条件分岐を含むワークフローをYAMLで宣言的に定義
- 🔒 型安全: Contract + Outcome による静的型チェック
- 🎯 純粋関数: ノードは副作用のない純粋関数
- ⚡ コード生成: YAMLから遷移コードを自動生成
- 🔄 2つの実行モデル: dag_runner(条件分岐)と typed_pipeline(線形)
- 🔗 フィールドベース依存関係: ノードコードで依存を宣言、sync時に自動検証
- 🧪 テスト容易: モック不要、引数を渡すだけ
- ⚙️ 環境別設定: development/production を簡単に切り替え
- 📊 構造化ロギング: loguru による美しいログ出力
- 🆙 バージョン管理: プロジェクトバージョン追跡、自動マイグレーション
アーキテクチャ
Contract(型契約)
ノード間で交換されるデータの「契約」を定義します。
from railway import Contract
class AlertContext(Contract):
"""アラート処理のコンテキスト"""
incident_id: str
severity: str
hostname: str | None = None
Contractの特徴:
- Pydantic BaseModel がベース(自動バリデーション)
- イミュータブル で安全(frozen=True)
- IDE補完 が効く
Node(処理単位)
DAGワークフローのノードは tuple[Contract, Outcome] を返します:
@node
def check_host(ctx: AlertContext) -> tuple[AlertContext, Outcome]:
"""ホスト情報を取得するノード"""
hostname = lookup_hostname(ctx.incident_id)
if hostname:
new_ctx = ctx.model_copy(update={"hostname": hostname})
return new_ctx, Outcome.success("found")
return ctx, Outcome.failure("not_found")
コンテキストの引き継ぎ:
DAGワークフローでは直前のノードの Contract のみが次のノードに渡されます。
ワークフロー全体で必要なデータは model_copy() で引き継ぎます:
# model_copy で既存データを保持しつつ新しいフィールドを追加
new_ctx = ctx.model_copy(update={"hostname": hostname})
return new_ctx, Outcome.success("found")
詳細は docs/ARCHITECTURE.md を参照。
フィールドベース依存関係
ノード間のデータ依存を @node デコレータで宣言します:
@node(
requires=["incident_id"], # 必須: なければ実行エラー
optional=["hostname"], # 任意: あれば使用
provides=["escalated"], # 提供: このノードが追加
)
def escalate(ctx: WorkflowContext) -> tuple[WorkflowContext, Outcome]:
if ctx.hostname: # optional なので存在チェック
notify_with_host(ctx.hostname)
return ctx.model_copy(update={"escalated": True}), Outcome.success("done")
YAML には依存情報を書かない:
# ノード名と遷移のみ
nodes:
check_host:
description: "ホスト情報取得"
escalate:
description: "エスカレーション"
transitions:
check_host:
success::found: escalate # フレームワークが依存を自動検証
利点:
- YAML 記述者はノード実装の詳細を知らなくてよい
railway sync transitionで依存エラーを自動検出- YAML のみでフロー変更、ノードコード変更不要
詳細は docs/ARCHITECTURE.md を参照。
ExitContract(実行結果)
dag_runner() は ExitContract を返します。終了状態とメタデータを含みます:
from railway import ExitContract
result = dag_runner(start=..., transitions=...)
# 基本プロパティ
result.is_success # True if exit_code == 0
result.is_failure # True if exit_code != 0
result.exit_code # 0 (success.* ) or 1 (failure.*)
result.exit_state # "success.done", "failure.timeout" など
result.context # 終端ノードが返したコンテキスト
# メタデータ
result.execution_path # ("start", "process", "exit.success.done")
result.iterations # 実行したノード数
| exit_state パターン | exit_code | is_success |
|---|---|---|
success.* |
0 | True |
failure.*, その他 |
1 | False |
終端ノード(Exit Node)
ワークフロー終了時に処理を実行できます。通常のノードと同じ形式で記述できるため、 コールバックの概念を知らなくても実装できます。
YAML定義:
nodes:
finalize:
description: "最終処理"
exit:
success:
done:
description: "正常終了(Slack通知)"
skipped:
description: "スキップして終了"
failure:
timeout:
description: "タイムアウト(PagerDuty通知)"
transitions:
finalize:
success::complete: exit.success.done
success::skipped: exit.success.skipped
failure::timeout: exit.failure.timeout
実装例(v0.12.3+):
# src/nodes/exit/success/done.py - ExitContract サブクラスを返す
from railway import ExitContract, node
class DoneResult(ExitContract):
"""正常終了時の詳細結果。"""
status: str
processed_count: int
exit_state: str = "success.done"
@node(name="exit.success.done")
def done(ctx: WorkflowContext) -> DoneResult:
"""終端ノードは ExitContract を返す(Outcome 不要)。"""
send_slack_notification(f"処理完了: {ctx.count}件")
return DoneResult(
status="completed",
processed_count=ctx.count,
)
特徴:
| 項目 | 説明 |
|---|---|
| 型安全性 | ExitContract で戻り値の型が保証される |
| IDE補完 | カスタムフィールドに補完が効く |
| 一貫性 | 通常のノードと同じ書き方 |
| テスト可能性 | 純粋関数としてテスト可能 |
| 表現力 | 詳細な終了状態を表現(done, skipped, timeout など) |
| 自動解決 | module/function は省略可能 |
詳細は docs/transition_graph_reference.md を参照。
デバッグと監査
on_step コールバック
各ステップ完了後にコールバックを受け取れます:
from railway.core.dag import dag_runner, StepRecorder
recorder = StepRecorder()
result = dag_runner(
start=check_severity,
transitions=TRANSITIONS,
on_step=recorder,
)
# 実行履歴を確認
for step in recorder.get_history():
print(f"[{step.node_name}] -> {step.state}")
AuditLogger
監査ログを出力:
from railway.core.dag import AuditLogger
audit = AuditLogger(workflow_id="incident-123")
result = dag_runner(
start=check_severity,
transitions=TRANSITIONS,
on_step=audit,
)
バージョン管理
Railway Framework はプロジェクトのバージョン情報を追跡し、安全なアップグレードを支援します。
なぜバージョン管理が必要か?
| 問題 | 影響 | Railway の解決策 |
|---|---|---|
| バージョン不明 | チームで不整合発生 | .railway/project.yaml で明示 |
| テンプレート変更 | railway new で不整合 |
互換性チェック + 警告 |
| 手動マイグレーション | 面倒、ミスしやすい | railway update で自動化 |
プロジェクトメタデータ
railway init 実行時に自動生成:
# .railway/project.yaml
railway:
version: "0.10.0" # 生成時のrailway-frameworkバージョン
created_at: "2026-01-23T10:30:00+09:00"
updated_at: "2026-01-23T10:30:00+09:00"
project:
name: "my_automation"
compatibility:
min_version: "0.10.0" # 必要な最小バージョン
設計判断:
| 判断 | 理由 |
|---|---|
| YAML形式 | 人間が読みやすく、手動編集も可能 |
.railway/ ディレクトリ |
フレームワーク関連ファイルを集約 |
| Git管理対象 | チーム全員でバージョン情報を共有 |
バージョン互換性ルール
| 条件 | 動作 |
|---|---|
| 同一バージョン | そのまま実行 |
| マイナー差異 | 警告 + 確認 |
| メジャー差異 | エラー + 拒否 |
| バージョン不明 | 警告 + 確認 |
既存プロジェクトのアップグレード
v0.10.x 以前のプロジェクトを最新形式にアップグレードできます。
なぜアップグレードが必要か?
| 旧形式の問題 | v0.11.3 での解決策 |
|---|---|
| 条件分岐が書きにくい | dag_runner で宣言的に定義 |
| ノードの戻り値が不明確 | Outcome で状態を明示 |
| 遷移ロジックがコードに埋まる | YAML で可視化 |
アップグレード手順
1. プレビュー(変更内容を確認)
railway update --dry-run
出力例:
マイグレーション: 0.10.0 → 0.12.0
ファイル追加:
- transition_graphs/.gitkeep
- _railway/generated/.gitkeep
コードガイダンス:
src/nodes/process.py:5
現在: def process(data: dict) -> dict:
推奨: def process(ctx: ProcessContext) -> tuple[ProcessContext, Outcome]:
2. アップグレード実行
railway update
3. ガイダンスに従ってコードを修正
旧形式のノードを新形式に変更します:
Before:
@node
def process(data: dict) -> dict:
return data
After:
@node
def process(ctx: ProcessContext) -> tuple[ProcessContext, Outcome]:
return ctx, Outcome.success("done")
検出される旧形式パターン
| パターン | 推奨変更 |
|---|---|
def node(data: dict) -> dict: |
def node(ctx: Context) -> tuple[Context, Outcome]: |
from railway import pipeline |
from railway.core.dag import dag_runner |
アップグレードの恩恵
- Outcome で次の遷移先を制御できる
- Contract で型安全にデータを扱える
- YAML で遷移ロジックを可視化できる
テストの書き方
DAGノードはテストが簡単:
from contracts.alert import AlertContext
from nodes.check_severity import check_severity
from railway.core.dag import Outcome
def test_check_severity_critical():
# Arrange
ctx = AlertContext(incident_id="INC-001", severity="critical")
# Act
result_ctx, outcome = check_severity(ctx)
# Assert
assert outcome == Outcome.success("critical")
assert result_ctx.severity == "critical"
# テスト実行
pytest -v
pytest --cov=src --cov-report=html
実例: アラート処理ワークフロー
ステップ1: Contractを定義
# src/contracts/alert.py
from railway import Contract
class AlertContext(Contract):
incident_id: str
severity: str
escalated: bool = False
ステップ2: ノードを作成
# src/nodes/alert/check_severity.py
from railway import node
from railway.core.dag import Outcome
from contracts.alert import AlertContext
@node
def check_severity(ctx: AlertContext) -> tuple[AlertContext, Outcome]:
if ctx.severity == "critical":
return ctx, Outcome.success("critical")
return ctx, Outcome.success("normal")
ステップ3: 遷移グラフを定義
# transition_graphs/alert_workflow.yml
version: "1.0"
entrypoint: alert_workflow
nodes:
check_severity:
module: nodes.alert.check_severity
function: check_severity
escalate:
module: nodes.alert.escalate
function: escalate
log_only:
module: nodes.alert.log_only
function: log_only
start: check_severity
transitions:
check_severity:
success::critical: escalate
success::normal: log_only
escalate:
success::done: exit::success
log_only:
success::done: exit::success
ステップ4: コード生成と実行
railway sync transition --entry alert_workflow
railway run alert_workflow
ドキュメント
- docs/ARCHITECTURE.md - アーキテクチャガイド(全体像)
- TUTORIAL.md - ハンズオンチュートリアル
- readme_linear.md - 線形パイプライン詳細
- docs/adr/ - 設計決定記録
採用技術スタック
| ライブラリ | 用途 |
|---|---|
pydantic |
Contract(データバリデーション) |
tenacity |
リトライ処理 |
typer |
CLIインターフェース |
loguru |
構造化ロギング |
ロードマップ
Phase 1 ✅ 完了
- ✅
@node,@entry_pointデコレータ - ✅
pipeline(),async_pipeline()関数 - ✅ 設定管理、ロギング、リトライ
- ✅ CLIツール (
init,new,list,run)
Phase 1.5 ✅ 完了(Output Model Pattern)
- ✅
Contractベースクラス - ✅
Paramsパラメータクラス - ✅
typed_pipeline(),typed_async_pipeline() - ✅
DependencyResolver自動依存解決 - ✅ CLI拡張 (
new contract,list contracts,show node)
Phase 1.6 ✅ 完了(3層エラーハンドリング)
- ✅
on_errorコールバック(パイプラインレベルのエラー制御) - ✅
on_stepコールバック(中間結果へのアクセス) - ✅
RetryPolicy/retries/retry_on(柔軟なリトライ設定) - ✅ inputs 自動推論(型ヒントからの依存関係解決)
- ✅ ログメッセージ日本語統一
Phase 2 ✅ 完了(バージョン管理 & DAGワークフロー)
- ✅ プロジェクトバージョン記録(
.railway/project.yaml) - ✅ バージョン互換性チェック(
railway new実行時) - ✅
railway updateコマンド(プロジェクトマイグレーション) - ✅
railway backupコマンド(バックアップ・ロールバック) - ✅ DAGワークフロー(
dag_runner、条件分岐対応) - ✅ Outcomeクラス & 遷移グラフ
- ✅
railway sync transitionコマンド
Phase 3 📋 計画中
- 🔜 並列パイプライン実行
- 🔜 WebUI
- 🔜 メトリクス収集
ライセンス
MIT License
Railway Framework で型安全な運用自動化を始めましょう!
railway init my_workflow
cd my_workflow
railway new entry my_workflow
railway sync transition --entry my_workflow
railway run my_workflow
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file railway_framework-0.13.0.tar.gz.
File metadata
- Download URL: railway_framework-0.13.0.tar.gz
- Upload date:
- Size: 801.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3662da3f30763a0ecbd8ebc60f1d351afb02783fb4ad9dd534542b48e2d8a239
|
|
| MD5 |
959ec020882b3a03c1313cd91f8f3175
|
|
| BLAKE2b-256 |
d935df85a2e1fa4f00dde779aee3d641e9307ef01a63826e02d2464125d015a3
|
File details
Details for the file railway_framework-0.13.0-py3-none-any.whl.
File metadata
- Download URL: railway_framework-0.13.0-py3-none-any.whl
- Upload date:
- Size: 133.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.22 {"installer":{"name":"uv","version":"0.9.22","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c19030c42709b0a429a698631155ef742c8a08c7d5529cd4238747badb89cd6a
|
|
| MD5 |
9d40304b13016fb8601260a835ee25b5
|
|
| BLAKE2b-256 |
6655d08ad4491d847445bac7f4c9b91309d547ebc68e2a1077b239573603ddf2
|