Skip to main content

No project description provided

Project description

PipeDSL — Declarative HTTP Pipeline Orchestration

PipeDSL is a lightweight framework for defining and executing sequences of HTTP requests using a YAML-based DSL. It supports chaining tasks, passing data between steps, parallel execution, and extracting structured values from JSON responses using JSONPath.

Use PipeDSL to:

  • Automate multi-step API workflows
  • Build integration or end-to-end tests
  • Fetch and transform data across multiple endpoints
  • Define complex request pipelines with minimal code

Quick Start

Install the package:

pip install pipedsl

Run a pipeline from a YAML string:

import asyncio
from pipedsl import YamlTaskReader, TaskScheduler

config = """
tasks:
  - type: http
    id: get_user
    name: Fetch user
    url: https://httpbin.org/get
    method: get
    is_singleton: false

  - type: http
    id: log_action
    name: Log action
    url: https://httpbin.org/post
    method: post
    body: '{"source": "!{{1}}"}'
    is_singleton: false

  - type: pipeline
    id: user_flow
    name: User workflow
    pipeline: "get_user() >> log_action(get_user.url)"
"""

async def main():
    tasks = YamlTaskReader.generate_tasks(config_body=config)
    async for task, result in TaskScheduler.schedule(tasks):
        print(f"Completed {task.id}{result.payload_type}")

asyncio.run(main())

Note: TaskScheduler.schedule() is an async generator. Use async for to consume results.


Writing Pipelines

Basic HTTP Task

tasks:
  - type: http
    id: healthcheck
    name: Health check
    url: https://api.example.com/health
    method: get

Sequential Execution

tasks:
  - type: http
    id: login
    url: https://api.example.com/login
    method: post
    body: '{"email": "user@example.com"}'
    json_extractor_props:
      token: 'access_token'
    is_singleton: false

  - type: http
    id: profile
    url: https://api.example.com/profile
    method: get
    headers:
      - ["Authorization", "Bearer !{{1}}"]
    is_singleton: false

  - type: pipeline
    id: auth_flow
    pipeline: "login() >> profile(login.token)"

The expression profile(login.token) passes the token field extracted from the login response as the first argument (!{{1}}) to the profile request.

Parallel Execution with Product Operator

tasks:
  - type: http
    id: list_ids
    url: https://api.example.com/items
    method: get
    json_extractor_props:
      ids: '$.results[*].id'
    is_singleton: false

  - type: http
    id: fetch_item
    url: https://api.example.com/items/!{{1}}
    method: get
    is_singleton: false

  - type: pipeline
    id: bulk_fetch
    pipeline: "list_ids() >> [list_ids.ids] * [fetch_item($1)]"

The syntax [A] * [B($1)] means: for each element in A, execute B, substituting the element as $1. This enables fan-out patterns and bulk operations.


Task Reference

Field Required Description
id Yes Unique identifier (used in DSL expressions)
name No Human-readable label
type Yes Either http or pipeline
single No If true (default), the task runs standalone. If false, it can be called from a pipeline.
url, method, headers, body Yes (for http) Standard HTTP request parameters
json_extractor_props No Maps {name: JSONPath} to extract values from JSON responses
pipeline_context No (for pipeline) Key-value store available as pipeline_context.key in DSL

Placeholders like !{{1}}, !{{2}}, etc., are replaced with positional arguments during execution.


How It Works

  1. Parsing: The DSL string (e.g., login() >> profile(login.token)) is tokenized and parsed into an abstract syntax tree (AST) using NLTK and a context-free grammar.
  2. Execution: Tasks are scheduled asynchronously. Results are stored in an execution context under the task’s id.
  3. Data Flow: Expressions like task.property resolve to extracted values from prior responses.
  4. Parallelism: The product operator ([X] * [Y($1)]) expands into a Cartesian product and executes sub-pipelines in parallel.

Architecture

  • YamlTaskReader: Reads YAML and builds task definitions with compiled ASTs for pipelines.
  • TaskScheduler: Orchestrates execution of top-level (single: true) tasks.
  • PipelineExecutor: Interprets the AST, manages context, and handles sequential/parallel execution.
  • HttpRequestExecutor: Sends requests via aiohttp and processes responses.

All core models are immutable (Pydantic with frozen=True), and the system is fully typed.


Development

To set up a development environment:

git clone https://github.com/yourname/PipeDSL.git
cd PipeDSL
pip install -e .
pytest

License

Apache License 2.0. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipedsl-0.1.10.tar.gz (13.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipedsl-0.1.10-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file pipedsl-0.1.10.tar.gz.

File metadata

  • Download URL: pipedsl-0.1.10.tar.gz
  • Upload date:
  • Size: 13.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for pipedsl-0.1.10.tar.gz
Algorithm Hash digest
SHA256 e0ce5006def389369bc8c581c014a4e1387169101a6983982703745468e62d81
MD5 426d6e9cc854919be5fc1947c6b7915b
BLAKE2b-256 2bdb3630146fca071eb6fcb381a942f1741a6f04e9f79fb872406ba5a902cbc9

See more details on using hashes here.

File details

Details for the file pipedsl-0.1.10-py3-none-any.whl.

File metadata

  • Download URL: pipedsl-0.1.10-py3-none-any.whl
  • Upload date:
  • Size: 17.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for pipedsl-0.1.10-py3-none-any.whl
Algorithm Hash digest
SHA256 08449c9687b95a2e5da4d97bcd1ff0c6cba38c1cc630a1d0fe5241915cf7349e
MD5 cf60d22701175593813b693f7d0a5251
BLAKE2b-256 b2416d3622d1cd7d9b656c310077630cdfa0612e67fd43b2f00bc4bfeeb5437a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page