Skip to main content

A Python SDK for Sequin

Project description

sequin-py

A lightweight Python SDK for sending, receiving, and acknowledging messages in Sequin streams. For easy development and testing, it also comes with helpful methods for managing the lifecycle of streams and consumers.

Installing

Install the library:

pip install sequin-py

Initializing

You'll typically initialize a Sequin Client once in your application. Create a new file to initialize the Client in, and import it for use in other parts of your app:

# sequin.py

from sequin_py import Client

base_url = os.environ.get('SEQUIN_URL', 'http://localhost:7376')

sequin = Client(base_url)

By default, the Client is initialized using Sequin's default host and port in local development: http://localhost:7376

Usage

You'll predominantly use sequin-py to send, receive, and acknowledge messages in Sequin streams:

# Import the Sequin client from sequin.py
from sequin import sequin

# Define your stream and consumer
stream = 'your-stream-name'
consumer = 'your-consumer-name'

# Send a message
res, error = sequin.send_message(stream, 'test.1', 'Hello, Sequin!')
if error:
    print(f"Error sending message: {error['summary']}")
    # Handle the error appropriately
else:
    print(f"Message sent successfully: {res}")

# Receive a message
message, error = sequin.receive_message(stream, consumer)
if error:
    print(f"Error receiving message: {error['summary']}")
elif message is None:
    print("No messages available")
else:
    print(f"Received message: {message}")
    # Don't forget to acknowledge the message
    ack_res, ack_error = sequin.ack_message(stream, consumer, message['ack_id'])
    if ack_error:
        print(f"Error acking message: {ack_error['summary']}")
    else:
        print("Message acked")

send_message()

Send a message to a stream:

res, error = sequin.send_message(stream_id_or_name: str, key: str, data: str)

Parameters

send_message() accepts three arguments:

  • stream_id_or_name (str): Either the name or id of the stream.
  • key (str): The key for the message.
  • data (str): The data payload for the message as a string.

Returns

send_message() will return a tuple (res, error):

Success

res = {
    "published": 1
}
error = None

Error

res = None
error = {
    "status": 404,
    "summary": "stream not found"
}

Example

res, error = sequin.send_message('my_stream', 'greeting.1', 'Hello, Sequin!')
if error:
    print(f"Error sending message: {error['summary']}")
    # Handle the error appropriately
else:
    print(f"Message sent successfully: {res['published']}")

send_messages()

Send a batch of messages (max 1,000):

res, error = sequin.send_messages(stream_id_or_name: str, messages: list)

Parameters

send_messages() accepts two arguments:

  • stream_id_or_name (str): Either the name or id of the stream.
  • messages (list): A list of message dictionaries:
[
    {
        "key": "message_key_1",
        "data": "data_payload_1"
    },
    {
        "key": "message_key_2",
        "data": "data_payload_2"
    },
    # ...
]

Returns

send_messages() will return a tuple (res, error):

[!IMPORTANT] send_messages() is all or nothing. Either all the messages are successfully published, or none of the messages are published.

Success

res = {
    "published": 42
}
error = None

Error

res = None
error = {
    "status": 404,
    "summary": "Stream not found"
}

Example

messages = [
    {
        "key": "test.1",
        "data": "Hello, Sequin!"
    },
    {
        "key": "test.2",
        "data": "Greetings from Sequin!"
    }
]

res, error = sequin.send_messages('my_stream', messages)
if error:
    print(f"Error sending messages: {error['summary']}")
    # Handle the error appropriately
else:
    print(f"Messages sent successfully: {res['published']}")

receive_message()

To pull a single message off the stream using a Sequin consumer, you'll use the receive_message() function:

message, error = sequin.receive_message(stream_id_or_name: str, consumer_id_or_name: str)

Parameters

receive_message() accepts two arguments:

  • stream_id_or_name (str): Either the name or id of the stream.
  • consumer_id_or_name (str): Either the name or id of the consumer.

Returns

receive_message() will return a tuple (message, error):

No messages available

message = None
error = None

Message

message = {
    "message": {
        "key": "test.1",
        "stream_id": "def45b2d-ae3f-46a4-b57b-54cdc1cecc6d",
        "data": "Hello, Sequin!",
        "seq": 1,
        "inserted_at": "2024-07-23T00:31:55.668060Z",
        "updated_at": "2024-07-23T00:31:55.668060Z"
    },
    "ack_id": "07240856-96cb-4305-9b2f-620f4b1528a4"
}
error = None

Error

message = None
error = {
    "status": 404,
    "summary": "Consumer not found."
}

