Skip to main content

Socket.IO server to schedule Celery tasks from clients in real-time.

Project description

PyPI version Docker Image Version (latest semver)

Stirfried 🥡

Stirfried is an ASGI HTTP/Socket.IO server that provides both browser-based and regular clients with real-time control over Celery tasks.

Tasks are scheduled by name, meaning the server won't necessarily need an update when changes are made to the workers and tasks available to it.

Stirfried implements a simple-to-scale, three layered architecture: clients, servers and workers. Any layer can be scaled out by adding more instances.

Stirfried provides Socket.IO and HTTP APIs with three core functions:

  • Schedule a task
  • Revoke a task
  • Query task info

Want to see Stirfried in action before digging through the README? Try running the example.

Built on:

Workers

Install Stirfried in your Celery workers via pip/pipenv/poetry:

pip install stirfried

Import the StirfriedTask:

from stirfried.celery import StirfriedTask

Configure the base class globally:

app = Celery(..., task_cls=StirfriedTask)

...or per task:

@app.task(base=StirfriedTask)
def add(x, y, room=None):
    return x + y

Servers

The server can be run by running the korijn/stirfried Docker container and exposing port 8000, or alternatively by cloning this repo, installing the dependencies with poetry and starting the uvicorn server as demonstrated in the example code.

You can configure both the servers and workers with a settings.py file, via the standard Celery configuration mechanism. In the docker deployment scenario, you can mount the settings file to the path /app/settings.py.

Clients

Clients can connect to the Socket.IO API using standard Socket.IO libraries, and to the HTTP API using plain window.fetch.

Task object schema

Tasks are scheduled by submitting the following task object to either of the APIs:

{
    "task_name": "",  // (required) task name
    "args": [],       // (optional) task arguments
    "kwargs": {},     // (optional) task keyword arguments
    "room": "",       // (optional) custom room override, only processed if
                      //            `custom_rooms` is enabled
                      //            NOTE: can also be used to disable server events
                      //                  for this task by passing the sentinel room
                      //                  ("NO_EMIT" by default)
    "chain": []       // (optional) array of task objects to chain onto the main task
                      //            task objects use the same schema, except for
                      //            the `chain` property which cannot be nested further
                      //            NOTE: chained tasks are applied in reverse order
}

Socket.IO API

Events are described in the following format: name(args[, optional]) -> callback_args

Clients can emit any of the following events that servers are listening for:

Event Description
send_task({task_name[, args][, kwargs][, room][, chain]}) -> {status, data} Schedule a task. Use a callback to receive the reply in the client. status indicates if scheduling succeeded and data contains the task id or error message in case of failure. The client can use the task id as reference when processing subsequent server-emitted events. Reference the task object schema for more details.
revoke_task(task_id) Revoke a task. Will not fail if the task does not exist, and won't do anything if the task is already running.
task_info(task_id) -> {id, state, result} Query task info. Use a callback to receive the reply in the client. Only works if a Celery result backend is configured.

Clients can listen for the following server-emitted events, which directly hook into the Celery Task class callbacks, except for on_progress which is a Stirfried addition and may be implemented by tasks to support progress events:

Event Description
on_progress({current, total, info, task_id, task_name}) Emitted on task progress updates. This event will only be emitted if tasks call emit_progress.
on_retry({task_id, task_name[, einfo]}) Emitted automatically on task retries. einfo is only included if stirfried_error_info=True.
on_failure({task_id, task_name[, einfo]}) Emitted automatically on task failure. einfo is only included if stirfried_error_info=True.
on_success({retval, task_id, task_name}) Emitted automatically on task success.
on_return({status, retval, task_id, task_name}) Emitted automatically on task success and failure.

HTTP API

Endpoint Description
POST /task Schedule a task. Submit the Task object as JSON the body of the request. Reference the task object schema for more details.
DELETE /task/{id} Revoke a task. Will not fail if the task does not exist, and won't do anything if the task is already running.
GET /task/{id} Query task info. Only works if a Celery result backend is configured.

Settings

You can configure Celery, Socket.IO and Stirfried all from the same settings.py file. Stirfried settings are prefixed with stirfried_, Socket.IO settings are prefixed with socketio_, and Celery settings are used as-is (not prefixed).

