Skip to main content

CenturyLink Managed Services Anywhere Python Utilities

Project description

Getting Started

To install the clc_msa_utils package, use the command below.:

pip3 install clc_msa_utils

KVStore

This is a utility class that abstracts loading a configuration from Consul or ETCD. This class supports perodically reloading the configuration from the configured key-value store, and notifying a callback method after reloading. The following may be passed into the constructor, or pulled from env variables:

Constructor Arg Environment Variable Description Default
consul_host CONSUL_HOST Host for Consul None
consul_port CONSUL_PORT Port for Consul 8500
etcd_host ETCD_HOST Host for etcd localhost
etcd_port ETCD_PORT Port for etcd 2379
kv_prefix KV_PREFIX Prefix for config path “”
reload_seconds RELOAD_CONFIG_PERIOD Seconds between config reloads 20
reload_enabled RELOAD_ENABLED If true, reloads the config periodically. False

TODO: Future Features

  • Nested Configurations: Will enable you specify a list of prefixes to use to overlay configuration values.

Example Usage

 1    from clc_msa_utils.kv_store import KVStore
 2 
 3      # Create config store
 4      kv_store = KVStore(
 5          kv_prefix=os.getenv('CONSUL_PREFIX') or os.getenv('ETCD_PREFIX', '/config/retry-listener'),
 6          reload_enabled=True
 7      )
 8 
 9      # Setup on_reload handler
10      def initialize():
11          kv_store.on_reload(dynamic_configuration)
12 
13      # Implement reload handler to check if attributes changed, and then perform some logic.
14      def dynamic_configuration(old, new):
15          if not old or old.get('exchange_configs') != new.get('exchange_configs') \
16              or kv_store.attribute_changed("rabbit_host","rabbit_port","rabbit_user","rabbit_password","rabbit_queue_name"):
17          setup_queue()
18 
19  # Use kv_store to pull configuration values.
20  def setup_queue():
21      rabbit_host = kv_store.get('rabbit_host', 'localhost')
22      rabbit_port = int(kv_store.get('rabbit_port', 5672))

LogManager

This is a utility class that uses a KVStore to configure the python logging facility. It uses KVStore’s dynamic ability to reload its configuration to update Python’s logging facility.

Constructor Arg Description Default
kv_store Backing KVStore used to configure logging. None/Required

KVStore configurations relative to the kv_prefix

Key Description Default
logging_filename The file where logs are written. None (Std Out)
logging_filemode The file mode if a filename is specified None
logging_format The format of the logging line [%(threadName)s] %(asctime)s - %(levelname)s - %(name)s - %(message)s
logging_datefmt The date format of the date written %m/%d/%Y %I:%M:%S %p
logging_level Root logging Level INFO
logging_config/log_name_1 Logging level for <log_name_1> None
logging_config/log_name_2 Logging level for <log_name_2> None
logging_config/log_name_n Logging level for <log_name_n> None

Example Usage

Here are the available configurations for logging provided by KVStore using an example of /config/local_config

{
  "config" : {
     "local_config" : {
        "logging_level": "INFO",
        "logging_config": {
           "default": "DEBUG",
           "KVStore": "DEBUG",
           "LogManager": "DEBUG"
        }
     }
  }
}
 1 from clc_msa_utils.kv_store import KVStore
 2 from clc_msa_utils.log_manager import LogManager
 3 
 4 kv_store = KVStore(
 5     kv_prefix=os.getenv('CONSUL_PREFIX') or
 6               os.getenv('ETCD_PREFIX') or
 7               os.getenv('KV_PREFIX', '/config/local_config'),
 8     reload_enabled=True
 9 )
10 
11 log_manager = LogManager(kv_store=kv_store)

QueueFactory

This is a utility class that abstracts the creation of Queue Producers and Queue Consumers/Listeners. The producers and consumers are constructed based on a configuration passed into their respective methods as a parameter. The following is an example JSON configuration of a Queue Consumer configuration that could be stored in a key-value store such as ETCD or Consul. Notice that the queue_config attribute is an array and can be all of the necessary configuration for both your Consumer and Producers.

{
  "queue_config": [
    {
      "name": "make_managed_request",
      "type": "consumer",
      "exchange": {
        "name": "managed_server",
        "type": "x-delayed-message",
        "arguments": {"x-delayed-type": "topic"},
        "durable": true
      },
      "queue": "make_managed_mos_cmdb",
      "binding_key": "requested.make_managed",
      "host": "rabbitmq.managed-services-dev.skydns.local",
      "port": "5672",
      "auth": {
        "user": "guest",
        "password": "guest"
      }
    }
  ]
}

