Skip to main content

Async task runtime for queue, polling, schedule, and webhook workloads.

Project description

onestep


onestep is a small async task runtime centered on four concepts:

  • OneStepApp: task registry and lifecycle manager
  • Source: fetch data from a queue or polling backend
  • Sink: publish processed data
  • Delivery: a single fetched item with ack/retry/fail

The V1 stable surface includes:

  • MemoryQueue
  • onestep-mysql: MySQLConnector.table_queue(...), incremental sources, table sinks, state stores, and cursor stores
  • onestep-rabbitmq: RabbitMQConnector.queue(...)
  • onestep-redis: RedisConnector.stream(...)
  • onestep-sqs: SQSConnector.queue(...)
  • IntervalSource.every(...)
  • CronSource(...)
  • WebhookSource(...)

Install

Core package:

pip install onestep

Common extras:

  • pip install 'onestep[yaml]'
  • pip install onestep-mysql
  • pip install onestep-rabbitmq
  • pip install onestep-redis
  • pip install onestep-sqs
  • pip install 'onestep[control-plane]'
  • pip install 'onestep[all]'

Connector plugins:

  • Feishu Bitable: install onestep-feishu-bitable
  • MySQL: install onestep-mysql
  • RabbitMQ: install onestep-rabbitmq
  • Redis: install onestep-redis
  • SQS: install onestep-sqs

From a source checkout:

  • pip install -e .
  • pip install -e '.[dev]'
  • pip install -e '.[integration]'

Upgrading from 0.5.x

1.0.0 is a runtime rewrite. Projects built on the legacy step and *Broker APIs should treat the upgrade as a migration, not a drop-in package bump.

See MIGRATION-0.5-to-1.0.0.md for:

  • old-to-new API mapping
  • unsupported legacy features
  • a minimal before/after example
  • rollout guidance for existing projects

CLI

The deployment entrypoint is the onestep CLI.

Recommended module shape:

from onestep import IntervalSource, OneStepApp

app = OneStepApp("billing-sync")


@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_billing(ctx, _):
    print("syncing billing data")

Run the app:

onestep run your_package.tasks:app

The short form is also supported:

onestep your_package.tasks:app

Check the target before starting it:

onestep check your_package.tasks:app

You can also point the CLI at a zero-argument factory:

onestep check your_package.tasks:build_app

Use JSON output when you want the check result in CI or deployment scripts:

onestep check --json your_package.tasks:app

You can also load a YAML app definition. Start with the smallest working shape and add fields only when needed. resources is the preferred top-level section for named runtime objects, while handler.ref and hooks.*.ref point to Python callables:

app:
  name: billing-sync
  logging:
    level: DEBUG

resources:
  tick:
    type: interval
    minutes: 5
    immediate: true

tasks:
  - name: sync_billing
    source: tick
    handler:
      ref: your_package.handlers.billing:sync_billing

Check or run the YAML target the same way:

onestep check worker.yaml
onestep run worker.yaml

Scaffold a minimal standalone YAML project when starting from scratch:

onestep init billing-sync

That generates:

  • pyproject.toml
  • worker.yaml
  • src/<package>/tasks/
  • src/<package>/transforms/

The generated project stays intentionally small. worker.yaml only defines runtime wiring; business logic still lives in Python. Add hooks.py later only when you actually wire hooks in YAML.

Use strict checking when you want schema validation for YAML targets, including unknown-field detection and contract checks for reporter, hooks, tasks, and resource specs:

onestep check --strict worker.yaml

app.logging.level is optional and only controls the onestep logger namespace. It does not configure handlers or formatters, but setting it to DEBUG makes framework-level sink success logs visible.

YAML resources can reference other resources by name, for example rabbitmq_queue.connector: rmq or mysql_incremental.state: cursor_store.

Task handlers and hooks can read:

  • ctx.config for app-level config
  • ctx.task_config for task-level config
  • ctx.resources for named runtime objects

App hooks can read app.resources for the same named resource registry.

For a progressive YAML guide from minimal task to fully wired app, including strict-check guidance, see docs/yaml-task-definition.md.

