2-way streams for your microservices
Project description
2-way streams for your microservices
What is a stream with feedbacks?
With Streamback you can implement the producer-consumer model but with a twist. The consumer can send feedback messages back to the producer via a feedback stream, making it work more like an RPC than the one way stream Kafka is intended to be used as.
How it works?
Streamback implements two different streams, the main stream and the feedback stream.
- Main stream: This is the stream that the producer sends messages to the consumer, it can be Kafka or Redis(or you can implement your own stream)
- Feedback stream: This is the stream the the consumer sends messages to the producer, Kafka is not recommended for this cause each time a new consumer is added to the cluster it causes a rebalance. Redis is the recommended stream for this.
Why not just use the conventional one way streams?
Streamback does not stop you from just using the main stream and not sending feedback messages, this way it is behaving just like a Kafka producer-consumer. Streamback just gives you the option to do so if you need it in order to make more simple the communication between your microservices.
Installation
pip install streamback
Examples
One way stream consumer-producer
Consumer
from streamback import Streamback, KafkaStream, RedisStream
streamback = Streamback(
"example_consumer_app",
main_stream=KafkaStream("kafka:9092"),
feedback_stream=RedisStream("redis:6379"),
)
@streamback.listen("test_hello")
def test_hello(context, message):
print("received: {value}".format(value=message.value))
streamback.start()
Producer
from streamback import Streamback, KafkaStream, RedisStream
streamback = Streamback(
"example_producer_app",
main_stream=KafkaStream("kafka:9092"),
feedback_stream=RedisStream("redis:6379"),
)
streamback.send("test_hello", "Hello world!")
2-way RPC like communication
Consumer
from streamback import Streamback, KafkaStream, RedisStream
streamback = Streamback(
"example_consumer_app",
main_stream=KafkaStream("kafka:9092"),
feedback_stream=RedisStream("redis:6379"),
)
@streamback.listen("test_hello_stream")
def test_hello_stream(context, message):
print("received: {value}".format(value=message.value))
message.respond("Hello from the consumer!")
streamback.start()
Producer
from streamback import Streamback, KafkaStream, RedisStream
streamback = Streamback(
"example_producer_app",
main_stream=KafkaStream("kafka:9092"),
feedback_stream=RedisStream("redis:6379"),
)
message = Streamback.send("test_hello_stream", "Hello world!").read(timeout=10)
print(message)
2-way RPC like communication with steaming feedback messages
Consumer
from streamback import Streamback, KafkaStream, RedisStream
import time
streamback = Streamback(
"example_consumer_app",
main_stream=KafkaStream("kafka:9092"),
feedback_stream=RedisStream("redis:6379"),
)
@streamback.listen("test_hello_stream")
def test_hello_stream(context, message):
print("received: {value}".format(value=message.value))
for i in range(10):
message.respond("Hello #{i} from the consumer!".format(i=i))
time.sleep(2)
streamback.start()
Producer
from streamback import Streamback, KafkaStream, RedisStream
streamback = Streamback(
"example_producer_app",
main_stream=KafkaStream("kafka:9092"),
feedback_stream=RedisStream("redis:6379"),
)
for message in Streamback.send("test_hello_stream", "Hello world!").stream():
print(message)
## OR
stream = Streamback.send("test_hello_stream", "Hello world!")
message1 = stream.read()
message2 = stream.read()
message3 = stream.read()
Class based consumers
@streamback.listen("new_log")
class LogsConsumer(Listener):
logs = []
def consume(self, context, message):
self.logs.append(message.value)
if len(self.logs) > 100:
self.flush_logs()
def flush_logs(self):
database_commit(self.logs)
Router
The StreambackRouter helps with spliting the consumer logic into different files, it is not required to use it but it helps
some_consumers.py
from streamback import Router
router = Router()
@router.listen("test_hello")
def test_hello(context, message):
print("received: {value}".format(value=message.value))
my_consumer_app.py
from streamback import Streamback, KafkaStream, RedisStream
from some_consumers import router as some_consumers_router
streamback = Streamback(
"example_consumer_app",
main_stream=KafkaStream("kafka:9092"),
feedback_stream=RedisStream("redis:6379"),
)
streamback.include_router(some_consumers_router)
streamback.start()
Streams connection string
You can use the following format for the main and feedback streams for a more compact way of defining the streams
from streamback import Streamback
streamback = Streamback(
"example_consumer_app",
streams="main=kafka://kafka:9092&feedback=redis://redis:6379"
)
Why python 2.7 compatible?
Streamback has been created for usage in car.gr's systems which has some legacy python 2.7 services. We are are planing to move Streamback to python >3.7 in some later version but for now the python 2.7 support was crucial and thus the async/await support was sacrificed.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamback-0.0.20.tar.gz.
File metadata
- Download URL: streamback-0.0.20.tar.gz
- Upload date:
- Size: 12.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
217ef92597b40e4dc918e6e73e1bb4c519edabd0d321cbb24598d9c0685ee0ee
|
|
| MD5 |
44cca6868767f8c6804d46f3934fed7f
|
|
| BLAKE2b-256 |
1d5a047b3c062905c817886e98416ea2f1280d4f10a37de343443f49e4e9df7d
|
File details
Details for the file streamback-0.0.20-py3-none-any.whl.
File metadata
- Download URL: streamback-0.0.20-py3-none-any.whl
- Upload date:
- Size: 13.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
296e583c58c7401065dfb7dd17a5aa1aafb17626de5db0b4dd94bf74e41aaec4
|
|
| MD5 |
a6297f4e159e7eb60b71677c072e10ba
|
|
| BLAKE2b-256 |
aed8dbb827f4396c9d5b2bf1158ed3272a4735a974867b8febd62ab072bb6ff7
|