CenturyLink Managed Services Anywhere Python Utilities
Project description
Getting Started
To install the clc_msa_utils package, use the command below.:
pip3 install clc_msa_utils
KVStore
This is a utlitiy class that abstracts loading a configuration from Consul or ETCD. This class supports perodically reloading the configuration from the configured key-value store, and notifing a callback method after reloading. The following may be passed into the constructor, or pulled from env variables:
Constructor Arg |
Environment Variable |
Description |
Default |
---|---|---|---|
consul_host |
CONSUL_HOST |
Host for Consul |
None |
consul_port |
CONSUL_PORT |
Port for Consul |
8500 |
etcd_host |
ETCD_HOST |
Host for etcd |
localhost |
etcd_port |
ETCD_PORT |
Port for etcd |
2379 |
kv_prefix |
KV_PREFIX |
Prefix for config path |
“” |
reload_seconds |
RELOAD_CONFIG_PERIOD |
Seconds between config reloads |
20 |
reload_enabled |
RELOAD_ENABLED |
If true, reloads the config periodically. |
False |
TODO: Future Features
Logging Configuration: Will enable configuring logging by updating the specified configuration mechanism.
Nested Configurations: Will enable you specify a list of prefixes to use to overlay configuration values.
Example Usage
1from clc_msa_utils.kv_store import KVStore
23
# Create config store
4kv_store = KVStore(
5kv_prefix=os.getenv('CONSUL_PREFIX') or os.getenv('ETCD_PREFIX', '/config/retry-listener'),
6reload_enabled=True
7)
89
# Setup on_reload handler
10def initialize():
11kv_store.on_reload(dynamic_configuration)
1213
# Implement reload handler to check if attributes changed, and then perform some logic.
14def dynamic_configuration(old, new):
15if not old or old.get('exchange_configs') != new.get('exchange_configs') \
16or kv_store.attribute_changed("rabbit_host","rabbit_port","rabbit_user","rabbit_password","rabbit_queue_name"):
17setup_queue()
1819
# Use kv_store to pull configuration values.
20def setup_queue():
21rabbit_host = kv_store.get('rabbit_host', 'localhost')
22rabbit_port = int(kv_store.get('rabbit_port', 5672))
QueueFactory
This is a utility class that abstracts the creation of Queue Producers and Queue Consumers/Listeners. The producers and consumers are constructed based on a configuration passed into their respective methods as a parameter. The following is an example JSON configuration of a Queue Consumer configuration that could be stored in a key-value store such as ETCD or Consul. Notice that the queue_config attribute is an array and can be all of the necessary configuration for both your Consumer and Producers.
{
"queue_config": [
{
"name": "make_managed_request",
"type": "consumer",
"exchange": {
"name": "managed_server",
"type": "x-delayed-message",
"arguments": {"x-delayed-type": "topic"},
"durable": true
},
"queue": "make_managed_mos_cmdb",
"binding_key": "requested.make_managed",
"host": "rabbitmq.managed-services-dev.skydns.local",
"port": "5672",
"auth": {
"user": "guest",
"password": "guest"
}
}
]
}
Example Usage
1from clc_msa_utils.queueing import QueueFactory
23
# Get config (eg. from kv_store)
4queue_config = kv_store.get('queue-config')
56
# Initialize QueueFactory
7q_factory = QueueFactory()
89
# Generate Queue Consumers (QueueConsumer)
10consumers = q_factory.create_consumers(queue_config)
1112
# Generate Queue Producers (QueueProducer)
13producers = q_factory.create_producers(queue_config)
1415
# Retrieve and use consumer based on name configured
16consumers['make_managed_request'].listen(callback_function)
1718
# Retrieve and use producer based on name configured
19producers['error'].publish({"error_details": "message about how you messed things up..."})
2021
22
23
def callback_function(ch, method, properties, body):
24...
Multi-Threaded Example
1queue_factory = None
23
def setup_queue:
45
# If the queue_factory was already created, stop_consuming.
6# Clean up the existing connections before creating new ones
7# on a configuration change.
8if queue_factory:
9queue_factory.stop_consuming()
1011
# Create one configuration per thread, with a unique name for each.
12queue_factory_config = {
13"queue_config": []
14}
1516
amqp_connections = int(kv_store.get('amqp_connections', '10'))
17x = 0
1819
while x < amqp_connections:
20queue_config = {
21"name": "notify_worker_thread_" + str(x),
22"type": "consumer",
23"queue": "my_queue",
24"host": "localhost",
25"port": "5672",
26"exchange": {
27"name": "managed_server",
28"type": "x-delayed-message",
29"arguments": {"x-delayed-type": "topic"},
30"durable": true
31},
32"auth": {
33"user": "guest",
34"password": "guest"
35}
36}
3738
queue_factory_config["queue_config"].append(queue_config)
39x = x + 1
4041
# Create the QueueFactory, and pass in the configuration and worker function.
42queue_factory = QueueFactory()
43queue_factory.create_consumers(queue_factory_config)
44queue_factory.start_consuming(do_work)
4546
# Wait for all threads to stop before stopping the main thread.
47for queue_consumer in queue_factory.consumers():
48queue_consumer.thread().join()
4950
...
5152
def do_work(ch, method, properties, body):
53# Worker code goes here
54pass
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.