For a standalone project-style example with worker.yaml plus tasks.py / transforms.py / hooks.py, see example/yaml_project/.

Built-in YAML resource types:

  • memory
  • interval
  • cron
  • webhook
  • http_sink

Plugin YAML resource types:

  • onestep-mysql: mysql, mysql_state_store, mysql_cursor_store, mysql_table_queue, mysql_incremental, mysql_table_sink
  • onestep-rabbitmq: rabbitmq, rabbitmq_queue
  • onestep-redis: redis, redis_stream
  • onestep-sqs: sqs, sqs_queue
  • onestep-feishu-bitable: feishu_bitable, feishu_bitable_incremental, feishu_bitable_table_sink

Install the corresponding plugin package in the worker environment before using plugin resource types in YAML.

YAML apps can also bind app-level state explicitly:

app:
  name: billing-sync
  state: app_state

connectors:
  db:
    type: mysql
    dsn: mysql+pymysql://root:root@localhost:3306/app
  app_state:
    type: mysql_state_store
    connector: db
    table: onestep_state

The named state resource must support load/save/delete; mysql_state_store and mysql_cursor_store both work when onestep-mysql is installed.

Legacy connectors, sources, and sinks sections are still supported and merged into the same internal resource registry. For the full YAML schema and a larger task-definition example, see docs/yaml-task-definition.md.

Runnable examples live in:

  • example/cli_app.py
  • example/cli_app.yaml

Run it locally with:

PYTHONPATH=src onestep check example.cli_app:app
onestep check example/cli_app.yaml
SYNC_INTERVAL_SECONDS=5 PYTHONPATH=src onestep run example.cli_app:app

systemd

A complete deployment template lives in:

  • deploy/README.md
  • deploy/systemd/onestep-app.service
  • deploy/env/onestep-app.env.example
  • deploy/bin/onestep-preflight.sh

The example uses:

  • /etc/onestep/onestep-app.env for deployment variables
  • ExecStartPre to run a startup check
  • ExecStart to launch onestep run

Install it with:

sudo mkdir -p /etc/onestep
sudo cp deploy/env/onestep-app.env.example /etc/onestep/onestep-app.env
sudo cp deploy/systemd/onestep-app.service /etc/systemd/system/onestep-app.service
sudo systemctl daemon-reload
sudo systemctl enable --now onestep-app

Check status and logs:

sudo systemctl status onestep-app
sudo journalctl -u onestep-app -f

See deploy/README.md for the expected directory layout and the env vars you need to adjust first. The deploy template prepends APP_CWD to PYTHONPATH so module targets defined inside the repo can be imported by the onestep console script.

Official worker image

onestep also ships an official worker runtime image for YAML-oriented workers.

Mounted workspace usage:

docker run --rm \
  -e ONESTEP_TARGET=/workspace/worker.yaml \
  -v "$PWD:/workspace" \
  ghcr.io/mic1on/onestep-worker:1.2.7

Derived image usage:

FROM ghcr.io/mic1on/onestep-worker:1.2.7

WORKDIR /workspace
COPY . /workspace
ENV ONESTEP_TARGET=/workspace/worker.yaml

ONESTEP_TARGET points to the YAML file path or Python import target the container should start. The runtime automatically adds /workspace and /workspace/src to PYTHONPATH. If /workspace/requirements.txt exists it is installed first; otherwise the runtime falls back to installing /workspace when /workspace/pyproject.toml exists.

See deploy/worker-runtime-image.md for the full usage guide and troubleshooting notes.

Control Plane Reporter

onestep can push runtime telemetry to onestep-control-plane over a single long-lived WebSocket session without adding a new connector or changing task code.

Install the optional control-plane dependency before using the reporter:

pip install 'onestep[control-plane]'

The YAML form is intentionally minimal:

app:
  name: billing-sync

reporter: true

That enables ControlPlaneReporter using env-backed defaults. When you need explicit overrides, keep the field names aligned with ControlPlaneReporterConfig:

