Skip to main content

DAO for GCP PubSub service.

Project description

GCP Pub/Sub DAO

The library provides DAO classes for GCP pubsub publisher/subscriber.

Installation

pip install gcp-pubsub-dao

Usage

  • sync subscriber:
from gcp_pubsub_dao import PubSubSubscriberDAO, Message

dao = PubSubSubscriberDAO(project_id="prodect-dev", subscription_id="subscription")
messages: Message = dao.get_messages(messages_count=2)

for message in messages:
    print(message.data)
    
dao.ack_messages(ack_ids=[message[0].ack_id])      
dao.nack_messages(ack_ids=[message[1].ack_id])     

dao.close()     # to clean up connections
  • sync publisher:
from gcp_pubsub_dao import PubSubPublisherDAO

dao = PubSubPublisherDAO(project_id="prodect-dev")
try:
    dao.publish_message(topic_name="topic", payload=b"asdfsdf", attributes={"kitId": "AW12345678"})
except Exception as ex:
    print(ex)
  • async subscriber:
from gcp_pubsub_dao import AsyncPubSubSubscriberDAO, Message

dao = AsyncPubSubSubscriberDAO(project_id="prodect-dev", subscription_id="subscription")
messages: Message = await dao.get_messages(messages_count=2)

for message in messages:
    print(message.data)
    
await dao.ack_messages(ack_ids=[message[0].ack_id])      
await dao.nack_messages(ack_ids=[message[1].ack_id])
  • async publisher:
from gcp_pubsub_dao import AsyncPubSubPublisherDAO

dao = AsyncPubSubPublisherDAO(project_id="prodect-dev")
try:
    await dao.publish_message(topic_name="topic", payload=b"asdfsdf", attributes={"kitId": "AW12345678"})
except Exception as ex:
    print(ex)
  • async worker
import asyncio
import sys

sys.path.append("./")

from gcp_pubsub_dao import AsyncPubSubSubscriberDAO
from gcp_pubsub_dao.worker_pool import WorkerPool, WorkerTask, HandlerResult
from gcp_pubsub_dao.entities import Message


async def handler1(message: Message):
    print(f"handler1: {message}")
    await asyncio.sleep(2)
    return HandlerResult(ack_id=message.ack_id, is_success=True)


async def handler2(message: Message):
    print(f"handler2: {message}")
    await asyncio.sleep(5)
    return HandlerResult(ack_id=message.ack_id, is_success=True)


async def main():
    tasks = [
        WorkerTask(
            subscriber_dao=AsyncPubSubSubscriberDAO(project_id="ash-dev-273120", subscription_id="http-sender-sub"),
            handler=handler1,
        ),
        WorkerTask(
            subscriber_dao=AsyncPubSubSubscriberDAO(project_id="ash-dev-273120", subscription_id="email-sender-sub"),
            handler=handler2,
        ),
    ]
    wp = WorkerPool()
    await wp.run(tasks=tasks)


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pubsub_dao-0.3.3.tar.gz (4.0 kB view details)

Uploaded Source

Built Distribution

gcp_pubsub_dao-0.3.3-py3-none-any.whl (6.3 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pubsub_dao-0.3.3.tar.gz.

File metadata

  • Download URL: gcp_pubsub_dao-0.3.3.tar.gz
  • Upload date:
  • Size: 4.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.4 Linux/6.5.0-1025-azure

File hashes

Hashes for gcp_pubsub_dao-0.3.3.tar.gz
Algorithm Hash digest
SHA256 33ef16509a639fccd8c9d7fcf7d06351664fd50999688fcd4b4f0a257abff571
MD5 2473cdee97771ab700dc0ea95f800c11
BLAKE2b-256 3751b32ec392dbcd57b9c6b56ef8398e05216352d0b14234a65435c5f06845f9

See more details on using hashes here.

File details

Details for the file gcp_pubsub_dao-0.3.3-py3-none-any.whl.

File metadata

  • Download URL: gcp_pubsub_dao-0.3.3-py3-none-any.whl
  • Upload date:
  • Size: 6.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.4 Linux/6.5.0-1025-azure

File hashes

Hashes for gcp_pubsub_dao-0.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 40206ed7952a92d0cacd9d9601ec863dc12eb4fc8edd9c02ee3475daa3b1ca42
MD5 4fa286bfbb0a639b6526d7524ef3c2ca
BLAKE2b-256 ef94e199a89f1521ac5cc8e5b197f67fa03dec24e7fb371d1046bc03284d1e74

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page