A Package containing my utils
Project description
cbaxter1988_utils
This Pacakges is a collection of helpful uitilies I utilize in numerous projects.
Usage
Running Tests
git clone https://github.com/byt3-m3/utils.git
pip install -r requirements.txt
python invoke_tests.py
Instalation
Using PIP
pip install cbaxter1988-utils
From Source
git clone https://github.com/byt3-m3/utils.git
python setup.py install
Utilities
core_utils
Cloning Objects:
from cbaxter1988_utils import core_utils
original_list = [1, 2, 3, 4]
cloned_list = core_utils.clone_object(original_list)
enviornment_utils
pika_utils
Utitilies for publishing and consuming messages with pika
from cbaxter1988_utils import pika_utils
AMQP_USER = 'guest'
AMQP_PW = 'guest'
AMQP_HOST = '127.0.0.1'
AMQP_PORT = 5672
EXCHANGE_NAME = 'test_exchange'
ROUTING_KEY_NAME = 'test_routing_key'
QUEUE_NAME = 'QUEUE_NAME'
AMQP_URL = pika_utils.make_amqp_url(amqp_user=AMQP_USER, amqp_pw=AMQP_PW, amqp_host=AMQP_HOST, amqp_port=AMQP_PORT)
publisher = pika_utils.make_basic_pika_publisher(
amqp_url=AMQP_URL,
queue=QUEUE_NAME,
exchange=EXCHANGE_NAME,
routing_key=ROUTING_KEY_NAME
)
def message_handler(ch: BlockingChannel, method: Basic.Deliver, properties: BasicProperties, body):
pass
# Do Work
subscriber = pika_utils.make_basic_pika_consumer(
amqp_url=AMQP_URL,
queue=QUEUE_NAME,
on_message_callback=message_handler
)
publisher.publish_message(body={"test": "data"})
subscriber.run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
cbaxter1988_utils-0.2.2.tar.gz
(14.4 kB
view hashes)
Built Distribution
Close
Hashes for cbaxter1988_utils-0.2.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | dcb6f3545e6829557b7f1278becd186e445622dc2dd57a1afa848dfd1da7060b |
|
MD5 | af0b42a01f78e608bdc52d96b93ce326 |
|
BLAKE2b-256 | 46da8d21590d952693fc44cbd56fdb80a4cf80fe32dee52627d5720575752135 |