Skip to main content

Python library to communicate with Centrifugo server HTTP API

Project description

Python SDK to communicate with Centrifugo v5 HTTP API. Python >= 3.9 supported.

To install run:

pip install cent

Centrifugo compatibility

  • Cent v5 and higher works with Centrifugo v6 and v5.
  • If you need to work with Centrifugo v3, v4 => use Cent v4
  • If you need to work with Centrifugo v2 => use Cent v3

Usage

First of all, see the description of Centrifugo server API in the documentation. This library also supports API extensions provided by Centrifugo PRO. In general, refer to api.proto Protobuf schema file as a source of truth about all available Centrifugo server APIs. Don't forget that Centrifugo supports both HTTP and GRPC API – so you can switch to GRPC by using api.proto file to generate stubs for communication.

This library contains Client and AsyncClient to work with Centrifugo HTTP server API. Both clients have the same methods to work with Centrifugo API and raise the same top-level exceptions.

Sync HTTP client

from cent import Client

Required init arguments:

  • api_url (str) - Centrifugo HTTP API URL address, for example, http://localhost:8000/api
  • api_key (str) - Centrifugo HTTP API key for auth

Optional arguments:

  • timeout (float) - base timeout for all requests in seconds, default is 10 seconds.
  • session (requests.Session) - custom requests session to use.

Example:

from cent import Client, PublishRequest

api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"

