An async Kafka Python Framework based on FastAPI and Confluent Kafka
Project description
CitizenK
CitizienK is a simple but powerful Python Library for developing reactive async Kafka microservices, built on top of Confluent Kafka Python, FastAPI and Pydantic.
CitizenK Replicator is an additional tool that we developed using the same technology to simplify data transfer between production and staging environments. It's not a substitution for Confluent's replicator which is a much more robust tool for replicating data between multiple production environments
How we got here...
We are a Python shop. We develop web services in Python, write ETL code in Python, and obviously use Python for data science. A few years ago, when we started working on Lanternn, we were looking for a python library that can help us build distributed, scalable processing pipelines on top of Kafka with stateless and stateful microservices. The best solution that we could find at the time was Faust. Faust is a stream processing library, porting ideas from Kafka Streams to Python. Additionally, Faust includes a powerful web server, schema validation and management, and is based on an agents/actors approach. With so many goodies, how could we resist?
We built dozens of services using Faust, and all in all, we were pretty happy with it. however, over time we realized that Kafka Streams is not really working for us, it's just too complex to manage and other alternatives for state management like Redis are simpler. We were also worried that Faust without the spirit of its creator Ask Solem, and its underlying Kafka library: aiokafka and python-kafka don't have a large enough community to support the stability issues that we had experienced. In parallel, frameworks like FastAPI and Confluent Kafka that we used anyway had large community backing, and we felt like we can use them together to build an alternative foundation for our pipelines.
The name CitizenK was chosen to reflect the idea that Python should be a first-class citizen in the Kafka world.
It is also related to Kafka's most famous novel: The Trial, which tells the story of Josef K. a man arrested and prosecuted by a remote, inaccessible authority, with the nature of his crime revealed neither to him nor to the reader.
Existing tools
- Faust
- Fastkafka
Tutorial
You can see an example of how to use CitizenK in the demo app
Creating a CitizienK app
First, we create a CitizenK app similar to how we create a FastAPI app, but with additional arguments:
- kafka_config: provides configuration for connecting and configuring the Kafka client
- app_name: Mainly used as the consumer group name
- app_type: SINK (consumer only), SOURCE(producer only) or TRANSFORM(producer-consumer)
- auto_generate_apis: Will auto generate FastAPI to consume and produce to workers and topics
- agents_in_thread: Will run the consumer agents in a thread and not in an async loop
app = CitizenK(
kafka_config=config.source_kafka,
app_name="citizenk",
app_type=AppType.TRANSFORM,
debug=True,
title="CitizenK Demo App",
auto_generate_apis=True,
agents_in_thread=config.AGENTS_IN_THREAD,
api_router_prefix=prefix,
api_port=config.API_PORT,
schema_registry_url=config.KAFKA_SOURCE_SCHEMA_REGISTRY,
version=config.VERSION,
openapi_url=prefix + "/openapi.json",
docs_url=prefix + "/docs",
license_info={
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html",
},
)
Creating CitizienK topics
Next, we create topics for the app and define the model for the topics using Pydantic
Topics can be either INPUT, OUTPUT or BIDIR
class Video(JSONSchema):
camera_id: int
path: str
timestamp: datetime
class ProcessedVideo(JSONSchema):
camera_id: int
path: str
timestamp: datetime
valid: bool
t1 = app.topic(name="B", value_type=Video, topic_dir=TopicDir.BIDIR)
t2 = app.topic(name="C", value_type=ProcessedVideo, topic_dir=TopicDir.BIDIR)
t3 = app.topic(name="D", value_type=ProcessedVideo, topic_dir=TopicDir.OUTPUT)
Creating CitizienK agents
And lastly, we create gents that process the Kafka messages.
Agents can listen to multiple topics and accept either values or the entire Kafka event (key, value, offset, partition, timestamp...)
@app.agent(topics=t1, batch_size=100)
async def process_videos_t1(events: List[KafkaEvent]):
# Process incoming video
for event in events:
camera_id = event.value.camera_id
video_counts[camera_id] += 1
v = ProcessedVideo(
camera_id=camera_id,
path=event.value.path,
timestamp=event.value.timestamp,
valid=bool(camera_id % 2),
)
t2.send(value=v, key=str(v.camera_id))
@app.agent(topics=t2, batch_size=100)
async def process_videos_t2(values: List[BaseModel]):
# Process incoming video
for value in values:
if value.valid:
t3.send(value=value, key=str(value.camera_id))
Auto endpoints
To help debug and evaluate the service, CitizenK automatically creates web endpoints that help you send messages to topics and agents.
- info: get service info
- topics: send events to topics
- agents: send events directly to agents, bypassing topics
- stats: get Kafka stats for producer and consumer
Creating additional CitizienK endpoints
Just like any other FastAPI app, you can create get, post and put endpoints that either interact with Kafka or perform some other tasks, non Kafka related
@router.post("/events", response_class=JSONResponse)
async def produce_video_events(
values: List[Video],
topic: str = Query(),
):
"""Sends events to the given topic"""
if topic not in app.topics:
raise HTTPException(status_code=400, detail="Topic not supported by app")
t = app.topics[topic]
for v in values:
t.send(value=v, key=str(v.camera_id))
return {"status": "ok"}
@router.get("/topics", response_class=JSONResponse)
async def get_source_topics():
"""Returns the list of topics from the source kafka"""
admin = KafkaAdapter(config.source_kafka)
topics = sorted(list(admin.get_all_broker_topics().keys()))
return {"topics": topics}
Multiple workers behind a load balancer
CitizenK includes two special decorators for scenarios where the service has multiple workers behind a load balancer and the web request needs to reach a specific worker that holds a partition.
- topic_router: forwards the request based on the topic and key (JSON / HTML)
- broadcast_router: aggregates the responses from all workers into a single JSON
Both routers support GET, POST, PUT and DELETE commands
@router.get("/topic_test", response_class=JSONResponse)
@app.topic_router(topic=t1, match_info="camera_id")
async def test_topic_router(request: Request, camera_id: int):
"""Returns the list of groups from the target kafka"""
return {"key": camera_id, "count": video_counts[camera_id]}
@router.get("/broadcast_test", response_class=JSONResponse)
@app.broadcast_router()
async def test_broadcast_router(request: Request):
"""Returns the list of groups from the target kafka"""
return video_counts
Websocket
CitizenK support for Websocket agents
@app.agent(topics=t2, batch_size=100, websocket_route=prefix + "/ws")
async def websocket_agent(values: List[BaseModel]) -> str:
values = [json.loads(v.json()) for v in values if not v.valid]
return json.dumps(values, indent=4)
This agent exposes a WebSocket endpoint for one or more clients to connect to. It then processes incoming Kafka messages from topic t2 and sends the returned string value to all the existing live WebSocket "/ws" connections. The main use case for this is to bridge between Kafka and Websocket. One possible use case for this feature is to send filtered Kafka events to a web app or mobile app.
The other direction frontend --> Kafka is probably easier to implement with a normal REST post endpoint and is not supported yet.
Things to be aware of...
CitizenK is a single-threaded async app. i.e. If a coroutine spends too much time in processing without awaiting IO, it will block other coroutines from running. Specifically, when using a load balancer with health checks, it's important to pay attention to the time between health checks and see that it's higher than the longest-running agent. Fixed using:agents_in_thread
To help tune the service. CitizenK includes the concept of batch size:i.e. how many events to consume and process every batch across all agents.
Additionally like any other Kafka service. it's important to tune several kafka consumer and producer configs. Specifically ensure rebalancing is not triggered unintentionally:
Consumer:
- fetch.max.bytes (50 Mbytes): The maximum amount of data the server should return for a fetch request. Reduce if processing each record takes significant time.
- max.poll.records(500): The maximum number of records returned in a single call to poll().
- max.poll.interval.ms (5 min): The maximum delay between invocations of poll() when using consumer group management
Producer:
- linger.ms(0): Important to set to 0/5/10/50/200 on moderate/high load
- batch.size(16K): Increase if sending large buffers to Kafka
Both:
- compression.type(none): gzip, snappy, or lz4
More explanation in here: Solving My Weird Kafka Rebalancing Problems & Explaining What Is Happening and Why?
Limitations
At the moment the library only supports JSON schema
CitizenK Replicator
Scenarios
Staging environment
-
I have a staging environment and I want to replicate some production topics to it
-
At some point I want to produce the staging topics in the staging environment using a staging service. So I switch off the replication and populate the same staging topic with real data.
-
When I finish the testing in staging, I want to switch back to production, so that I can save on costs.
-
If the workload is high, I want to replicate most (i.e. 90%) of the messages from production and only produce just a little (i.e. 10%) of the data from staging. This way in the same topic, I will have mixed data and potential schema from the two environments
-
When switching between environments (i.e. on configuration change), I want to change the offset to the latest on the new topic, so that the handover is not too chaotic
-
I also want to delete the consumer group of the service in staging, so that when it come back up again, it won't see a lag.
-
Additionally sometimes, I want to migrate data between production and staging due to schema change or different identities.
Dev environment + live data
-
When I test a service locally or in a dev environment, possibly with a local Kafka, I want the local Kafka to have real data, so that I can test the service for a long period of time with live data.
-
Theoretically, I can connect the dev service to the staging or production Kafka cluster, however, this presents a stability/security risk to the remote cluster. There is also a risk that the service will join a consumer group and participate accidentally in the remote workload. This approach also prevents parallel testing as there can be a conflict between the consumers.
-
So one solution would be to replicate the topics from staging to the local/dev Kafka, maybe with some filtering to reduce the load, so that the local service is not overwhelmed with too much data
On premise kafka -- cloud kafka bridge
-
I have a local Kafka and I want to replicate some topics to the remote cloud
-
You can use this tool for this scenario, however Confluent replicator or Kafka MirrorMaker are probably more suitable
Dev environment + replayed data
-
When I test a service locally or in a dev environment, possibly with a local Kafka, I want the local Kafka to replay historical/simulated messages from a file.
-
this scenario is a bit different from the previous ones, as there is no Kafka consumer, just a producer. And you can say that it is more of a tool than a service.
-
The messages are read from a file with a timestamp (one file for each topic), and injected into the right topic with the correct timing keeping the same gap between now, and the initial timestamp.
Cluster -- Cluster replication
- You can use this tool for this scenario, however Confluent replicator or Kafka MirrorMaker are probably more suitable
Existing tools
- Confluent replicator: Looks like a good tool, but not open source, expensive
- Kafka Mirror Maker: Open source but doesn't support filtering
- kcat: Nice tool, but not for these scenarios
Implementation details
- Using containerised python
- Based on Confluent Kafka API + FastAP
- Does not create the topics or partitions automatically. It assumes they exists and configured
- Deployed as a distributed service
- Filter based on JMESPath for JSON messages
- Allow two consumer options: with consumer group, without consumer group
- write code following DDD principles
Configuration
- LOG_LEVEL: service log level
- JSON_LOGGING: Use json logging
- API_PREFIX: API prefix
- FILE_DATA_PATH: Location of json files when reading and writing topics from file
- KAFKA_SOURCE_SERVER_URL: Source Bootstrap Servers
- KAFKA_SOURCE_USE_TLS: Enable Source SSL: 0,1
- KAFKA_SOURCE_SASL_MECHANISM: Source SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
- KAFKA_SOURCE_SASL_USERNAME: Source SASL username
- KAFKA_SOURCE_SASL_PASSWORD: Source SASL password
- KAFKA_SOURCE_GROUP_NAME: Source group name, or leave empty to consume without a consumer group
- KAFKA_SOURCE_EXTRA_CONFIG_<KAFKA_CONFIG_KEY>: Any valid kafka consumer config (uppercase, replace . with _)
- KAFKA_TARGET_SERVER_URL: Target Bootstrap Servers
- KAFKA_TARGET_USE_TLS: Enable Target SSL
- KAFKA_TARGET_SASL_MECHANISM: Target SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
- KAFKA_TARGET_SASL_USERNAME: Target SASL username
- KAFKA_TARGET_SASL_PASSWORD: Target SASL password
- KAFKA_TARGET_EXTRA_CONFIG_<KAFKA_CONFIG_KEY>: Any valid kafka producer config (uppercase, replace . with _)
- READ_MAPPINGS_EVERY_SECONDS: How often to check for new mappings in the file system
- CACULATE_STATS_EVERY_SECONDS: How often to calculate stats
- DELETE_GROUPS_EVERY_SECONDS: How often to check for new group deletion
Current solution limitations
- Currently only supports JSON schema.
API
User Interface
The user interface allows you to add a new mapping and edit/delete an existing mapping.
Usage
Provide a JSON list of topic mappings in this format:
[
{
"name": "File A to B",
"source_topic_name": "A",
"target_topic_name": "B",
"source_is_file": true
},
{
"name": "Topic B to C",
"source_topic_name": "B",
"target_topic_name": "C"
},
{
"name": "Topic C to D Using filter",
"source_topic_name": "C",
"target_topic_name": "D",
"valid_jmespath": "key == 'hello' && value.msg == 'world'",
"enabled": true
},
{
"name": "TopicCtoD",
"source_topic_name": "C",
"target_topic_name": "D",
"enabled": false,
"target_service_consumer_group": "service"
},
{
"name": "Topic D to File E",
"source_topic_name": "D",
"target_topic_name": "E",
"target_is_file": true
},
{
"name": "Disabled Topic U to File V",
"source_topic_name": "U",
"target_topic_name": "V",
"target_is_file": true,
"enabled": false
}
]
- name: The mapping name
- enabled: Enable / disable mapping
- source_topic_name: The topic to read from in the source cluster
- target_topic_name: The topic to write to in the target cluster
- valid_jmespath: filter criteria
- source_is_file: If the source is a json file
- target_is_file: If the target is a json file
- target_service_consumer_group: The service consumer group to delete when replication is enabled
Topic Level Mapping
Topic level mappings allows mapping of key/values when replicating a topic. This might be useful if for example the schema / enums / keys are different between the environments
To support this, the replicator supports value mappings for each topic that it consumes from the source in the following JSON format:
There are two mapping formats:
- value.payload.product_id : map source product_id to target product_id
- key:partition: map source key to target partition (drop entire msg if partition equals -1000)
{
"key":{
"1":10,
"2":12
},
"value.payload.product_id":{
"1001":1,
"1002":12,
"1003":14
},
"value.payload.user_name":{
"A":"A name",
"B":"B name",
"C":"C name"
},
"key:partition":{
"1":0,
"2":0,
"3":-1000,
"4":1,
"5":1,
"6":1,
}
}
Stats
Returns a list of JSON stats for each mapping in the following format:
{
"time": "2023-05-25 18:20:43.875557",
"started": "2023-05-25 08:08:35.728313",
"queue": 180,
"mappings": [
{
"name": "Topic B to C",
"source_topic_name": "B",
"target_topic_name": "C",
"valid_jmespath": null,
"target_service_consumer_group": null,
"consumer_group_up": false,
"assignments": [0,1,2],
"lag": 1258,
"source_count": 27739,
"target_count": 27739
}
]
}
Grafana Integration
To view the stats in Grafana, use the Infinity data source with the following settings:
Consumer API
To simplify debug and to support other use cases, the replicator also includes an end point to consume messages from a given topic.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file citizenk-0.1.48.tar.gz
.
File metadata
- Download URL: citizenk-0.1.48.tar.gz
- Upload date:
- Size: 28.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.4.2 CPython/3.9.4 Darwin/22.4.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6dc9b4b2585df9fc53f0e580372586a124e5184f4c2b32e5416ce7436ef34bfb |
|
MD5 | 75caef13a516bbb14ff039e1b0112687 |
|
BLAKE2b-256 | da4de6ba8f37800bf16e9b4150a1529a60b2b87777505a5510ae6fefa6708cd7 |
File details
Details for the file citizenk-0.1.48-py3-none-any.whl
.
File metadata
- Download URL: citizenk-0.1.48-py3-none-any.whl
- Upload date:
- Size: 24.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.4.2 CPython/3.9.4 Darwin/22.4.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f00422c4eb427c6f76b22029e7351f425e76f847bea8846ba974c52c14eae953 |
|
MD5 | 284ef491599ec8a9b1471c0d41cbdeea |
|
BLAKE2b-256 | 47de6e5650a08234f914f841cfebf11b1839f775057c136660149b350d2eb0b0 |