Example

message, error = sequin.receive_message('my_stream', 'my_consumer')
if error:
    print(f"Error receiving message: {error['summary']}")
    # Handle the error appropriately
elif message is None:
    print("No messages available")
else:
    print(f"Message received successfully: {message}")

receive_messages()

You can pull a batch of messages for your consumer using receive_messages(). receive_messages() pulls a batch of 10 messages by default:

messages, error = sequin.receive_messages(stream_id_or_name: str, consumer_id_or_name: str, options: Optional[dict] = None)

Parameters

receive_messages() accepts three arguments:

  • stream_id_or_name (str): Either the name or id of the stream.
  • consumer_id_or_name (str): Either the name or id of the consumer.
  • options (dict, optional): A dictionary that defines optional parameters:
    • batch_size (int): The number of messages to request. Default is 10, max of 1,000.
options = {
    "batch_size": int  # The number of messages to request. Default is 10, max of 1,000.
}

Returns

receive_messages() will return a tuple (messages, error):

No messages available

messages = []
error = None

Messages

messages = [
    {
        "message": {
            "key": "test.1",
            "stream_id": "def45b2d-ae3f-46a4-b57b-54cdc1cecc6d",
            "data": "Hello, Sequin!",
            "seq": 1,
            "inserted_at": "2024-07-23T00:31:55.668060Z",
            "updated_at": "2024-07-23T00:31:55.668060Z"
        },
        "ack_id": "07240856-96cb-4305-9b2f-620f4b1528a4"
    },
    # ... more messages
]
error = None

Error

messages = None
error = {
    "status": 404,
    "summary": "Consumer not found."
}

Example

messages, error = sequin.receive_messages('my_stream', 'my_consumer', options={'batch_size': 100})
if error:
    print(f"Error receiving messages: {error['summary']}")
    # Handle the error appropriately
elif not messages:
    print("No messages available")
else:
    print(f"Messages received successfully: {len(messages)}")

ack_message()

To acknowledge a message, use the ack_message() function:

res, error = sequin.ack_message(stream_id_or_name: str, consumer_id_or_name: str, ack_id: str)

Parameters

ack_message() accepts three arguments:

  • stream_id_or_name (str): Either the name or id of the stream.
  • consumer_id_or_name (str): Either the name or id of the consumer.
  • ack_id (str): The acknowledgement ID of the message.

Returns

ack_message() will return a tuple (res, error):

Success

res = {
    "success": True
}
error = None

Error

res = None
error = {
    "status": 404,
    "summary": "Consumer not found."
}

Example

res, error = sequin.ack_message('my_stream', 'my_consumer', '07240856-96cb-4305-9b2f-620f4b1528a4')
if error:
    print(f"Error acknowledging message: {error['summary']}")
    # Handle the error appropriately
else:
    print("Message acknowledged successfully")

ack_messages()

To acknowledge multiple messages at once, use the ack_messages() function:

res, error = sequin.ack_messages(stream_id_or_name: str, consumer_id_or_name: str, ack_ids: List[str])

Parameters

ack_messages() accepts three arguments:

  • stream_id_or_name (str): Either the name or id of the stream.
  • consumer_id_or_name (str): Either the name or id of the consumer.
  • ack_ids (List[str]): A list of acknowledgement IDs of the messages to acknowledge.

Returns

ack_messages() will return a tuple (res, error):

Success

res = {
    "success": True
}
error = None

Error

res = None
error = {
    "status": 404,
    "summary": "Consumer not found."
}

Example

ack_ids = ['07240856-96cb-4305-9b2f-620f4b1528a4', '18351967-07dc-5416-0c2e-731f5b2638b5']
res, error = sequin.ack_messages('my_stream', 'my_consumer', ack_ids)
if error:
    print(f"Error acknowledging messages: {error['summary']}")
    # Handle the error appropriately
else:
    print("Messages acknowledged successfully")

nack_message()