reporter:
  base_url: https://control-plane.example.com
  token: ${ONESTEP_CONTROL_PLANE_TOKEN}
  service_name: billing-sync-worker

Attach the reporter explicitly:

from onestep import ControlPlaneReporter, ControlPlaneReporterConfig, IntervalSource, OneStepApp

app = OneStepApp("billing-sync")
reporter = ControlPlaneReporter(ControlPlaneReporterConfig.from_env(app_name=app.name))
reporter.attach(app)


@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_users(ctx, _):
    print("syncing users")

Required environment variables:

  • ONESTEP_CONTROL_PLANE_URL or ONESTEP_CONTROL_URL
  • ONESTEP_CONTROL_PLANE_TOKEN or ONESTEP_CONTROL_TOKEN

Common optional environment variables:

  • ONESTEP_ENV
  • ONESTEP_SERVICE_NAME
  • ONESTEP_NODE_NAME
  • ONESTEP_DEPLOYMENT_VERSION
  • ONESTEP_INSTANCE_ID
  • ONESTEP_REPLICA_KEY
  • ONESTEP_STATE_DIR
  • ONESTEP_CONTROL_PLANE_HEARTBEAT_INTERVAL_S
  • ONESTEP_CONTROL_PLANE_METRICS_INTERVAL_S
  • ONESTEP_CONTROL_PLANE_EVENT_FLUSH_INTERVAL_S
  • ONESTEP_CONTROL_PLANE_EVENT_BATCH_SIZE
  • ONESTEP_CONTROL_PLANE_MAX_PENDING_EVENTS
  • ONESTEP_CONTROL_PLANE_MAX_PENDING_METRIC_BATCHES
  • ONESTEP_CONTROL_PLANE_TIMEOUT_S
  • ONESTEP_CONTROL_PLANE_RECONNECT_BASE_DELAY_S
  • ONESTEP_CONTROL_PLANE_RECONNECT_MAX_DELAY_S

The reporter now uses:

  • GET /api/v1/agents/ws
  • hello / hello_ack
  • telemetry(sync|heartbeat|metrics|events)
  • command / command_ack / command_result

Behavior:

  • startup reuses a stable instance_id from local state unless ONESTEP_INSTANCE_ID or ONESTEP_REPLICA_KEY overrides it
  • startup opens a fresh WS session and negotiates capabilities
  • startup sends a heartbeat and a topology sync built from the current app tasks
  • sync and heartbeat sequences are tracked independently and persist across restarts
  • sync is resent on later heartbeat cycles until the current topology hash converges
  • task execution events are aggregated into task window metrics
  • important runtime events (retried, failed, dead_lettered, cancelled) are batched and pushed
  • remote commands can trigger ping, shutdown, restart, drain, pause_task, resume_task, sync_now, flush_metrics, and flush_events
  • transport send failures reset the current session and reconnect with exponential backoff plus jitter
  • low-priority metrics and events buffers are bounded locally; if the control plane stays down, the oldest buffered telemetry is dropped first
  • reporter failures are logged but do not stop task execution

Identity resolution order:

  1. ONESTEP_INSTANCE_ID: hard override for tests or explicit pinning.
  2. ONESTEP_REPLICA_KEY: derives a deterministic UUIDv5 from service_name + environment + replica_key.
  3. local identity state: reuses the instance_id stored in the reporter state dir.

By default ControlPlaneReporterConfig.from_env() uses a local state dir under:

  • ~/.onestep/control-plane-state/<environment>/<service_name>
  • ~/.onestep/control-plane-state/<environment>/<service_name>/<replica_key> when ONESTEP_REPLICA_KEY is set

Multi-replica rules:

  • single worker: the default state dir is enough
  • multiple workers on one host: give each worker a unique ONESTEP_REPLICA_KEY or ONESTEP_STATE_DIR
  • StatefulSet-style deployments: use the stable ordinal as ONESTEP_REPLICA_KEY
  • disposable deployment pod names are not a stable replica key unless you inject one yourself

