Service factory for DVGroup
Project description
#Состав библиотеки
- Класс Factory для получения экземпляров сервисов без указания путей к сервисам и секретов. Для работы нужно сконфигурировать переменные окружения VAULT_URL и VAULT_TOKEN, или явно передать url и token при создании экземпляра класса). Далее секреты автоматически достаются из vault, пути из consul
- Декоратор @log(logger=you_logger)
- Декоратор @retry(count=10, sleep=0.5)
Установка
Requests is available on PyPI:
$ python -m pip install dvgroup_factory
from dvgroup_factory import factory
##Порядок работы:
- Получить объект фабрики:
- fc = factory.Factory(vault_url=url, vault_token=token)
- 2.1 fc = factory.Factory(), если определены переменные окружения VAULT_URL и VAULT_TOKEN
- Получить объект сервиса (в kwargs передаются параметры не связанные с url и secrets):
- ch_client = fc.clickhouse_client(secure=True, database="db1", verify=False)
- kafka_p = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
- По умолчанию, если ранее уже был создан объект сервиса, то при следующем запросе, будет возвращен ранее созданный.
- Для получения нового объекта (если ранее уже был получен экземпляр), требуется переддать параметр new=True:
- kafka_p2 = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'), new=True)
- Для получения именованного экземпляра требуется указать параметр instance_name
- kafka_p2 = fc.kafka_producer(value_serializer=lambda v: json.dumps(v).encode('utf-8'), instance_name="Name")
- Для понимания, какие методы (классы сервисов) реализованы, следует вызвать метод info(), который возвратит след.информацию:
Создание экземпляра:\
ins = Factory(vault_url=url, vault_token=token)\
Методы:
1 ins.vault_client(url: str, token: str)
2 ins.consul(**kwargs)
3 ins.kafka_producer(**kwargs)
4 ins.kafka_consumer(*topics,**kwargs)
3 ins.aiokafka_producer(**kwargs)
4 ins.aiokafka_consumer(*topics,**kwargs)
5 ins.clickhouse_client(**kwargs)
6 ins.azure_container_client(**kwargs)
7 ins.loki_handler(**kwargs)
8 ins.gp_connection()\
Для создания нового экземпляра укажите в kwargs: new=True
Пути настроек в consul:
{"clickhouse": "env/databases/clickhouse", "kafka": "env/databases/kafka", "ms-azure-se": "env/databases/ms-azure-se", "loki": "env/databases/loki"}
#Пример кода:
from dvgrop_factory import factory as fc
###Получаю экземпляр фабрики factory = fc.Factory()
###Consul
consul = factory.consul()
kafka_config = consul.kv["env/databases/kafka"]
###Clickhouse
ch = factory.clickhouse_client(database="db1", ca_certs="CA.pem")
rs = ch.execute("SELECT COUNT(*) FROM db1.atol")
###KafkaProducer k_p = factory.kafka_producer()
###KafkaConsumer k_c = factory.kafka_consumer()
###azure.storage.blob.ContainerClient
a_cc = factory.azure_container_client(container_name="output")
print(f'k_c = {a_cc}')
###azure.storage.blob.BlobCliennt a_cc = factory.azure_blob_client(container_name="output", blob_name = "nm")
###logging_loki.LokiHandler
loki = factory.loki_handler(tags={"application": "atol-connector"}, version="1")
loki.setLevel(logging.DEBUG)
_log_format = f"%(asctime)s - [%(levelname)s] - %(name)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s"
loki.setFormatter(logging.Formatter(_log_format))
logger = logging.getLogger('segments-api')
logger.addHandler(loki)
###GreenPlum Connection conn = factory.gp_connection() cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM raw_atol') rs = cursor.fetchone()
###AIOKafka
async def aiostart():
consumer = factory.aiokafka_consumer("test-atol1", auto_offset_reset='earliest', enable_auto_commit=False, )
producer = factory.aiokafka_producer()
await consumer.start()
await producer.start()
try:
future = await producer.send("test-atol", value={"ASYNC": "start"})
#record_metadata = await future
key = None
async for msg in consumer:
print(f"async key {key} msg = {msg}")
msg.value["consumer-producer"] = True
msg.value["ASYNC"] = True
print(f"async msg = {msg}")
future = await producer.send("test-atol1", value=msg.value)
finally:
await consumer.stop()
await producer.stop()\
ioloop = asyncio.get_event_loop()
tasks = [ioloop.create_task(aiostart())]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dvgroup_factory-0.0.60.tar.gz.
File metadata
- Download URL: dvgroup_factory-0.0.60.tar.gz
- Upload date:
- Size: 15.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3224b3a9ba62c51dcf1db65d700484ecebfab449972b843581104fda28df4d93
|
|
| MD5 |
1bc1fe77af48f1a1d4a262bfda4eedf9
|
|
| BLAKE2b-256 |
53c1fdbd610daa40325fd3b3f1078179f25a80cb9a590c37bb3f20c049cb3f50
|
File details
Details for the file dvgroup_factory-0.0.60-py3-none-any.whl.
File metadata
- Download URL: dvgroup_factory-0.0.60-py3-none-any.whl
- Upload date:
- Size: 15.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0d2b54f0b9b7b2e65f1fcf4c9ff5696986ce6bce36db991d9b0ee03941fd629d
|
|
| MD5 |
ee2403e46d3a2ac94c13984323b61981
|
|
| BLAKE2b-256 |
ba73e3a356742f529c2a68d62778a81d540a1a8609ca0f23ce99f70762a2748c
|