To negatively acknowledge a message (indicating it couldn't be processed), use the nack_message() function:

res, error = sequin.nack_message(stream_id_or_name: str, consumer_id_or_name: str, ack_id: str)

Parameters

nack_message() accepts three arguments:

  • stream_id_or_name (str): Either the name or id of the stream.
  • consumer_id_or_name (str): Either the name or id of the consumer.
  • ack_id (str): The acknowledgement ID of the message to negatively acknowledge.

Returns

nack_message() will return a tuple (res, error):

Success

res = {
    "success": True
}
error = None

Error

res = None
error = {
    "status": 404,
    "summary": "Consumer not found."
}

Example

res, error = sequin.nack_message('my_stream', 'my_consumer', '07240856-96cb-4305-9b2f-620f4b1528a4')
if error:
    print(f"Error nacking message: {error['summary']}")
    # Handle the error appropriately
else:
    print("Message nacked successfully")

nack_messages()

To negatively acknowledge multiple messages at once, use the nack_messages() function:

res, error = sequin.nack_messages(stream_id_or_name: str, consumer_id_or_name: str, ack_ids: List[str])

Parameters

nack_messages() accepts three arguments:

  • stream_id_or_name (str): Either the name or id of the stream.
  • consumer_id_or_name (str): Either the name or id of the consumer.
  • ack_ids (List[str]): A list of acknowledgement IDs of the messages to negatively acknowledge.

Returns

nack_messages() will return a tuple (res, error):

Success

res = {
    "success": True
}
error = None

Error

res = None
error = {
    "status": 404,
    "summary": "Consumer not found."
}

Example

ack_ids = ['07240856-96cb-4305-9b2f-620f4b1528a4', '18351967-07dc-5416-0c2e-731f5b2638b5']
res, error = sequin.nack_messages('my_stream', 'my_consumer', ack_ids)
if error:
    print(f"Error nacking messages: {error['summary']}")
    # Handle the error appropriately
else:
    print("Messages nacked successfully")

create_stream()

Creating streams can be helpful in automated testing. You can create a new stream using create_stream():

stream, error = sequin.create_stream(stream_name: str, options: Optional[dict] = None)

Parameters

create_stream() accepts two parameters:

  • stream_name (str): The name of the stream you want to create.
  • options (dict, optional): A dictionary of key-value pairs that define optional parameters:
    • one_message_per_key (bool)
    • process_unmodified (bool)
    • max_storage_gb (int)
    • retain_up_to (int)
    • retain_at_least (int)
options = {
    "one_message_per_key": True,
    "process_unmodified": False,
    "max_storage_gb": 10,
    "retain_up_to": 1000000,
    "retain_at_least": 100000
}

Returns

create_stream() will return a tuple (stream, error):

Success

stream = {
    "id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
    "name": "my_stream",
    "account_id": "8b930c30-2334-4339-b7ba-f250b7be223e",
    "stats": {
        "message_count": 0,
        "consumer_count": 0,
        "storage_size": 163840
    },
    "inserted_at": "2024-07-24T20:02:46Z",
    "updated_at": "2024-07-24T20:02:46Z"
}
error = None

Error

stream = None
error = {
    "status": 422,
    "summary": "Validation failed: duplicate name"
}

Example

stream, error = sequin.create_stream('my_stream')
if error:
    print(f"Error creating stream: {error['summary']}")
    # Handle the error appropriately
else:
    print(f"Stream created successfully: {stream}")

delete_stream()

Deleting streams can be helpful in automated testing. You can delete a stream using delete_stream():

res, error = sequin.delete_stream(stream_id_or_name: str)

Parameters

delete_stream() accepts one parameter:

  • stream_id_or_name (str): The id or name of the stream you want to delete.

Returns

delete_stream() will return a tuple (res, error):

Successful delete

res = {
    "id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
    "deleted": True
}
error = None

Error

res = None
error = {
    "status": 404,
    "summary": "Not found: No `stream` found matching the provided ID or name"
}

Example

res, error = sequin.delete_stream('my_stream')
if error:
    print(f"Error deleting stream: {error['summary']}")
    # Handle the error appropriately
else:
    print(f"Stream deleted successfully: {res}")

create_consumer()

Creating consumers can be helpful in automated testing. You can create a new consumer using create_consumer():

consumer, error = sequin.create_consumer(stream_id_or_name: str, consumer_name: str, consumer_filter: str, options: Optional[dict] = None)

Parameters

create_consumer() accepts four parameters:

  • stream_id_or_name (str): The id or name of the stream you want to attach the consumer to.
  • consumer_name (str): The name of the consumer you want to create.
  • consumer_filter (str): The filter pattern the consumer will use to pull messages off the stream.
  • options (dict, optional): A dictionary of key-value pairs that define optional parameters:
    • ack_wait_ms (int): Acknowledgement wait time in milliseconds
    • max_ack_pending (int): Maximum number of pending acknowledgements
    • max_deliver (int): Maximum number of delivery attempts
options = {
    "ack_wait_ms": 60000,
    "max_ack_pending": 5000,
    "max_deliver": 3
}

Returns

create_consumer() will return a tuple (consumer, error):

Success

consumer = {
    "ack_wait_ms": 30000,
    "filter_key_pattern": "test.>",
    "id": "67df6362-ba21-4ddc-8601-55d404bacaeb",
    "inserted_at": "2024-07-24T20:12:20Z",
    "kind": "pull",
    "max_ack_pending": 10000,
    "max_deliver": None,
    "max_waiting": 20,
    "name": "my_consumer",
    "stream_id": "15b1f003-3a47-4371-8331-6437cb48477e",
    "updated_at": "2024-07-24T20:12:20Z",
    "status": "active"
}
error = None

Error

consumer = None
error = {
    "status": 422,
    "summary": "Validation failed: duplicate name"
}

Example

consumer, error = sequin.create_consumer('my_stream', 'my_consumer', 'test.>')
if error:
    print(f"Error creating consumer: {error creating consumer: {error['summary']}")
    # Handle the error appropriately
else:
    print(f"Consumer created successfully: {consumer}")

delete_consumer()

Deleting consumers can be helpful in automated testing. You can delete a consumer using delete_consumer():

res, error = sequin.delete_consumer(stream_id_or_name: str, consumer_id_or_name: str)

Parameters

delete_consumer() accepts two parameters:

  • stream_id_or_name (str): The id or name of the stream associated to the consumer you want to delete.
  • consumer_id_or_name (str): The id or name of the consumer you want to delete.

Returns

delete_consumer() will return a tuple (res, error):

Successful delete

res = {
    "id": "197a3ee8-8ddd-4ddd-8456-5d0b78a72784",
    "deleted": True
}
error = None

Error

res = None
error = {
    "status": 404,
    "summary": "Not found: No `consumer` found matching the provided ID or name"
}

Example

res, error = sequin.delete_consumer('my_stream', 'my_consumer')
if error:
    print(f"Error deleting consumer: {error['summary']}")
    # Handle the error appropriately
else:
    print(f"Consumer deleted successfully: {res}")

Testing

To adequately test Sequin, we recommend creating temporary streams and consumers in addition to testing sending and receiving messages. Here's an example using pytest:

import pytest
from sequin import sequin
import time

def test_sequin_stream_and_consumer():
    stream_name = f"test-stream-{int(time.time())}"
    consumer_name = f"test-consumer-{int(time.time())}"

    # Create a new stream
    stream, error = sequin.create_stream(stream_name)
    assert error is None
    assert stream['name'] == stream_name

    # Create a consumer
    consumer, error = sequin.create_consumer(stream_name, consumer_name, 'test.>')
    assert error is None
    assert consumer['name'] == consumer_name

    # Send a message
    res, error = sequin.send_message(stream_name, 'test.1', 'Hello, Sequin!')
    assert error is None
    assert res['published'] == 1

    # Receive and ack a message
    message, error = sequin.receive_message(stream_name, consumer_name)
    assert error is None
    assert message is not None

    res, error = sequin.ack_message(stream_name, consumer_name, message['ack_id'])
    assert error is None

    # Delete the consumer
    res, error = sequin.delete_consumer(stream_name, consumer_name)
    assert error is None
    assert res['deleted'] is True

    # Delete the stream
    res, error = sequin.delete_stream(stream_name)
    assert error is None
    assert res['deleted'] is True

This test creates a temporary stream and consumer, sends a message, receives and acknowledges it, and then cleans up by deleting the consumer and stream. You can expand on this basic test to cover more of your specific use cases and edge cases.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sequin_py-0.1.0.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

sequin_py-0.1.0-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file sequin_py-0.1.0.tar.gz.

File metadata

  • Download URL: sequin_py-0.1.0.tar.gz
  • Upload date:
  • Size: 8.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.4

File hashes

Hashes for sequin_py-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c8cf07256b753187d74dc856bcce1f30a72fd9e52eadbde160260d97b2eadf88
MD5 74256f4456a87f7fe121bbe32e2f4fd8
BLAKE2b-256 36d91aa67abc5faa6c6aa98570ed9d5afe53feffc64485d709065073309e3e2c

See more details on using hashes here.

File details

Details for the file sequin_py-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: sequin_py-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 7.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.4

File hashes

Hashes for sequin_py-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d25f875f59c2cca97c7967e6f6f5f2a2b6a519980cedaa1a771e669f8686f9a3
MD5 a8dea198edb508a5e350e6a19afb606e
BLAKE2b-256 10d98f6f2948da96c1a4b26ed25deb20fa5cb01b3d5ad111c7a24cd92ca003f6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page