Skip to main content

No project description provided

Project description

Romeways kafka

This is an extra package for romeways for more details access the romeways Github page.

This package use aiokafka to connect kafka server.

Configs

Queue

from dataclasses import dataclass

from romeways import GenericQueueConfig


@dataclass(slots=True, frozen=True)
class KafkaQueueConfig(GenericQueueConfig):
    """
    topic: str See aiokafka.AIOKafkaConsumer doc
     -> https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer
    group_id: str See aiokafka.AIOKafkaConsumer doc.
     -> https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer
    """
    
    topic: str
    group_id: str
    connector_name: str
    frequency: float
    max_chunk_size: int
    sequential: bool

Connector

from dataclasses import dataclass

from romeways import GenericConnectorConfig


@dataclass(slots=True, frozen=True)
class KafkaConnectorConfig(GenericConnectorConfig):
    """
    bootstrap_server: str | list[str] See aiokafka.AIOKafkaConsumer doc
     -> https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer
    client_id: str See aiokafka.AIOKafkaConsumer doc.
     -> https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer
    """
        
    bootstrap_server: str
    client_id: str

Use case

import asyncio

import romeways

# Create a queue config
config_q = romeways.KafkaQueueConfig(
    topic="net_topic",
    group_id="my-project",
    connector_name="kafka-dev1",
    frequency=1,
    max_chunk_size=10,
    sequential=False,
)

# Register a controller/consumer for the queue name
@romeways.queue_consumer(queue_name="queue.payment.done", config=config_q)
async def controller(message: romeways.Message):
    print(message)

config_p = romeways.KafkaConnectorConfig(
    connector_name="kafka-dev1", 
    bootstrap_server="localhost:9094", 
    client_id="1"
)

# Register a connector
romeways.connector_register(
    connector=romeways.KafkaQueueConnector, config=config_p, spawn_process=True
)

asyncio.run(romeways.start())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

romeways_kafka_queue-0.1.0.tar.gz (6.6 kB view hashes)

Uploaded Source

Built Distribution

romeways_kafka_queue-0.1.0-py3-none-any.whl (9.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page