Skip to main content

Async task runtime for queue, polling, schedule, and webhook workloads.

Project description

onestep


onestep is a small async task runtime centered on four concepts:

  • OneStepApp: task registry and lifecycle manager
  • Source: fetch data from a queue or polling backend
  • Sink: publish processed data
  • Delivery: a single fetched item with ack/retry/fail

The V1 stable surface includes:

  • MemoryQueue
  • MySQLConnector.table_queue(...)
  • MySQLConnector.incremental(...)
  • MySQLConnector.table_sink(...)
  • MySQLConnector.state_store(...)
  • MySQLConnector.cursor_store(...)
  • RabbitMQConnector.queue(...)
  • RedisConnector.stream(...)
  • SQSConnector.queue(...)
  • IntervalSource.every(...)
  • CronSource(...)
  • WebhookSource(...)

Install

Core package:

pip install onestep

Common extras:

  • pip install 'onestep[yaml]'
  • pip install 'onestep[mysql]'
  • pip install 'onestep[rabbitmq]'
  • pip install 'onestep[redis]'
  • pip install 'onestep[sqs]'
  • pip install 'onestep[all]'

From a source checkout:

  • pip install -e .
  • pip install -e '.[dev]'
  • pip install -e '.[integration]'

Upgrading from 0.5.x

1.0.0 is a runtime rewrite. Projects built on the legacy step and *Broker APIs should treat the upgrade as a migration, not a drop-in package bump.

See MIGRATION-0.5-to-1.0.0.md for:

  • old-to-new API mapping
  • unsupported legacy features
  • a minimal before/after example
  • rollout guidance for existing projects

CLI

The deployment entrypoint is the onestep CLI.

Recommended module shape:

from onestep import IntervalSource, OneStepApp

app = OneStepApp("billing-sync")


@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_billing(ctx, _):
    print("syncing billing data")

Run the app:

onestep run your_package.tasks:app

The short form is also supported:

onestep your_package.tasks:app

Check the target before starting it:

onestep check your_package.tasks:app

You can also point the CLI at a zero-argument factory:

onestep check your_package.tasks:build_app

Use JSON output when you want the check result in CI or deployment scripts:

onestep check --json your_package.tasks:app

You can also load a YAML app definition with handler.ref entries that point to Python callables:

app:
  name: billing-sync

connectors:
  tick:
    type: interval
    minutes: 5
    immediate: true
  processed:
    type: memory

tasks:
  - name: sync_billing
    source: tick
    handler:
      ref: your_package.handlers.billing:sync_billing
      params:
        region: cn
    emit: [processed]
    retry:
      type: max_attempts
      max_attempts: 3
      delay_s: 10

Check or run the YAML target the same way:

onestep check worker.yaml
onestep run worker.yaml

YAML resources can reference other resources by name, for example rabbitmq_queue.connector: rmq or mysql_incremental.state: cursor_store.

Currently supported YAML resource types:

  • memory
  • interval
  • cron
  • webhook
  • rabbitmq
  • rabbitmq_queue
  • redis
  • redis_stream
  • sqs
  • sqs_queue
  • mysql
  • mysql_state_store
  • mysql_cursor_store
  • mysql_table_queue
  • mysql_incremental
  • mysql_table_sink

YAML apps can also bind app-level state explicitly:

app:
  name: billing-sync
  state: app_state

connectors:
  app_state:
    type: mysql_state_store
    connector: db
    table: onestep_state

The named state resource must support load/save/delete; mysql_state_store and mysql_cursor_store both work.

Runnable examples live in:

  • example/cli_app.py
  • example/cli_app.yaml

Run it locally with:

PYTHONPATH=src onestep check example.cli_app:app
onestep check example/cli_app.yaml
SYNC_INTERVAL_SECONDS=5 PYTHONPATH=src onestep run example.cli_app:app

systemd

A complete deployment template lives in:

  • deploy/README.md
  • deploy/systemd/onestep-app.service
  • deploy/env/onestep-app.env.example
  • deploy/bin/onestep-preflight.sh

The example uses:

  • /etc/onestep/onestep-app.env for deployment variables
  • ExecStartPre to run a startup check
  • ExecStart to launch onestep run

Install it with:

