Skip to main content

Two way streams for your microservices

Project description

2-way streams for your microservices

What is a stream with feedbacks?


With Streamback you can implement the producer-consumer model but with a twist. The consumer can send feedback messages back to the producer via a feedback stream, making it work more like an RPC than the one way stream Kafka is intended to be used as.

How it works?


Streamback implements two different streams, the main stream and the feedback stream.

  • Main stream: This is the kafka stream that the producer sends messages to the consumer.
  • Feedback stream: This is the stream that the consumer sends messages to the producer. Redis is used for this stream for its
  • simplicity and speed.

Why not just use the conventional one way streams?


Streamback does not stop you from just using the main stream and not sending feedback messages, this way it is behaving just like a Kafka producer-consumer. Streamback just gives you the option to do so if you need it in order to make more simple the communication between your microservices.

Installation


pip install streamback

Examples

One way stream consumer-producer

Consumer

from streamback import Streamback

streamback = Streamback(
    "example_consumer_app",
    streams="main=kafka://kafka:9092&feedback=redis://redis:6379"
)


@streamback.listen("test_hello")
def test_hello(context, message):
    print("received: {value}".format(value=message.value))


streamback.start()

Producer

from streamback import Streamback

streamback = Streamback(
    "example_producer_app",
    streams="main=kafka://kafka:9092&feedback=redis://redis:6379"
)

streamback.send("test_hello", "Hello world!")

2-way RPC like communication

Consumer

from streamback import Streamback

streamback = Streamback(
    "example_consumer_app",
    streams="main=kafka://kafka:9092&feedback=redis://redis:6379"
)


@streamback.listen("test_hello_stream")
def test_hello_stream(context, message):
    print("received: {value}".format(value=message.value))
    message.respond("Hello from the consumer!")


streamback.start()

Producer

from streamback import Streamback

streamback = Streamback(
    "example_producer_app",
    streams="main=kafka://kafka:9092&feedback=redis://redis:6379"
)

message = streamback.send("test_hello_stream", "Hello world!").read(timeout=10)
print(message)

2-way RPC like communication with steaming feedback messages

Consumer

from streamback import Streamback, KafkaStream, RedisStream
import time

streamback = Streamback(
    "example_consumer_app",
    streams="main=kafka://kafka:9092&feedback=redis://redis:6379"
)


@streamback.listen("test_hello_stream")
def test_hello_stream(context, message):
    print("received: {value}".format(value=message.value))
    for i in range(10):
        message.respond("Hello #{i} from the consumer!".format(i=i))
        time.sleep(2)


streamback.start()

Producer

from streamback import Streamback

streamback = Streamback(
    "example_producer_app",
    streams="main=kafka://kafka:9092&feedback=redis://redis:6379"
)

for message in streamback.send("test_hello_stream", "Hello world!").stream():
    print(message)

## OR

stream = streamback.send("test_hello_stream", "Hello world!")

message1 = stream.read()
message2 = stream.read()
message3 = stream.read()

Class based consumers

@streamback.listen("new_log")
class LogsConsumer(Listener):
    logs = []

    def consume(self, context, message):
        self.logs.append(message.value)
        if len(self.logs) > 100:
            self.flush_logs()

    def flush_logs(self):
        database_commit(self.logs)

Router

The StreambackRouter helps with spliting the consumer logic into different files, it is not required to use it but it helps

some_consumers.py

from streamback import Router

router = Router()


@router.listen("test_hello")
def test_hello(context, message):
    print("received: {value}".format(value=message.value))

my_consumer_app.py

from streamback import Streamback

from some_consumers import router as some_consumers_router

streamback = Streamback(
    "example_consumer_app",
    streams="main=kafka://kafka:9092&feedback=redis://redis:6379"
)

streamback.include_router(some_consumers_router)

streamback.start()

Handling consume exceptions

You can pass the on_exception callback upon creating the Streamback object to handle exceptions that occur during the consumption of messages by the listeners

from streamback import Streamback

def on_exception(listener, context, message, exception):
    print("on_exception:", listener, context, message, exception)

streamback = Streamback(
    "example_consumer_app",
    streams="main=kafka://kafka:9092&feedback=redis://redis:6379",
    on_exception=on_exception
)

Why python 2.7 compatible?

Streamback has been created for usage in car.gr's systems which has some legacy python 2.7 services. We are are planing to move Streamback to python >3.7 in some later version but for now the python 2.7 support was crucial and thus the async/await support was sacrificed.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamback-0.0.33.tar.gz (13.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamback-0.0.33-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file streamback-0.0.33.tar.gz.

File metadata

  • Download URL: streamback-0.0.33.tar.gz
  • Upload date:
  • Size: 13.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.5

File hashes

Hashes for streamback-0.0.33.tar.gz
Algorithm Hash digest
SHA256 3232cf5e8565f9ca5aa1171c51b5942c758a2dd671dc04b5118dd81189acb851
MD5 61ad274721fd56eca917d9c94d300ee9
BLAKE2b-256 3032470fd3f10dc15ff3cb2bde5e165219072d3ed1fbcee04d8dd72c73f1a5bb

See more details on using hashes here.

File details

Details for the file streamback-0.0.33-py3-none-any.whl.

File metadata

  • Download URL: streamback-0.0.33-py3-none-any.whl
  • Upload date:
  • Size: 14.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.5

File hashes

Hashes for streamback-0.0.33-py3-none-any.whl
Algorithm Hash digest
SHA256 f33c09ae3fd8ffc931f193ea0f17ad4eba4451bb0370919a9bfa0fa8b4263846
MD5 58d1fc1b37935d5a0f76a361e42913d6
BLAKE2b-256 15eafcb6f9d2524a8bd8ba575330b136aa6d6e0a8c7aea483f9b150a0ebaee68

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page