Skip to main content

DAO for GCP PubSub service.

Project description

GCP Pub/Sub DAO

The library provides DAO classes for GCP pubsub publisher/subscriber.

Installation

pip install gcp-pubsub-dao

Usage

  • sync subscriber:
from gcp_pubsub_dao import PubSubSubscriberDAO, Message

dao = PubSubSubscriberDAO(project_id="prodect-dev", subscription_id="subscription")
messages: Message = dao.get_messages(messages_count=2)

for message in messages:
    print(message.data)
    
dao.ack_messages(ack_ids=[message[0].ack_id])      
dao.nack_messages(ack_ids=[message[1].ack_id])     

dao.close()     # to clean up connections
  • sync publisher:
from gcp_pubsub_dao import PubSubPublisherDAO

dao = PubSubPublisherDAO(project_id="prodect-dev")
try:
    dao.publish_message(topic_name="topic", payload=b"asdfsdf", attributes={"kitId": "AW12345678"})
except Exception as ex:
    print(ex)
  • async subscriber:
from gcp_pubsub_dao import AsyncPubSubSubscriberDAO, Message

dao = AsyncPubSubSubscriberDAO(project_id="prodect-dev", subscription_id="subscription")
messages: Message = await dao.get_messages(messages_count=2)

for message in messages:
    print(message.data)
    
await dao.ack_messages(ack_ids=[message[0].ack_id])      
await dao.nack_messages(ack_ids=[message[1].ack_id])
  • async publisher:
from gcp_pubsub_dao import AsyncPubSubPublisherDAO

dao = AsyncPubSubPublisherDAO(project_id="prodect-dev")
try:
    await dao.publish_message(topic_name="topic", payload=b"asdfsdf", attributes={"kitId": "AW12345678"})
except Exception as ex:
    print(ex)
  • async worker
import asyncio
import sys

sys.path.append("./")

from gcp_pubsub_dao import AsyncPubSubSubscriberDAO
from gcp_pubsub_dao.worker_pool import WorkerPool, WorkerTask, HandlerResult
from gcp_pubsub_dao.entities import Message


async def handler1(message: Message):
    print(f"handler1: {message}")
    await asyncio.sleep(2)
    return HandlerResult(ack_id=message.ack_id, is_success=True)


async def handler2(message: Message):
    print(f"handler2: {message}")
    await asyncio.sleep(5)
    return HandlerResult(ack_id=message.ack_id, is_success=True)


async def main():
    tasks = [
        WorkerTask(
            subscriber_dao=AsyncPubSubSubscriberDAO(project_id="ash-dev-273120", subscription_id="http-sender-sub"),
            handler=handler1,
        ),
        WorkerTask(
            subscriber_dao=AsyncPubSubSubscriberDAO(project_id="ash-dev-273120", subscription_id="email-sender-sub"),
            handler=handler2,
        ),
    ]
    wp = WorkerPool()
    await wp.run(tasks=tasks)


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcp_pubsub_dao-0.3.2.tar.gz (4.0 kB view details)

Uploaded Source

Built Distribution

gcp_pubsub_dao-0.3.2-py3-none-any.whl (6.3 kB view details)

Uploaded Python 3

File details

Details for the file gcp_pubsub_dao-0.3.2.tar.gz.

File metadata

  • Download URL: gcp_pubsub_dao-0.3.2.tar.gz
  • Upload date:
  • Size: 4.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.4 Linux/6.5.0-1025-azure

File hashes

Hashes for gcp_pubsub_dao-0.3.2.tar.gz
Algorithm Hash digest
SHA256 368ffcd81188c4f2738b8fa7f3f8db5edf711e199bdff748a3e9be3ef66282a2
MD5 1135b74f0ccd52eacb887182abafca2d
BLAKE2b-256 533be543231322471e4697210c0e4574574a98c47747e40d59ad2a8060f88448

See more details on using hashes here.

File details

Details for the file gcp_pubsub_dao-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: gcp_pubsub_dao-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 6.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.4 Linux/6.5.0-1025-azure

File hashes

Hashes for gcp_pubsub_dao-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 faa21fcdce4d408681567c076d71900ea1085c6532d35a81c476504865e5e07e
MD5 a93be707fca79cad9aeb41d728a88b05
BLAKE2b-256 a4dafe7d3130d92ee1b28c3dbb975145c14329a81555081b4bd42e60c8b112b2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page