Safely consume and process data.
Project description
aiosafeconsumer is a library that provides abstractions and some implementations to consume data somewhere and process it.
Features:
Based on AsyncIO
Type annotated
Use logging with contextual information
Abstractions:
DataSource - waits for data and returns batch of records using Python generator
DataProcessor - accepts batch of records and precess it
DataTransformer - accepts batch of records and transform it and calls another processor to precess it. Extends DataProcessor
Worker - abstract worker. Do a long running task
ConsumerWorker - connects DataSource and DataProcessor. Extends Worker
DataWriter - base abstraction to perform data synchronization. Extends DataProcessor
Current implementations:
KafkaSource - read data from Kafka
RedisStreamSource - read data from Redis Streams
RedisWriter - synchronize data in Redis
ElasticsearchWriter - synchronize data in Elasticsearch
MongoDBWriter - synchronize data in MongoDB
PostgresWriter - synchronize data in PostgreSQL
WorkerPool - controller to setup and run workers in parallel. Can handle worker failures and restarts workers when it fails or exits.
Install:
pip install aiosafeconsumer
Install with Kafka:
pip install aiosafeconsumer[kafka]
Install with Redis:
pip install aiosafeconsumer[redis]
Install with Elasticsearch:
pip install aiosafeconsumer[elasticsearch]
Install with MongoDB:
pip install aiosafeconsumer[mongo]
Install with PostgreSQL:
pip install aiosafeconsumer[postgres]
Links:
Producer library: https://github.com/lostclus/django-kafka-streamer
Example application: https://github.com/lostclus/WeatherApp
TODO:
Enumerate IDs message type support in PostgreSQL writer
ClickHouse writer
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiosafeconsumer-0.0.8.tar.gz.
File metadata
- Download URL: aiosafeconsumer-0.0.8.tar.gz
- Upload date:
- Size: 39.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
688c524e829f68d6b6d7ec5eb262ff5d2e856b4253e1adc630780902020ddcb8
|
|
| MD5 |
5b91a7a09426fbd5ec248f80c577cac8
|
|
| BLAKE2b-256 |
0074fee98166755f0ea0049decaa182cf4eb97c3311dc683c15250ddc35adf75
|
File details
Details for the file aiosafeconsumer-0.0.8-py3-none-any.whl.
File metadata
- Download URL: aiosafeconsumer-0.0.8-py3-none-any.whl
- Upload date:
- Size: 26.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d3d1377c00506348a2cc38f8eb1f744bd3b60ba43bb004d3136de06e8bc64b40
|
|
| MD5 |
ed000563039c6569e9dc129e3fb11e9d
|
|
| BLAKE2b-256 |
03ca69fab7ba1cf5a6943e8dedb517cfd2f24a70d58ecf63c9f910f4daaff79f
|