A Python library for real-time PostgreSQL event-driven cache invalidation.
Project description
PGCacheWatch
PGCacheWatch is a Python library that enhances applications with real-time PostgreSQL event notifications, enabling efficient cache invalidation. It leverages the existing PostgreSQL infrastructure to simplify cache management while ensuring performance and data consistency.
Example with FastAPI
This example illustrates the integration of PGCacheWatch with FastAPI to dynamically invalidate cache following database changes, thus maintaining the freshness and consistency of your application's data.
import contextlib
import typing
import asyncpg
from fastapi import FastAPI
from pgcachewatch import decorators, listeners, models, strategies
# Initialize a PGEventQueue listener to listen for database events.
listener = listeners.PGEventQueue()
@contextlib.asynccontextmanager
async def app_setup_teardown(_: FastAPI) -> typing.AsyncGenerator[None, None]:
"""
Asynchronous context manager for FastAPI app setup and teardown.
This context manager is used to establish and close the database connection
at the start and end of the FastAPI application lifecycle, respectively.
"""
# Establish a database connection using asyncpg.
conn = await asyncpg.connect()
# Connect the listener to the database using the specified channel.
await listener.connect(conn)
yield # Yield control back to the event loop.
await conn.close() # Ensure the database connection is closed on app teardown.
# Create an instance of FastAPI, specifying the app setup and teardown actions.
APP = FastAPI(lifespan=app_setup_teardown)
# Decorate the cached_query function with cache invalidation logic.
@decorators.cache(
strategy=strategies.Greedy(
listener=listener,
# Invalidate the cache only for 'update' operations on the database.
predicate=lambda x: x.operation == "update",
)
)
async def cached_query(user_id: int) -> dict[str, str]:
"""
Simulates a database query that benefits from cache invalidation.
This function is decorated to use PGCacheWatch's cache invalidation, ensuring
that the data returned is up-to-date following any relevant 'update' operations
on the database.
"""
# Return a mock data response.
return {"data": "query result"}
# Define a FastAPI route to fetch data, utilizing the cached_query function.
@APP.get("/data")
async def get_data(user_id: int) -> dict:
"""
This endpoint uses the cached_query function to return data, demonstrating
how cache invalidation can be integrated into a web application route.
"""
# Fetch and return the data using the cached query function.
return await cached_query(user_id)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
PGCacheWatch-0.2.5.tar.gz
(151.6 kB
view hashes)
Built Distribution
Close
Hashes for PGCacheWatch-0.2.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 278c3302007722d9d7cbed17ecf5f039229953f44638e1d9efebc1d439580703 |
|
MD5 | 0088a4eb15739795e947c83ab7598b11 |
|
BLAKE2b-256 | b24c01e280f851e01857d05a910039830b0c41b21e02e986101a1d947ba1d5cd |