A toolkit for creating lightweight event processing pipelines
Project description
event-collector
Event-collector is a toolkit for creating lightweight event processing pipelines.
This library implements an in-memory competing consumer queue using only Python standard library functions and asyncio primitives. The collector queue runs as asyncio background tasks and can autoscale according to load. Using collector queues you can set up data processing workflows quickly without having to deploy a heavy stack of external services.
Requirements
>= Python 3.13
The only external dependencies are the following:
orjsonstructlog
Documentation
How to install
Using pip:
pip install event-collector
Or, if using uv:
uv add event-collector
Quickstart example
To run a collector queue with an event collector, we need to perform the following steps:
- create a
Collectorwith an event handling function to process events; - configure a
CollectorQueueto run background consumer tasks that will wait for input events; - run the collector queue and publish events to the queue.
"""
example.py
"""
import asyncio
import json
import sys
from event_collector import (
Collector,
CollectorQueue,
Context,
Payload,
Ok,
Result,
)
async def event_handler(payload: Payload, ctx: Context) -> Result:
"""
Extract the input data and return a result dict wrapped in
an "Ok" Result object. Also simulate a backend processing delay.
"""
data = payload.value
result = {
"data": data,
"status": "OK",
}
await asyncio.sleep(1)
return Ok(result)
async def main() -> None:
"""
Create a Collector with the event handler and initialize
a CollectorQueue with it. Run the queue and send a batch
of event payloads to the collector.
We'll set "report_interval" to a positive value (a
report interval in seconds) so the collector queue will
print runtime metrics at the specified interval on the
stdout log output.
"""
collector = Collector("demo-collector", event_handler)
async with CollectorQueue(
[collector],
min_consumers=1,
max_consumers=100,
report_interval=1.0,
) as collector_queue:
# Send event payloads to the event handler.
publisher = collector_queue.get_publisher()
for i in range(3):
payload = Payload({"payload": f"payload data {i}"})
event = await publisher.send(payload)
# The publisher.send() call returns a CollectorEvent
# object. We await the event object -- it will block
# until the result is ready -- then fetch the
# results from a ResultList.
result_list = await event.wait()
result = result_list.first().unwrap()
# Print the result in JSON format.
print(json.dumps(result))
if __name__ == "__main__":
asyncio.run(main())
sys.exit(0)
The collector queue will emit logs with runtime metrics.
Run the example code using the following command (we'll pipe the output to jq to format the log output):
python example.py | jq .
Here is the printed result with part of the metrics log output:
(.venv) [event-collector]$ python example.py | jq .
...
[snip]
{
"logger": "CollectorQueue-107974cacc474a3298202c299d1497d8",
"metrics_timestamp": "2026-05-06T05:57:01.322421+00:00",
"qsize": 0,
"busy": 1,
"consumers": 2,
"load": 0.5,
"avg_load": 0.5,
"service_time": 1.0007,
"wait_time": 1.0023,
"arrival_rate": 0.0,
"departure_rate": 0.0,
"wip": 0.0,
"t_delta": 0.0,
"event_count": 2,
"event": "collector queue metrics",
"level": "info",
"timestamp": "2026-05-06T05:57:01.399931Z"
}
{
"data": {
"payload": "payload data 0"
},
"status": "OK"
}
{
"data": {
"payload": "payload data 1"
},
"status": "OK"
}
{
"data": {
"payload": "payload data 2"
},
"status": "OK"
}
Contact
- Kevin Chan
<kc@kchan.io>
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file event_collector-0.2.3.tar.gz.
File metadata
- Download URL: event_collector-0.2.3.tar.gz
- Upload date:
- Size: 24.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.13 {"installer":{"name":"uv","version":"0.11.13","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7d8f753d72662a4fd09addf5224de677210d223965a8ec12ba5969d9aff2f04b
|
|
| MD5 |
8f5f62381f9ef066694fce5ab4e2f1e2
|
|
| BLAKE2b-256 |
8b23ad9065b40b8ccdd6a9cf0c6af8c0cf16edcc79efbb0125ff51c3daf4bf51
|
File details
Details for the file event_collector-0.2.3-py3-none-any.whl.
File metadata
- Download URL: event_collector-0.2.3-py3-none-any.whl
- Upload date:
- Size: 29.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.13 {"installer":{"name":"uv","version":"0.11.13","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
44fad323ac7ce4e2b5120773d7ebb4dcfb7bd5452bedd9451af6c5a0db15cbab
|
|
| MD5 |
7cc5e9b074c0cf0a441459dd9ca9158f
|
|
| BLAKE2b-256 |
ae1f0ab4706f94e9812a5580c4e4ee930a7451505c560c181c2dad4408f6fc10
|