For an operations-focused guide with deployment patterns and troubleshooting, see docs/stable-instance-identity.md.

Quick local demo:

  1. Start onestep-control-plane:

    If the control plane repo is checked out next to this one:

cd ../onestep-control-plane
./scripts/start-local.sh
  1. Start a long-running OneStep reporter demo:
./scripts/run-control-plane-demo.sh
  1. Or run the end-to-end smoke script, which boots the local control plane, starts the demo agent, dispatches a ping, and waits for a successful command_result:
./scripts/run-control-plane-smoke.sh
  1. Inspect the control plane. The demo cycles through ok, retry_once, fail, and slow jobs so you can see successful runs, retries, terminal failures, timeouts, and dead-letter events without changing any code:
http://127.0.0.1:8080/services?environment=dev
http://127.0.0.1:8080/services/control-plane-demo?environment=dev
http://127.0.0.1:8080/services/control-plane-demo?environment=dev&tab=commands

Runtime

The runtime now has a few explicit control points:

  • OneStepApp(..., shutdown_timeout_s=30.0) controls how long the app waits for inflight tasks before cancelling them during shutdown
  • @app.task(..., timeout_s=30.0) applies an execution timeout to async handlers
  • @app.task(..., dead_letter=...) routes terminal failures into a dead-letter sink
  • @app.on_event receives task execution events
  • InMemoryMetrics() is a built-in metrics hook for event counting
  • @app.on_startup and @app.on_shutdown register lifecycle hooks
  • ctx.config exposes app config
  • ctx.state exposes per-task namespaced state backed by the app state store

Failures are classified as:

  • error
  • timeout
  • cancelled

Custom retry policies receive a FailureInfo object so they can decide differently for timeouts vs business exceptions.

Execution events are emitted for:

  • fetched
  • started
  • succeeded
  • retried
  • failed
  • dead_lettered
  • cancelled
from onestep import InMemoryMetrics, InMemoryStateStore, MemoryQueue, OneStepApp, StructuredEventLogger

source = MemoryQueue("incoming")
dead = MemoryQueue("dead-letter")
metrics = InMemoryMetrics()
app = OneStepApp(
    "runtime-demo",
    config={"prefix": "demo"},
    state=InMemoryStateStore(),
    shutdown_timeout_s=10.0,
)
app.on_event(metrics)
app.on_event(StructuredEventLogger())


@app.on_startup
async def bootstrap(app):
    await source.publish({"value": 1})


@app.task(source=source, timeout_s=5.0, dead_letter=dead)
async def consume(ctx, item):
    runs = await ctx.state.get("runs", 0)
    await ctx.state.set("runs", runs + 1)
    print(ctx.config["prefix"], item)
    ctx.app.request_shutdown()

If a task ends in a terminal failure and dead_letter is configured, the dead-letter sink receives:

  • body["payload"]: the original payload
  • body["failure"]: {kind, exception_type, message, traceback?}
  • meta["original_meta"]: the original envelope metadata

You can inspect metrics in-process:

snapshot = metrics.snapshot()
print(snapshot["kinds"])

StructuredEventLogger() bridges TaskEvent into standard Python logging with consistent fields such as:

  • event_kind
  • app_name
  • task_name
  • source_name
  • attempts
  • duration_s
  • failure_kind

Tasks can also opt into webhook-friendly success metadata by returning a reserved notification object. The task return value sent to sinks stays unchanged; only the emitted succeeded event gains meta["notification"].

from onestep import MemoryQueue, OneStepApp, StructuredEventLogger

source = MemoryQueue("incoming")
app = OneStepApp("notification-demo")
app.on_event(StructuredEventLogger())


@app.task(source=source)
async def sync_status(ctx, item):
    updated = item["updated"]
    ctx.app.request_shutdown()
    return {
        "success": True,
        "updated": updated,
        "notification": {
            "summary": f"Status sync complete, updated {updated} devices",
            "metrics": [
                {"label": "Updated", "value": updated},
            ],
        },
    }

The emitted TaskEventKind.SUCCEEDED metadata then includes:

