A powerful messaging platform for modern developers
Project description
Playground - Sandbox - Docs - Twitter - YouTube
Memphis{dev} is an open-source real-time data processing platform
that provides end-to-end support for in-app streaming use cases using Memphis distributed message broker.
Memphis' platform requires zero ops, enables rapid development, extreme cost reduction,
eliminates coding barriers, and saves a great amount of dev time for data-oriented developers and data engineers.
Installation
$ pip3 install memphis-py
Importing
from memphis import Memphis, Headers
from memphis import retention_types, storage_types
Connecting to Memphis
First, we need to create Memphis object
and then connect with Memphis by using memphis.connect
.
async def main():
try:
memphis = Memphis()
await memphis.connect(
host="<memphis-host>",
username="<application-type username>",
connection_token="<broker-token>",
port="<port>", # defaults to 6666
reconnect=True, # defaults to True
max_reconnect=3, # defaults to 3
reconnect_interval_ms=1500, # defaults to 1500
timeout_ms=1500 # defaults to 1500
)
...
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
Once connected, the entire functionalities offered by Memphis are available.
Disconnecting from Memphis
To disconnect from Memphis, call close()
on the memphis object.
await memphis.close()
Creating a Station
station = memphis.station(
name="<station-name>",
retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES. Defaults to MAX_MESSAGE_AGE_SECONDS
retention_value=604800, # defaults to 604800
storage_type=storage_types.DISK, # storage_types.DISK/storage_types.MEMORY. Defaults to DISK
replicas=1, # defaults to 1
dedup_enabled=False, # defaults to false
dedup_window_ms: 0, # defaults to 0
)
Retention types
Memphis currently supports the following types of retention:
memphis.retention_types.MAX_MESSAGE_AGE_SECONDS
Means that every message persists for the value set in retention value field (in seconds)
memphis.retention_types.MESSAGES
Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted
memphis.retention_types.BYTES
Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted
Storage types
Memphis currently supports the following types of messages storage:
memphis.storage_types.DISK
Means that messages persist on disk
memphis.storage_types.MEMORY
Means that messages persist on the main memory
Destroying a Station
Destroying a station will remove all its resources (producers/consumers)
station.destroy()
Produce and Consume messages
The most common client operations are produce
to send messages and consume
to
receive messages.
Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are bytearray
.
In order to stop getting messages, you have to call consumer.destroy()
. Destroy will terminate regardless
of whether there are messages in flight for the client.
Creating a Producer
producer = await memphis.producer(station_name="<station-name>", producer_name="<producer-name>", generate_random_suffix=False)
Producing a message
await prod.produce(
message='bytearray/protobuf class', # bytes / protobuf class in case your station is schema validated
ack_wait_sec=15) # defaults to 15
Add headers
headers= Headers()
headers.add("key", "value")
await producer.produce(
message='bytearray/protobuf class', # bytes / protobuf class in case your station is schema validated
headers=headers) # default to {}
Async produce
Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss
await producer.produce(
message='bytearray/protobuf class', # bytes / protobuf class in case your station is schema validated
headers={}, async_produce=True)
Destroying a Producer
producer.destroy()
Creating a Consumer
consumer = await memphis.consumer(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
pull_interval_ms=1000, # defaults to 1000
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=10, # defaults to 10
generate_random_suffix=False
)
Processing messages
Once all the messages in the station were consumed the msg_handler will receive error: Memphis: TimeoutError
.
async def msg_handler(msgs, error):
for msg in msgs:
print("message: ", msg.get_data())
await msg.ack()
if error:
print(error)
consumer.consume(msg_handler)
Acknowledge a message
Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group
await message.ack()
Get headers
Get headers per message
``python headers = message.get_headers()
### Destroying a Consumer
```python
consumer.destroy()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.