Python DSL for setting up Flask app CDC
Project description
How to use
from devlibx_avro_helper.month_data import MonthDataAvroHelper
input = "PAg4LTE23AEIOC0xN94BCDgtMTTYAQg4LTE12gEIOC0xOOABCDgtMTniAQg4LTMw"
"+AEIOC0zMfoBCDgtMTLUAQg4LTEz1gEIOC0xMNABCDgtMTHSAQY5LTH8AQY5LTL"
"+AQY5LTOAAgg4LTI38gEGOS00ggIGOC02yAEIOC0yOPQBBjgtN8oBCDgtMjXuAQY4LTjMAQg4LTI28AEGOC05zgEIOC0yOfYBCDgtMjDkAQg4LTIz6gEIOC0yNOwBCDgtMjHmAQg4LTIy6AEAEGhhcmlzaF8x "
helper = MonthDataAvroHelper()
result = helper.process(input)
print(result)
# Result
# {'days': {'8-16': 110, '8-17': 111, '8-14': 108, '8-15': 109, '8-18': 112, '8-19': 113, '8-30': 124, '8-31': 125, '8-12': 106, '8-13': 107, '8-10': 104, '8-11': 105, '9-1': 126, '9-2': 127, '9-3': 128, '8-27': 121, '9-4': 129, '8-6': 100, '8-28': 122, '8-7': 101, '8-25': 119, '8-8': 102, '8-26': 120, '8-9': 103, '8-29': 123, '8-20': 114, '8-23': 117, '8-24': 118, '8-21': 115, '8-22': 116}, 'entity_id': 'harish_1'}
Get last N days data
In this example we would have data in base 64 encoding. We will get last 29 nad 7 days data from.
# How to use - the full example is given below
from devlibx_avro_helper.month_data import MonthDataAvroHelper
helper = MonthDataAvroHelper()
result = helper.process_and_return_last_n_days_from_time(date_time_obj, base64Str, 30)
from devlibx_avro_helper.month_data import MonthDataAvroHelper
from datetime import datetime
def test_collect_data_for_n_days(self):
base64Str = "PAg4LTE23AEIOC0xN94BCDgtMTTYAQg4LTE12gEIOC0xOOABCDgtMTniAQg4LTMw"
"+AEIOC0zMfoBCDgtMTLUAQg4LTEz1gEIOC0xMNABCDgtMTHSAQY5LTH8AQY5LTL"
"+AQY5LTOAAgg4LTI38gEGOS00ggIGOC02yAEIOC0yOPQBBjgtN8oBCDgtMjXuAQY4LTjMAQg4LTI28AEGOC05zgEIOC0yOfYBCDgtMjDkAQg4LTIz6gEIOC0yNOwBCDgtMjHmAQg4LTIy6AEAEGhhcmlzaF8x "
date_time_str = '03/09/22 01:55:19'
date_time_obj = datetime.strptime(date_time_str, '%d/%m/%y %H:%M:%S')
helper = MonthDataAvroHelper()
result = helper.process_and_return_last_n_days_from_time(date_time_obj, base64Str, 30)
self.assertEqual(29, len(result))
print("result of test_collect_data_for_n_days 30 days", result)
# Result = [100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128]
result = helper.process_and_return_last_n_days_from_time(date_time_obj, base64Str, 7)
self.assertEqual(7, len(result))
print("result of test_collect_data_for_n_days 7 days", result)
# Result = [122, 123, 124, 125, 126, 127, 128]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for avro-helper-devlibx-0.0.4.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 624623747cf4660bbf26d064a8eae8cc050b4b781cfdc383251ad2467975f05a |
|
MD5 | 93a668eeda9c282b56ce49649640b64a |
|
BLAKE2b-256 | ed08e00891275322b47ffe3e7bf78a1010888e4248fdba6cb3ed18d86b4648e7 |
Close
Hashes for avro_helper_devlibx-0.0.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1bfca4a047bc220e3d44207c832d0f80332a35525fc6266c344bc66d6094ce0b |
|
MD5 | 85cce994258b77225a07276d856838ed |
|
BLAKE2b-256 | 198f0d9b62ec5991255e9d3b9779ea2fc247ae5de1b27e9c3dec5d38149f98f1 |