sudo mkdir -p /etc/onestep
sudo cp deploy/env/onestep-app.env.example /etc/onestep/onestep-app.env
sudo cp deploy/systemd/onestep-app.service /etc/systemd/system/onestep-app.service
sudo systemctl daemon-reload
sudo systemctl enable --now onestep-app

Check status and logs:

sudo systemctl status onestep-app
sudo journalctl -u onestep-app -f

See deploy/README.md for the expected directory layout and the env vars you need to adjust first. The deploy template prepends APP_CWD to PYTHONPATH so module targets defined inside the repo can be imported by the onestep console script.

Control Plane Reporter

onestep can push runtime telemetry to onestep-control-plane over a single long-lived WebSocket session without adding a new connector or changing task code.

Attach the reporter explicitly:

from onestep import ControlPlaneReporter, ControlPlaneReporterConfig, IntervalSource, OneStepApp

app = OneStepApp("billing-sync")
reporter = ControlPlaneReporter(ControlPlaneReporterConfig.from_env(app_name=app.name))
reporter.attach(app)


@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_users(ctx, _):
    print("syncing users")

Required environment variables:

  • ONESTEP_CONTROL_PLANE_URL or ONESTEP_CONTROL_URL
  • ONESTEP_CONTROL_PLANE_TOKEN or ONESTEP_CONTROL_TOKEN

Common optional environment variables:

  • ONESTEP_ENV
  • ONESTEP_SERVICE_NAME
  • ONESTEP_NODE_NAME
  • ONESTEP_DEPLOYMENT_VERSION
  • ONESTEP_INSTANCE_ID
  • ONESTEP_REPLICA_KEY
  • ONESTEP_STATE_DIR
  • ONESTEP_CONTROL_PLANE_HEARTBEAT_INTERVAL_S
  • ONESTEP_CONTROL_PLANE_METRICS_INTERVAL_S
  • ONESTEP_CONTROL_PLANE_EVENT_FLUSH_INTERVAL_S
  • ONESTEP_CONTROL_PLANE_EVENT_BATCH_SIZE
  • ONESTEP_CONTROL_PLANE_MAX_PENDING_EVENTS
  • ONESTEP_CONTROL_PLANE_MAX_PENDING_METRIC_BATCHES
  • ONESTEP_CONTROL_PLANE_TIMEOUT_S
  • ONESTEP_CONTROL_PLANE_RECONNECT_BASE_DELAY_S
  • ONESTEP_CONTROL_PLANE_RECONNECT_MAX_DELAY_S

The reporter now uses:

  • GET /api/v1/agents/ws
  • hello / hello_ack
  • telemetry(sync|heartbeat|metrics|events)
  • command / command_ack / command_result

Behavior:

  • startup reuses a stable instance_id from local state unless ONESTEP_INSTANCE_ID or ONESTEP_REPLICA_KEY overrides it
  • startup opens a fresh WS session and negotiates capabilities
  • startup sends a heartbeat and a topology sync built from the current app tasks
  • sync and heartbeat sequences are tracked independently and persist across restarts
  • sync is resent on later heartbeat cycles until the current topology hash converges
  • task execution events are aggregated into task window metrics
  • important runtime events (retried, failed, dead_lettered, cancelled) are batched and pushed
  • remote commands can trigger ping, shutdown, restart, drain, pause_task, resume_task, sync_now, flush_metrics, and flush_events
  • transport send failures reset the current session and reconnect with exponential backoff plus jitter
  • low-priority metrics and events buffers are bounded locally; if the control plane stays down, the oldest buffered telemetry is dropped first
  • reporter failures are logged but do not stop task execution

Identity resolution order:

  1. ONESTEP_INSTANCE_ID: hard override for tests or explicit pinning.
  2. ONESTEP_REPLICA_KEY: derives a deterministic UUIDv5 from service_name + environment + replica_key.
  3. local identity state: reuses the instance_id stored in the reporter state dir.

By default ControlPlaneReporterConfig.from_env() uses a local state dir under:

  • ~/.onestep/control-plane-state/<environment>/<service_name>
  • ~/.onestep/control-plane-state/<environment>/<service_name>/<replica_key> when ONESTEP_REPLICA_KEY is set

Multi-replica rules:

  • single worker: the default state dir is enough
  • multiple workers on one host: give each worker a unique ONESTEP_REPLICA_KEY or ONESTEP_STATE_DIR
  • StatefulSet-style deployments: use the stable ordinal as ONESTEP_REPLICA_KEY
  • disposable deployment pod names are not a stable replica key unless you inject one yourself

