Python DSL for setting up Flask app CDC
Project description
How to use
Install pip install avro-helper-devlibx
Quick example
from devlibx_avro_helper.month_data import MonthDataAvroHelper
base64Str = "BgY3LTMCBjYtNgIGNy01BAAAAAI="
helper = MonthDataAvroHelper()
result = helper.process(base64Str)
print(result)
# Result
# {'days': {'8-16': 110, '8-17': 111, '8-14': 108, '8-15': 109, '8-18': 112, '8-19': 113, '8-30': 124, '8-31': 125, '8-12': 106, '8-13': 107, '8-10': 104, '8-11': 105, '9-1': 126, '9-2': 127, '9-3': 128, '8-27': 121, '9-4': 129, '8-6': 100, '8-28': 122, '8-7': 101, '8-25': 119, '8-8': 102, '8-26': 120, '8-9': 103, '8-29': 123, '8-20': 114, '8-23': 117, '8-24': 118, '8-21': 115, '8-22': 116}, 'entity_id': 'harish_1'}
Get data for this month
In this example we would have data in base 64 encoding. We will get aggregated data for this month
from devlibx_avro_helper.month_data import MonthDataAvroHelper
from datetime import datetime
def test_process_and_return_aggregation_for_month(self):
base64Str = "AgoGNy0xAgY3LTICBjctMwIGNy00AgY3LTUKAAAAAAI="
helper = MonthDataAvroHelper()
result = helper.process(base64Str)
print(result)
# Output = {'days': {'7-1': 1, '7-2': 1, '7-3': 1, '7-4': 1, '7-5': 5}, 'days_str': None, 'entity_id': None, 'sub_entity_id': None, 'version': 1}
date_time_str = '05/07/22 01:55:19'
date_time_obj = datetime.strptime(date_time_str, '%d/%m/%y %H:%M:%S')
# if you are looking for data for this month then use can use
# helper.process_and_return_aggregation_for_this_month(base64Str)
result = helper.process_and_return_aggregation_for_month(date_time_obj, base64Str)
self.assertEqual(9, result, "result should be 9")
# Output = 9
Get data for this week (last 7 days)
from devlibx_avro_helper.month_data import MonthDataAvroHelper
from datetime import datetime
def test_process_and_return_aggregation_for_week(self):
# Test 1 - data from generateDataFor_test_parsing_Test_2
base64Str = "Ag4INi0yOQIGNy0xAgY3LTICCDYtMzACBjctMwIGNy00AgY3LTUKAAAAAAI=="
helper = MonthDataAvroHelper()
result = helper.process(base64Str)
print(result)
# Output = {'days': {'6-29': 1, '7-1': 1, '7-2': 1, '6-30': 1, '7-3': 1, '7-4': 1, '7-5': 5}, 'days_str': None, 'entity_id': None, 'sub_entity_id': None, 'version': 1}
date_time_str = '05/07/22 01:55:19'
date_time_obj = datetime.strptime(date_time_str, '%d/%m/%y %H:%M:%S')
# if you are looking for data for this month then use can use
# helper.process_and_return_aggregation_for_this_week(base64Str)
result = helper.process_and_return_aggregation_for_week(date_time_obj, base64Str)
self.assertEqual(11, result, "result should be 9")
# Output = 11
Get data for this week (last 7 days)
from devlibx_avro_helper.month_data import MonthDataAvroHelper
from datetime import datetime
def test_process_and_return_for_day(self):
# Test 1 - data from generateDataFor_test_parsing_Test_2
base64Str = "Ag4INi0yOQIGNy0xAgY3LTICCDYtMzACBjctMwIGNy00AgY3LTUKAAAAAAI=="
helper = MonthDataAvroHelper()
result = helper.process(base64Str)
print(result)
# Output = {'days': {'6-29': 1, '7-1': 1, '7-2': 1, '6-30': 1, '7-3': 1, '7-4': 1, '7-5': 5}, 'days_str': None, 'entity_id': None, 'sub_entity_id': None, 'version': 1}
date_time_str = '05/07/22 01:55:19'
date_time_obj = datetime.strptime(date_time_str, '%d/%m/%y %H:%M:%S')
# if you are looking for data for this month then use can use
# helper.process_and_return_aggregation_for_this_month(base64Str)
result = helper.process_and_return_for_today(date_time_obj, base64Str)
self.assertEqual(5, result, "result should be 9")
# Output = 5
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for avro-helper-devlibx-0.0.10.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 93c365c3f97660ea789ac3541a90a2af13c1110f3230d0d8a552bb82bbef12d6 |
|
MD5 | 03a003b25b8c8f17be540a5dc86b1faa |
|
BLAKE2b-256 | 9c652e0314be29d3d202d5a4a30e78871e7299cd1c9b67eaced0d899ae7c06e8 |
Close
Hashes for avro_helper_devlibx-0.0.10-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | cc3bb24f8b20945f7681588b93b92c4ee2a46a9406b4fb39faa51b6853f04439 |
|
MD5 | ea9917facd707ba0753625619ad70625 |
|
BLAKE2b-256 | 27f232d7d254a70277596c7d85ace50447b9a49a53f5d087aef8b59ef214d792 |