Skip to main content

No project description provided

Project description

PipeDSL — Declarative HTTP Pipeline Orchestration

PipeDSL is a lightweight framework for defining and executing sequences of HTTP requests using a YAML-based DSL. It supports chaining tasks, passing data between steps, parallel execution, and extracting structured values from JSON responses using JSONPath.

Use PipeDSL to:

  • Automate multi-step API workflows
  • Build integration or end-to-end tests
  • Fetch and transform data across multiple endpoints
  • Define complex request pipelines with minimal code

Quick Start

Install the package:

pip install pipedsl

Run a pipeline from a YAML string:

import asyncio
from pipedsl import YamlTaskReader, TaskScheduler

config = """
tasks:
  - type: http
    id: get_user
    name: Fetch user
    url: https://httpbin.org/get
    method: get
    is_singleton: false

  - type: http
    id: log_action
    name: Log action
    url: https://httpbin.org/post
    method: post
    body: '{"source": "!{{1}}"}'
    is_singleton: false

  - type: pipeline
    id: user_flow
    name: User workflow
    pipeline: "get_user() >> log_action(get_user.url)"
"""

async def main():
    tasks = YamlTaskReader.generate_tasks(config_body=config)
    async for task, result in TaskScheduler.schedule(tasks):
        print(f"Completed {task.id}{result.payload_type}")

asyncio.run(main())

Note: TaskScheduler.schedule() is an async generator. Use async for to consume results.


Writing Pipelines

Basic HTTP Task

tasks:
  - type: http
    id: healthcheck
    name: Health check
    url: https://api.example.com/health
    method: get

Sequential Execution

tasks:
  - type: http
    id: login
    url: https://api.example.com/login
    method: post
    body: '{"email": "user@example.com"}'
    json_extractor_props:
      token: 'access_token'
    is_singleton: false

  - type: http
    id: profile
    url: https://api.example.com/profile
    method: get
    headers:
      - ["Authorization", "Bearer !{{1}}"]
    is_singleton: false

  - type: pipeline
    id: auth_flow
    pipeline: "login() >> profile(login.token)"

The expression profile(login.token) passes the token field extracted from the login response as the first argument (!{{1}}) to the profile request.

Parallel Execution with Product Operator

tasks:
  - type: http
    id: list_ids
    url: https://api.example.com/items
    method: get
    json_extractor_props:
      ids: '$.results[*].id'
    is_singleton: false

  - type: http
    id: fetch_item
    url: https://api.example.com/items/!{{1}}
    method: get
    is_singleton: false

  - type: pipeline
    id: bulk_fetch
    pipeline: "list_ids() >> [list_ids.ids] * [fetch_item($1)]"

The syntax [A] * [B($1)] means: for each element in A, execute B, substituting the element as $1. This enables fan-out patterns and bulk operations.


Task Reference

Field Required Description
id Yes Unique identifier (used in DSL expressions)
name No Human-readable label
type Yes Either http or pipeline
single No If true (default), the task runs standalone. If false, it can be called from a pipeline.
url, method, headers, body Yes (for http) Standard HTTP request parameters
json_extractor_props No Maps {name: JSONPath} to extract values from JSON responses
pipeline_context No (for pipeline) Key-value store available as pipeline_context.key in DSL

Placeholders like !{{1}}, !{{2}}, etc., are replaced with positional arguments during execution.


How It Works

  1. Parsing: The DSL string (e.g., login() >> profile(login.token)) is tokenized and parsed into an abstract syntax tree (AST) using NLTK and a context-free grammar.
  2. Execution: Tasks are scheduled asynchronously. Results are stored in an execution context under the task’s id.
  3. Data Flow: Expressions like task.property resolve to extracted values from prior responses.
  4. Parallelism: The product operator ([X] * [Y($1)]) expands into a Cartesian product and executes sub-pipelines in parallel.

Architecture

  • YamlTaskReader: Reads YAML and builds task definitions with compiled ASTs for pipelines.
  • TaskScheduler: Orchestrates execution of top-level (single: true) tasks.
  • PipelineExecutor: Interprets the AST, manages context, and handles sequential/parallel execution.
  • HttpRequestExecutor: Sends requests via aiohttp and processes responses.

All core models are immutable (Pydantic with frozen=True), and the system is fully typed.


Development

To set up a development environment:

git clone https://github.com/yourname/PipeDSL.git
cd PipeDSL
pip install -e .
pytest

License

Apache License 2.0. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipedsl-0.1.9.tar.gz (13.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipedsl-0.1.9-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file pipedsl-0.1.9.tar.gz.

File metadata

  • Download URL: pipedsl-0.1.9.tar.gz
  • Upload date:
  • Size: 13.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for pipedsl-0.1.9.tar.gz
Algorithm Hash digest
SHA256 0318747a4c6835d535a167af497de2c5248f60b2f0e22692caf021cf8bba3b7c
MD5 5d1d1331fba33f8f6e437c6b90247fbd
BLAKE2b-256 62e16c70d1496134053b2170dc9802e8ba760d376b69a299b8e4af7b9de1157b

See more details on using hashes here.

File details

Details for the file pipedsl-0.1.9-py3-none-any.whl.

File metadata

  • Download URL: pipedsl-0.1.9-py3-none-any.whl
  • Upload date:
  • Size: 17.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for pipedsl-0.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 c5b8b12c9745f2cb3cef555f37f6d6f211b91821e7f6915750da1be41943c03d
MD5 32abdd0f8e90b955c2014ef3a7061098
BLAKE2b-256 010a119e1f9414a04fbc56ca83213ea2cd2e7601de8880388017925d9d85da3b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page