Bigdata Utility Code.
Project description
Bidata Utility
Install
pip install py-bigdate-util --upgrade
Usage
Config
读本地配置文件load配置
#!/usr/bin/env python3
# -*- coding=utf-8 -*-
import unittest
from bigdata_util.util import get_absolute_path
from bigdata_util.util import BaseConfig, Singleton
'''
# config.test_conf
a = a
b = 1
c = c
'''
class MyConfig(BaseConfig, metaclass=Singleton):
def __init__(self, cfg: dict = None):
"""
:type cfg: object
"""
super(MyConfig, self).__init__(
cfg,
# config.conf 符合 ConfigObj 格式
# https://configobj.readthedocs.io/en/latest/configobj.html#the-config-file-format
get_absolute_path(__file__, '../config.test_conf')
)
pass
class ConfigTest(unittest.TestCase):
def test_config(self):
config = {
'aa': 'aa',
'bb': 2,
'c': 'cc'
}
c = MyConfig(cfg=config)
self.assertEqual(c.get('a', 'default'), 'a')
self.assertEqual(c.get('aa', 'default'), 'aa')
self.assertEqual(c.get('b', 'default'), 1)
self.assertEqual(c.get('bb', 'default'), 2)
self.assertEqual(c.get('c', 'default'), 'cc')
pass
Connectors
MaxcomputeConnector功能点
- odps_ins = MaxcomputeConnector(cfg.get('connector.odps')) 连接odps,可配置专网logview地址,同时支持http、socks代理配置
- odps_ins.get_table_data 拉取表数据,并缓存在本目录 .connector_maxcompute_cache 中,监控last_modified_time决定是否更新
- odps_ins.update_mr_jar 自动从本地java工程目录拿到jar包并上传到odps作为资源,返回资源名
- odps_ins.update_udf 更新udf
- odps_ins.run_sql_with_logview_return_plain_json 运行sql,打印logview,并返回结果
- odps_ins.run_sql_in_file 运行.sql文件,支持文件中传入 ${变量} 或 ${类实例.属性}
- odps_ins.create_ots_external_table 创建ots外表
odps和postgres配合使用
from bigdata_util.connector import MaxcomputeConnector, PostGreConnector
from .my_config import MyConfig
cfg = MyConfig()
odps = MaxcomputeConnector(cfg.get('connector.odps'))
postgre = PostGreConnector(cfg.get('connector.postgres'))
# fetch and cache data.
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
# 多次次读数据时,基于远程表的last_modified_time决定是否重新拉取
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
postgre.save_geometry_table('dual_point', data_list, col_name_wkt='wkt', geometry_type='point')
ots使用
from bigdata_util.connector import TableStoreConnector
ots_cli = TableStoreConnector(cfg.get('connector.ots'))
consumed, return_row, next_token = ots_cli.get_row('dual', [('id', 1)])
运行java mr
已在java代码中写好mr逻辑,然后在python中调用
define
#!/usr/bin/env python3
# -*- coding=utf-8 -*-
from bigdata_util.util import get_logger, get_absolute_path
from bigdata_util.connector import MaxComputeConnector
from .my_config import CustomConfig
from bigdata_util.mr_launcher import MapReduceLauncher
from typing import List
logger = get_logger(__file__)
cfg = CustomConfig()
odps_cst = MaxComputeConnector(cfg.get('connector.odps_cst'))
class CalculateDistanceOfPools(MapReduceLauncher):
def init_mr_jar_base_path(self, mr_jar_base_path: str = None):
return cfg.get('mr_jar_base_path')
def init_project_base_path(self, mr_jar_base_path: str = None):
return cfg.get('project_base_path')
@staticmethod
def init_odps_conf_file_name():
return 'odps_cst_conf.ini'
def init_mapper_class(self, mapper_class: str = None):
return 'com.aliyun.citybrain.traffic.commonmapper.RawMapper'
def init_reducer_class(self, reducer_class: str = None):
return 'com.aliyun.citybrain.traffic.pooldistance.Reducer'
def init_maxcompute_ins(self, maxcompute_ins: MaxComputeConnector = None):
return odps_cst
def init_mr_jar_name(self, mr_jar_name: str = None):
return 'traffic-algo-mg-1.0-SNAPSHOT-jar-with-dependencies.jar'
def init_system_parameters(self, system_parameters: dict = None):
"""
只控制 下面这三个参数,默认值如下
-splitSize 32
-reduceCnt 100
-reduceMem 4096
:param system_parameters: {'splitSize': '16', 'resuceCnt': '900', 'reduceMem': 4096}
:return: 可以为空
"""
return {
'reduceCnt': 900
}
def init_ddl_file_path(self, ddl_file_path: str = None):
return os.path.abspath(os.path.join(
os.path.split(os.path.realpath(__file__))[0],
'step_3_pool_clustering_euclidean_distance_between_pool_mr.osql'
))
def run(self):
self.launch()
if __name__ == '__main__':
CalculateDistanceOfPools().run()
use
from . import CalculateDistanceOfPools
mr = CalculateDistanceOfPools()
mr.set_init_parameters({
'mapper.key': 'pool_id:string',
'mapper.value': 'pool_id:string,sub_polygon:string',
'geometry_col_name': 'sub_polygon',
'geometry_gen_type': 'wkt',
'input_table': TableInfo('algtmp_tfc_vhcpool_info', partition=cfg.COMMON_PARTITION_WITH_DT),
'output_table': TableInfo('algtmp_rltn_euclidean_distance_pool_pool', partition=cfg.COMMON_PARTITION_WITH_DT),
'overlap_length': 1, # 至少50米交接
'overlap_ratio': 0.1,
'threshold.MAX_DISTANCE': 0
})
mr.launch()
# 打印mr运行的logview...
Plot
from . import MyConfig
from bigdata_util.plot import PlotLine
from bigdata_util.connector import MaxcomputeConnector
cfg = MyConfig()
plot_line = PlotLine(MaxcomputeConnector(cfg.get('connector.odps')))
plot_line.plot_line('''
select
stat_time x,
value y,
name label
from dual
''')
connectors依赖
odps
pip install pyodps==0.9.1
hive
pip install impyla==0.16.3
pip install thrift_sasl==0.4.2 (mac安装请参考: https://github.com/albin3/book-notes/issues/2)
hive kerberos
安装依赖:
mac: brew install krb5
linux: apt install -y krb5-user
pip install bit_array
pip install thrift
pip install thrift_sasl
pip install impyla
pip install krbcontext
pip install hdfs[kerberos] -i https://mirrors.aliyun.com/pypi/simple
pip install pykerberos -i https://mirrors.aliyun.com/pypi/simple
kafka
pip install kafka-python==2.0.1
pg with proxy
pip install asyncpg
usage:
from bigdata_util.connector.postgre_async import PostgreAsyncConnector
table_name = 'algtmp_connector_test_pg'
pg_conn = PostgreAsyncConnector(
'postgresql://<user_name>:<passwd>@<host_name>/<data_base>',
'socks5://<host_name>:<port>'
)
pg_conn.execute_sql(f'''
create table if not exists {table_name} (
a varchar,
b varchar,
c varchar,
d varchar,
PRIMARY KEY (a, b, c, d)
)
''')
pg_conn.save_data(table_name, [
{'a': 'a1', 'b': 'b1', 'c': 'c2', 'd': 'd1'}
])
data_list = pg_conn.run_sql_return_plain_json(f'''
select * from {table_name}
''')
pg_conn.drop_table(table_name)
mysql with proxy
暂时只支持查询,不支持写数据
pip install PyMySQL
usage:
from bigdata_util.connector.mysql import MysqlConnector
mysql_conn = MysqlConnector(
host='localhost',
port=3306,
user='root',
password='password',
database='default',
proxy='socks5://<host_name>:<port>',
)
data_list = mysql_conn.run_sql_return_plain_json('''
show tables;
''')
install in p36
安装前可能需要先手动安装 numpy pandas shapely
> pip install numpy pandas shapely
> pip install py-bigdate-util
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
py-bidata-util-0.1.1.tar.gz
(43.5 kB
view hashes)
Built Distribution
Close
Hashes for py_bidata_util-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 67b6d3ae84dc0463d67ec7b7ba989e13be84a38d204a1b9f4246a8b5e55ccd0f |
|
MD5 | dd1d94c5640056fed4a92e47152970f8 |
|
BLAKE2b-256 | 72bd5864808ab86b7330b4ade46085919aa4239974378036d895e456ddf3c4fc |