Skip to main content

Service factory for DVGroup

Project description

#Состав библиотеки

  1. Класс Factory для получения экземпляров сервисов без указания путей к сервисам и секретов. Для работы нужно сконфигурировать переменные окружения VAULT_URL и VAULT_TOKEN, или явно передать url и token при создании экземпляра класса). Далее секреты автоматически достаются из vault, пути из consul
  2. Декоратор @log(logger=you_logger)
  3. Декоратор @retry(count=10, sleep=0.5)

Установка


Requests is available on PyPI:
$ python -m pip install dvgroup_factory
from dvgroup_factory import factory

##Порядок работы:

  1. Получить объект фабрики:
    1. fc = factory.Factory(vault_url=url, vault_token=token)
    2. 2.1 fc = factory.Factory(), если определены переменные окружения VAULT_URL и VAULT_TOKEN
  2. Получить объект сервиса (в kwargs передаются параметры не связанные с url и secrets):
    1. ch_client = fc.clickhouse_client(secure=True, database="db1", verify=False)
    2. kafka_p = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
  3. По умолчанию, если ранее уже был создан объект сервиса, то при следующем запросе, будет возвращен ранее созданный.
  4. Для получения нового объекта (если ранее уже был получен экземпляр), требуется переддать параметр new=True:
    1. kafka_p2 = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'), new=True)
  5. Для получения именованного экземпляра требуется указать параметр instance_name
    1. kafka_p2 = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'), instance_name="Name")
  6. Для понимания, какие методы (классы сервисов) реализованы, следует вызвать метод info(), который возвратит след.информацию:

Создание экземпляра:\

ins = Factory(vault_url=url, vault_token=token)\

Методы:
1 ins.vault_client(url: str, token: str)
2 ins.consul(**kwargs)
3 ins.kafka_producer(**kwargs)
4 ins.kafka_consumer(*topics,**kwargs)
3 ins.aiokafka_producer(**kwargs)
4 ins.aiokafka_consumer(*topics,**kwargs)
5 ins.clickhouse_client(**kwargs)
6 ins.azure_container_client(**kwargs)
7 ins.loki_handler(**kwargs)
8 ins.gp_connection()\ Для создания нового экземпляра укажите в kwargs: new=True
Пути настроек в consul:
{"clickhouse": "env/databases/clickhouse", "kafka": "env/databases/kafka", "ms-azure-se": "env/databases/ms-azure-se", "loki": "env/databases/loki"}

#Пример кода:

from dvgrop_factory import factory as fc

###Получаю экземпляр фабрики factory = fc.Factory()

###Consul consul = factory.consul()
kafka_config = consul.kv["env/databases/kafka"]

###Clickhouse ch = factory.clickhouse_client(database="db1", ca_certs="CA.pem")
rs = ch.execute("SELECT COUNT(*) FROM db1.atol")

###KafkaProducer k_p = factory.kafka_producer()

###KafkaConsumer k_c = factory.kafka_consumer()

###azure.storage.blob.ContainerClient a_cc = factory.azure_container_client(container_name="output")
print(f'k_c = {a_cc}')

###azure.storage.blob.BlobCliennt a_cc = factory.azure_blob_client(container_name="output", blob_name = "nm")

###logging_loki.LokiHandler loki = factory.loki_handler(tags={"application": "atol-connector"}, version="1")
loki.setLevel(logging.DEBUG)
_log_format = f"%(asctime)s - [%(levelname)s] - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s"
loki.setFormatter(logging.Formatter(_log_format))
logger = logging.getLogger('segments-api')
logger.addHandler(loki)

###GreenPlum Connection conn = factory.gp_connection() cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM raw_atol') rs = cursor.fetchone()

###AIOKafka async def aiostart():
consumer = factory.aiokafka_consumer("test-atol1", auto_offset_reset='earliest', enable_auto_commit=False, )
producer = factory.aiokafka_producer()
await consumer.start()
await producer.start()
try:
future = await producer.send("test-atol", value={"ASYNC": "start"})
#record_metadata = await future
key = None
async for msg in consumer:
print(f"async key {key} msg = {msg}")
msg.value["consumer-producer"] = True
msg.value["ASYNC"] = True
print(f"async msg = {msg}")
future = await producer.send("test-atol1", value=msg.value)
finally:
await consumer.stop()
await producer.stop()\

ioloop = asyncio.get_event_loop()
tasks = [ioloop.create_task(aiostart())]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dvgroup_factory-0.0.60.tar.gz (15.1 kB view hashes)

Uploaded Source

Built Distribution

dvgroup_factory-0.0.60-py3-none-any.whl (15.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page