Skip to main content

A Pubsub

Project description

RizaPubsub

inhouse gcp pubsub

publish

import time
from pubsub import PubSub

c = {}

# begin
p = PubSub(c)
publisher = p.pub()

# xpub = p.create_topic('test_001')
# print(xpub)
# xsub = p.create_subscription('test_001', 'test_sub_001')
# print(xsub)
3

pid = p.send(
    publisher, {
        'topic': 'test001',
        'namespace': 'auth',
        'subname': 'auth_register',
        'delay': 1,
        'data': {
            'fullname': 'mas joko',
            'email': 'panas@gmail.com'
        }
    })
print('send pubsub:', pid)

subscribe

import asyncio
from datetime import datetime
from pubsub import PubSub

c = {}


async def executor(data):
    print("{}".format(data.data), datetime.now())


async def sublistener(config, subname, count=1):
    '''Pubsub listen subscribtion tasks'''
    p = PubSub(c)
    while True:
        subscrb = p.sub()
        with subscrb:
            subpath = subscrb.subscription_path(config["project_id"], subname)

            # get messages
            response = subscrb.pull(request={
                'subscription': subpath,
                'max_messages': count
            })

            ack_ids = []
            for msg in response.received_messages:

                # print("Received: {}".format(msg.message))
                await executor(msg.message)

                ack_ids.append(msg.ack_id)

            # Acknowledges the received messages so they will not be sent again.
            tot = len(response.received_messages)
            if tot > 0:
                subscrb.acknowledge(request={
                    "subscription": subpath,
                    "ack_ids": ack_ids,
                })

        await asyncio.sleep(2)


# begin
async def main_pubsub(c):
    await sublistener(c, 'test001-sub', 10)


if __name__ == '__main__':

    loop = asyncio.new_event_loop()
    try:
        loop.create_task(main_pubsub(c))
        loop.run_forever()
    except KeyboardInterrupt:
        loop.close()
    finally:
        loop.close()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rizapubsub-0.0.1.tar.gz (3.7 kB view hashes)

Uploaded Source

Built Distribution

rizapubsub-0.0.1-py3-none-any.whl (5.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page