Skip to main content

CenturyLink Managed Services Anywhere Python Utilities

Project description

Getting Started

To install the clc_msa_utils package, use the command below.:

pip3 install clc_msa_utils

KVStore

This is a utlitiy class that abstracts loading a configuration from Consul or ETCD. This class supports perodically reloading the configuration from the configured key-value store, and notifing a callback method after reloading. The following may be passed into the constructor, or pulled from env variables:

Constructor Arg

Environment Variable

Description

Default

consul_host

CONSUL_HOST

Host for Consul

None

consul_port

CONSUL_PORT

Port for Consul

8500

etcd_host

ETCD_HOST

Host for etcd

localhost

etcd_port

ETCD_PORT

Port for etcd

2379

kv_prefix

KV_PREFIX

Prefix for config path

“”

reload_seconds

RELOAD_CONFIG_PERIOD

Seconds between config reloads

20

reload_enabled

RELOAD_ENABLED

If true, reloads the config periodically.

False

TODO: Future Features

  • Logging Configuration: Will enable configuring logging by updating the specified configuration mechanism.

  • Nested Configurations: Will enable you specify a list of prefixes to use to overlay configuration values.

Example Usage

 1    from clc_msa_utils.kv_store import KVStore
 2 
 3      # Create config store
 4      kv_store = KVStore(
 5          kv_prefix=os.getenv('CONSUL_PREFIX') or os.getenv('ETCD_PREFIX', '/config/retry-listener'),
 6          reload_enabled=True
 7      )
 8 
 9      # Setup on_reload handler
10      def initialize():
11          kv_store.on_reload(dynamic_configuration)
12 
13      # Implement reload handler to check if attributes changed, and then perform some logic.
14      def dynamic_configuration(old, new):
15          if not old or old.get('exchange_configs') != new.get('exchange_configs') \
16              or kv_store.attribute_changed("rabbit_host","rabbit_port","rabbit_user","rabbit_password","rabbit_queue_name"):
17          setup_queue()
18 
19  # Use kv_store to pull configuration values.
20  def setup_queue():
21      rabbit_host = kv_store.get('rabbit_host', 'localhost')
22      rabbit_port = int(kv_store.get('rabbit_port', 5672))

QueueFactory

This is a utility class that abstracts the creation of Queue Producers and Queue Consumers/Listeners. The producers and consumers are constructed based on a configuration passed into their respective methods as a parameter. The following is an example JSON configuration of a Queue Consumer configuration that could be stored in a key-value store such as ETCD or Consul. Notice that the queue_config attribute is an array and can be all of the necessary configuration for both your Consumer and Producers.

{
  "queue_config": [
    {
      "name": "make_managed_request",
      "type": "consumer",
      "exchange": {
        "name": "managed_server",
        "type": "x-delayed-message",
        "arguments": {"x-delayed-type": "topic"},
        "durable": true
      },
      "queue": "make_managed_mos_cmdb",
      "binding_key": "requested.make_managed",
      "host": "rabbitmq.managed-services-dev.skydns.local",
      "port": "5672",
      "auth": {
        "user": "guest",
        "password": "guest"
      }
    }
  ]
}

Example Usage

 1      from clc_msa_utils.queueing import QueueFactory
 2 
 3      # Get config (eg. from kv_store)
 4      queue_config = kv_store.get('queue-config')
 5 
 6      # Initialize QueueFactory
 7      q_factory = QueueFactory()
 8 
 9      # Generate Queue Consumers (QueueConsumer)
10      consumers = q_factory.create_consumers(queue_config)
11 
12      # Generate Queue Producers (QueueProducer)
13      producers = q_factory.create_producers(queue_config)
14 
15      # Retrieve and use consumer based on name configured
16      consumers['make_managed_request'].listen(callback_function)
17 
18      # Retrieve and use producer based on name configured
19      producers['error'].publish({"error_details": "message about how you messed things up..."})
20 
21 
22 
23      def callback_function(ch, method, properties, body):
24      ...

Multi-Threaded Example

 1    queue_factory = None
 2 
 3    def setup_queue:
 4 
 5       # If the queue_factory was already created, stop_consuming.
 6       # Clean up the existing connections before creating new ones
 7       # on a configuration change.
 8       if queue_factory:
 9           queue_factory.stop_consuming()
10 
11       # Create one configuration per thread, with a unique name for each.
12       queue_factory_config = {
13           "queue_config": []
14       }
15 
16       amqp_connections = int(kv_store.get('amqp_connections', '10'))
17       x = 0
18 
19       while x < amqp_connections:
20           queue_config = {
21               "name": "notify_worker_thread_" + str(x),
22               "type": "consumer",
23               "queue": "my_queue",
24               "host": "localhost",
25               "port": "5672",
26               "exchange": {
27                   "name": "managed_server",
28                   "type": "x-delayed-message",
29                   "arguments": {"x-delayed-type": "topic"},
30                   "durable": true
31               },
32               "auth": {
33                   "user": "guest",
34                   "password": "guest"
35               }
36           }
37 
38           queue_factory_config["queue_config"].append(queue_config)
39           x = x + 1
40 
41       # Create the QueueFactory, and pass in the configuration and worker function.
42       queue_factory = QueueFactory()
43       queue_factory.create_consumers(queue_factory_config)
44       queue_factory.start_consuming(do_work)
45 
46       # Wait for all threads to stop before stopping the main thread.
47       for queue_consumer in queue_factory.consumers():
48           queue_consumer.thread().join()
49 
50    ...
51 
52    def do_work(ch, method, properties, body):
53       # Worker code goes here
54       pass

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

clc_msa_utils-0.6.1.tar.gz (8.7 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page