{
  "notification": {
    "summary": "Status sync complete, updated 12 devices",
    "metrics": [
      {"label": "Updated", "value": 12}
    ]
  }
}

notification values must be JSON-safe. Invalid values are dropped from the emitted metadata instead of failing task execution.

Memory Queue Example

from onestep import MemoryQueue, OneStepApp

app = OneStepApp("demo")
source = MemoryQueue("incoming")
sink = MemoryQueue("processed")


@app.task(source=source, emit=sink, concurrency=4)
async def double(ctx, item):
    ctx.app.request_shutdown()
    return {"value": item["value"] * 2}


async def main():
    await source.publish({"value": 21})
    await app.serve()

Interval Source

Use a local scheduler when you need to run a task every fixed duration.

from onestep import IntervalSource, OneStepApp

app = OneStepApp("interval-demo")


@app.task(
    source=IntervalSource.every(
        hours=1,
        immediate=True,
        overlap="skip",
        payload={"job": "refresh-cache"},
    )
)
async def refresh_cache(ctx, item):
    print("scheduled at:", ctx.current.meta["scheduled_at"], "payload:", item)

overlap controls what happens when the previous run is still inflight:

  • allow: start another run immediately
  • skip: drop missed ticks while the previous run is still running
  • queue: serialize missed ticks and run them one by one afterwards

queue mode keeps at most max_queued_runs missed ticks, defaulting to 1000, and drops the oldest queued ticks first when the backlog exceeds that bound.

Cron Source

Use CronSource when you care about wall-clock time rather than elapsed duration.

from onestep import CronSource, OneStepApp

app = OneStepApp("hourly-sync")


@app.task(source=CronSource("0 * * * *", timezone="Asia/Shanghai", overlap="skip"))
async def sync_hourly(ctx, _):
    print("running at:", ctx.current.meta["scheduled_at"])

The built-in parser supports standard 5-field cron expressions and these aliases:

  • @hourly
  • @daily
  • @weekly
  • @monthly
  • @yearly

Webhook Source

Use WebhookSource when external systems push events into your app.

from onestep import BearerAuth, MemoryQueue, OneStepApp, WebhookSource

app = OneStepApp("webhook-demo")
jobs = MemoryQueue("jobs")


@app.task(
    source=WebhookSource(
        path="/webhooks/github",
        methods=("POST",),
        host="127.0.0.1",
        port=8080,
        auth=BearerAuth("replace-me"),
    ),
    emit=jobs,
)
async def ingest_github(ctx, event):
    return {
        "event": event["headers"].get("x-github-event"),
        "payload": event["body"],
    }

The stdlib implementation supports:

  • shared host:port listeners across multiple webhook routes
  • optional BearerAuth(...)
  • json, form, text, raw, and auto body parsing
  • fixed WebhookResponse(...) responses
  • exact path matching and method filtering

The payload delivered to your task contains:

  • body
  • headers
  • query
  • method
  • path
  • client
  • received_at

MySQL Table Queue

Use a table as a task queue by claiming rows and marking them as finished.

from onestep import MemoryQueue, OneStepApp
from onestep_mysql import MySQLConnector

app = OneStepApp("orders")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
source = db.table_queue(
    table="orders",
    key="id",
    where="status = 0",
    claim={"status": 9},
    ack={"status": 1},
    nack={"status": 0},
    batch_size=100,
)
sink = db.table_sink(table="processed_orders", mode="upsert", keys=("id",))


@app.task(source=source, emit=sink, concurrency=16)
async def process_order(ctx, row):
    return {"id": row["id"], "payload": row["payload"], "status": "done"}

When you need to write computed fields back to the claimed row itself, call await ctx.update_current_row({...}) inside the task. This is currently supported for MySQLConnector.table_queue(...) deliveries.

MySQL Incremental Sync

Use (updated_at, id) as a lightweight cursor for Logstash-style sync.

from onestep import MemoryQueue, OneStepApp
from onestep_mysql import MySQLConnector