For an operations-focused guide with deployment patterns and troubleshooting, see docs/stable-instance-identity.md.

Quick local demo:

  1. Start onestep-control-plane:

    If the control plane repo is checked out next to this one:

cd ../onestep-control-plane
./scripts/start-local.sh
  1. Start a long-running OneStep reporter demo:
./scripts/run-control-plane-demo.sh
  1. Or run the end-to-end smoke script, which boots the local control plane, starts the demo agent, dispatches a ping, and waits for a successful command_result:
./scripts/run-control-plane-smoke.sh
  1. Inspect the control plane. The demo cycles through ok, retry_once, fail, and slow jobs so you can see successful runs, retries, terminal failures, timeouts, and dead-letter events without changing any code:
http://127.0.0.1:8080/services?environment=dev
http://127.0.0.1:8080/services/control-plane-demo?environment=dev
http://127.0.0.1:8080/services/control-plane-demo?environment=dev&tab=commands

Runtime

The runtime now has a few explicit control points:

  • OneStepApp(..., shutdown_timeout_s=30.0) controls how long the app waits for inflight tasks before cancelling them during shutdown
  • @app.task(..., timeout_s=30.0) applies an execution timeout to async handlers
  • @app.task(..., dead_letter=...) routes terminal failures into a dead-letter sink
  • @app.on_event receives task execution events
  • InMemoryMetrics() is a built-in metrics hook for event counting
  • @app.on_startup and @app.on_shutdown register lifecycle hooks
  • ctx.config exposes app config
  • ctx.state exposes per-task namespaced state backed by the app state store

Failures are classified as:

  • error
  • timeout
  • cancelled

Custom retry policies receive a FailureInfo object so they can decide differently for timeouts vs business exceptions.

Execution events are emitted for:

  • fetched
  • started
  • succeeded
  • retried
  • failed
  • dead_lettered
  • cancelled
from onestep import InMemoryMetrics, InMemoryStateStore, MemoryQueue, OneStepApp, StructuredEventLogger

source = MemoryQueue("incoming")
dead = MemoryQueue("dead-letter")
metrics = InMemoryMetrics()
app = OneStepApp(
    "runtime-demo",
    config={"prefix": "demo"},
    state=InMemoryStateStore(),
    shutdown_timeout_s=10.0,
)
app.on_event(metrics)
app.on_event(StructuredEventLogger())


@app.on_startup
async def bootstrap(app):
    await source.publish({"value": 1})


@app.task(source=source, timeout_s=5.0, dead_letter=dead)
async def consume(ctx, item):
    runs = await ctx.state.get("runs", 0)
    await ctx.state.set("runs", runs + 1)
    print(ctx.config["prefix"], item)
    ctx.app.request_shutdown()

If a task ends in a terminal failure and dead_letter is configured, the dead-letter sink receives:

  • body["payload"]: the original payload
  • body["failure"]: {kind, exception_type, message, traceback?}
  • meta["original_meta"]: the original envelope metadata

You can inspect metrics in-process:

snapshot = metrics.snapshot()
print(snapshot["kinds"])

StructuredEventLogger() bridges TaskEvent into standard Python logging with consistent fields such as:

  • event_kind
  • app_name
  • task_name
  • source_name
  • attempts
  • duration_s
  • failure_kind

Memory Queue Example

from onestep import MemoryQueue, OneStepApp

app = OneStepApp("demo")
source = MemoryQueue("incoming")
sink = MemoryQueue("processed")


@app.task(source=source, emit=sink, concurrency=4)
async def double(ctx, item):
    ctx.app.request_shutdown()
    return {"value": item["value"] * 2}


async def main():
    await source.publish({"value": 21})
    await app.serve()

Interval Source

Use a local scheduler when you need to run a task every fixed duration.

from onestep import IntervalSource, OneStepApp

app = OneStepApp("interval-demo")


@app.task(
    source=IntervalSource.every(
        hours=1,
        immediate=True,
        overlap="skip",
        payload={"job": "refresh-cache"},
    )
)
async def refresh_cache(ctx, item):
    print("scheduled at:", ctx.current.meta["scheduled_at"], "payload:", item)

