A powerful messaging platform for modern developers
Project description
Memphis is an intelligent, frictionless message broker.
Made to enable developers to build real-time and streaming apps fast.
Cloud - Docs - Twitter - YouTube
Memphis.dev is more than a broker. It's a new streaming stack.
It accelerates the development of real-time applications that require
high throughput, low latency, small footprint, and multiple protocols,
with minimum platform operations, and all the observability you can think of.
Highly resilient, distributed architecture, cloud-native, and run on any Kubernetes,
on any cloud without zookeeper, bookeeper, or JVM.
Installation
$ pip3 install memphis-py
Importing
from memphis import Memphis, Headers
from memphis.types import Retention, Storage
import asyncio
Connecting to Memphis
First, we need to create Memphis object
and then connect with Memphis by using memphis.connect
.
async def main():
try:
memphis = Memphis()
await memphis.connect(
host="<memphis-host>",
username="<application-type username>",
account_id=<account_id>, # You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored
connection_token="<broker-token>", # you will get it on application type user creation
password="<string>", # depends on how Memphis deployed - default is connection token-based authentication
port=<port>, # defaults to 6666
reconnect=True, # defaults to True
max_reconnect=3, # defaults to 3
reconnect_interval_ms=1500, # defaults to 1500
timeout_ms=1500, # defaults to 1500
# for TLS connection:
key_file='<key-client.pem>',
cert_file='<cert-client.pem>',
ca_file='<rootCA.pem>'
)
...
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
Once connected, the entire functionalities offered by Memphis are available.
Disconnecting from Memphis
To disconnect from Memphis, call close()
on the memphis object.
await memphis.close()
Creating a Station
Unexist stations will be created automatically through the SDK on the first producer/consumer connection with default values.
If a station already exists nothing happens, the new configuration will not be applied
station = memphis.station(
name="<station-name>",
schema_name="<schema-name>",
retention_type=Retention.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES/ACK_BASED(cloud only). Defaults to MAX_MESSAGE_AGE_SECONDS
retention_value=604800, # defaults to 604800
storage_type=Storage.DISK, # Storage.DISK/Storage.MEMORY. Defaults to DISK
replicas=1, # defaults to 1
idempotency_window_ms=120000, # defaults to 2 minutes
send_poison_msg_to_dls=True, # defaults to true
send_schema_failed_msg_to_dls=True, # defaults to true
tiered_storage_enabled=False # defaults to false
partitions_number=1 # default to 1
)
Retention types
Memphis currently supports the following types of retention:
memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS
Means that every message persists for the value set in retention value field (in seconds)
memphis.types.Retention.MESSAGES
Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted
memphis.types.Retention.BYTES
Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted
memphis.types.Retention.ACK_BASED # for cloud users only
Means that after a message is getting acked by all interested consumer groups it will be deleted from the Station.
Retention Values
The retention values
are directly related to the retention types
mentioned above, where the values vary according to the type of retention chosen.
All retention values are of type int
but with different representations as follows:
memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS
is represented in seconds, memphis.types.Retention.MESSAGES
in a number of messages, memphis.types.Retention.BYTES
in a number of bytes and finally and finally memphis.ACK_BASED
is not using the retentionValue param at all.
After these limits are reached oldest messages will be deleted.
Storage types
Memphis currently supports the following types of messages storage:
memphis.types.Storage.DISK
Means that messages persist on disk
memphis.types.Storage.MEMORY
Means that messages persist on the main memory
Station partitions
Memphis station is created with 1 patition by default You can change the patitions number as you wish in order to scale your stations
Destroying a Station
Destroying a station will remove all its resources (producers/consumers)
station.destroy()
Creating a New Schema
await memphis.create_schema("<schema-name>", "<schema-type>", "<schema-file-path>")
Current available schema types - Protobuf / JSON schema / GraphQL schema / Avro
Enforcing a Schema on an Existing Station
await memphis.enforce_schema("<schema-name>", "<station-name>")
Deprecated - Attaching a Schema, use enforce_schema instead
await memphis.attach_schema("<schema-name>", "<station-name>")
Detaching a Schema from Station
await memphis.detach_schema("<station-name>")
Produce and Consume messages
The most common client operations are produce
to send messages and consume
to
receive messages.
Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are bytearray
.
In order to stop getting messages, you have to call consumer.destroy()
. Destroy will terminate regardless
of whether there are messages in flight for the client.
If a station is created with more than one partition, produce and consume bill be perform in a Round Robin fasion
Creating a Producer
producer = await memphis.producer(station_name="<station-name>", producer_name="<producer-name>")
Producing a message
Without creating a producer. In cases where extra performance is needed the recommended way is to create a producer first and produce messages by using the produce function of it
await memphis.produce(station_name='test_station_py', producer_name='prod_py',
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or bytearray/dict (schema validated station - avro schema)
ack_wait_sec=15, # defaults to 15
headers=headers, # default to {}
nonblocking=False, #defaults to false
msg_id="123"
)
With creating a producer
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema)
ack_wait_sec=15) # defaults to 15
Add headers
headers= Headers()
headers.add("key", "value")
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema)
headers=headers) # default to {}
Non-blocking Produce
For better performance, the client won't block requests while waiting for an acknowledgment.
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
headers={}, nonblocking=True)
Non-blocking Produce with Task Limits
For better performance, the client won't block requests while waiting for an acknowledgment. If you are producing a large number of messages and see timeout errors, then you may need to limit the number of concurrent tasks like so:
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
headers={}, nonblocking=True, limit_concurrent_tasks=500)
Message ID
Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id
await producer.produce(
message='bytearray/protobuf class/dict', # bytes / protobuf class (schema validated station - protobuf) or bytes/dict (schema validated station - json schema)
headers={},
async_produce=True,
msg_id="123")
Destroying a Producer
producer.destroy()
Creating a Consumer
consumer = await memphis.consumer(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
pull_interval_ms=1000, # defaults to 1000
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=10, # defaults to 10
start_consume_from_sequence=1, # start consuming from a specific sequence. defaults to 1
last_messages=-1 # consume the last N messages, defaults to -1 (all messages in the station)
)
Setting a context for message handler function
context = {"key": "value"}
consumer.set_context(context)
Processing messages
Once all the messages in the station were consumed the msg_handler will receive error: Memphis: TimeoutError
.
async def msg_handler(msgs, error, context):
for msg in msgs:
print("message: ", msg.get_data())
await msg.ack()
if error:
print(error)
consumer.consume(msg_handler)
Fetch a single batch of messages
msgs = await memphis.fetch_messages(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=10, # defaults to 10
start_consume_from_sequence=1 # start consuming from a specific sequence. defaults to 1
last_messages=-1 # consume the last N messages, defaults to -1 (all messages in the station))
)
Fetch a single batch of messages after creating a consumer
msgs = await consumer.fetch(batch_size=10) # defaults to 10
Acknowledge a message
Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group
await message.ack()
Delay the message after a given duration
Delay the message and tell Memphis server to re-send the same message again to the same consumer group. The message will be redelivered only in case consumer.max_msg_deliveries
is not reached yet.
await message.delay(delay_in_seconds)
Get headers
Get headers per message
headers = message.get_headers()
Get message sequence number
Get message sequence number
sequence_number = msg.get_sequence_number()
Destroying a Consumer
consumer.destroy()
Check connection status
memphis.is_connected()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.