Messaging service around insights-core.
Project description
Construct an insights archive processing application by providing a configuration file that specifies its components. The building blocks are described below.
Engine
An engine encapsulates the process of evaluating an archive with insights. It has parameters for its result formatter, a subset of the loaded components to evaluate, an archive extraction timeout, and a working directory for it to use during analysis. A consumer feeds an engine one archive at a time and uses a publisher to publish the results.
Consumer
A consumer retrieves a message at a time from a source, extracts a url from it, downloads an archive using the configured downloader, and passes the file to an internal engine for processing. It retrieves results from the engine and publishes them with the configured publisher.
import logging
from . import Consumer
log = logging.getLogger(__name__)
class Interactive(Consumer):
def run(self):
while True:
input_msg = input("Input Archive Name: ")
if not input_msg:
break
self.process(input_msg)
def get_url(self, input_msg):
return input_msg
Requeuer
A requeuer allows a consumer to raise a Requeue exception to indicate that
it couldn't handle the input and would like the requeuer to do something with
it. What the requeuer does is open ended: it could put the message onto a
different topic, send it to a different message broker, store it in a
database, etc.
Publisher
A publisher stores results in some way. For example, it could publish them to a database, post them to a message queue, or display them. It is given the original request and the raw result string from the configured formatter.
from . import Publisher
class StdOut(Publisher):
def publish(self, input_msg, results):
print(results)
Format
A format is used by the engine to monitor insights components during analysis, capture interesting information about them, and convert the results into a string that will make up the body of the application's response.
Downloader
A downloader is used by a consumer to download archives. The project provides downloaders for http endpoints, S3 buckets, and the local file system.
import os
import shutil
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from s3fs import S3FileSystem
class S3Downloader(object):
def __init__(self, tmp_dir=None, chunk_size=16 * 1024, **kwargs):
self.tmp_dir = tmp_dir
self.chunk_size = chunk_size
self.fs = S3FileSystem(**kwargs)
@contextmanager
def get(self, src):
with self.fs.open(src) as s:
with NamedTemporaryFile(dir=self.tmp_dir) as d:
shutil.copyfileobj(s, d, length=self.chunk_size)
d.flush()
yield d.name
Here's one for the local file system.
import os
from contextlib import contextmanager
class LocalFS(object):
@contextmanager
def get(self, src):
path = os.path.realpath(os.path.expanduser(src))
yield path
Watchers
Watchers monitor events from the consumer or engine. A consumer watcher might track the total number of archives, the number that succeeded, and the number that failed. An engine watcher might track component execution times. Look at the watchers package for the possible callbacks.
from pprint import pprint
from insights import dr
from insights_messaging.watcher import EngineWatcher
class LocalStatWatcher(EngineWatcher):
def __init__(self):
self.archives = 0
def on_engine_complete(self, broker):
self.archives += 1
times = {dr.get_name(k): v for k, v in broker.exec_times.items() if k in broker}
pprint({"times": times, "archives": self.archives})
Logging
Standard logging configuration can be specified under the logging key.
You can programmatically modify the logging configuration specified above by
providing a logging_configurator key. It should specify a function that
returns another function. The returned function must accept the existing log
configuration above and return a modified version of it.
service:
logging_configurator:
name: insights_messaging.tests.test_get_logging_config.custom_log_config
args: []
kwargs: {}
Example function:
def custom_log_config(*args, **kwargs):
def inner(config):
config["custom_log_stuff"] = "custom config here"
return config
return inner
Environment Variable Substitution
Environment variables may be used in any value position. They can be specified in one of three ways:
foo: $SOME_ENV
foo: ${SOME_ENV}
foo: ${SOME_ENV:<default value>}
If the environment variable isn't defined, the string value is not modified unless a default has been specified. If the environment variable is defined, it will be substituted even if its value is nothing.
The default value is everything from the first colon (:) to the first closing bracket. Closing brackets can not be escaped.
We first try to convert the value to a boolean if it is "true" or "false" (case insensitive), then an int, then a float. If all conversions fail, it's returned as a string.
Example Configuration
The plugins section of the configuration is standard insights configs. The service section contains the components that make up the application.
The consumer, publisher, downloader, and watchers must contain a full component
name, and they may contain a list called args and a dictionary called
kwargs. If provided, the args and kwargs are used to construct the component.
In the case of a consumer, they are provided after the standard args of
publisher, downloader, and engine.
The format value is the class name of the result formatter the engine should
use. It defaults to insights.formats.text.HumanReadableFormat.
The target_components list can be used to constrain which loaded components
are executed. The dependency graph of components whose full names start with
any element will be executed. If it is empty or the target_components key
doesn't exist, all loaded componets are evaluated.
extract_tmp_dir is where the engine will extract archives for analysis. It
will use /tmp if no path is provided.
extract_timeout is the number of seconds the engine will attempt to extract
an archive. It raises an exception if the timeout is exceeded or tries forever
if no timeout is specified.
The engine section specifies which engine class to use. If it exists, it
takes default configuration from format, target_components,
extract_tmp_dir, and extract_timeout at the same level as the engine
key. If it has a kwargs key, any values there override those defaults. If the
engine section doesn't exist, insights_messaging.engine.Engine is used and
takes the default configs.
Custom Engine Config
# insights.parsers.redhat_release must be loaded and enabled for
# insights_messaging.formats.rhel_stats.Stats to collect product and version
# info. This is also true for insights.formats._json.JsonFormat.
plugins:
default_component_enabled: true
packages:
- insights.specs.default
- insights.specs.insights_archive
- insights.parsers.redhat_release
- examples.rules
configs:
- name: examples.rules.bash_version.report
enabled: true
service:
engine:
name: examples.engine.CustomEngine
kwargs:
format: insights_stats_worker.rhel_stats.Stats
target_components:
- foo.bar.rules
extract_timeout: 10
extract_tmp_dir: ${TMP_DIR:/tmp}
consumer:
name: insights_stats_worker.consumer.Consumer
kwargs:
queue: test_job
conn_params:
host: ${CONSUMER_HOST:localhost}
port: ${CONSUMER_PORT:5672}
requeuer:
name: example.requeuer.Requeuer
kwargs:
queue: retry
conn_params:
host: ${CONSUMER_HOST:localhost}
port: ${CONSUMER_PORT:5672}
publisher:
name: insights_messaging.publishers.rabbitmq.RabbitMQ
kwargs:
queue: test_job_response
conn_params:
host: localhost
port: 5672
downloader:
name: insights_messaging.downloaders.localfs.LocalFS
watchers:
- name: insights_messaging.watchers.stats.LocalStatWatcher
logging:
version: 1
disable_existing_loggers: false
loggers:
"":
level: WARN
Default Engine Config
# insights.parsers.redhat_release must be loaded and enabled for
# insights_messaging.formats.rhel_stats.Stats to collect product and version
# info. This is also true for insights.formats._json.JsonFormat.
plugins:
default_component_enabled: true
packages:
- insights.specs.default
- insights.specs.insights_archive
- insights.parsers.redhat_release
- examples.rules
configs:
- name: examples.rules.bash_version.report
enabled: true
service:
format: insights_stats_worker.rhel_stats.Stats
target_components:
- foo.bar.rules
extract_timeout: 10
extract_tmp_dir: ${TMP_DIR:/tmp}
consumer:
name: insights_stats_worker.consumer.Consumer
kwargs:
queue: test_job
conn_params:
host: ${CONSUMER_HOST:localhost}
port: ${CONSUMER_PORT:5672}
requeuer:
name: example.requeuer.Requeuer
kwargs:
queue: retry
conn_params:
host: ${CONSUMER_HOST:localhost}
port: ${CONSUMER_PORT:5672}
publisher:
name: insights_messaging.publishers.rabbitmq.RabbitMQ
kwargs:
queue: test_job_response
conn_params:
host: localhost
port: 5672
downloader:
name: insights_messaging.downloaders.localfs.LocalFS
watchers:
- name: insights_messaging.watchers.stats.LocalStatWatcher
logging:
version: 1
disable_existing_loggers: false
loggers:
"":
level: WARN
logging_configurator:
name: insights_messaging.tests.test_get_logging_config.custom_log_config
args: []
kwargs: {}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file insights_core_messaging-1.2.20.tar.gz.
File metadata
- Download URL: insights_core_messaging-1.2.20.tar.gz
- Upload date:
- Size: 20.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.11.0rc2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
459bdb677c8237a55a387573733195b67eda7288d04e30b8f1c4ee9c0c03c125
|
|
| MD5 |
94fe8d34163c3c7d14ac4605ef945a7e
|
|
| BLAKE2b-256 |
4f914d4e0d0c4412cf451bbdeadf7bdbce4a8c26c963a004c82d1f67e72f9254
|
File details
Details for the file insights_core_messaging-1.2.20-py3-none-any.whl.
File metadata
- Download URL: insights_core_messaging-1.2.20-py3-none-any.whl
- Upload date:
- Size: 27.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.11.0rc2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9010555d0d5281a4307ce66f09bfd0fb89f6e16080583bfadc72743e469c4bf2
|
|
| MD5 |
c2daab2dcd38f0749ee542be0bcee4db
|
|
| BLAKE2b-256 |
a72d827aa3348a34439ade06226db3c32500cd0a7e91bf8f61d62e41a7636492
|