Skip to main content

DAO for GCP PubSub service.

Project description

GCP Pub/Sub DAO

The library provides DAO classes for GCP pubsub publisher/subscriber.

Installation

pip install gcp-pubsub-dao

Usage

  • sync subscriber:
from gcp_pubsub_dao import PubSubSubscriberDAO, Message

dao = PubSubSubscriberDAO(project_id="prodect-dev", subscription_id="subscription")
messages: Message = dao.get_messages(messages_count=2)

for message in messages:
    print(message.data)
    
dao.ack_messages(ack_ids=[message[0].ack_id])      
dao.nack_messages(ack_ids=[message[1].ack_id])     

dao.close()     # to clean up connections
  • sync publisher:
from gcp_pubsub_dao import PubSubPublisherDAO

dao = PubSubPublisherDAO(project_id="prodect-dev")
try:
    dao.publish_message(topic_name="topic", payload=b"asdfsdf", attributes={"kitId": "AW12345678"})
except Exception as ex:
    print(ex)
  • async subscriber:
from gcp_pubsub_dao import AsyncPubSubSubscriberDAO, Message

dao = AsyncPubSubSubscriberDAO(project_id="prodect-dev", subscription_id="subscription")
messages: Message = await dao.get_messages(messages_count=2)

for message in messages:
    print(message.data)
    
await dao.ack_messages(ack_ids=[message[0].ack_id])      
await dao.nack_messages(ack_ids=[message[1].ack_id])
  • async publisher:
from gcp_pubsub_dao import AsyncPubSubPublisherDAO

dao = AsyncPubSubPublisherDAO(project_id="prodect-dev")
try:
    await dao.publish_message(topic_name="topic", payload=b"asdfsdf", attributes={"kitId": "AW12345678"})
except Exception as ex:
    print(ex)
  • async worker pool
import asyncio
import sys

sys.path.append("./")

from gcp_pubsub_dao import AsyncPubSubSubscriberDAO
from gcp_pubsub_dao.worker_pool import WorkerPool, WorkerTask, HandlerResult
from gcp_pubsub_dao.entities import Message


async def handler1(message: Message):
    print(f"handler1: {message}")
    await asyncio.sleep(2)
    return HandlerResult(ack_id=message.ack_id, is_success=True)


async def handler2(message: Message):
    print(f"handler2: {message}")
    await asyncio.sleep(5)
    return HandlerResult(ack_id=message.ack_id, is_success=True)


def heartbeat_func():
    print("Heartbeat: Worker is alive")


async def main():
    tasks = [
        WorkerTask(
            subscriber_dao=AsyncPubSubSubscriberDAO(project_id="ash-dev-273120", subscription_id="http-sender-sub"),
            handler=handler1,
        ),
        WorkerTask(
            subscriber_dao=AsyncPubSubSubscriberDAO(project_id="ash-dev-273120", subscription_id="email-sender-sub"),
            handler=handler2,
        ),
    ]
    
    # Create worker pool with heartbeat function
    wp = WorkerPool(heartbeat_func=heartbeat_func)
    
    # Run in async mode (default) - all tasks run concurrently
    await wp.run(tasks=tasks)
    
    # Or run in sync mode - tasks run one by one in order
    # await wp.run(tasks=tasks, mode="sync")


if __name__ == "__main__":
    asyncio.run(main())

Worker Pool Features

The WorkerPool provides two execution modes:

Async Mode (default)

  • All tasks run concurrently using asyncio.TaskGroup
  • Tasks can execute in any order or simultaneously
  • Best for independent tasks that don't need to be processed in sequence

Sync Mode

  • Tasks run one by one in the order they are provided
  • Each task completes before the next one starts
  • Useful when tasks need to be processed in a specific sequence
  • Note: Message processing within each task is still asynchronous

Heartbeat Function

  • Optional callback function that gets called during worker execution
  • Useful for monitoring worker health and activity
  • Called before processing messages in each iteration
  • Can be used for logging, metrics, or health checks

WorkerTask Configuration

  • subscriber_dao: The async subscriber DAO instance
  • handler: Async function that processes messages and returns HandlerResult
  • batch_size: Number of messages to fetch per batch (default: 10)
  • return_immediately: Whether to return immediately if no messages (default: False)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pubsub_dao-0.4.1.tar.gz (4.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcp_pubsub_dao-0.4.1-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pubsub_dao-0.4.1.tar.gz.

File metadata

  • Download URL: gcp_pubsub_dao-0.4.1.tar.gz
  • Upload date:
  • Size: 4.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.4 Linux/6.11.0-1018-azure

File hashes

Hashes for gcp_pubsub_dao-0.4.1.tar.gz
Algorithm Hash digest
SHA256 ba93e501d15e74f065bc19671bfd79c38781f6350e7669f9009b879d484ca6d3
MD5 4e96244768b96a85e0c59fb1f91b37b2
BLAKE2b-256 69e550b2109cfb075aa6ee7080a942f34e02b03667f863644475f59526d93c1a

See more details on using hashes here.

File details

Details for the file gcp_pubsub_dao-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: gcp_pubsub_dao-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.4 Linux/6.11.0-1018-azure

File hashes

Hashes for gcp_pubsub_dao-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 bc28e9b48f8979536c59e4cb0ca101a47e8e4d6ace888d02d41ae5aaad69af38
MD5 088e33ba58d2d41fa72a89426cbc55d3
BLAKE2b-256 b19255d3d4151fb008c811b1c2693aee3c1dcbe48d255e2e356b8c46d4218906

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page