Skip to main content

No project description provided

Project description

Romeways kafka

This is an extra package for romeways for more details access the romeways Github page.

This package use aiokafka to connect kafka server.

Configs

Queue

from dataclasses import dataclass

from romeways import GenericQueueConfig


@dataclass(slots=True, frozen=True)
class KafkaQueueConfig(GenericQueueConfig):
    """
    topic: str See aiokafka.AIOKafkaConsumer doc
     -> https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer
    group_id: str See aiokafka.AIOKafkaConsumer doc.
     -> https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer
    """
    
    topic: str
    group_id: str
    connector_name: str
    frequency: float
    max_chunk_size: int
    sequential: bool

Connector

from dataclasses import dataclass

from romeways import GenericConnectorConfig


@dataclass(slots=True, frozen=True)
class KafkaConnectorConfig(GenericConnectorConfig):
    """
    bootstrap_server: str | list[str] See aiokafka.AIOKafkaConsumer doc
     -> https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer
    client_id: str See aiokafka.AIOKafkaConsumer doc.
     -> https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer
    """
        
    bootstrap_server: str
    client_id: str

Use case

import asyncio

import romeways

# Create a queue config
config_q = romeways.KafkaQueueConfig(
    topic="net_topic",
    group_id="my-project",
    connector_name="kafka-dev1",
    frequency=1,
    max_chunk_size=10,
    sequential=False,
)

# Register a controller/consumer for the queue name
@romeways.queue_consumer(queue_name="queue.payment.done", config=config_q)
async def controller(message: romeways.Message):
    print(message)

config_p = romeways.KafkaConnectorConfig(
    connector_name="kafka-dev1", 
    bootstrap_server="localhost:9094", 
    client_id="1"
)

# Register a connector
romeways.connector_register(
    connector=romeways.KafkaQueueConnector, config=config_p, spawn_process=True
)

asyncio.run(romeways.start())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

romeways_kafka_queue-0.1.0.tar.gz (6.6 kB view details)

Uploaded Source

Built Distribution

romeways_kafka_queue-0.1.0-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file romeways_kafka_queue-0.1.0.tar.gz.

File metadata

  • Download URL: romeways_kafka_queue-0.1.0.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.0 Linux/6.1.0-13-amd64

File hashes

Hashes for romeways_kafka_queue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 dc94654f6f42e9e006347d29116c1d05d7d14ba0cb52817370d4419f65c18cbb
MD5 a33e8d407ba9d53c8630c8c22d855a12
BLAKE2b-256 12a742645e0e8b16939d33f5bcc8e5133ad45deac551e837840760cf8dba69ba

See more details on using hashes here.

File details

Details for the file romeways_kafka_queue-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for romeways_kafka_queue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ee4ea9f6c819101ce0378b8b86fdd10315dc6963d26314d87da151046c298b54
MD5 f3364ca63fefae10845cd1f05141df97
BLAKE2b-256 6c6f18d51f1c36aa78dd8c771275626a8dff48f2e081968ecd361346a680574e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page