client = Client(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = client.publish(request)
print(result)

Async HTTP client

from cent import AsyncClient

Required init arguments:

  • api_url (str) - Centrifugo HTTP API URL address, for example, http://localhost:8000
  • api_key (str) - Centrifugo HTTP API key for auth

Optional arguments:

  • timeout (float) - base timeout for all requests in seconds, default is 10 seconds.
  • session (aiohttp.ClientSession) - custom aiohttp session to use.

Example:

import asyncio
from cent import AsyncClient, PublishRequest

api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"

async def main():
    client = AsyncClient(api_url, api_key)
    request = PublishRequest(channel="channel", data={"input": "Hello world!"})
    result = await client.publish(request)
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

Handling errors

This library raises exceptions if sth goes wrong. All exceptions are subclasses of cent.CentError.

  • CentError - base class for all exceptions
  • CentNetworkError - raised in case of network related errors (connection refused)
  • CentTransportError - raised in case of transport related errors (HTTP status code is not 2xx)
  • CentTimeoutError - raised in case of timeout
  • CentUnauthorizedError - raised in case of unauthorized access (signal of invalid API key)
  • CentDecodeError - raised in case of server response decoding error
  • CentApiResponseError - raised in case of API response error (i.e. error returned by Centrifugo itself, you can inspect code and message returned by Centrifugo in this case)

Note, that BroadcastRequest and BatchRequest are quite special – since they contain multiple commands in one request, handling CentApiResponseError is still required, but not enough – you also need to manually iterate over the results to check for individual errors. For example, one publish command can fail while another one can succeed. For example:

from cent import *

c = Client("http://localhost:8000/api", "api_key")
req = BroadcastRequest(channels=["1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
#   responses=[
#       Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='rqKx')),
#       Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='nUrf'))
#   ]
# )
req = BroadcastRequest(channels=["invalid:1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
#   responses=[
#       Response[PublishResult](error=Error(code=102, message='unknown channel'), result=None),
#       Response[PublishResult](error=None, result=PublishResult(offset=8, epoch='nUrf'))
#   ]
# )

I.e. cent library does not raise exceptions for individual errors in BroadcastRequest or BatchRequest, only for top-level response error, for example, sending empty list of channels in broadcast:

req = BroadcastRequest(channels=[], data={})
c.broadcast(req)
Traceback (most recent call last):
    ...
    raise CentApiResponseError(
cent.exceptions.CentApiResponseError: Server API response error #107: bad request

So this all adds some complexity, but that's the trade-off for the performance and efficiency of these two methods. You can always write some convenient wrappers around cent library to handle errors in a way that suits your application.

Using for async consumers

You can use this library to constructs events for Centrifugo async consumers. For example, to get proper method and payload for async publish:

from cent import PublishRequest

request = PublishRequest(channel="channel", data={"input": "Hello world!"})
method = request.api_method
payload = request.api_payload
# use method and payload to construct async consumer event.

Using Broadcast and Batch

To demonstrate the benefits of using BroadcastRequest and BatchRequest let's compare approaches. Let's say at some point in your app you need to publish the same message into 10k different channels. Let's compare sequential publish, batch publish and broadcast publish. Here is the code to do the comparison:

from cent import *
from time import time


def main():
    publish_requests = []
    channels = []
    for i in range(10000):
        channel = f"test_{i}"
        publish_requests.append(PublishRequest(channel=channel, data={"msg": "hello"}))
        channels.append(channel)
    batch_request = BatchRequest(requests=publish_requests)
    broadcast_request = BroadcastRequest(channels=channels, data={"msg": "hello"})

    client = Client("http://localhost:8000/api", "api_key")

    start = time()
    for request in publish_requests:
        client.publish(request)
    print("sequential", time() - start)

    start = time()
    client.batch(batch_request)
    print("batch", time() - start)

    start = time()
    client.broadcast(broadcast_request)
    print("broadcast", time() - start)


if __name__ == "__main__":
    main()

On local machine, the output may look like this:

sequential 5.731332778930664
batch 0.12313580513000488
broadcast 0.06050515174865723

So BatchRequest is much faster than sequential requests in this case, and BroadcastRequest is the fastest - publication to 10k Centrifugo channels took only 60ms. Because all the work is done in one network round-trip. In reality the difference will be even more significant because of network latency.

For contributors

Tests and benchmarks

Prerequisites – start Centrifugo server locally:

CENTRIFUGO_API_KEY=api_key CENTRIFUGO_HISTORY_TTL=300s CENTRIFUGO_HISTORY_SIZE=100 \
CENTRIFUGO_PRESENCE=true CENTRIFUGO_GRPC_API=true ./centrifugo

And install dependencies:

make dev

Then to run tests, run:

make test

To run benchmarks, run:

make bench

Migrate to Cent v5

Cent v5 contains the following notable changes compared to Cent v4:

  • Client constructor slightly changed, refer to the examples above.
  • To call desired API import and construct a request object (inherited from Pydantic BaseModel) and then call corresponding method of client. This should feel very similar to how GRPC is usually structured.
  • Base exception class is now CentError instead of CentException, exceptions SDK raises were refactored.
  • To send multiple commands in one HTTP request SDK provides batch method.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cent-5.2.0.tar.gz (17.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cent-5.2.0-py3-none-any.whl (19.3 kB view details)

Uploaded Python 3

File details

Details for the file cent-5.2.0.tar.gz.

File metadata

  • Download URL: cent-5.2.0.tar.gz
  • Upload date:
  • Size: 17.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.12.12 Linux/6.11.0-1018-azure

File hashes

Hashes for cent-5.2.0.tar.gz
Algorithm Hash digest
SHA256 6de91f1f5bf5b2c053a8725fe96e15f65fa5dabb99627af5e9d359a3f6344983
MD5 4ea02c5a94b6e72284f7622ae02cb901
BLAKE2b-256 c208a1eb55e7d11fc61b603856d8f073302c9377d3185742310bddf78e641c22

See more details on using hashes here.

File details

Details for the file cent-5.2.0-py3-none-any.whl.

File metadata

  • Download URL: cent-5.2.0-py3-none-any.whl
  • Upload date:
  • Size: 19.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.12.12 Linux/6.11.0-1018-azure

File hashes

Hashes for cent-5.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e3484ea7c37f9fc6d8a0800cfba4c08ad87d8146e32939599b59d96e049830dd
MD5 00107a413233e1a30b9abaa155321da6
BLAKE2b-256 f005ca1e37519e07aeccf52ca35ea5d3cd87e52c07a2507f7f08da4c51ae482c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page