app = OneStepApp("sync-users")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
state = db.cursor_store(table="onestep_cursor")
source = db.incremental(
    table="users",
    key="id",
    cursor=("updated_at", "id"),
    where="deleted = 0",
    batch_size=1000,
    state=state,
)
out = MemoryQueue("dw")


@app.task(source=source, emit=out, concurrency=1)
async def sync_user(ctx, row):
    return {"id": row["id"], "name": row["name"], "updated_at": row["updated_at"]}

For production deployments, prefer db.cursor_store(...) or db.state_store(...) over the in-memory stores so cursors and task state survive process restarts.

Feishu Bitable Plugin

Install onestep-feishu-bitable to use Feishu Bitable as an incremental source or upsert sink. In this repository, the plugin source lives under plugins/onestep-feishu-bitable.

from onestep import OneStepApp
from onestep_feishu_bitable import (
    FeishuBitableConnector,
    feishu_bitable_text,
    feishu_bitable_user,
)

app = OneStepApp("feishu-sync")
feishu = FeishuBitableConnector(app_id="cli_xxx", app_secret="secret")
source = feishu.incremental(
    app_token="bascnxxx",
    table_id="tbl_source",
    cursor_field="最后更新时间",
    user_id_type="user_id",
    batch_size=100,
    fallback_scan_page_limit=100,
)
sink = feishu.table_sink(
    app_token="bascnyyy",
    table_id="tbl_target",
    mode="upsert",
    match_fields=["编号"],
    user_id_type="user_id",
)


@app.task(source=source, emit=sink, concurrency=20)
async def sync_order(ctx, payload):
    fields = payload["fields"]
    return {
        "编号": feishu_bitable_text(fields.get("编号")),
        "标题": feishu_bitable_text(fields.get("标题")),
        "负责人": feishu_bitable_user(fields.get("负责人ID")),
    }

For Feishu person fields, pass the matching user_id_type (open_id, union_id, or user_id) and write values as [{"id": "..."}]. The feishu_bitable_text(...) and feishu_bitable_user(...) helpers live in the onestep_feishu_bitable plugin package.

When Feishu cannot serve the incremental search with cursor sorting, the source falls back to scanning pages and sorting locally. fallback_scan_page_limit bounds that fallback to avoid high memory use on large tables.

RabbitMQ Queue

from onestep import OneStepApp
from onestep_rabbitmq import RabbitMQConnector

app = OneStepApp("rabbitmq-demo")
rmq = RabbitMQConnector("amqp://guest:guest@localhost/")
source = rmq.queue(
    "incoming_jobs",
    exchange="jobs.events",
    routing_key="jobs.created",
    prefetch=50,
)
out = rmq.queue(
    "processed_jobs",
    exchange="jobs.events",
    routing_key="jobs.done",
)


@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
    return {"job": item["job"], "status": "done"}

Install with pip install onestep-rabbitmq.

Redis Streams

Use Redis Streams for lightweight, reliable message queuing with consumer groups.

from onestep import OneStepApp
from onestep_redis import RedisConnector

app = OneStepApp("redis-demo")
redis = RedisConnector("redis://localhost:6379")
source = redis.stream(
    "jobs",
    group="workers",
    batch_size=100,
    poll_interval_s=0.5,
)
out = redis.stream("processed")


@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
    return {"job": item["job"], "status": "done"}

Key features:

  • Consumer groups: Multiple consumers share message processing
  • Message acknowledgment: XACK for reliable processing
  • Pending messages: Unacked messages stay in PEL for retry via XCLAIM
  • Stream trimming: maxlen option to limit stream size

Install with pip install onestep-redis.

SQS Queue

from onestep import OneStepApp
from onestep_sqs import SQSConnector

app = OneStepApp("sqs-demo")
sqs = SQSConnector(region_name="ap-southeast-1")
source = sqs.queue(
    "https://sqs.ap-southeast-1.amazonaws.com/123456789/jobs.fifo",
    message_group_id="workers",
    delete_batch_size=10,
    delete_flush_interval_s=0.5,
    heartbeat_interval_s=15,
    heartbeat_visibility_timeout=60,
)
out = sqs.queue(
    "https://sqs.ap-southeast-1.amazonaws.com/123456789/processed.fifo",
    message_group_id="workers",
)


