Skip to main content

A Pubsub

Project description

RizaPubsub

inhouse gcp pubsub

publish

import time
from pubsub import PubSub

c = {}

# begin
p = PubSub(c)
publisher = p.pub()

# xpub = p.create_topic('test_001')
# print(xpub)
# xsub = p.create_subscription('test_001', 'test_sub_001')
# print(xsub)
3

pid = p.send(
    publisher, {
        'topic': 'test001',
        'namespace': 'auth',
        'subname': 'auth_register',
        'delay': 1,
        'data': {
            'fullname': 'mas joko',
            'email': 'panas@gmail.com'
        }
    })
print('send pubsub:', pid)

subscribe

import asyncio
from datetime import datetime
from pubsub import PubSub

c = {}


async def executor(data):
    print("{}".format(data.data), datetime.now())


async def sublistener(config, subname, count=1):
    '''Pubsub listen subscribtion tasks'''
    p = PubSub(c)
    while True:
        subscrb = p.sub()
        with subscrb:
            subpath = subscrb.subscription_path(config["project_id"], subname)

            # get messages
            response = subscrb.pull(request={
                'subscription': subpath,
                'max_messages': count
            })

            ack_ids = []
            for msg in response.received_messages:

                # print("Received: {}".format(msg.message))
                await executor(msg.message)

                ack_ids.append(msg.ack_id)

            # Acknowledges the received messages so they will not be sent again.
            tot = len(response.received_messages)
            if tot > 0:
                subscrb.acknowledge(request={
                    "subscription": subpath,
                    "ack_ids": ack_ids,
                })

        await asyncio.sleep(2)


# begin
async def main_pubsub(c):
    await sublistener(c, 'test001-sub', 10)


if __name__ == '__main__':

    loop = asyncio.new_event_loop()
    try:
        loop.create_task(main_pubsub(c))
        loop.run_forever()
    except KeyboardInterrupt:
        loop.close()
    finally:
        loop.close()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rizapubsub-0.0.1.tar.gz (3.7 kB view details)

Uploaded Source

Built Distribution

rizapubsub-0.0.1-py3-none-any.whl (5.0 kB view details)

Uploaded Python 3

File details

Details for the file rizapubsub-0.0.1.tar.gz.

File metadata

  • Download URL: rizapubsub-0.0.1.tar.gz
  • Upload date:
  • Size: 3.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.6.0 pkginfo/1.7.0 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.0 CPython/3.9.6

File hashes

Hashes for rizapubsub-0.0.1.tar.gz
Algorithm Hash digest
SHA256 7c910c219930f4311bacbaf1cd5f1e9b94c8cf6047a2da13cd5564e48ff41d68
MD5 c5486a02ef46b8cfe932fb423c43e75e
BLAKE2b-256 a141a2683b4cdee3b946d6ce2a1428b12d451f8c26226cdd5344fd1f90db7707

See more details on using hashes here.

File details

Details for the file rizapubsub-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: rizapubsub-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 5.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.6.0 pkginfo/1.7.0 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.0 CPython/3.9.6

File hashes

Hashes for rizapubsub-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 cef98be302eb0bbae84aca35f26a2e1a4dc93f642f4f6e1a38462d6be8a2123a
MD5 082d7903bce402a28c57223fef844cc6
BLAKE2b-256 3c5a600cfeec14b76aa6e04020621a1a5f9c5c3fc4e148771df1c99eda269c6d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page