Kafka resource for Tamarco microservice framework.
Project description
tamarco-kafka
Kafka resource for Tamarco microservice framework. It runs a confluent-kafka client in a thread.
This repository is a plugin for Tamarco, for more information go to Tamarco main repository.
Settings
This resource depends on the following configuration:
system:
resources:
kafka:
bootstrap_servers: kafka:9092
The bootstrap servers are the address of the members of a kafka cluster separated by coma.
Input and outputs
The inputs and outputs need to be declared in the resource.
Input
The input can be used with two different patterns, as decorator and as async stream.
This resource only supports balanced consumer groups with auto commit.
Async stream
This usage case uses the input as asynchronous iterator to consume the metric stream.
class MyMicroservice(Microservice):
name = "input_example"
metrics_input = KafkaInput(topic='metrics', codec=JsonCodec)
kafka = KafkaResource(inputs=[metrics_input])
@task
async def metrics_consumer(self):
async for metric in self.metrics_input:
self.logger.info(f'Consumed message from metrics topic: {metric}')
Decorator
This usage case declares a function as handler of the messages, and the resource is going to open automatically a coroutine to consume each message.
class MyMicroservice(Microservice):
name = "input_example"
kafka = KafkaResource(inputs=[metrics_input])
@KafkaInput(resource=kafka, topic='metrics', codec=JsonCodec)
async def metrics_handler(self, message):
self.logger.info(f'Consumed message from metrics topic: {message}')
Output
It is a Kafka producer very simple to use.
class MyMicroservice(Microservice):
name = "output_example"
metrics_output = KafkaOutput(topic='metrics', codec=JsonCodec)
kafka = KafkaResource(outputs=[metrics_output])
@task_timer(interval=1000, autostart=True)
async def metrics_producer(self):
metrics_message = {'metrics': {'cat': 'MEOW'}}
await self.metrics_output.push(metrics_message)
self.logger.info(f'Produced message {metrics_message} to metrics topic')
How to run the examples
To run them you just need to launch the docker-compose, install the requirements and run it.
pip install -r examples/requirements.txt
docker-compose up -d
python examples/microservice_stream_input.py
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file tamarco-kafka-0.1.0.tar.gz
.
File metadata
- Download URL: tamarco-kafka-0.1.0.tar.gz
- Upload date:
- Size: 15.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4f8bdef609aeff86dfbb47c3ebb8d26ed14744b622f466ead0cdf8c42e0c26db |
|
MD5 | 9ecc49d4d1abc30bef0f5a60e9eaafa4 |
|
BLAKE2b-256 | 0a863dd8fdca104969e84cd4427a16ae9c8daf4bc4ef1a58e037b4c1bc1d2aa7 |