Async task runtime for queue, polling, schedule, and webhook workloads.
Project description
onestep
onestep is a small async task runtime centered on four concepts:
OneStepApp: task registry and lifecycle managerSource: fetch data from a queue or polling backendSink: publish processed dataDelivery: a single fetched item withack/retry/fail
The V1 stable surface includes:
MemoryQueueMySQLConnector.table_queue(...)MySQLConnector.incremental(...)MySQLConnector.table_sink(...)MySQLConnector.state_store(...)MySQLConnector.cursor_store(...)RabbitMQConnector.queue(...)RedisConnector.stream(...)SQSConnector.queue(...)IntervalSource.every(...)CronSource(...)WebhookSource(...)
Install
Core package:
pip install onestep
Common extras:
pip install 'onestep[yaml]'pip install 'onestep[mysql]'pip install 'onestep[rabbitmq]'pip install 'onestep[redis]'pip install 'onestep[sqs]'pip install 'onestep[all]'
From a source checkout:
pip install -e .pip install -e '.[dev]'pip install -e '.[integration]'
Upgrading from 0.5.x
1.0.0 is a runtime rewrite. Projects built on the legacy step and
*Broker APIs should treat the upgrade as a migration, not a drop-in package
bump.
See MIGRATION-0.5-to-1.0.0.md for:
- old-to-new API mapping
- unsupported legacy features
- a minimal before/after example
- rollout guidance for existing projects
CLI
The deployment entrypoint is the onestep CLI.
Recommended module shape:
from onestep import IntervalSource, OneStepApp
app = OneStepApp("billing-sync")
@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_billing(ctx, _):
print("syncing billing data")
Run the app:
onestep run your_package.tasks:app
The short form is also supported:
onestep your_package.tasks:app
Check the target before starting it:
onestep check your_package.tasks:app
You can also point the CLI at a zero-argument factory:
onestep check your_package.tasks:build_app
Use JSON output when you want the check result in CI or deployment scripts:
onestep check --json your_package.tasks:app
You can also load a YAML app definition with handler.ref entries that point to Python callables:
app:
name: billing-sync
connectors:
tick:
type: interval
minutes: 5
immediate: true
processed:
type: memory
tasks:
- name: sync_billing
source: tick
handler:
ref: your_package.handlers.billing:sync_billing
params:
region: cn
emit: [processed]
retry:
type: max_attempts
max_attempts: 3
delay_s: 10
Check or run the YAML target the same way:
onestep check worker.yaml
onestep run worker.yaml
YAML resources can reference other resources by name, for example rabbitmq_queue.connector: rmq
or mysql_incremental.state: cursor_store.
Currently supported YAML resource types:
memoryintervalcronwebhookrabbitmqrabbitmq_queueredisredis_streamsqssqs_queuemysqlmysql_state_storemysql_cursor_storemysql_table_queuemysql_incrementalmysql_table_sink
YAML apps can also bind app-level state explicitly:
app:
name: billing-sync
state: app_state
connectors:
app_state:
type: mysql_state_store
connector: db
table: onestep_state
The named state resource must support load/save/delete; mysql_state_store
and mysql_cursor_store both work.
Runnable examples live in:
example/cli_app.pyexample/cli_app.yaml
Run it locally with:
PYTHONPATH=src onestep check example.cli_app:app
onestep check example/cli_app.yaml
SYNC_INTERVAL_SECONDS=5 PYTHONPATH=src onestep run example.cli_app:app
systemd
A complete deployment template lives in:
deploy/README.mddeploy/systemd/onestep-app.servicedeploy/env/onestep-app.env.exampledeploy/bin/onestep-preflight.sh
The example uses:
/etc/onestep/onestep-app.envfor deployment variablesExecStartPreto run a startup checkExecStartto launchonestep run
Install it with:
sudo mkdir -p /etc/onestep
sudo cp deploy/env/onestep-app.env.example /etc/onestep/onestep-app.env
sudo cp deploy/systemd/onestep-app.service /etc/systemd/system/onestep-app.service
sudo systemctl daemon-reload
sudo systemctl enable --now onestep-app
Check status and logs:
sudo systemctl status onestep-app
sudo journalctl -u onestep-app -f
See deploy/README.md for the expected directory layout and the env vars you need to adjust first.
The deploy template prepends APP_CWD to PYTHONPATH so module targets defined inside the repo can be imported by the onestep console script.
Control Plane Reporter
onestep can push runtime telemetry to onestep-control-plane over a single long-lived
WebSocket session without adding a new connector or changing task code.
Attach the reporter explicitly:
from onestep import ControlPlaneReporter, ControlPlaneReporterConfig, IntervalSource, OneStepApp
app = OneStepApp("billing-sync")
reporter = ControlPlaneReporter(ControlPlaneReporterConfig.from_env(app_name=app.name))
reporter.attach(app)
@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_users(ctx, _):
print("syncing users")
Required environment variables:
ONESTEP_CONTROL_PLANE_URLorONESTEP_CONTROL_URLONESTEP_CONTROL_PLANE_TOKENorONESTEP_CONTROL_TOKEN
Common optional environment variables:
ONESTEP_ENVONESTEP_SERVICE_NAMEONESTEP_NODE_NAMEONESTEP_DEPLOYMENT_VERSIONONESTEP_INSTANCE_IDONESTEP_REPLICA_KEYONESTEP_STATE_DIRONESTEP_CONTROL_PLANE_HEARTBEAT_INTERVAL_SONESTEP_CONTROL_PLANE_METRICS_INTERVAL_SONESTEP_CONTROL_PLANE_EVENT_FLUSH_INTERVAL_SONESTEP_CONTROL_PLANE_EVENT_BATCH_SIZEONESTEP_CONTROL_PLANE_MAX_PENDING_EVENTSONESTEP_CONTROL_PLANE_MAX_PENDING_METRIC_BATCHESONESTEP_CONTROL_PLANE_TIMEOUT_SONESTEP_CONTROL_PLANE_RECONNECT_BASE_DELAY_SONESTEP_CONTROL_PLANE_RECONNECT_MAX_DELAY_S
The reporter now uses:
GET /api/v1/agents/wshello/hello_acktelemetry(sync|heartbeat|metrics|events)command/command_ack/command_result
Behavior:
- startup reuses a stable
instance_idfrom local state unlessONESTEP_INSTANCE_IDorONESTEP_REPLICA_KEYoverrides it - startup opens a fresh WS session and negotiates capabilities
- startup sends a heartbeat and a topology sync built from the current app tasks
syncandheartbeatsequences are tracked independently and persist across restarts- sync is resent on later heartbeat cycles until the current topology hash converges
- task execution events are aggregated into task window metrics
- important runtime events (
retried,failed,dead_lettered,cancelled) are batched and pushed - remote commands can trigger
ping,shutdown,restart,drain,pause_task,resume_task,sync_now,flush_metrics, andflush_events - transport send failures reset the current session and reconnect with exponential backoff plus jitter
- low-priority
metricsandeventsbuffers are bounded locally; if the control plane stays down, the oldest buffered telemetry is dropped first - reporter failures are logged but do not stop task execution
Identity resolution order:
ONESTEP_INSTANCE_ID: hard override for tests or explicit pinning.ONESTEP_REPLICA_KEY: derives a deterministic UUIDv5 fromservice_name + environment + replica_key.- local identity state: reuses the
instance_idstored in the reporter state dir.
By default ControlPlaneReporterConfig.from_env() uses a local state dir under:
~/.onestep/control-plane-state/<environment>/<service_name>~/.onestep/control-plane-state/<environment>/<service_name>/<replica_key>whenONESTEP_REPLICA_KEYis set
Multi-replica rules:
- single worker: the default state dir is enough
- multiple workers on one host: give each worker a unique
ONESTEP_REPLICA_KEYorONESTEP_STATE_DIR - StatefulSet-style deployments: use the stable ordinal as
ONESTEP_REPLICA_KEY - disposable deployment pod names are not a stable replica key unless you inject one yourself
For an operations-focused guide with deployment patterns and troubleshooting, see
docs/stable-instance-identity.md.
Quick local demo:
-
Start
onestep-control-plane:If the control plane repo is checked out next to this one:
cd ../onestep-control-plane
./scripts/start-local.sh
- Start a long-running OneStep reporter demo:
./scripts/run-control-plane-demo.sh
- Or run the end-to-end smoke script, which boots the local control plane, starts the
demo agent, dispatches a
ping, and waits for a successfulcommand_result:
./scripts/run-control-plane-smoke.sh
- Inspect the control plane. The demo cycles through
ok,retry_once,fail, andslowjobs so you can see successful runs, retries, terminal failures, timeouts, and dead-letter events without changing any code:
http://127.0.0.1:8080/services?environment=dev
http://127.0.0.1:8080/services/control-plane-demo?environment=dev
http://127.0.0.1:8080/services/control-plane-demo?environment=dev&tab=commands
Runtime
The runtime now has a few explicit control points:
OneStepApp(..., shutdown_timeout_s=30.0)controls how long the app waits for inflight tasks before cancelling them during shutdown@app.task(..., timeout_s=30.0)applies an execution timeout to async handlers@app.task(..., dead_letter=...)routes terminal failures into a dead-letter sink@app.on_eventreceives task execution eventsInMemoryMetrics()is a built-in metrics hook for event counting@app.on_startupand@app.on_shutdownregister lifecycle hooksctx.configexposes app configctx.stateexposes per-task namespaced state backed by the app state store
Failures are classified as:
errortimeoutcancelled
Custom retry policies receive a FailureInfo object so they can decide differently for timeouts vs business exceptions.
Execution events are emitted for:
fetchedstartedsucceededretriedfaileddead_letteredcancelled
from onestep import InMemoryMetrics, InMemoryStateStore, MemoryQueue, OneStepApp, StructuredEventLogger
source = MemoryQueue("incoming")
dead = MemoryQueue("dead-letter")
metrics = InMemoryMetrics()
app = OneStepApp(
"runtime-demo",
config={"prefix": "demo"},
state=InMemoryStateStore(),
shutdown_timeout_s=10.0,
)
app.on_event(metrics)
app.on_event(StructuredEventLogger())
@app.on_startup
async def bootstrap(app):
await source.publish({"value": 1})
@app.task(source=source, timeout_s=5.0, dead_letter=dead)
async def consume(ctx, item):
runs = await ctx.state.get("runs", 0)
await ctx.state.set("runs", runs + 1)
print(ctx.config["prefix"], item)
ctx.app.request_shutdown()
If a task ends in a terminal failure and dead_letter is configured, the dead-letter sink receives:
body["payload"]: the original payloadbody["failure"]:{kind, exception_type, message, traceback?}meta["original_meta"]: the original envelope metadata
You can inspect metrics in-process:
snapshot = metrics.snapshot()
print(snapshot["kinds"])
StructuredEventLogger() bridges TaskEvent into standard Python logging with consistent fields such as:
event_kindapp_nametask_namesource_nameattemptsduration_sfailure_kind
Memory Queue Example
from onestep import MemoryQueue, OneStepApp
app = OneStepApp("demo")
source = MemoryQueue("incoming")
sink = MemoryQueue("processed")
@app.task(source=source, emit=sink, concurrency=4)
async def double(ctx, item):
ctx.app.request_shutdown()
return {"value": item["value"] * 2}
async def main():
await source.publish({"value": 21})
await app.serve()
Interval Source
Use a local scheduler when you need to run a task every fixed duration.
from onestep import IntervalSource, OneStepApp
app = OneStepApp("interval-demo")
@app.task(
source=IntervalSource.every(
hours=1,
immediate=True,
overlap="skip",
payload={"job": "refresh-cache"},
)
)
async def refresh_cache(ctx, item):
print("scheduled at:", ctx.current.meta["scheduled_at"], "payload:", item)
overlap controls what happens when the previous run is still inflight:
allow: start another run immediatelyskip: drop missed ticks while the previous run is still runningqueue: serialize missed ticks and run them one by one afterwards
Cron Source
Use CronSource when you care about wall-clock time rather than elapsed duration.
from onestep import CronSource, OneStepApp
app = OneStepApp("hourly-sync")
@app.task(source=CronSource("0 * * * *", timezone="Asia/Shanghai", overlap="skip"))
async def sync_hourly(ctx, _):
print("running at:", ctx.current.meta["scheduled_at"])
The built-in parser supports standard 5-field cron expressions and these aliases:
@hourly@daily@weekly@monthly@yearly
Webhook Source
Use WebhookSource when external systems push events into your app.
from onestep import BearerAuth, MemoryQueue, OneStepApp, WebhookSource
app = OneStepApp("webhook-demo")
jobs = MemoryQueue("jobs")
@app.task(
source=WebhookSource(
path="/webhooks/github",
methods=("POST",),
host="127.0.0.1",
port=8080,
auth=BearerAuth("replace-me"),
),
emit=jobs,
)
async def ingest_github(ctx, event):
return {
"event": event["headers"].get("x-github-event"),
"payload": event["body"],
}
The stdlib implementation supports:
- shared
host:portlisteners across multiple webhook routes - optional
BearerAuth(...) json,form,text,raw, andautobody parsing- fixed
WebhookResponse(...)responses - exact path matching and method filtering
The payload delivered to your task contains:
bodyheadersquerymethodpathclientreceived_at
MySQL Table Queue
Use a table as a task queue by claiming rows and marking them as finished.
from onestep import MemoryQueue, MySQLConnector, OneStepApp
app = OneStepApp("orders")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
source = db.table_queue(
table="orders",
key="id",
where="status = 0",
claim={"status": 9},
ack={"status": 1},
nack={"status": 0},
batch_size=100,
)
sink = db.table_sink(table="processed_orders", mode="upsert", keys=("id",))
@app.task(source=source, emit=sink, concurrency=16)
async def process_order(ctx, row):
return {"id": row["id"], "payload": row["payload"], "status": "done"}
MySQL Incremental Sync
Use (updated_at, id) as a lightweight cursor for Logstash-style sync.
from onestep import MemoryQueue, MySQLConnector, OneStepApp
app = OneStepApp("sync-users")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
state = db.cursor_store(table="onestep_cursor")
source = db.incremental(
table="users",
key="id",
cursor=("updated_at", "id"),
where="deleted = 0",
batch_size=1000,
state=state,
)
out = MemoryQueue("dw")
@app.task(source=source, emit=out, concurrency=1)
async def sync_user(ctx, row):
return {"id": row["id"], "name": row["name"], "updated_at": row["updated_at"]}
For production deployments, prefer db.cursor_store(...) or db.state_store(...) over the in-memory stores so cursors and task state survive process restarts.
RabbitMQ Queue
from onestep import OneStepApp, RabbitMQConnector
app = OneStepApp("rabbitmq-demo")
rmq = RabbitMQConnector("amqp://guest:guest@localhost/")
source = rmq.queue(
"incoming_jobs",
exchange="jobs.events",
routing_key="jobs.created",
prefetch=50,
)
out = rmq.queue(
"processed_jobs",
exchange="jobs.events",
routing_key="jobs.done",
)
@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}
Install with pip install '.[rabbitmq]'.
Redis Streams
Use Redis Streams for lightweight, reliable message queuing with consumer groups.
from onestep import OneStepApp, RedisConnector
app = OneStepApp("redis-demo")
redis = RedisConnector("redis://localhost:6379")
source = redis.stream(
"jobs",
group="workers",
batch_size=100,
poll_interval_s=0.5,
)
out = redis.stream("processed")
@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}
Key features:
- Consumer groups: Multiple consumers share message processing
- Message acknowledgment:
XACKfor reliable processing - Pending messages: Unacked messages stay in PEL for retry via
XCLAIM - Stream trimming:
maxlenoption to limit stream size
Install with pip install '.[redis]'.
SQS Queue
from onestep import OneStepApp, SQSConnector
app = OneStepApp("sqs-demo")
sqs = SQSConnector(region_name="ap-southeast-1")
source = sqs.queue(
"https://sqs.ap-southeast-1.amazonaws.com/123456789/jobs.fifo",
message_group_id="workers",
delete_batch_size=10,
delete_flush_interval_s=0.5,
heartbeat_interval_s=15,
heartbeat_visibility_timeout=60,
)
out = sqs.queue(
"https://sqs.ap-southeast-1.amazonaws.com/123456789/processed.fifo",
message_group_id="workers",
)
@app.task(source=source, emit=out, concurrency=16)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}
Install with pip install '.[sqs]'.
Examples
Supported examples are indexed in:
example/README.md
Common entrypoints:
example/cli_app.pyexample/runtime_showcase.pyexample/mysql_incremental.pyexample/redis_stream.pyexample/webhook_source.py
Integration Tests
Optional live tests are under tests/integration/.
The local stack now includes Redis, RabbitMQ, LocalStack SQS, and MySQL.
Install the live-test dependencies:
pip install '.[integration]'
Start the local integration stack:
make integration-up
Load the generated environment into your current shell:
eval "$(./scripts/setup-integration-env.sh)"
Run all live tests in one command:
make integration-test
You can also run one test file manually after loading the environment:
PYTHONPATH=src python3 -m pytest tests/integration/test_redis_live.py -qPYTHONPATH=src python3 -m pytest tests/integration/test_rabbitmq_live.py -qPYTHONPATH=src python3 -m pytest tests/integration/test_sqs_live.py -qPYTHONPATH=src python3 -m pytest tests/integration/test_mysql_live.py -q
Set KEEP_INTEGRATION_SERVICES=1 to keep containers running after make integration-test.
Test Layout
The test suite is now intentionally split by responsibility:
tests/contract/: runtime contract tests that lock task execution semanticstests/integration/: live infrastructure tests for Redis, RabbitMQ, SQS, and MySQLtests/test_*.py: connector-focused unit tests
End-to-End Demo
For a quick end-to-end demo with webhook ingestion, queueing, dead-letter handling, metrics, and structured logs:
PYTHONPATH=src python3 example/runtime_showcase.py
Then send:
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"ok","value":21}'
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"fail","value":21}'
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"slow","value":21}'
You will see:
- structured task event logs
- successful processing for
action=ok - dead-letter output for
action=fail - timeout + dead-letter output for
action=slow
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file onestep-1.1.0.tar.gz.
File metadata
- Download URL: onestep-1.1.0.tar.gz
- Upload date:
- Size: 111.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e092f292a45234ac4fd1b933b284a57c2caf66d89d27f85a06e74eb4640cf848
|
|
| MD5 |
66181a897fe613f1050c29942434a97f
|
|
| BLAKE2b-256 |
30426f4145641f4ef1293369586cc6a5ba604521cf7f0e239f313e3f638480dc
|
File details
Details for the file onestep-1.1.0-py3-none-any.whl.
File metadata
- Download URL: onestep-1.1.0-py3-none-any.whl
- Upload date:
- Size: 78.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f183a6c728090b92eece4a1c05c19ccaa50bfdc853d0e8c3d2117a546519208
|
|
| MD5 |
d3c4736ca736f0bd8aeae56c944d7cc0
|
|
| BLAKE2b-256 |
02bd380433740e065875a5bb39e198a59697911e70a2a804004f2cf79806b5a3
|