A Python SDK for Sequin
Project description
sequin-py
A lightweight Python SDK for sending, receiving, and acknowledging messages in Sequin streams. For easy development and testing, it also comes with helpful methods for managing the lifecycle of streams and consumers.
Installing
Install the library:
pip install sequin-py
Initializing
You'll typically initialize a Sequin Client
once in your application. Create a new file to initialize the Client
in, and import it for use in other parts of your app:
# sequin.py
from sequin_py import Client
base_url = os.environ.get('SEQUIN_URL', 'http://localhost:7376')
sequin = Client(base_url)
By default, the Client is initialized using Sequin's default host and port in local development: http://localhost:7376
Usage
You'll predominantly use sequin-py
to send, receive, and acknowledge messages in Sequin streams:
# Import the Sequin client from sequin.py
from sequin import sequin
# Define your stream and consumer
stream = 'your-stream-name'
consumer = 'your-consumer-name'
# Send a message
res, error = sequin.send_message(stream, 'test.1', 'Hello, Sequin!')
if error:
print(f"Error sending message: {error['summary']}")
# Handle the error appropriately
else:
print(f"Message sent successfully: {res}")
# Receive a message
message, error = sequin.receive_message(stream, consumer)
if error:
print(f"Error receiving message: {error['summary']}")
elif message is None:
print("No messages available")
else:
print(f"Received message: {message}")
# Don't forget to acknowledge the message
ack_res, ack_error = sequin.ack_message(stream, consumer, message['ack_id'])
if ack_error:
print(f"Error acking message: {ack_error['summary']}")
else:
print("Message acked")
send_message()
Send a message to a stream:
res, error = sequin.send_message(stream_id_or_name: str, key: str, data: str)
Parameters
send_message()
accepts three arguments:
stream_id_or_name
(str): Either the name or id of the stream.key
(str): The key for the message.data
(str): The data payload for the message as a string.
Returns
send_message()
will return a tuple (res, error)
:
Success
res = {
"published": 1
}
error = None
Error
res = None
error = {
"status": 404,
"summary": "stream not found"
}
Example
res, error = sequin.send_message('my_stream', 'greeting.1', 'Hello, Sequin!')
if error:
print(f"Error sending message: {error['summary']}")
# Handle the error appropriately
else:
print(f"Message sent successfully: {res['published']}")
send_messages()
Send a batch of messages (max 1,000):
res, error = sequin.send_messages(stream_id_or_name: str, messages: list)
Parameters
send_messages()
accepts two arguments:
stream_id_or_name
(str): Either the name or id of the stream.messages
(list): A list of message dictionaries:
[
{
"key": "message_key_1",
"data": "data_payload_1"
},
{
"key": "message_key_2",
"data": "data_payload_2"
},
# ...
]
Returns
send_messages()
will return a tuple (res, error)
:
[!IMPORTANT]
send_messages()
is all or nothing. Either all the messages are successfully published, or none of the messages are published.
Success
res = {
"published": 42
}
error = None
Error
res = None
error = {
"status": 404,
"summary": "Stream not found"
}
Example
messages = [
{
"key": "test.1",
"data": "Hello, Sequin!"
},
{
"key": "test.2",
"data": "Greetings from Sequin!"
}
]
res, error = sequin.send_messages('my_stream', messages)
if error:
print(f"Error sending messages: {error['summary']}")
# Handle the error appropriately
else:
print(f"Messages sent successfully: {res['published']}")
receive_message()
To pull a single message off the stream using a Sequin consumer, you'll use the receive_message()
function:
message, error = sequin.receive_message(stream_id_or_name: str, consumer_id_or_name: str)
Parameters
receive_message()
accepts two arguments:
stream_id_or_name
(str): Either the name or id of the stream.consumer_id_or_name
(str): Either the name or id of the consumer.
Returns
receive_message()
will return a tuple (message, error)
:
No messages available
message = None
error = None
Message
message = {
"message": {
"key": "test.1",
"stream_id": "def45b2d-ae3f-46a4-b57b-54cdc1cecc6d",
"data": "Hello, Sequin!",
"seq": 1,
"inserted_at": "2024-07-23T00:31:55.668060Z",
"updated_at": "2024-07-23T00:31:55.668060Z"
},
"ack_id": "07240856-96cb-4305-9b2f-620f4b1528a4"
}
error = None
Error
message = None
error = {
"status": 404,
"summary": "Consumer not found."
}
Example
message, error = sequin.receive_message('my_stream', 'my_consumer')
if error:
print(f"Error receiving message: {error['summary']}")
# Handle the error appropriately
elif message is None:
print("No messages available")
else:
print(f"Message received successfully: {message}")
receive_messages()
You can pull a batch of messages for your consumer using receive_messages()
. receive_messages()
pulls a batch of 10
messages by default:
messages, error = sequin.receive_messages(stream_id_or_name: str, consumer_id_or_name: str, options: Optional[dict] = None)
Parameters
receive_messages()
accepts three arguments:
stream_id_or_name
(str): Either the name or id of the stream.consumer_id_or_name
(str): Either the name or id of the consumer.options
(dict, optional): A dictionary that defines optional parameters:batch_size
(int): The number of messages to request. Default is 10, max of 1,000.
options = {
"batch_size": int # The number of messages to request. Default is 10, max of 1,000.
}
Returns
receive_messages()
will return a tuple (messages, error)
:
No messages available
messages = []
error = None
Messages
messages = [
{
"message": {
"key": "test.1",
"stream_id": "def45b2d-ae3f-46a4-b57b-54cdc1cecc6d",
"data": "Hello, Sequin!",
"seq": 1,
"inserted_at": "2024-07-23T00:31:55.668060Z",
"updated_at": "2024-07-23T00:31:55.668060Z"
},
"ack_id": "07240856-96cb-4305-9b2f-620f4b1528a4"
},
# ... more messages
]
error = None
Error
messages = None
error = {
"status": 404,
"summary": "Consumer not found."
}
Example
messages, error = sequin.receive_messages('my_stream', 'my_consumer', options={'batch_size': 100})
if error:
print(f"Error receiving messages: {error['summary']}")
# Handle the error appropriately
elif not messages:
print("No messages available")
else:
print(f"Messages received successfully: {len(messages)}")
ack_message()
To acknowledge a message, use the ack_message()
function:
res, error = sequin.ack_message(stream_id_or_name: str, consumer_id_or_name: str, ack_id: str)
Parameters
ack_message()
accepts three arguments:
stream_id_or_name
(str): Either the name or id of the stream.consumer_id_or_name
(str): Either the name or id of the consumer.ack_id
(str): The acknowledgement ID of the message.
Returns
ack_message()
will return a tuple (res, error)
:
Success
res = {
"success": True
}
error = None
Error
res = None
error = {
"status": 404,
"summary": "Consumer not found."
}
Example
res, error = sequin.ack_message('my_stream', 'my_consumer', '07240856-96cb-4305-9b2f-620f4b1528a4')
if error:
print(f"Error acknowledging message: {error['summary']}")
# Handle the error appropriately
else:
print("Message acknowledged successfully")
ack_messages()
To acknowledge multiple messages at once, use the ack_messages()
function:
res, error = sequin.ack_messages(stream_id_or_name: str, consumer_id_or_name: str, ack_ids: List[str])
Parameters
ack_messages()
accepts three arguments:
stream_id_or_name
(str): Either the name or id of the stream.consumer_id_or_name
(str): Either the name or id of the consumer.ack_ids
(List[str]): A list of acknowledgement IDs of the messages to acknowledge.
Returns
ack_messages()
will return a tuple (res, error)
:
Success
res = {
"success": True
}
error = None
Error
res = None
error = {
"status": 404,
"summary": "Consumer not found."
}
Example
ack_ids = ['07240856-96cb-4305-9b2f-620f4b1528a4', '18351967-07dc-5416-0c2e-731f5b2638b5']
res, error = sequin.ack_messages('my_stream', 'my_consumer', ack_ids)
if error:
print(f"Error acknowledging messages: {error['summary']}")
# Handle the error appropriately
else:
print("Messages acknowledged successfully")
nack_message()
To negatively acknowledge a message (indicating it couldn't be processed), use the nack_message()
function:
res, error = sequin.nack_message(stream_id_or_name: str, consumer_id_or_name: str, ack_id: str)
Parameters
nack_message()
accepts three arguments:
stream_id_or_name
(str): Either the name or id of the stream.consumer_id_or_name
(str): Either the name or id of the consumer.ack_id
(str): The acknowledgement ID of the message to negatively acknowledge.
Returns
nack_message()
will return a tuple (res, error)
:
Success
res = {
"success": True
}
error = None
Error
res = None
error = {
"status": 404,
"summary": "Consumer not found."
}
Example
res, error = sequin.nack_message('my_stream', 'my_consumer', '07240856-96cb-4305-9b2f-620f4b1528a4')
if error:
print(f"Error nacking message: {error['summary']}")
# Handle the error appropriately
else:
print("Message nacked successfully")
nack_messages()
To negatively acknowledge multiple messages at once, use the nack_messages()
function:
res, error = sequin.nack_messages(stream_id_or_name: str, consumer_id_or_name: str, ack_ids: List[str])
Parameters
nack_messages()
accepts three arguments:
stream_id_or_name
(str): Either the name or id of the stream.consumer_id_or_name
(str): Either the name or id of the consumer.ack_ids
(List[str]): A list of acknowledgement IDs of the messages to negatively acknowledge.
Returns
nack_messages()
will return a tuple (res, error)
:
Success
res = {
"success": True
}
error = None
Error
res = None
error = {
"status": 404,
"summary": "Consumer not found."
}
Example
ack_ids = ['07240856-96cb-4305-9b2f-620f4b1528a4', '18351967-07dc-5416-0c2e-731f5b2638b5']
res, error = sequin.nack_messages('my_stream', 'my_consumer', ack_ids)
if error:
print(f"Error nacking messages: {error['summary']}")
# Handle the error appropriately
else:
print("Messages nacked successfully")
create_stream()
Creating streams can be helpful in automated testing. You can create a new stream using create_stream()
:
stream, error = sequin.create_stream(stream_name: str, options: Optional[dict] = None)
Parameters
create_stream()
accepts two parameters:
stream_name
(str): The name of the stream you want to create.options
(dict, optional): A dictionary of key-value pairs that define optional parameters:one_message_per_key
(bool)process_unmodified
(bool)max_storage_gb
(int)retain_up_to
(int)retain_at_least
(int)
options = {
"one_message_per_key": True,
"process_unmodified": False,
"max_storage_gb": 10,
"retain_up_to": 1000000,
"retain_at_least": 100000
}
Returns
create_stream()
will return a tuple (stream, error)
:
Success
stream = {
"id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
"name": "my_stream",
"account_id": "8b930c30-2334-4339-b7ba-f250b7be223e",
"stats": {
"message_count": 0,
"consumer_count": 0,
"storage_size": 163840
},
"inserted_at": "2024-07-24T20:02:46Z",
"updated_at": "2024-07-24T20:02:46Z"
}
error = None
Error
stream = None
error = {
"status": 422,
"summary": "Validation failed: duplicate name"
}
Example
stream, error = sequin.create_stream('my_stream')
if error:
print(f"Error creating stream: {error['summary']}")
# Handle the error appropriately
else:
print(f"Stream created successfully: {stream}")
delete_stream()
Deleting streams can be helpful in automated testing. You can delete a stream using delete_stream()
:
res, error = sequin.delete_stream(stream_id_or_name: str)
Parameters
delete_stream()
accepts one parameter:
stream_id_or_name
(str): The id or name of the stream you want to delete.
Returns
delete_stream()
will return a tuple (res, error)
:
Successful delete
res = {
"id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
"deleted": True
}
error = None
Error
res = None
error = {
"status": 404,
"summary": "Not found: No `stream` found matching the provided ID or name"
}
Example
res, error = sequin.delete_stream('my_stream')
if error:
print(f"Error deleting stream: {error['summary']}")
# Handle the error appropriately
else:
print(f"Stream deleted successfully: {res}")
create_consumer()
Creating consumers can be helpful in automated testing. You can create a new consumer using create_consumer()
:
consumer, error = sequin.create_consumer(stream_id_or_name: str, consumer_name: str, consumer_filter: str, options: Optional[dict] = None)
Parameters
create_consumer()
accepts four parameters:
stream_id_or_name
(str): The id or name of the stream you want to attach the consumer to.consumer_name
(str): The name of the consumer you want to create.consumer_filter
(str): The filter pattern the consumer will use to pull messages off the stream.options
(dict, optional): A dictionary of key-value pairs that define optional parameters:ack_wait_ms
(int): Acknowledgement wait time in millisecondsmax_ack_pending
(int): Maximum number of pending acknowledgementsmax_deliver
(int): Maximum number of delivery attempts
options = {
"ack_wait_ms": 60000,
"max_ack_pending": 5000,
"max_deliver": 3
}
Returns
create_consumer()
will return a tuple (consumer, error)
:
Success
consumer = {
"ack_wait_ms": 30000,
"filter_key_pattern": "test.>",
"id": "67df6362-ba21-4ddc-8601-55d404bacaeb",
"inserted_at": "2024-07-24T20:12:20Z",
"kind": "pull",
"max_ack_pending": 10000,
"max_deliver": None,
"max_waiting": 20,
"name": "my_consumer",
"stream_id": "15b1f003-3a47-4371-8331-6437cb48477e",
"updated_at": "2024-07-24T20:12:20Z",
"status": "active"
}
error = None
Error
consumer = None
error = {
"status": 422,
"summary": "Validation failed: duplicate name"
}
Example
consumer, error = sequin.create_consumer('my_stream', 'my_consumer', 'test.>')
if error:
print(f"Error creating consumer: {error creating consumer: {error['summary']}")
# Handle the error appropriately
else:
print(f"Consumer created successfully: {consumer}")
delete_consumer()
Deleting consumers can be helpful in automated testing. You can delete a consumer using delete_consumer()
:
res, error = sequin.delete_consumer(stream_id_or_name: str, consumer_id_or_name: str)
Parameters
delete_consumer()
accepts two parameters:
stream_id_or_name
(str): The id or name of the stream associated to the consumer you want to delete.consumer_id_or_name
(str): The id or name of the consumer you want to delete.
Returns
delete_consumer()
will return a tuple (res, error)
:
Successful delete
res = {
"id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
"deleted": True
}
error = None
Error
res = None
error = {
"status": 404,
"summary": "Not found: No `consumer` found matching the provided ID or name"
}
Example
res, error = sequin.delete_consumer('my_stream', 'my_consumer')
if error:
print(f"Error deleting consumer: {error['summary']}")
# Handle the error appropriately
else:
print(f"Consumer deleted successfully: {res}")
Testing
To adequately test Sequin, we recommend creating temporary streams and consumers in addition to testing sending and receiving messages. Here's an example using pytest:
import pytest
from sequin import sequin
import time
def test_sequin_stream_and_consumer():
stream_name = f"test-stream-{int(time.time())}"
consumer_name = f"test-consumer-{int(time.time())}"
# Create a new stream
stream, error = sequin.create_stream(stream_name)
assert error is None
assert stream['name'] == stream_name
# Create a consumer
consumer, error = sequin.create_consumer(stream_name, consumer_name, 'test.>')
assert error is None
assert consumer['name'] == consumer_name
# Send a message
res, error = sequin.send_message(stream_name, 'test.1', 'Hello, Sequin!')
assert error is None
assert res['published'] == 1
# Receive and ack a message
message, error = sequin.receive_message(stream_name, consumer_name)
assert error is None
assert message is not None
res, error = sequin.ack_message(stream_name, consumer_name, message['ack_id'])
assert error is None
# Delete the consumer
res, error = sequin.delete_consumer(stream_name, consumer_name)
assert error is None
assert res['deleted'] is True
# Delete the stream
res, error = sequin.delete_stream(stream_name)
assert error is None
assert res['deleted'] is True
This test creates a temporary stream and consumer, sends a message, receives and acknowledges it, and then cleans up by deleting the consumer and stream. You can expand on this basic test to cover more of your specific use cases and edge cases.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file sequin_py-0.1.0.tar.gz
.
File metadata
- Download URL: sequin_py-0.1.0.tar.gz
- Upload date:
- Size: 8.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c8cf07256b753187d74dc856bcce1f30a72fd9e52eadbde160260d97b2eadf88 |
|
MD5 | 74256f4456a87f7fe121bbe32e2f4fd8 |
|
BLAKE2b-256 | 36d91aa67abc5faa6c6aa98570ed9d5afe53feffc64485d709065073309e3e2c |
File details
Details for the file sequin_py-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: sequin_py-0.1.0-py3-none-any.whl
- Upload date:
- Size: 7.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d25f875f59c2cca97c7967e6f6f5f2a2b6a519980cedaa1a771e669f8686f9a3 |
|
MD5 | a8dea198edb508a5e350e6a19afb606e |
|
BLAKE2b-256 | 10d98f6f2948da96c1a4b26ed25deb20fa5cb01b3d5ad111c7a24cd92ca003f6 |