Python DSL for setting up Flask app CDC
Project description
How to use
from devlibx_avro_helper.month_data import MonthDataAvroHelper
input = "PAg4LTE23AEIOC0xN94BCDgtMTTYAQg4LTE12gEIOC0xOOABCDgtMTniAQg4LTMw"
"+AEIOC0zMfoBCDgtMTLUAQg4LTEz1gEIOC0xMNABCDgtMTHSAQY5LTH8AQY5LTL"
"+AQY5LTOAAgg4LTI38gEGOS00ggIGOC02yAEIOC0yOPQBBjgtN8oBCDgtMjXuAQY4LTjMAQg4LTI28AEGOC05zgEIOC0yOfYBCDgtMjDkAQg4LTIz6gEIOC0yNOwBCDgtMjHmAQg4LTIy6AEAEGhhcmlzaF8x "
helper = MonthDataAvroHelper()
result = helper.process(input)
print(result)
# Result
# {'days': {'8-16': 110, '8-17': 111, '8-14': 108, '8-15': 109, '8-18': 112, '8-19': 113, '8-30': 124, '8-31': 125, '8-12': 106, '8-13': 107, '8-10': 104, '8-11': 105, '9-1': 126, '9-2': 127, '9-3': 128, '8-27': 121, '9-4': 129, '8-6': 100, '8-28': 122, '8-7': 101, '8-25': 119, '8-8': 102, '8-26': 120, '8-9': 103, '8-29': 123, '8-20': 114, '8-23': 117, '8-24': 118, '8-21': 115, '8-22': 116}, 'entity_id': 'harish_1'}
Get last N days data
In this example we would have data in base 64 encoding. We will get last 29 nad 7 days data from.
# How to use - the full example is given below
from devlibx_avro_helper.month_data import MonthDataAvroHelper
helper = MonthDataAvroHelper()
result = helper.process_and_return_last_n_days_from_time(date_time_obj, base64Str, 30)
from devlibx_avro_helper.month_data import MonthDataAvroHelper
from datetime import datetime
def test_collect_data_for_n_days(self):
base64Str = "PAg4LTE23AEIOC0xN94BCDgtMTTYAQg4LTE12gEIOC0xOOABCDgtMTniAQg4LTMw"
"+AEIOC0zMfoBCDgtMTLUAQg4LTEz1gEIOC0xMNABCDgtMTHSAQY5LTH8AQY5LTL"
"+AQY5LTOAAgg4LTI38gEGOS00ggIGOC02yAEIOC0yOPQBBjgtN8oBCDgtMjXuAQY4LTjMAQg4LTI28AEGOC05zgEIOC0yOfYBCDgtMjDkAQg4LTIz6gEIOC0yNOwBCDgtMjHmAQg4LTIy6AEAEGhhcmlzaF8x "
date_time_str = '03/09/22 01:55:19'
date_time_obj = datetime.strptime(date_time_str, '%d/%m/%y %H:%M:%S')
helper = MonthDataAvroHelper()
result = helper.process_and_return_last_n_days_from_time(date_time_obj, base64Str, 30)
self.assertEqual(29, len(result))
print("result of test_collect_data_for_n_days 30 days", result)
# Result = [100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128]
result = helper.process_and_return_last_n_days_from_time(date_time_obj, base64Str, 7)
self.assertEqual(7, len(result))
print("result of test_collect_data_for_n_days 7 days", result)
# Result = [122, 123, 124, 125, 126, 127, 128]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for avro-helper-devlibx-0.0.5.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | eea90e91a9917d8c8c682471796a5dae06fd5930418fa61380058cc3491fd55a |
|
MD5 | d34008eeb0035bd943775888dfd2c8f3 |
|
BLAKE2b-256 | 77f9ec808abb64d204e83a3ac80a5ea7e4574d8ec1310aad62c93d14da40b43a |
Close
Hashes for avro_helper_devlibx-0.0.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ac930a98f2068dc62faf7629860664d5574200a70a20f6562146ec0074b6dd53 |
|
MD5 | 952a87cd007d37b2b19289223b8a8fb8 |
|
BLAKE2b-256 | 14256c6fca9bda9f6dd904adb57b095f53b63e5db8c75188ded65a0502cb91b8 |