CenturyLink Managed Services Anywhere Python Utilities
Project description
Getting Started
To install the clc_msa_utils package, use the command below.:
pip3 install clc_msa_utils
KVStore
This is a utlitiy class that abstracts loading a configuration from Consul or ETCD. This class supports perodically reloading the configuration from the configured key-value store, and notifing a callback method after reloading. The following may be passed into the constructor, or pulled from env variables:
Constructor Arg |
Environment Variable |
Description |
Default |
---|---|---|---|
consul_host |
CONSUL_HOST |
Host for Consul |
None |
consul_port |
CONSUL_PORT |
Port for Consul |
8500 |
etcd_host |
ETCD_HOST |
Host for etcd |
localhost |
etcd_port |
ETCD_PORT |
Port for etcd |
2379 |
kv_prefix |
KV_PREFIX |
Prefix for config path |
“” |
reload_seconds |
RELOAD_CONFIG_PERIOD |
Seconds between config reloads |
20 |
reload_enabled |
RELOAD_ENABLED |
If true, reloads the config periodically. |
False |
TODO: Future Features
Logging Configuration: Will enable configuring logging by updating the specified configuration mechanism.
Nested Configurations: Will enable you specify a list of prefixes to use to overlay configuration values.
Example Usage
1from clc_msa_utils.kv_store import KVStore
23
# Create config store
4kv_store = KVStore(
5kv_prefix=os.getenv('CONSUL_PREFIX') or os.getenv('ETCD_PREFIX', '/config/retry-listener'),
6reload_enabled=True
7)
89
# Setup on_reload handler
10def initialize():
11kv_store.on_reload(dynamic_configuration)
1213
# Implement reload handler to check if attributes changed, and then perform some logic.
14def dynamic_configuration(old, new):
15if not old or old.get('exchange_configs') != new.get('exchange_configs') \
16or kv_store.attribute_changed("rabbit_host","rabbit_port","rabbit_user","rabbit_password","rabbit_queue_name"):
17setup_queue()
1819
# Use kv_store to pull configuration values.
20def setup_queue():
21rabbit_host = kv_store.get('rabbit_host', 'localhost')
22rabbit_port = int(kv_store.get('rabbit_port', 5672))
LogManager
This is a utility class that uses a KVStore to configure the python logging facility. It uses KVStore’s dynamic ability to reload its configuration to update Python’s logging facility.
Constructor Arg |
Description |
Default |
---|---|---|
kv_store |
Backing KVStore used to configure logging. |
None/Required |
KVStore configurations relative to the kv_prefix
Key |
Description |
Default |
---|---|---|
logging_filename |
The file where logs are written. |
None (Std Out) |
logging_filemode |
The file mode if a filename is specified |
None |
logging_format |
The format of the logging line |
[%(threadName)s] %(asctime)s - %(levelname)s - %(name)s - %(message)s |
logging_datefmt |
The datefmt of the date written |
%m/%d/%Y %I:%M:%S %p |
logging_level |
Root logging Level |
INFO |
logging_config/log_name_1 |
Logging level for <log_name_1> |
None |
logging_config/log_name_2 |
Logging level for <log_name_2> |
None |
logging_config/log_name_n |
Logging level for <log_name_n> |
None |
Example Usage
Here are the available configurations for logging provided by KVStore using an example of /confing/local_config
{
"config" : {
"local_config" : {
"logging_level": "INFO",
"logging_config: {
"default": "DEBUG",
"KVStore": "DEBUG",
"LogManager": "DEBUG"
}
}
}
}
1from clc_msa_utils.kv_store import KVStore
2from clc_msa_utils.log_manager import LogManager
34
kv_store = KVStore(
5kv_prefix=os.getenv('CONSUL_PREFIX') or
6os.getenv('ETCD_PREFIX') or
7os.getenv('KV_PREFIX', '/config/local_config'),
8reload_enabled=True
9)
1011
log_manager = LogManager(kv_store=kv_store)
QueueFactory
This is a utility class that abstracts the creation of Queue Producers and Queue Consumers/Listeners. The producers and consumers are constructed based on a configuration passed into their respective methods as a parameter. The following is an example JSON configuration of a Queue Consumer configuration that could be stored in a key-value store such as ETCD or Consul. Notice that the queue_config attribute is an array and can be all of the necessary configuration for both your Consumer and Producers.
{
"queue_config": [
{
"name": "make_managed_request",
"type": "consumer",
"exchange": {
"name": "managed_server",
"type": "x-delayed-message",
"arguments": {"x-delayed-type": "topic"},
"durable": true
},
"queue": "make_managed_mos_cmdb",
"binding_key": "requested.make_managed",
"host": "rabbitmq.managed-services-dev.skydns.local",
"port": "5672",
"auth": {
"user": "guest",
"password": "guest"
}
}
]
}
Example Usage
1from clc_msa_utils.queueing import QueueFactory
23
# Get config (eg. from kv_store)
4queue_config = kv_store.get('queue-config')
56
# Initialize QueueFactory
7q_factory = QueueFactory()
89
# Generate Queue Consumers (QueueConsumer)
10consumers = q_factory.create_consumers(queue_config)
1112
# Generate Queue Producers (QueueProducer)
13producers = q_factory.create_producers(queue_config)
1415
# Retrieve and use consumer based on name configured
16consumers['make_managed_request'].listen(callback_function)
1718
# Retrieve and use producer based on name configured
19producers['error'].publish({"error_details": "message about how you messed things up..."})
2021
22
23
def callback_function(ch, method, properties, body):
24...
Multi-Threaded Example
1queue_factory = None
23
def setup_queue:
45
# If the queue_factory was already created, stop_consuming.
6# Clean up the existing connections before creating new ones
7# on a configuration change.
8if queue_factory:
9queue_factory.stop_consuming()
1011
# Create one configuration per thread, with a unique name for each.
12queue_factory_config = {
13"queue_config": []
14}
1516
amqp_connections = int(kv_store.get('amqp_connections', '10'))
17x = 0
1819
while x < amqp_connections:
20queue_config = {
21"name": "notify_worker_thread_" + str(x),
22"type": "consumer",
23"queue": "my_queue",
24"host": "localhost",
25"port": "5672",
26"exchange": {
27"name": "managed_server",
28"type": "x-delayed-message",
29"arguments": {"x-delayed-type": "topic"},
30"durable": true
31},
32"auth": {
33"user": "guest",
34"password": "guest"
35}
36}
3738
queue_factory_config["queue_config"].append(queue_config)
39x = x + 1
4041
# Create the QueueFactory, and pass in the configuration and worker function.
42queue_factory = QueueFactory()
43queue_factory.create_consumers(queue_factory_config)
44queue_factory.start_consuming(do_work)
4546
# Wait for all threads to stop before stopping the main thread.
47for queue_consumer in queue_factory.consumers():
48queue_consumer.thread().join()
4950
...
5152
def do_work(ch, method, properties, body):
53# Worker code goes here
54pass
QueueWorker
This is a utility class that creates a KVStore, LogManager, configures exchanges and queues, and starts consuming. It provides convenience methods to publish success, and error messages, and will handle catching and reporting exceptions in your callback method, and acknowldge the message when done.
Here are the parameters available when creating a QueueWorker
Parameter |
Description |
Default |
---|---|---|
consul_host |
Consul host used to initialize the KVStore. |
None |
consul_port |
Consul port used to initialize the KVStore. |
None |
etcd_host |
Etcd host used to initialize the KVStore. |
None |
etcd_port |
Etcd port used to initialize the KVStore. |
None |
kv_prefix |
The prefix used to initialize the KVStore. |
None |
rabbit_host_key |
The key in the kv store that contains the RabbitMQ Host. |
rabbit_host |
rabbit_port_key |
The key in the kv store that contains the RabbitMQ Port |
rabbit_port |
rabbit_user_key |
The key in the kv store that contains the RabbitMQ User |
rabbit_user |
rabbit_password_key |
The key in the kv store that contains the RabbitMQ Password |
rabbit_password |
amqp_connection_key |
The key in the kv store that contains the number of connections to RabbitMQ |
amqp_connections |
listen_exchange_key |
The key in the kv store that contains the exchange to publish to listen on when consuming messages |
exchange |
listen_routing_key_key |
The key in the kv store that contains the routing key to bind to when consuming messages. |
listen_routing_key |
queue_name_key |
The key in the kv store that contains the queue name to listen on when consuming messages |
queue |
done_exchange_key |
The key in the kv store that contains the exchange to publish to on success |
done_exchange |
done_routing_key_key |
The key in the kv store that contains the routing key to publish to on success. |
done_routing_key |
error_exchange_key |
The key in the kv store that contains the exchange to publish to on error |
error_exchange |
error_routing_key_key |
The key in the kv store that contains the routing key to publish to on error. |
error_routing_key |
data_key_on_error_payload |
The key in the kv store that contains the key in the error payload when publishing to the error exchange. |
data |
initialize_log_manager |
When true, creates a LogManager using the kv store created or specified |
True |
kv_store |
When specigfied, this kv_store is used instead of creating a new one. |
None |
Example Usage
1from clc_msa_utils.queueing import QueueWorker
23
logger = logging.getLogger("default")
45
register_queue_worker = None
67
register_queue_worker = QueueWorker(rabbit_host_key="rabbit-host",
8rabbit_port_key="rabbit-port",
9rabbit_user_key="rabbit-user",
10rabbit_password_key="rabbit-password",
11amqp_connection_key="amqp-connection-count",
12queue_name_key="queue",
13done_exchange_key="done-exchange",
14error_exchange_key="error-exchange",
15listen_exchange_key="listen-exchange",
16listen_routing_key_key="listen-routing-key",
17done_routing_key_key="done-routing-key",
18error_routing_key_key="error-routing-key",
19data_key_on_error_payload="original_payload")
2021
# Use the same kv_store as above, and don't initialize another log_manager
22unregister_queue_worker = QueueWorker(rabbit_host_key="rabbit-host",
23rabbit_port_key="rabbit-port",
24rabbit_user_key="rabbit-user",
25rabbit_password_key="rabbit-password",
26amqp_connection_key="amqp-un-connection-count",
27queue_name_key="un-queue",
28listen_exchange_key="listen-exchange",
29done_exchange_key="done-exchange",
30error_exchange_key="error-exchange",
31listen_routing_key_key="listen-un-routing-key",
32done_routing_key_key="done-un-routing-key",
33error_routing_key_key="error-un-routing-key",
34data_key_on_error_payload="original_payload",
35initiliaze_log_manager=False,
36kv_store=register_queue_worker.kv_store())
3738
39
# Initializes the listener
40def initialize():
41logger.debug("Initializing worker...")
4243
# Register the callbacks with the queue workers, this initializes the worker and starts consuming.
44register_queue_worker.set_callback(register_listener)
45unregister_queue_worker.set_callback(unregister_listener)
4647
logger.debug("Done Initializing worker")
4849
50
def register_listener(ch, method, properties, body):
51_do_work(ch, method, properties, body, "register", register_queue_worker)
5253
54
def unregister_listener(ch, method, properties, body):
55_do_work(ch, method, properties, body, "unregister", unregister_queue_worker)
5657
58
def _do_work(ch, method, properties, body, task_name, queue_worker, sleep_seconds=5):
59logger.info("[{0}] Received the following message: {1}".format(task_name, body.decode("utf-8")))
60logger.info("[{0}] Pretending to do something for {1} seconds...".format(task_name, str(sleep_seconds)))
6162
time.sleep(sleep_seconds)
6364
logger.info("[{0}] Done pretending to do something. ".format(task_name, str(sleep_seconds)))
6566
payload = {
67"task_name": task_name,
68"sleep_seconds": sleep_seconds,
69"original_message": body.decode("utf-8"),
70"properties": properties,
71"method": method
72}
7374
# No need to catch an error, the queue worker will publish the error for you.
75# The error message will contain 'Exception: Raising an error.', the error_details and
76# errorDetails will contain the stack trace, and the `data_key_on_error_payload` property will contain the
77# original payload.
78if "error" in str(body.decode("utf-8")):
79raise Exception("Raising an error.")
8081
# Publish a success message, propagating the properties
82queue_worker.publish_success(payload, properties)
8384
# If I need to manually publish an error message, there is a method to do so.
85queue_worker.publish_error(payload)
8687
# Queue worker acknowledges the message, so no need to do it here!
88logger.info("[{0}] Acknowledged that I am done pretending to do something.".format(task_name))
8990
91
if __name__ == '__main__':
92initialize()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.