A powerful messaging platform for modern developers
Project description
Playground - Sandbox - Docs - Twitter - YouTube
Memphis{dev} is an open-source real-time data processing platform
that provides end-to-end support for in-app streaming use cases using Memphis distributed message broker.
Memphis' platform requires zero ops, enables rapid development, extreme cost reduction,
eliminates coding barriers, and saves a great amount of dev time for data-oriented developers and data engineers.
Installation
$ pip3 install memphis-py
Importing
from memphis import Memphis, Headers
from memphis import retention_types, storage_types
Connecting to Memphis
First, we need to create Memphis object
and then connect with Memphis by using memphis.connect
.
async def main():
try:
memphis = Memphis()
await memphis.connect(
host="<memphis-host>",
username="<application-type username>",
connection_token="<broker-token>",
port="<port>", # defaults to 6666
reconnect=True, # defaults to True
max_reconnect=3, # defaults to 3
reconnect_interval_ms=1500, # defaults to 1500
timeout_ms=1500 # defaults to 1500
)
...
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
Once connected, the entire functionalities offered by Memphis are available.
Disconnecting from Memphis
To disconnect from Memphis, call close()
on the memphis object.
await memphis.close()
Creating a Station
station = memphis.station(
name="<station-name>",
retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES. Defaults to MAX_MESSAGE_AGE_SECONDS
retention_value=604800, # defaults to 604800
storage_type=storage_types.DISK, # storage_types.DISK/storage_types.MEMORY. Defaults to DISK
replicas=1, # defaults to 1
dedup_enabled=False, # defaults to false
dedup_window_ms: 0, # defaults to 0
)
Retention types
Memphis currently supports the following types of retention:
memphis.retention_types.MAX_MESSAGE_AGE_SECONDS
Means that every message persists for the value set in retention value field (in seconds)
memphis.retention_types.MESSAGES
Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted
memphis.retention_types.BYTES
Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted
Storage types
Memphis currently supports the following types of messages storage:
memphis.storage_types.DISK
Means that messages persist on disk
memphis.storage_types.MEMORY
Means that messages persist on the main memory
Destroying a Station
Destroying a station will remove all its resources (producers/consumers)
station.destroy()
Produce and Consume messages
The most common client operations are produce
to send messages and consume
to
receive messages.
Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are bytearray
.
In order to stop getting messages, you have to call consumer.destroy()
. Destroy will terminate regardless
of whether there are messages in flight for the client.
Creating a Producer
producer = await memphis.producer(station_name="<station-name>", producer_name="<producer-name>", generate_random_suffix=False)
Producing a message
await prod.produce(
message='bytearray/protobuf class', # bytes / protobuf class in case your station is schema validated
ack_wait_sec=15) # defaults to 15
Add headers
headers= Headers()
headers.add("key", "value")
await producer.produce(
message='bytearray/protobuf class', # bytes / protobuf class in case your station is schema validated
headers=headers) # default to {}
Async produce
Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss
await producer.produce(
message='bytearray/protobuf class', # bytes / protobuf class in case your station is schema validated
headers={}, async_produce=True)
Destroying a Producer
producer.destroy()
Creating a Consumer
consumer = await memphis.consumer(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
pull_interval_ms=1000, # defaults to 1000
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=10, # defaults to 10
generate_random_suffix=False
)
Processing messages
Once all the messages in the station were consumed the msg_handler will receive error: Memphis: TimeoutError
.
async def msg_handler(msgs, error):
for msg in msgs:
print("message: ", msg.get_data())
await msg.ack()
if error:
print(error)
consumer.consume(msg_handler)
Acknowledge a message
Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group
await message.ack()
Get headers
Get headers per message
``python headers = message.get_headers()
### Destroying a Consumer
```python
consumer.destroy()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file memphis-py-0.2.3.tar.gz
.
File metadata
- Download URL: memphis-py-0.2.3.tar.gz
- Upload date:
- Size: 12.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.7.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f2ec3469b464167ce1727869787590bfc4c25a16796da5a948e7c7ea1f6f862f |
|
MD5 | 2791ba1417fb886642e2e08a121e5bbc |
|
BLAKE2b-256 | cd8413c1112cd7f8293402242d474202a1a66a711b381f0347617d352c56693d |