Skip to main content

Kafka resource for Tamarco microservice framework.

Project description

tamarco-kafka

Build Status Coverage Quality Gate Status

Kafka resource for Tamarco microservice framework. It runs a confluent-kafka client in a thread.

This repository is a plugin for Tamarco, for more information go to Tamarco main repository.

Settings

This resource depends on the following configuration:

    system:
        resources:
            kafka:
                bootstrap_servers: kafka:9092

The bootstrap servers are the address of the members of a kafka cluster separated by coma.

Input and outputs

The inputs and outputs need to be declared in the resource.

Input

The input can be used with two different patterns, as decorator and as async stream.

This resource only supports balanced consumer groups with auto commit.

Async stream

This usage case uses the input as asynchronous iterator to consume the metric stream.

class MyMicroservice(Microservice):
    name = "input_example"

    metrics_input = KafkaInput(topic='metrics', codec=JsonCodec)
    kafka = KafkaResource(inputs=[metrics_input])

    @task
    async def metrics_consumer(self):
        async for metric in self.metrics_input:
            self.logger.info(f'Consumed message from metrics topic: {metric}')

Decorator

This usage case declares a function as handler of the messages, and the resource is going to open automatically a coroutine to consume each message.

class MyMicroservice(Microservice):
    name = "input_example"

    kafka = KafkaResource(inputs=[metrics_input])

    @KafkaInput(resource=kafka, topic='metrics', codec=JsonCodec)
    async def metrics_handler(self, message):
        self.logger.info(f'Consumed message from metrics topic: {message}')

Output

It is a Kafka producer very simple to use.

class MyMicroservice(Microservice):
    name = "output_example"
    metrics_output = KafkaOutput(topic='metrics', codec=JsonCodec)
    kafka = KafkaResource(outputs=[metrics_output])

    @task_timer(interval=1000, autostart=True)
    async def metrics_producer(self):
        metrics_message = {'metrics': {'cat': 'MEOW'}}
        await self.metrics_output.push(metrics_message)
        self.logger.info(f'Produced message {metrics_message} to metrics topic')

How to run the examples

To run them you just need to launch the docker-compose, install the requirements and run it.

pip install -r examples/requirements.txt
docker-compose up -d
python examples/microservice_stream_input.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tamarco-kafka-0.1.0.tar.gz (15.5 kB view details)

Uploaded Source

File details

Details for the file tamarco-kafka-0.1.0.tar.gz.

File metadata

  • Download URL: tamarco-kafka-0.1.0.tar.gz
  • Upload date:
  • Size: 15.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.1

File hashes

Hashes for tamarco-kafka-0.1.0.tar.gz
Algorithm Hash digest
SHA256 4f8bdef609aeff86dfbb47c3ebb8d26ed14744b622f466ead0cdf8c42e0c26db
MD5 9ecc49d4d1abc30bef0f5a60e9eaafa4
BLAKE2b-256 0a863dd8fdca104969e84cd4427a16ae9c8daf4bc4ef1a58e037b4c1bc1d2aa7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page