Broker-less event streaming over HTTP
Project description
ZeroEventHub
This README file contains information specific to the Python port of the ZeroEventHub. Please see the main readme file for an overview of what this project is about.
Client
We recommend that you store the latest checkpoint/cursor for each partition in the client's database. Example of simple single-partition consumption. Note about the example:
- Things starting with "my" is supplied by you
- Things starting with "their" is supplied by the service you connect to
>>> import zeroeventhub
>>> import httpx
>>> import asyncio
>>> from typing import Sequence
>>> from unittest.mock import MagicMock, Mock, PropertyMock
>>> my_db = MagicMock()
>>> my_person_event_repository = Mock()
>>> my_person_event_repository.read_cursors_from_db.return_value = None
# Step 1: Setup
>>> their_partition_count = 1 # documented contract with server
>>> their_service_url = "https://localhost:8192/person/feed/v1"
>>> my_zeh_session = httpx.AsyncClient() # you can setup the authentication on the session
>>> client = zeroeventhub.Client(their_service_url, their_partition_count, my_zeh_session)
# Step 2: Load the cursors from last time we ran
>>> cursors = my_person_event_repository.read_cursors_from_db()
>>> if not cursors:
... # we have never run before, so we can get all events with FIRST_CURSOR
... # (if we just want to receive new events from now, we would use LAST_CURSOR)
... cursors = [
... zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR)
... for partition_id in range(their_partition_count)
... ]
# Step 3: Enter listening loop...
>>> my_still_want_to_read_events = PropertyMock(side_effect=[True, False])
>>> async def poll_for_events(cursors: Sequence[zeroeventhub.Cursor]) -> None:
... page_of_events = zeroeventhub.PageEventReceiver()
... while my_still_want_to_read_events():
... # Step 4: Use ZeroEventHub client to fetch the next page of events.
... await zeroeventhub.receive_events(page_of_events,
... client.fetch_events(cursors),
... )
...
... # Step 5: Write the effect of changes to our own database and the updated
... # cursor value in the same transaction.
... with my_db.begin_transaction() as tx:
... my_person_event_repository.write_effect_of_events_to_db(tx, page_of_events.events)
... my_person_event_repository.write_cursors_to_db(tx, page_of_events.latest_checkpoints)
... tx.commit()
...
... cursors = page_of_events.latest_checkpoints
... page_of_events.clear()
>>> asyncio.run(poll_for_events(cursors))
Server
This library makes it easy to setup a zeroeventhub feed endpoint with FastAPI.
>>> from typing import Annotated, Any, AsyncGenerator, Dict, Optional, Sequence
>>> from fastapi import Depends, FastAPI, Request
>>> from fastapi.responses import StreamingResponse
>>> from zeroeventhub import (
... Cursor,
... DataReader,
... ZeroEventHubFastApiHandler,
... )
>>> from unittest.mock import Mock
>>> app = FastAPI()
>>> PersonEventRepository = Mock
>>> class PersonDataReader(DataReader):
... def __init__(self, person_event_repository: PersonEventRepository) -> None:
... self._person_event_repository = person_event_repository
...
... def get_data(
... self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int]
... ) -> AsyncGenerator[Dict[str, Any], Any]:
... return (
... self._person_event_repository.get_events_since(cursors[0].cursor)
... .take(page_size)
... .with_headers(headers)
... )
>>> def get_person_data_reader() -> PersonDataReader:
... return PersonDataReader(PersonEventRepository())
>>> PersonDataReaderDependency = Annotated[
... PersonDataReader,
... Depends(get_person_data_reader, use_cache=True),
... ]
>>> @app.get("person/feed/v1")
... async def feed(request: Request, person_data_reader: PersonDataReaderDependency) -> StreamingResponse:
... api_handler = ZeroEventHubFastApiHandler(data_reader=person_data_reader, server_partition_count=1)
... return api_handler.handle(request)
Development
To run the test suite, assuming you already have Python 3.10 or later installed and on your PATH:
pip install poetry==1.8.4
poetry config virtualenvs.in-project true
poetry install --sync
poetry run coverage run --branch -m pytest
poetry run coverage html
Then, you can open the htmlcov/index.html file in your browser to look at the code coverage report.
Also, to pass the CI checks, you may want to run the following before pushing your changes:
poetry run ruff format
poetry run ruff check
poetry run pyright
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zeroeventhub-0.2.4.tar.gz.
File metadata
- Download URL: zeroeventhub-0.2.4.tar.gz
- Upload date:
- Size: 12.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
59fb74e05aa6a97284b85867ab1128fcb8f3c86cd1a08d26148328e953e419b4
|
|
| MD5 |
4a21c33c434fac9a2fbb56213c1baa85
|
|
| BLAKE2b-256 |
146856f1ab751d80de876db97f210eb219248b3b9ca4a134764cffa969c9ebb8
|
File details
Details for the file zeroeventhub-0.2.4-py3-none-any.whl.
File metadata
- Download URL: zeroeventhub-0.2.4-py3-none-any.whl
- Upload date:
- Size: 12.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b4bbb0b8339db58cd1aed910ec2b2ec4c02e869f5de41e496cc763485b1ab99e
|
|
| MD5 |
3ba53901a98b5e9da2eaae2898f63a67
|
|
| BLAKE2b-256 |
764b51460e0e317bb8689718dbe31a0ceb971175074bf4463551a429eff62095
|