overlap controls what happens when the previous run is still inflight:

  • allow: start another run immediately
  • skip: drop missed ticks while the previous run is still running
  • queue: serialize missed ticks and run them one by one afterwards

Cron Source

Use CronSource when you care about wall-clock time rather than elapsed duration.

from onestep import CronSource, OneStepApp

app = OneStepApp("hourly-sync")


@app.task(source=CronSource("0 * * * *", timezone="Asia/Shanghai", overlap="skip"))
async def sync_hourly(ctx, _):
    print("running at:", ctx.current.meta["scheduled_at"])

The built-in parser supports standard 5-field cron expressions and these aliases:

  • @hourly
  • @daily
  • @weekly
  • @monthly
  • @yearly

Webhook Source

Use WebhookSource when external systems push events into your app.

from onestep import BearerAuth, MemoryQueue, OneStepApp, WebhookSource

app = OneStepApp("webhook-demo")
jobs = MemoryQueue("jobs")


@app.task(
    source=WebhookSource(
        path="/webhooks/github",
        methods=("POST",),
        host="127.0.0.1",
        port=8080,
        auth=BearerAuth("replace-me"),
    ),
    emit=jobs,
)
async def ingest_github(ctx, event):
    return {
        "event": event["headers"].get("x-github-event"),
        "payload": event["body"],
    }

The stdlib implementation supports:

  • shared host:port listeners across multiple webhook routes
  • optional BearerAuth(...)
  • json, form, text, raw, and auto body parsing
  • fixed WebhookResponse(...) responses
  • exact path matching and method filtering

The payload delivered to your task contains:

  • body
  • headers
  • query
  • method
  • path
  • client
  • received_at

MySQL Table Queue

Use a table as a task queue by claiming rows and marking them as finished.

from onestep import MemoryQueue, MySQLConnector, OneStepApp

app = OneStepApp("orders")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
source = db.table_queue(
    table="orders",
    key="id",
    where="status = 0",
    claim={"status": 9},
    ack={"status": 1},
    nack={"status": 0},
    batch_size=100,
)
sink = db.table_sink(table="processed_orders", mode="upsert", keys=("id",))


@app.task(source=source, emit=sink, concurrency=16)
async def process_order(ctx, row):
    return {"id": row["id"], "payload": row["payload"], "status": "done"}

MySQL Incremental Sync

Use (updated_at, id) as a lightweight cursor for Logstash-style sync.

from onestep import MemoryQueue, MySQLConnector, OneStepApp

app = OneStepApp("sync-users")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
state = db.cursor_store(table="onestep_cursor")
source = db.incremental(
    table="users",
    key="id",
    cursor=("updated_at", "id"),
    where="deleted = 0",
    batch_size=1000,
    state=state,
)
out = MemoryQueue("dw")


@app.task(source=source, emit=out, concurrency=1)
async def sync_user(ctx, row):
    return {"id": row["id"], "name": row["name"], "updated_at": row["updated_at"]}

For production deployments, prefer db.cursor_store(...) or db.state_store(...) over the in-memory stores so cursors and task state survive process restarts.

RabbitMQ Queue

from onestep import OneStepApp, RabbitMQConnector

app = OneStepApp("rabbitmq-demo")
rmq = RabbitMQConnector("amqp://guest:guest@localhost/")
source = rmq.queue(
    "incoming_jobs",
    exchange="jobs.events",
    routing_key="jobs.created",
    prefetch=50,
)
out = rmq.queue(
    "processed_jobs",
    exchange="jobs.events",
    routing_key="jobs.done",
)


@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
    return {"job": item["job"], "status": "done"}

Install with pip install '.[rabbitmq]'.

Redis Streams

Use Redis Streams for lightweight, reliable message queuing with consumer groups.

from onestep import OneStepApp, RedisConnector

app = OneStepApp("redis-demo")
redis = RedisConnector("redis://localhost:6379")
source = redis.stream(
    "jobs",
    group="workers",
    batch_size=100,
    poll_interval_s=0.5,
)
out = redis.stream("processed")


@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
    return {"job": item["job"], "status": "done"}

Key features:

  • Consumer groups: Multiple consumers share message processing
  • Message acknowledgment: XACK for reliable processing
  • Pending messages: Unacked messages stay in PEL for retry via XCLAIM
  • Stream trimming: maxlen option to limit stream size

Install with pip install '.[redis]'.

