Skip to main content

No project description provided

Project description

PipeDSL — Declarative HTTP Pipeline Orchestration

PipeDSL is a lightweight framework for defining and executing sequences of HTTP requests using a YAML-based DSL. It supports chaining tasks, passing data between steps, parallel execution, and extracting structured values from JSON responses using JSONPath.

Use PipeDSL to:

  • Automate multi-step API workflows
  • Build integration or end-to-end tests
  • Fetch and transform data across multiple endpoints
  • Define complex request pipelines with minimal code

Quick Start

Install the package:

pip install PipeDSL

Run a pipeline from a YAML string:

import asyncio
from PipeDSL import YamlTaskReader, TaskScheduler

config = """
tasks:
  - type: http
    id: get_user
    name: Fetch user
    url: https://httpbin.org/get
    method: get
    is_singleton: false

  - type: http
    id: log_action
    name: Log action
    url: https://httpbin.org/post
    method: post
    body: '{"source": "!{{1}}"}'
    is_singleton: false

  - type: pipeline
    id: user_flow
    name: User workflow
    pipeline: "get_user() >> log_action(get_user.url)"
"""

async def main():
    tasks = YamlTaskReader.generate_tasks(config_body=config)
    async for task, result in TaskScheduler.schedule(tasks):
        print(f"Completed {task.id}{result.payload_type}")

asyncio.run(main())

Note: TaskScheduler.schedule() is an async generator. Use async for to consume results.


Writing Pipelines

Basic HTTP Task

tasks:
  - type: http
    id: healthcheck
    name: Health check
    url: https://api.example.com/health
    method: get

Sequential Execution

tasks:
  - type: http
    id: login
    url: https://api.example.com/login
    method: post
    body: '{"email": "user@example.com"}'
    json_extractor_props:
      token: 'access_token'
    is_singleton: false

  - type: http
    id: profile
    url: https://api.example.com/profile
    method: get
    headers:
      - ["Authorization", "Bearer !{{1}}"]
    is_singleton: false

  - type: pipeline
    id: auth_flow
    pipeline: "login() >> profile(login.token)"

The expression profile(login.token) passes the token field extracted from the login response as the first argument (!{{1}}) to the profile request.

Parallel Execution with Product Operator

tasks:
  - type: http
    id: list_ids
    url: https://api.example.com/items
    method: get
    json_extractor_props:
      ids: '$.results[*].id'
    is_singleton: false

  - type: http
    id: fetch_item
    url: https://api.example.com/items/!{{1}}
    method: get
    is_singleton: false

  - type: pipeline
    id: bulk_fetch
    pipeline: "list_ids() >> [list_ids.ids] * [fetch_item($1)]"

The syntax [A] * [B($1)] means: for each element in A, execute B, substituting the element as $1. This enables fan-out patterns and bulk operations.


Task Reference

Field Required Description
id Yes Unique identifier (used in DSL expressions)
name No Human-readable label
type Yes Either http or pipeline
is_singleton No If true (default), the task runs standalone. If false, it can be called from a pipeline.
url, method, headers, body Yes (for http) Standard HTTP request parameters
json_extractor_props No Maps {name: JSONPath} to extract values from JSON responses
pipeline_context No (for pipeline) Key-value store available as pipeline_context.key in DSL

Placeholders like !{{1}}, !{{2}}, etc., are replaced with positional arguments during execution.


How It Works

  1. Parsing: The DSL string (e.g., login() >> profile(login.token)) is tokenized and parsed into an abstract syntax tree (AST) using NLTK and a context-free grammar.
  2. Execution: Tasks are scheduled asynchronously. Results are stored in an execution context under the task’s id.
  3. Data Flow: Expressions like task.property resolve to extracted values from prior responses.
  4. Parallelism: The product operator ([X] * [Y($1)]) expands into a Cartesian product and executes sub-pipelines in parallel.

Architecture

  • YamlTaskReader: Reads YAML and builds task definitions with compiled ASTs for pipelines.
  • TaskScheduler: Orchestrates execution of top-level (single: true) tasks.
  • PipelineExecutor: Interprets the AST, manages context, and handles sequential/parallel execution.
  • HttpRequestExecutor: Sends requests via aiohttp and processes responses.

All core models are immutable (Pydantic with frozen=True), and the system is fully typed.


Development

To set up a development environment:

git clone https://github.com/yourname/PipeDSL.git
cd PipeDSL
pip install -e .
pytest

License

Apache License 2.0. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipedsl-0.1.13.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipedsl-0.1.13-py3-none-any.whl (17.0 kB view details)

Uploaded Python 3

File details

Details for the file pipedsl-0.1.13.tar.gz.

File metadata

  • Download URL: pipedsl-0.1.13.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for pipedsl-0.1.13.tar.gz
Algorithm Hash digest
SHA256 09a11bbca274bbd67793bd9604da6ace3fa3fa4235b5e7b794a3b7e03e4f6460
MD5 4a77f205e6e01b92029cd092986ec2df
BLAKE2b-256 54d82648ecab4af1403c557c50fafdeb53fb072ae16524063d8f23a76b2141c4

See more details on using hashes here.

File details

Details for the file pipedsl-0.1.13-py3-none-any.whl.

File metadata

  • Download URL: pipedsl-0.1.13-py3-none-any.whl
  • Upload date:
  • Size: 17.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for pipedsl-0.1.13-py3-none-any.whl
Algorithm Hash digest
SHA256 4693ef2a709e6a31b117d7463e8a835b2dcbd59f559d1c3c4169ebea6a054578
MD5 23dd3c7637ef30d33c01b1d786b67c05
BLAKE2b-256 126645bc305dd37df2b4515e63fc1ce77f5b2dd398932721dca2a315468efe27

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page