Skip to main content
Join the official 2019 Python Developers SurveyStart the survey!

A python3 client for jetblack-messagebus

Project description

jetblack-messagebus-python3

Overview

This is a Python 3.7+ client for the jetblack-messagebus.

It follows the publish-subscribe pattern, and includes support for "notification" of a subscription by another client. This allows it to provide data on-demand.

See the server documentation for more detailed information.

Example

The client below subscribes on feed "TEST" to topic "FOO" and prints out the data it receives.

import asyncio

from jetblack_messagebus import CallbackClient

async def on_data(user, host, feed, topic, data_packets, is_image):
    print(f'data: user="{user}",host="{host}",feed="{feed}",topic="{topic}",is_image={is_image}')
    if not data_packets:
        print("no data")
    else:
        for packet in data_packets:
            message = packet.data.decode('utf8') if packet.data else None
            print(f'packet: entitlements={packet.entitlements},message={message}')

async def main():
    client = await CallbackClient.create('localhost', 9001)
    client.data_handlers.append(on_data)
    await client.add_subscription('TEST', 'FOO')
    await client.start()

if __name__ == '__main__':
    asyncio.run(main())

SSL

To create an SSL subscriber, pass in the ssl context.

import ssl
from jetblack_messagebus import CallbackClient

...

async def main():
    """Start the demo"""
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    client = await CallbackClient.create('myhost.example.com', 9001, ssl=ssl_context)
    await client.add_subscription('TEST', 'FOO')
    await client.start()

Authentication

The message bus currently supports the following authentication methods:

  • Password File
  • LDAP
  • JWT

Password File and LDAP

Both the password file and LDAP methods require a username and password. This is provided by the basic authenticator.

import ssl
from jetblack_messagebus import CallbackClient
from jetblack_messagebus.authentication import BasicAuthenticator

...

async def main():
    authenticator = BasicAuthenticator("john.doe@example.com", "pa$$word")
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    client = await CallbackClient.create(
        'myhost.example.com',
        9001,
        ssl=ssl_context,
        authenticator=authenticator
    )
    await client.add_subscription('TEST', 'FOO')
    await client.start()

JWT

JWT authentication requires a valid token to be passed by the client. This is provided by the token authenticator.

import ssl
from jetblack_messagebus import CallbackClient
from jetblack_messagebus.authentication import TokenAuthenticator

...

async def main():
    authenticator = TokenAuthenticator(
        "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huLmRvZUBleGFtcGxlLmNvbSIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI6MTUxNjIzOTAyMiwiZXhwIjoxNTE3MTgzNTAxfQ.wLSGBcNUT8r1DqQvaBrrGY4NHiiVOpoxrgeoPsSsJkY"
    )
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    client = await CallbackClient.create(
        'myhost.example.com',
        9001,
        ssl=ssl_context,
        authenticator=authenticator
    )
    await client.add_subscription('TEST', 'FOO')
    await client.start()

Message handlers

There are three types of messages that can be received:

  • Data
  • Subscription notifications
  • Authorization requests.

Data

A data handler looks like this:

# A data handler.
async def on_data(
        user: str,
        host: str,
        feed: str,
        topic: str,
        data_packets: Optional[List[DataPacket]],
        is_image: bool
) -> None:
    """Called when data is received"""
    pass

# Add the handler to the client.
client.data_handlers.append(on_data)
# Remove the handler
client.data_handlers.remove(on_data)

The data packets have two fields: entitlements and data.

The entitlements is a optional set of ints which express the entitlements that were required for the data to have been received.

The data is an optional bytes holding the encoded data. What this represents is agreed by the sender and receiver. For example it might be a simple string, some JSON text, or a protocol buffer.

Subscription notifications

A subscription notification handler looks like this:

# A notification handler.
async def on_notification(
        client_id: UUID,
        user: str,
        host: str,
        feed: str,
        topic: str,
        is_add: bool
) -> None:
    """Called for a notification"""
    pass

# Add the handler to the client.
client.notification_handlers.append(on_notification)
# Remove the handler
client.notification_handlers.remove(on_notification)

Authorization requests

# An authorization handler.
async def on_authorization(
        client_id: UUID,
        host: str,
        user: str,
        feed: str,
        topic: str
) -> None:
    """Called when authorization is requested"""
    pass

# Add the handler to the client.
client._authorization_handlers.append(on_authorization)
# Remove the handler
client._authorization_handlers.remove(on_authorization)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for jetblack-messagebus-python3, version 4.1.1
Filename, size File type Python version Upload date Hashes
Filename, size jetblack_messagebus_python3-4.1.1-py3-none-any.whl (18.2 kB) File type Wheel Python version py3 Upload date Hashes View hashes
Filename, size jetblack-messagebus-python3-4.1.1.tar.gz (15.3 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page