A python3 client for jetblack-messagebus
Project description
jetblack-messagebus-python3
Overview
This is a Python 3.7+ client for the jetblack-messagebus.
It follows the publish-subscribe pattern, and includes support for "notification" of a subscription by another client. This allows it to provide data on-demand.
See the server documentation for more detailed information.
Example
The client below subscribes on feed "TEST" to topic "FOO" and prints out the data it receives.
import asyncio
from jetblack_messagebus import CallbackClient
async def on_data(user, host, feed, topic, data_packets, is_image):
print(f'data: user="{user}",host="{host}",feed="{feed}",topic="{topic}",is_image={is_image}')
if not data_packets:
print("no data")
else:
for packet in data_packets:
message = packet.data.decode('utf8') if packet.data else None
print(f'packet: entitlements={packet.entitlements},message={message}')
async def main():
client = await CallbackClient.create('localhost', 9091)
client.data_handlers.append(on_data)
await client.add_subscription('TEST', 'FOO')
await client.start()
if __name__ == '__main__':
asyncio.run(main())
SSL
To create an SSL subscriber, pass in the ssl context.
import ssl
from jetblack_messagebus import CallbackClient
...
async def main():
"""Start the demo"""
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
client = await CallbackClient.create('myhost.example.com', 9091, ssl=ssl_context)
await client.add_subscription('TEST', 'FOO')
await client.start()
Authentication
The message bus currently supports the following authentication methods:
- Password File
- LDAP
- JWT
Password File and LDAP
Both the password file and LDAP methods require a username and password. This is provided by the basic authenticator.
import ssl
from jetblack_messagebus import CallbackClient
from jetblack_messagebus.authentication import BasicAuthenticator
...
async def main():
authenticator = BasicAuthenticator("john.doe@example.com", "pa$$word")
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
client = await CallbackClient.create(
'myhost.example.com',
9091,
ssl=ssl_context,
authenticator=authenticator
)
await client.add_subscription('TEST', 'FOO')
await client.start()
JWT
JWT authentication requires a valid token to be passed by the client. This is provided by the token authenticator.
import ssl
from jetblack_messagebus import CallbackClient
from jetblack_messagebus.authentication import TokenAuthenticator
...
async def main():
authenticator = TokenAuthenticator(
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huLmRvZUBleGFtcGxlLmNvbSIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI6MTUxNjIzOTAyMiwiZXhwIjoxNTE3MTgzNTAxfQ.wLSGBcNUT8r1DqQvaBrrGY4NHiiVOpoxrgeoPsSsJkY"
)
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
client = await CallbackClient.create(
'myhost.example.com',
9091,
ssl=ssl_context,
authenticator=authenticator
)
await client.add_subscription('TEST', 'FOO')
await client.start()
Message handlers
There are three types of messages that can be received:
- Data
- Subscription notifications
- Authorization requests.
Data
A data handler looks like this:
# A data handler.
async def on_data(
user: str,
host: str,
feed: str,
topic: str,
data_packets: Optional[List[DataPacket]],
is_image: bool
) -> None:
"""Called when data is received"""
pass
# Add the handler to the client.
client.data_handlers.append(on_data)
# Remove the handler
client.data_handlers.remove(on_data)
The data packets have two fields: entitlements
and data
.
The entitlements
is a optional set of ints which express the entitlements that were
required for the data to have been received.
The data
is an optional bytes
holding the encoded data. What this represents
is agreed by the sender and receiver. For example it might be a simple string, some
JSON text, or a protocol buffer.
Subscription notifications
A subscription notification handler looks like this:
# A notification handler.
async def on_notification(
client_id: UUID,
user: str,
host: str,
feed: str,
topic: str,
is_add: bool
) -> None:
"""Called for a notification"""
pass
# Add the handler to the client.
client.notification_handlers.append(on_notification)
# Remove the handler
client.notification_handlers.remove(on_notification)
Authorization requests
# An authorization handler.
async def on_authorization(
client_id: UUID,
host: str,
user: str,
feed: str,
topic: str
) -> None:
"""Called when authorization is requested"""
pass
# Add the handler to the client.
client._authorization_handlers.append(on_authorization)
# Remove the handler
client._authorization_handlers.remove(on_authorization)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for jetblack-messagebus-python3-3.0.2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1bb9253666eced27ef942d738e1c0da536c935c3a7080d9448aa7242eb3c785f |
|
MD5 | 3d0410fa038b287187ffd0e0edb16f43 |
|
BLAKE2b-256 | 525acf77a6ade578078f5a7752a102763d15ad825ab15562adafffd0c0e70d38 |
Hashes for jetblack_messagebus_python3-3.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d06cb3a43a5cf8110569122e8d2f371082cf0b72ad9c19cf03f08acda00923b5 |
|
MD5 | 50f4235d7cbb70ef23201948fa04d7e5 |
|
BLAKE2b-256 | 7b649a51ad95c6a4dd9ac6650bfa3771df322c60a372131c916f925e8ebca2f9 |