Example Usage

 1      from clc_msa_utils.queueing import QueueFactory
 2 
 3      # Get config (eg. from kv_store)
 4      queue_config = kv_store.get('queue-config')
 5 
 6      # Initialize QueueFactory
 7      q_factory = QueueFactory()
 8 
 9      # Generate Queue Consumers (QueueConsumer)
10      consumers = q_factory.create_consumers(queue_config)
11 
12      # Generate Queue Producers (QueueProducer)
13      producers = q_factory.create_producers(queue_config)
14 
15      # Retrieve and use consumer based on name configured
16      consumers['make_managed_request'].listen(callback_function)
17 
18      # Retrieve and use producer based on name configured
19      producers['error'].publish({"error_details": "message about how you messed things up..."})
20 
21 
22 
23      def callback_function(ch, method, properties, body):
24      ...

Multi-Threaded Example

 1    queue_factory = None
 2 
 3    def setup_queue:
 4 
 5       # If the queue_factory was already created, stop_consuming.
 6       # Clean up the existing connections before creating new ones
 7       # on a configuration change.
 8       if queue_factory:
 9           queue_factory.stop_consuming()
10 
11       # Create one configuration per thread, with a unique name for each.
12       queue_factory_config = {
13           "queue_config": []
14       }
15 
16       amqp_connections = int(kv_store.get('amqp_connections', '10'))
17       x = 0
18 
19       while x < amqp_connections:
20           queue_config = {
21               "name": "notify_worker_thread_" + str(x),
22               "type": "consumer",
23               "queue": "my_queue",
24               "host": "localhost",
25               "port": "5672",
26               "exchange": {
27                   "name": "managed_server",
28                   "type": "x-delayed-message",
29                   "arguments": {"x-delayed-type": "topic"},
30                   "durable": true
31               },
32               "auth": {
33                   "user": "guest",
34                   "password": "guest"
35               }
36           }
37 
38           queue_factory_config["queue_config"].append(queue_config)
39           x = x + 1
40 
41       # Create the QueueFactory, and pass in the configuration and worker function.
42       queue_factory = QueueFactory()
43       queue_factory.create_consumers(queue_factory_config)
44       queue_factory.start_consuming(do_work)
45 
46       # Wait for all threads to stop before stopping the main thread.
47       for queue_consumer in queue_factory.consumers():
48           queue_consumer.thread().join()
49 
50    ...
51 
52    def do_work(ch, method, properties, body):
53       # Worker code goes here
54       pass

QueueWorker

This is a utility class that creates a KVStore, LogManager, configures exchanges and queues, and starts consuming. This class also supports multi-threaded queue consumers, specified by the amqp connections. It also provides convenience methods to publish success messages, error messages, and will handle catching and reporting exceptionswithout writing code in the callback method, and acknowldge the message when done.

Here are the parameters available when creating a QueueWorker

Parameter Description Default
consul_host Consul host used to initialize the KVStore. None
consul_port Consul port used to initialize the KVStore. None
etcd_host Etcd host used to initialize the KVStore. None
etcd_port Etcd port used to initialize the KVStore. None
kv_prefix The prefix used to initialize the KVStore. None
rabbit_host_key The key in the kv store that contains the RabbitMQ Host. rabbit_host
rabbit_port_key The key in the kv store that contains the RabbitMQ Port rabbit_port
rabbit_user_key The key in the kv store that contains the RabbitMQ User rabbit_user
rabbit_password_key The key in the kv store that contains the RabbitMQ Password rabbit_password
amqp_connection_key The key in the kv store that contains the number of connections to RabbitMQ amqp_connections
listen_exchange_key The key in the kv store that contains the exchange to publish to listen on when consuming messages exchange
listen_routing_key_key The key in the kv store that contains the routing key to bind to when consuming messages. listen_routing_key
queue_name_key The key in the kv store that contains the queue name to listen on when consuming messages queue
done_exchange_key The key in the kv store that contains the exchange to publish to on success done_exchange
done_routing_key_key The key in the kv store that contains the routing key to publish to on success. done_routing_key
error_exchange_key The key in the kv store that contains the exchange to publish to on error error_exchange
error_routing_key_key The key in the kv store that contains the routing key to publish to on error. error_routing_key
data_key_on_error_payload The key in the kv store that contains the key in the error payload when publishing to the error exchange. data
initialize_log_manager When true, creates a LogManager using the kv store created or specified True
kv_store When specigfied, this kv_store is used instead of creating a new one. None
rabbit_host_default The default value of the RabbitMQ Host. localhost
rabbit_port_default The default value of the RabbitMQ Port 5672
rabbit_user_default The default value of the RabbitMQ User guest
rabbit_password_default The default value of the RabbitMQ Password guest
amqp_connection_default The default value of the number of connections to RabbitMQ 10
listen_exchange_default The default value of the exchange to publish to listen on when consuming messages main_exchange
listen_routing_key_default The default value of the routing key to bind to when consuming messages. listen.key
queue_name_default The default value of the queue name to listen on when consuming messages default_queue
done_exchange_default The default value of the exchange to publish to on success main_exchange
done_routing_key_default The default value of the routing key to publish to on success. done.key
error_exchange_default The default value ofthe exchange to publish to on error error_exchange
error_routing_key_default The default value of the routing key to publish to on error. error.key

