Skip to main content

A toolkit for creating lightweight event processing pipelines

Project description

event-collector

Event-collector is a toolkit for creating lightweight event processing pipelines.

This library implements an in-memory competing consumer queue using only Python standard library functions and asyncio primitives. The collector queue runs as asyncio background tasks and can autoscale according to load. Using collector queues you can set up data processing workflows quickly without having to deploy a heavy stack of external services.

Requirements

  • >= Python 3.13

The only external dependencies are the following:

  • orjson
  • structlog

Documentation

Quickstart example

To run a collector queue with an event collector, we need to perform the following steps:

  1. create a Collector with an event handling function to process events;
  2. configure a CollectorQueue to run background consumer tasks that will wait for input events;
  3. run the collector queue and publish events to the queue.
"""
example.py
"""

import asyncio
import json
import sys

from event_collector import (
    Collector,
    CollectorQueue,
    Context,
    Payload,
    Ok,
    Result,
)


async def event_handler(payload: Payload, ctx: Context) -> Result:
    """
    Extract the input data and return a result dict wrapped in
    an "Ok" Result object. Also simulate a backend processing delay.
    """
    data = payload.value
    result = {
        "data": data,
        "status": "OK",
    }
    await asyncio.sleep(1)
    return Ok(result)


async def main() -> None:
    """
    Create a Collector with the event handler and initialize
    a CollectorQueue with it. Run the queue and send a batch
    of event payloads to the collector.

    We'll set "report_interval" to a positive value (a
    report interval in seconds) so the collector queue will
    print runtime metrics at the specified interval on the
    stdout log output.
    """
    collector = Collector("demo-collector", event_handler)
    async with CollectorQueue(
        [collector],
        min_consumers=1,
        max_consumers=100,
        report_interval=1.0,
    ) as collector_queue:

        # Send event payloads to the event handler.
        publisher = collector_queue.get_publisher()

        for i in range(3):
            payload = Payload({"payload": f"payload data {i}"})
            event = await publisher.send(payload)

            # The publisher.send() call returns a CollectorEvent
            # object. We await the event object -- it will block
            # until the result is ready -- then fetch the
            # results from a ResultList.
            result_list = await event.wait()
            result = result_list.first().unwrap()

            # Print the result in JSON format.
            print(json.dumps(result))


if __name__ == "__main__":
    asyncio.run(main())
    sys.exit(0)

The collector queue will emit logs with runtime metrics.

Run the example code using the following command (we'll pipe the output to jq to format the log output):

python example.py | jq .

Here is the printed result with part of the metrics log output:

(.venv) [event-collector]$ python example.py | jq .
...
[snip]
{
  "logger": "CollectorQueue-107974cacc474a3298202c299d1497d8",
  "metrics_timestamp": "2026-05-06T05:57:01.322421+00:00",
  "qsize": 0,
  "busy": 1,
  "consumers": 2,
  "load": 0.5,
  "avg_load": 0.5,
  "service_time": 1.0007,
  "wait_time": 1.0023,
  "arrival_rate": 0.0,
  "departure_rate": 0.0,
  "wip": 0.0,
  "t_delta": 0.0,
  "event_count": 2,
  "event": "collector queue metrics",
  "level": "info",
  "timestamp": "2026-05-06T05:57:01.399931Z"
}
{
  "data": {
    "payload": "payload data 0"
  },
  "status": "OK"
}
{
  "data": {
    "payload": "payload data 1"
  },
  "status": "OK"
}
{
  "data": {
    "payload": "payload data 2"
  },
  "status": "OK"
}

Contact

  • Kevin Chan <kc@kchan.io>

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

event_collector-0.2.2.tar.gz (24.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

event_collector-0.2.2-py3-none-any.whl (29.0 kB view details)

Uploaded Python 3

File details

Details for the file event_collector-0.2.2.tar.gz.

File metadata

  • Download URL: event_collector-0.2.2.tar.gz
  • Upload date:
  • Size: 24.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.13 {"installer":{"name":"uv","version":"0.11.13","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for event_collector-0.2.2.tar.gz
Algorithm Hash digest
SHA256 ea91f6e09b0d04de39752a729fec9911f2f2b48efc077da77b79083cb9b2b5ac
MD5 5b6c9b301602d02a20f18af4b1800a3d
BLAKE2b-256 f9a727075508be41210fc8fa0d63ed981d387d3b951773d82ae791d49ba96081

See more details on using hashes here.

File details

Details for the file event_collector-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: event_collector-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 29.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.13 {"installer":{"name":"uv","version":"0.11.13","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for event_collector-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 5f1e68d892abd4210b1b6c655878ad827ebeb6a4c0e6e6482ec5e7ea8c196dfa
MD5 2eed01d8bb82f02ef606dd734c2e5e2e
BLAKE2b-256 1662815b8a559a45ed1797ca898af505e368ae5b193e92430b8fe187542d9b54

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page