Socket.IO server settings are passed on directly (but without the prefix) to the AsyncServer constructor of the python-socketio library, see their documentation for the options that are available. See the Celery documentation for the options there.

The following options are additionally available for configuring Stirfried servers and workers:

Key Type Default Description
stirfried_enable_http bool True Set to False to disable the HTTP API.
stirfried_enable_socketio bool True Set to False to disable the Socket.IO API.
stirfried_enable_task_info bool True Set to False to disable the task_info event and GET /task/{id} endpoint.
stirfried_enable_revoke_task bool True Set to False to disable the revoke_task event and DELETE /task/{id} endpoint.
stirfried_redis_url str "" Connection string for the Socket.IO API server-to-server communication over Redis pubsub. Required if you want workers to be able to emit events.
stirfried_available_tasks List[str] [] If non-empty, send_task and POST /task will fail if a task name is not contained in the list.
stirfried_error_info bool False Set to True to include error messages and tracebacks in events, event callbacks and HTTP responses.
stirfried_sentinel_room str "NO_EMIT" A magic string value that can be passed to the room argument to prevent workers from emitting events for a task. This is the default room value for task sent to the HTTP API since there is no Socket.IO client.
stirfried_custom_rooms bool False Set to True to allow clients to override the default room for server-emitted events.
stirfried_header_task_map Dict[str, Dict[str, str] {} Configure to map headers to keyword arguments for specific tasks. For example, {"send_email": {"Date": "date"}}, would cause the Date header's value to be injected into the keyword argument date whenever a send_email task is scheduled. This can be used in concert with Socket.IO's extraHeaders feature to implement authorization and validation.

Rooms

For Socket.IO API scheduled tasks, server-emitted events are sent to the client that scheduled the task by default. For HTTP API scheduled tasks, server-emitted events are not emitted by default. The server accomplishes this by injecting a value into the room keyword argument of Stirfried Celery tasks.

The StirfriedTask base class depends on the presence of this keyword argument.

This means you are required to add the keyword argument room=None to your task definitions in order to receive it.

If custom_rooms is enabled, clients can override the value by sending along a custom room value (though not via the keyword arguments of the task, see the API schema documentation).

Progress events

You can emit progress events from workers by calling self.emit_progress(current, total, info=None) in a task.

You can use the optional info keyword argument to send along arbitrary metadata, such as a progress message or early results.

Note that you are required to pass bind=True to the celery.task decorator in order to get access to the self instance variable.

@celery.task(bind=True)
def add(self, x, y, room=None):
    s = x
    self.emit_progress(50, 100)  # 50%
    s += y
    return s

Binary/big data

Socket.IO clients, servers and Celery workers support the msgpack transport, allowing you to use binary data directly (without needing to manually convert to and from base64 encoded strings and suffering the according performance penalty).

You should also be aware of limitations in Redis on client output buffers. This means that you cannot emit events greater than a certain value (32mb by default). You can override this setting in various ways, here's how to do it via the Redis server CLI:

# client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
redis-server --client-output-buffer-limit pubsub 256mb 128mb 30

Testing

When unit testing a Stirfried Celery worker, the recommended approach is to disable the Redis connection by simply leaving out stirfried_redis_url from your settings, and to directly call the task functions in unit tests. The lack of a Redis connection will short-circuit any events that would normally be emitted. This setup will allow you to treat tasks as regular functions and perform unit testing as usual.

Optionally, you can patch/mock any calls to self.emit_progress using standard Python testing utilities to test those too.

Example code

The repo includes an example demonstrating all of the functionality provided by Stirfried.

You can run the example as follows:

  • Clone the repository
  • cd into the example directory
  • Run docker-compose build
  • Then docker-compose up
  • Open your browser and go to http://localhost:8080/
  • You should see the following interface and are ready to give Stirfried a try:

Stirfried 🥡 test client

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stirfried-0.7.0.tar.gz (9.5 kB view hashes)

Uploaded Source

Built Distribution

stirfried-0.7.0-py3-none-any.whl (8.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page