Example Usage

worker.py

  1 import logging
  2 import time
  3 
  4 from clc_msa_utils.queueing import QueueWorker
  5 
  6 logger = logging.getLogger("default")
  7 
  8 unregister_queue_worker = QueueWorker(
  9     kv_prefix=os.getenv("ETCD_PREFIX", "/config/billing-listener"),
 10 
 11     # Rabbit Connection Info
 12     rabbit_host_key="rabbit_host", rabbit_host_default="rabbitmq.rabbitmq",
 13     rabbit_port_key="rabbit_port", rabbit_port_default=15672,
 14     rabbit_user_key="rabbit_user", rabbit_user_default="guest",
 15     rabbit_password_key="rabbit_password", rabbit_password_default="guest",
 16     amqp_connection_key="amqp_connection_count", amqp_connection_default=10,
 17 
 18     # Listen Config
 19     listen_exchange_key="main_exchange", listen_exchange_default="managed_server",
 20     listen_routing_key_key="main_exchange_stop_billing_routing_key", listen_routing_default="requested.make_unmanaged",
 21     queue_name_key="rabbit_stop_billing_queue_name", queue_name_default="stop_billing",
 22 
 23     # Done Config
 24     done_exchange_key="main_exchange", done_exchange_default="managed_server",
 25     done_routing_key_key="main_exchange_done_stop_billing_routing_key",
 26     done_routing_key_default="billing.make_unmanaged",
 27 
 28     # Error Config
 29     error_exchange_key="dead_letter_exchange", error_exchange_default="managed_server_error",
 30     error_routing_key_key="dead_letter_exchange_stop_billing_routing_key",
 31     error_routing_key_default="monitoring_config.make_managed",
 32     data_key_on_error_payload="server")
 33 
 34 # Use the same kv_store as above, and don't initialize another log_manager
 35 register_queue_worker = QueueWorker(
 36     # Rabbit Connection Info
 37     rabbit_host_key="rabbit_host", rabbit_host_default="rabbitmq.rabbitmq",
 38     rabbit_port_key="rabbit_port", rabbit_port_default=15672,
 39     rabbit_user_key="rabbit_user", rabbit_user_default="guest",
 40     rabbit_password_key="rabbit_password", rabbit_password_default="guest",
 41     amqp_connection_key="amqp_connection_count", amqp_connection_default=10,
 42 
 43     # Listen Config
 44     listen_exchange_key="main_exchange", listen_exchange_default="managed_server",
 45     listen_routing_key_key="main_exchange_routing_key", listen_routing_default="requested.make_managed",
 46     queue_name_key="rabbit_queue_name", queue_name_default="start_billing",
 47 
 48     # Done Config
 49     done_exchange_key="main_exchange", done_exchange_default="managed_server",
 50     done_routing_key_key="main_exchange_done_routing_key", done_routing_key_default="billing.make_managed",
 51 
 52     # Error Config
 53     error_exchange_key="dead_letter_exchange", error_exchange_default="managed_server_error",
 54     error_routing_key_key="dead_letter_exchange_routing_key", error_routing_key_default="billing.make_managed",
 55     data_key_on_error_payload="server",
 56 
 57     # Reuse configs
 58     initialize_log_manager=False, kv_store=unregister_queue_worker.kv_store())
 59 
 60 # Use the same kv_store for my configurations.
 61 kv_store=unregister_queue_worker.kv_store()
 62 
 63 # Use all defaults.
 64 all_defaults_queue_worker = QueueWorker(rabbit_host_default="rabbitmq.rabbitmq")
 65 
 66 
 67 # Initializes the listener
 68 def initialize():
 69     logger.debug("Initializing worker...")
 70 
 71     # Register the callbacks with the queue workers, this initializes the worker and starts consuming.
 72     register_queue_worker.set_callback(register_listener)
 73     unregister_queue_worker.set_callback(unregister_listener)
 74     all_defaults_queue_worker.set_callback(all_defaults_listener)
 75 
 76     logger.debug("Done Initializing worker")
 77 
 78 
 79 def register_listener(ch, method, properties, body):
 80     _do_work(ch, method, properties, body, "register", register_queue_worker)
 81 
 82 
 83 def unregister_listener(ch, method, properties, body):
 84     _do_work(ch, method, properties, body, "unregister", unregister_queue_worker)
 85 
 86 
 87 def all_defaults_listener(ch, method, properties, body):
 88     _do_work(ch, method, properties, body, "all_defaults", all_defaults_queue_worker)
 89 
 90 
 91 def _do_work(ch, method, properties, body, task_name, queue_worker, sleep_seconds=8):
 92     logger.info("[{0}] Received the following message: {1}".format(task_name, body.decode("utf-8")))
 93     logger.info("[{0}] Pretending to do something for {1} seconds...".format(task_name, str(sleep_seconds)))
 94 
 95     time.sleep(sleep_seconds)
 96 
 97     logger.info("[{0}] Done pretending to do something. ".format(task_name, str(sleep_seconds)))
 98 
 99     payload = {
100         "task_name": task_name,
101         "sleep_seconds": sleep_seconds,
102         "original_message": body.decode("utf-8"),
103         "properties": properties,
104         "method": method
105     }
106 
107  # No need to catch an error, the QueueWorker will publish the error for you.
108  # The error message will contain 'Exception: Raising an error.', the error_details and
109  # errorDetails will contain the stack trace, and the `data_key_on_error_payload` property will contain the
110  # original payload.
111  if "error" in str(body.decode("utf-8")):
112      raise Exception("Raising an error.")
113 
114  # Publish a success message, propagating the properties
115  queue_worker.publish_success(payload, properties)
116 
117  # If I need to manually publish an error message, there is a method to do so.
118  queue_worker.publish_error(payload)
119 
120  # Queue worker acknowledges the message, so need to do is here!
121  logger.info("[{0}] Acknowledged that I am done pretending to do something.".format(task_name))
122 
123 
124 if __name__ == '__main__':
125     initialize()

