Skip to main content

A toolkit for creating lightweight event processing pipelines

Project description

event-collector

Event-collector is a toolkit for creating lightweight event processing pipelines.

This library implements an in-memory competing consumer queue using only Python standard library functions and asyncio primitives. The collector queue runs as asyncio background tasks and can autoscale according to load. Using collector queues you can set up data processing workflows quickly without having to deploy a heavy stack of external services.

Requirements

  • >= Python 3.13

The only external dependencies are the following:

  • orjson
  • structlog

Documentation

How to install

Using pip:

pip install event-collector

Or, if using uv:

uv add event-collector

Quickstart example

To run a collector queue with an event collector, we need to perform the following steps:

  1. create a Collector with an event handling function to process events;
  2. configure a CollectorQueue to run background consumer tasks that will wait for input events;
  3. run the collector queue and publish events to the queue.
"""
example.py
"""

import asyncio
import json
import sys

from event_collector import (
    Collector,
    CollectorQueue,
    Context,
    Payload,
    Ok,
    Result,
)


async def event_handler(payload: Payload, ctx: Context) -> Result:
    """
    Extract the input data and return a result dict wrapped in
    an "Ok" Result object. Also simulate a backend processing delay.
    """
    data = payload.value
    result = {
        "data": data,
        "status": "OK",
    }
    await asyncio.sleep(1)
    return Ok(result)


async def main() -> None:
    """
    Create a Collector with the event handler and initialize
    a CollectorQueue with it. Run the queue and send a batch
    of event payloads to the collector.

    We'll set "report_interval" to a positive value (a
    report interval in seconds) so the collector queue will
    print runtime metrics at the specified interval on the
    stdout log output.
    """
    collector = Collector("demo-collector", event_handler)
    async with CollectorQueue(
        [collector],
        min_consumers=1,
        max_consumers=100,
        report_interval=1.0,
    ) as collector_queue:

        # Send event payloads to the event handler.
        publisher = collector_queue.get_publisher()

        for i in range(3):
            payload = Payload({"payload": f"payload data {i}"})
            event = await publisher.send(payload)

            # The publisher.send() call returns a CollectorEvent
            # object. We await the event object -- it will block
            # until the result is ready -- then fetch the
            # results from a ResultList.
            result_list = await event.wait()
            result = result_list.first().unwrap()

            # Print the result in JSON format.
            print(json.dumps(result))


if __name__ == "__main__":
    asyncio.run(main())
    sys.exit(0)

The collector queue will emit logs with runtime metrics.

Run the example code using the following command (we'll pipe the output to jq to format the log output):

python example.py | jq .

Here is the printed result with part of the metrics log output:

(.venv) [event-collector]$ python example.py | jq .
...
[snip]
{
  "logger": "CollectorQueue-107974cacc474a3298202c299d1497d8",
  "metrics_timestamp": "2026-05-06T05:57:01.322421+00:00",
  "qsize": 0,
  "busy": 1,
  "consumers": 2,
  "load": 0.5,
  "avg_load": 0.5,
  "service_time": 1.0007,
  "wait_time": 1.0023,
  "arrival_rate": 0.0,
  "departure_rate": 0.0,
  "wip": 0.0,
  "t_delta": 0.0,
  "event_count": 2,
  "event": "collector queue metrics",
  "level": "info",
  "timestamp": "2026-05-06T05:57:01.399931Z"
}
{
  "data": {
    "payload": "payload data 0"
  },
  "status": "OK"
}
{
  "data": {
    "payload": "payload data 1"
  },
  "status": "OK"
}
{
  "data": {
    "payload": "payload data 2"
  },
  "status": "OK"
}

Contact

  • Kevin Chan <kc@kchan.io>

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

event_collector-0.2.3.tar.gz (24.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

event_collector-0.2.3-py3-none-any.whl (29.1 kB view details)

Uploaded Python 3

File details

Details for the file event_collector-0.2.3.tar.gz.

File metadata

  • Download URL: event_collector-0.2.3.tar.gz
  • Upload date:
  • Size: 24.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.13 {"installer":{"name":"uv","version":"0.11.13","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for event_collector-0.2.3.tar.gz
Algorithm Hash digest
SHA256 7d8f753d72662a4fd09addf5224de677210d223965a8ec12ba5969d9aff2f04b
MD5 8f5f62381f9ef066694fce5ab4e2f1e2
BLAKE2b-256 8b23ad9065b40b8ccdd6a9cf0c6af8c0cf16edcc79efbb0125ff51c3daf4bf51

See more details on using hashes here.

File details

Details for the file event_collector-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: event_collector-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 29.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.13 {"installer":{"name":"uv","version":"0.11.13","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for event_collector-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 44fad323ac7ce4e2b5120773d7ebb4dcfb7bd5452bedd9451af6c5a0db15cbab
MD5 7cc5e9b074c0cf0a441459dd9ca9158f
BLAKE2b-256 ae1f0ab4706f94e9812a5580c4e4ee930a7451505c560c181c2dad4408f6fc10

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page