Skip to main content

2-way streams for your microservices

Project description

2-way streams for your microservices

What is a 2-way stream?


With Streamback you can implement the producer-consumer model but with a twist. The consumer can send feedback messages back to the producer via a feedback stream, making it work more like an RPC than the one way stream Kafka is intended to be used as.

How it works?


Streamback implements two different streams, the main stream and the feedback stream.

  • Main stream: This is the stream that the producer sends messages to the consumer, it can be Kafka or Redis(or you can implement your own stream)
  • Feedback stream: This is the stream the the consumer sends messages to the producer, Kafka is not recommended for this cause each time a new consumer is added to the cluster it causes a rebalance. Redis is the recommended stream for this.

Why not just use the conventional one way streams?


Streamback does not stop you from just using the main stream and not sending feedback messages, this way it is behaving just like a Kafka producer-consumer. Streamback just gives you the option to do so if you need it in order to make more simple the communication between your microservices.

Installation


pip install streamback

Examples

One way stream consumer-producer

Consumer

from streamback import Streamback, KafkaStream, RedisStream

streamback = Streamback(
    "example_consumer_app",
    main_stream=KafkaStream("kafka:9092"),
    feedback_stream=RedisStream("redis:6379"),
)

@streamback.listen("test_hello")
def test_hello(context, message):
    print("received: {value}".format(value=message.value))

streamback.start()

Producer

from streamback import Streamback, KafkaStream, RedisStream

streamback = Streamback(
    "example_consumer_app",
    main_stream=KafkaStream("kafka:9092"),
    feedback_stream=RedisStream("redis:6379"),
)

streamback.send("test_hello", "Hello world!")

2-way RPC like communication

Consumer

from streamback import Streamback, KafkaStream, RedisStream

streamback = Streamback(
    "example_consumer_app",
    main_stream=KafkaStream("kafka:9092"),
    feedback_stream=RedisStream("redis:6379"),
)

@streamback.listen("test_hello_stream")
def test_hello_stream(context, message):
    print("received: {value}".format(value=message.value))
    message.respond("Hello from the consumer!")

streamback.start()

Producer

from streamback import Streamback, KafkaStream, RedisStream

streamback = Streamback(
    "example_producer_app",
    main_stream=KafkaStream("kafka:9092"),
    feedback_stream=RedisStream("redis:6379"),
)

message = Streamback.send("test_hello_stream", "Hello world!").read(timeout=10)
print(message)

2-way RPC like communication with steaming feedback messages

Consumer

from streamback import Streamback, KafkaStream, RedisStream
import time

streamback = Streamback(
    "example_consumer_app",
    main_stream=KafkaStream("kafka:9092"),
    feedback_stream=RedisStream("redis:6379"),
)

@streamback.listen("test_hello_stream")
def test_hello_stream(context, message):
    print("received: {value}".format(value=message.value))
    for i in range(10):
        message.respond("Hello #{i} from the consumer!".format(i=i))
        time.sleep(2)

streamback.start()

Producer

from streamback import Streamback, KafkaStream, RedisStream

streamback = Streamback(
    "example_consumer_app",
    main_stream=KafkaStream("kafka:9092"),
    feedback_stream=RedisStream("redis:6379"),
)

for message in Streamback.send("test_hello_stream", "Hello world!").stream():
    print(message)

## OR

stream = Streamback.send("test_hello_stream", "Hello world!")

message1 = stream.read()
message2 = stream.read()
message3 = stream.read()

Router

The StreambackRouter helps with spliting the consumer logic into different files, it is not required to use it but it helps

some_consumers.py

from streamback import StreambackRouter

router = StreambackRouter()

@router.listen("test_hello")
def test_hello(context, message):
    print("received: {value}".format(value=message.value))

my_consumer_app.py

from streamback import Streamback, KafkaStream, RedisStream
from streamback.router import StreambackRouter

from some_consumers import router as some_consumers_router

streamback = Streamback(
    "example_consumer_app",
    main_stream=KafkaStream("kafka:9092"),
    feedback_stream=RedisStream("redis:6379"),
)

streamback.include_router(some_consumers_router)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamback-0.0.13.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamback-0.0.13-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file streamback-0.0.13.tar.gz.

File metadata

  • Download URL: streamback-0.0.13.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.5

File hashes

Hashes for streamback-0.0.13.tar.gz
Algorithm Hash digest
SHA256 f62fc9f54123fb5e6227a0e8c9db30acae7d46a5bf853831cc5b9169051f4945
MD5 894d4d666ecfa87df3ea98c1514ffdae
BLAKE2b-256 1e6d782f479d575ca6edde2ed581b8ed25a5c1a8e10ad6dd4102d03c561784e5

See more details on using hashes here.

File details

Details for the file streamback-0.0.13-py3-none-any.whl.

File metadata

  • Download URL: streamback-0.0.13-py3-none-any.whl
  • Upload date:
  • Size: 11.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.5

File hashes

Hashes for streamback-0.0.13-py3-none-any.whl
Algorithm Hash digest
SHA256 2ec24a2585087b4f8371828c8cf4edaca3f18f8d06f8cb6b52fbeb6c9e1a75c3
MD5 ee08c561dfc93cf47468ea36a0487eb9
BLAKE2b-256 2d44c35aa63c3e1b9e166fa0b6fd31ac5712e104a04df7a0594358987c83b3b0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page