worker_UT.py

 1 import unittest
 2 import worker
 3 
 4 
 5 class WorkerTests(unittest.TestCase):
 6 
 7     def setUp(self):
 8         pass
 9 
10     def tearDown(self):
11         # Stop reloading so the test will end.
12         worker.kv_store.disable_reloading()
13 
14     def test_something(self):
15         pass

utils.dict_replace_empty_values()

This utility method removes or replaces empty strings in a dictionary. Optionally, you may also replace None values.

positional parameters

  1. The dictionary to process

arguments

  • process_none_values: When true, replace or remove attributes that have a value of None/Null, default=False
  • clone_dict: When true clones the input dictionary, processes it, and returns the clone leaving the original untouched, default=False
  • remove_values: When true, removes attributes that are empty or optionally None, default=False
  • replace_with: The replacement value, default=None
  • replace_float_with_decimal: The replacement value, default=None

Example Usage

 1        from utils import dict_replace_empty_values
 2 
 3        def process_dict(my_dict):
 4            # Return a clone of my_dict removing None values and empty strings.
 5            dict_a = dict_replace_empty_values(my_dict,
 6                                               process_none_values=True,
 7                                               clone_dict=True,
 8                                               remove_values=True)
 9 
10             # Return a clone of my_dict replacing None values and empty strings with "EMPTY".
11             dict_b = dict_replace_empty_values(my_dict,
12                                                process_none_values=True,
13                                                clone_dict=True,
14                                                replace_with="EMPTY")
15 
16             # Return a clone of my_dict replacing None values and empty strings with "EMPTY", and replace floats with decimal.
17             dict_c = dict_replace_empty_values(my_dict,
18                                                process_none_values=True,
19                                                clone_dict=True,
20                                                replace_with="EMPTY",
21                                                replace_float_with_decimal=True)

utils.log_dict_types()

Logs the type for every attribute in the specified dictionary.

positional parameters

  1. The dictionary for which to log types

arguments

  • types: Which types to show, else show all, default=None,
  • use_logger: The logger uto use, default=logger

[1]This document is formatted using reStructuredText, with reStructuredText directives.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for clc-msa-utils, version 0.10.8
Filename, size File type Python version Upload date Hashes
Filename, size clc_msa_utils-0.10.8.tar.gz (28.5 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page