@app.task(source=source, emit=out, concurrency=16)
async def process_job(ctx, item):
    return {"job": item["job"], "status": "done"}

Install with pip install onestep-sqs.

Examples

Supported examples are indexed in:

  • example/README.md

Common entrypoints:

  • example/cli_app.py
  • example/runtime_showcase.py
  • example/mysql_incremental.py
  • example/redis_stream.py
  • example/webhook_source.py

Integration Tests

Optional live tests are under tests/integration/ and plugin-specific plugins/*/tests/integration/ directories. The local stack now includes Redis, RabbitMQ, LocalStack SQS, and MySQL.

Install the live-test dependencies:

pip install '.[integration]'

Start the local integration stack:

make integration-up

Load the generated environment into your current shell:

eval "$(./scripts/setup-integration-env.sh)"

Run all live tests in one command:

make integration-test

You can also run one test file manually after loading the environment:

  • PYTHONPATH=src python3 -m pytest tests/integration/test_redis_live.py -q
  • PYTHONPATH=src python3 -m pytest tests/integration/test_rabbitmq_live.py -q
  • uv run --all-packages python -m pytest plugins/onestep-sqs/tests/integration/test_sqs_live.py -q
  • uv run --all-packages python -m pytest plugins/onestep-mysql/tests/integration/test_mysql_live.py -q

Set KEEP_INTEGRATION_SERVICES=1 to keep containers running after make integration-test.

Test Layout

The test suite is now intentionally split by responsibility:

  • tests/contract/: runtime contract tests that lock task execution semantics
  • tests/integration/: live infrastructure tests for built-in Redis and RabbitMQ connectors
  • plugins/*/tests/: plugin-specific unit, contract, and live integration tests
  • tests/test_*.py: connector-focused unit tests

End-to-End Demo

For a quick end-to-end demo with webhook ingestion, queueing, dead-letter handling, metrics, and structured logs:

PYTHONPATH=src python3 example/runtime_showcase.py

Then send:

curl -X POST http://127.0.0.1:8090/demo/webhook \
  -H 'Content-Type: application/json' \
  -d '{"action":"ok","value":21}'

curl -X POST http://127.0.0.1:8090/demo/webhook \
  -H 'Content-Type: application/json' \
  -d '{"action":"fail","value":21}'

curl -X POST http://127.0.0.1:8090/demo/webhook \
  -H 'Content-Type: application/json' \
  -d '{"action":"slow","value":21}'

You will see:

  • structured task event logs
  • successful processing for action=ok
  • dead-letter output for action=fail
  • timeout + dead-letter output for action=slow

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

onestep-1.4.2.tar.gz (128.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

onestep-1.4.2-py3-none-any.whl (81.9 kB view details)

Uploaded Python 3

File details

Details for the file onestep-1.4.2.tar.gz.

File metadata

  • Download URL: onestep-1.4.2.tar.gz
  • Upload date:
  • Size: 128.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for onestep-1.4.2.tar.gz
Algorithm Hash digest
SHA256 ecd9f115d3576590255bd0e86f503192f14066802ed77b418ebe74fa1d5fdf0e
MD5 804b41edc5e7d4048bcece3c61db6080
BLAKE2b-256 0f1b9046f65645b668d3ecb70a054d29d2b172b237147680ce57e2b866f36563

See more details on using hashes here.

File details

Details for the file onestep-1.4.2-py3-none-any.whl.

File metadata

  • Download URL: onestep-1.4.2-py3-none-any.whl
  • Upload date:
  • Size: 81.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for onestep-1.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 b52677dd9799b7c96992ba552b951e36318cdd587a9db80fdc5b0ae19397c743
MD5 9450cf2b355ce93b0c97e45808c7de48
BLAKE2b-256 e08f33d9d15c1719a69d2ce86ca0bed182c4634fd5e66c5e02739b374a277928

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page