SQS Queue

from onestep import OneStepApp, SQSConnector

app = OneStepApp("sqs-demo")
sqs = SQSConnector(region_name="ap-southeast-1")
source = sqs.queue(
    "https://sqs.ap-southeast-1.amazonaws.com/123456789/jobs.fifo",
    message_group_id="workers",
    delete_batch_size=10,
    delete_flush_interval_s=0.5,
    heartbeat_interval_s=15,
    heartbeat_visibility_timeout=60,
)
out = sqs.queue(
    "https://sqs.ap-southeast-1.amazonaws.com/123456789/processed.fifo",
    message_group_id="workers",
)


@app.task(source=source, emit=out, concurrency=16)
async def process_job(ctx, item):
    return {"job": item["job"], "status": "done"}

Install with pip install '.[sqs]'.

Examples

Supported examples are indexed in:

  • example/README.md

Common entrypoints:

  • example/cli_app.py
  • example/runtime_showcase.py
  • example/mysql_incremental.py
  • example/redis_stream.py
  • example/webhook_source.py

Integration Tests

Optional live tests are under tests/integration/. The local stack now includes Redis, RabbitMQ, LocalStack SQS, and MySQL.

Install the live-test dependencies:

pip install '.[integration]'

Start the local integration stack:

make integration-up

Load the generated environment into your current shell:

eval "$(./scripts/setup-integration-env.sh)"

Run all live tests in one command:

make integration-test

You can also run one test file manually after loading the environment:

  • PYTHONPATH=src python3 -m pytest tests/integration/test_redis_live.py -q
  • PYTHONPATH=src python3 -m pytest tests/integration/test_rabbitmq_live.py -q
  • PYTHONPATH=src python3 -m pytest tests/integration/test_sqs_live.py -q
  • PYTHONPATH=src python3 -m pytest tests/integration/test_mysql_live.py -q

Set KEEP_INTEGRATION_SERVICES=1 to keep containers running after make integration-test.

Test Layout

The test suite is now intentionally split by responsibility:

  • tests/contract/: runtime contract tests that lock task execution semantics
  • tests/integration/: live infrastructure tests for Redis, RabbitMQ, SQS, and MySQL
  • tests/test_*.py: connector-focused unit tests

End-to-End Demo

For a quick end-to-end demo with webhook ingestion, queueing, dead-letter handling, metrics, and structured logs:

PYTHONPATH=src python3 example/runtime_showcase.py

Then send:

curl -X POST http://127.0.0.1:8090/demo/webhook \
  -H 'Content-Type: application/json' \
  -d '{"action":"ok","value":21}'

curl -X POST http://127.0.0.1:8090/demo/webhook \
  -H 'Content-Type: application/json' \
  -d '{"action":"fail","value":21}'

curl -X POST http://127.0.0.1:8090/demo/webhook \
  -H 'Content-Type: application/json' \
  -d '{"action":"slow","value":21}'

You will see:

  • structured task event logs
  • successful processing for action=ok
  • dead-letter output for action=fail
  • timeout + dead-letter output for action=slow

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

onestep-1.1.1.tar.gz (111.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

onestep-1.1.1-py3-none-any.whl (78.6 kB view details)

Uploaded Python 3

File details

Details for the file onestep-1.1.1.tar.gz.

File metadata

  • Download URL: onestep-1.1.1.tar.gz
  • Upload date:
  • Size: 111.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for onestep-1.1.1.tar.gz
Algorithm Hash digest
SHA256 1c2a1eb6b98e5b8841027b9cbfbab66447bab0777b8f27b325fae0579e859181
MD5 0408e049561e0689c49ca6dfc156b124
BLAKE2b-256 535522e8b6396e91f7fa47836ee1414e27fcb6ef507f025cb371dd3f280bc1ea

See more details on using hashes here.

File details

Details for the file onestep-1.1.1-py3-none-any.whl.

File metadata

  • Download URL: onestep-1.1.1-py3-none-any.whl
  • Upload date:
  • Size: 78.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for onestep-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f073dd6bffe83dc662486fd1048791e59522df6074df8086d4889925f8a08a26
MD5 12cb55eeed9b652c11877525d23b6f4a
BLAKE2b-256 b357dd3112db864c031b17dd1ee2db17d2121bda54d4ce512e2e8a923046b7f9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page