Python client for Liftbridge.
Project description
python-liftbridge
This project is under development.
Python client for Liftbridge, a system that provides lightweight, fault-tolerant message streams for NATS.
Liftbridge provides the following high-level features:
- Log-based API for NATS
- Replicated for fault-tolerance
- Horizontally scalable
- Wildcard subscription support
- At-least-once delivery support and message replay
- Message key-value support
- Log compaction by key
Installation
$ pip install python-liftbridge
Basic Usage
from liftclient import Lift, Message, Stream, ErrStreamExists
# Create a Liftbridge client.
client = Lift(ip_address='localhost:9292', timeout=5)
# Create a stream attached to the NATS subject "foo".
try:
client.create_stream(Stream(subject='foo', name='foo-stream'))
except ErrStreamExists:
print('This stream already exists!')
# Publish a message to "foo".
client.publish(Message(value='hello', subject='foo'))
# Subscribe to the stream starting from the beginning.
for message in client.subscribe(
Stream(
subject='foo',
name='foo-stream',
).start_at_earliest_received(),
):
print("Received: '{}'".format(message.value))
Create Stream
Streams are a durable message log attached to a NATS subject. They record messages published to the subject for consumption.
Streams have a few key properties: a subject, which is the corresponding NATS subject, a name, which is a human-readable identifier for the stream, and a replication factor, which is the number of nodes the stream should be replicated to for redundancy. Optionally, there is a group which is the name of a load-balance group for the stream to join. When there are multiple streams in the same group, messages will be balanced among them.
"""
Create a stream attached to the NATS subject "foo.*" that is replicated to
all the brokers in the cluster. ErrStreamExists is returned if a stream with
the given name already exists for the subject.
"""
client.create_stream(Stream(subject='foo.*', name='my-stream', max_replication=True))
Subscription Start/Replay Options
Subscriptions are how Liftbridge streams are consumed. Clients can choose where to start consuming messages from in a stream. This is controlled using options passed to Subscribe.
# Subscribe starting with new messages only.
client.subscribe(
Stream(subject='foo', name='foo-stream')
)
# Subscribe starting with the most recently published value.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_earliest_received()
)
# Subscribe starting with the oldest published value.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_latest_received()
)
# Subscribe starting at a specific offset.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_offset(4)
)
# Subscribe starting at a specific time.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_time(datetime.now())
)
# Subscribe starting at a specific amount of time in the past.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_time_delta(timedelta(days=1))
)
Publishing
A publish API is provided to make it easy to write messages to streams. This includes a number of options for decorating messages with metadata like a message key.
Keys are used by Liftbridge's log compaction. When enabled, Liftbridge streams will retain only the last message for a given key.
# Publish a message with a key
client.publish(Message(subject='foo', value='Hello', key='key'))
Publishing Directly with NATS
Since Liftbridge is an extension of NATS, a NATS client can also be used to publish messages. This means existing NATS publishers do not need any changes for messages to be consumed in Liftbridge.
How to contribute
- Check for open issues or open a fresh issue to start a discussion around a feature idea or a bug.
- Fork the repository on GitHub to start making your changes to the master branch (or branch off of it).
- Write a test which shows that the bug was fixed or that the feature works as expected.
- Send a pull request and bug me until it gets merged and published.
Some things on the backlog:
- Add documentation (Sphynx)
- Add CI (CircleCI or TravisCI)
- Add tests
- Add code coverage
- Add TLS support for gRPC
- Add message headers support
- Add message ACK support (scaffolding is already done)
- Add method to close connection
- Add async client
- Add gRPC connection pool
- Add logging (and remove all the random prints)
- Add proper docstrings
- Add version file
- Add Contributing.md and explanation of the workflow (pyenv,tox,make,pre-commit...)
- Improve fetch metadata
- Improve error handling
- Add to the makefile run-liftbridge using Docker container
- Better instrumentation/observability (OpenCensus support?)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for liftclient-0.0.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ac33eeda377c7c9423f5e5b945f894834d3375380786a82cabb5c7e4ae90af50 |
|
MD5 | 64e433347f05cacf213a0b558b1ba832 |
|
BLAKE2b-256 | ef799f2dbcee16862e8a3c767896e814f0fd0cc8761f8edd0f44317ce6fa9809 |