No project description provided
Project description
PipeDSL — Declarative HTTP Pipeline Orchestration
PipeDSL is a lightweight framework for defining and executing sequences of HTTP requests using a YAML-based DSL. It supports chaining tasks, passing data between steps, parallel execution, and extracting structured values from JSON responses using JSONPath.
Use PipeDSL to:
- Automate multi-step API workflows
- Build integration or end-to-end tests
- Fetch and transform data across multiple endpoints
- Define complex request pipelines with minimal code
Quick Start
Install the package:
pip install pipedsl
Run a pipeline from a YAML string:
import asyncio
from pipedsl import YamlTaskReader, TaskScheduler
config = """
tasks:
- type: http
id: get_user
name: Fetch user
url: https://httpbin.org/get
method: get
is_singleton: false
- type: http
id: log_action
name: Log action
url: https://httpbin.org/post
method: post
body: '{"source": "!{{1}}"}'
is_singleton: false
- type: pipeline
id: user_flow
name: User workflow
pipeline: "get_user() >> log_action(get_user.url)"
"""
async def main():
tasks = YamlTaskReader.generate_tasks(config_body=config)
async for task, result in TaskScheduler.schedule(tasks):
print(f"Completed {task.id} → {result.payload_type}")
asyncio.run(main())
Note:
TaskScheduler.schedule()is an async generator. Useasync forto consume results.
Writing Pipelines
Basic HTTP Task
tasks:
- type: http
id: healthcheck
name: Health check
url: https://api.example.com/health
method: get
Sequential Execution
tasks:
- type: http
id: login
url: https://api.example.com/login
method: post
body: '{"email": "user@example.com"}'
json_extractor_props:
token: 'access_token'
is_singleton: false
- type: http
id: profile
url: https://api.example.com/profile
method: get
headers:
- ["Authorization", "Bearer !{{1}}"]
is_singleton: false
- type: pipeline
id: auth_flow
pipeline: "login() >> profile(login.token)"
The expression profile(login.token) passes the token field extracted from the login response as the first argument (!{{1}}) to the profile request.
Parallel Execution with Product Operator
tasks:
- type: http
id: list_ids
url: https://api.example.com/items
method: get
json_extractor_props:
ids: '$.results[*].id'
is_singleton: false
- type: http
id: fetch_item
url: https://api.example.com/items/!{{1}}
method: get
is_singleton: false
- type: pipeline
id: bulk_fetch
pipeline: "list_ids() >> [list_ids.ids] * [fetch_item($1)]"
The syntax [A] * [B($1)] means: for each element in A, execute B, substituting the element as $1. This enables fan-out patterns and bulk operations.
Task Reference
| Field | Required | Description |
|---|---|---|
id |
Yes | Unique identifier (used in DSL expressions) |
name |
No | Human-readable label |
type |
Yes | Either http or pipeline |
single |
No | If true (default), the task runs standalone. If false, it can be called from a pipeline. |
url, method, headers, body |
Yes (for http) |
Standard HTTP request parameters |
json_extractor_props |
No | Maps {name: JSONPath} to extract values from JSON responses |
pipeline_context |
No (for pipeline) |
Key-value store available as pipeline_context.key in DSL |
Placeholders like !{{1}}, !{{2}}, etc., are replaced with positional arguments during execution.
How It Works
- Parsing: The DSL string (e.g.,
login() >> profile(login.token)) is tokenized and parsed into an abstract syntax tree (AST) using NLTK and a context-free grammar. - Execution: Tasks are scheduled asynchronously. Results are stored in an execution context under the task’s
id. - Data Flow: Expressions like
task.propertyresolve to extracted values from prior responses. - Parallelism: The product operator (
[X] * [Y($1)]) expands into a Cartesian product and executes sub-pipelines in parallel.
Architecture
YamlTaskReader: Reads YAML and builds task definitions with compiled ASTs for pipelines.TaskScheduler: Orchestrates execution of top-level (single: true) tasks.PipelineExecutor: Interprets the AST, manages context, and handles sequential/parallel execution.HttpRequestExecutor: Sends requests viaaiohttpand processes responses.
All core models are immutable (Pydantic with frozen=True), and the system is fully typed.
Development
To set up a development environment:
git clone https://github.com/yourname/PipeDSL.git
cd PipeDSL
pip install -e .
pytest
License
Apache License 2.0. See LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pipedsl-0.1.11.tar.gz.
File metadata
- Download URL: pipedsl-0.1.11.tar.gz
- Upload date:
- Size: 13.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2122d6821209e4318af636c1533c6c05dd77922b9c3ae7e39ec1dfdb66d6b943
|
|
| MD5 |
7e6cfae62ddd0af1f54af208b0f4345f
|
|
| BLAKE2b-256 |
b3ebf31fe03e4496eeb1740330501dc1a8aa3a6b2812e0d3f1b5dba5742f5fd1
|
File details
Details for the file pipedsl-0.1.11-py3-none-any.whl.
File metadata
- Download URL: pipedsl-0.1.11-py3-none-any.whl
- Upload date:
- Size: 17.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
70fa31a0e23d4e811453a73060c63bb605ca31fbdabf5bcd6e574e907d922daf
|
|
| MD5 |
e3e0075db4454556d63ee4fdb8ebbba2
|
|
| BLAKE2b-256 |
e9407981f0444bd7f714fae1db468266cfd5857bf6d7cc469dc9290a16de9468
|