Skip to main content

Bigdata Utility Code.

Project description

Bidata Utility

Install

pip install py-bigdate-util --upgrade

Usage

Config

读本地配置文件load配置

#!/usr/bin/env python3
# -*- coding=utf-8 -*-
import unittest

from bigdata_util.util import get_absolute_path
from bigdata_util.util import BaseConfig, Singleton

'''
# config.test_conf
a = a
b = 1
c = c
'''

class MyConfig(BaseConfig, metaclass=Singleton):
    def __init__(self, cfg: dict = None):
        """
        :type cfg: object
        """
        super(MyConfig, self).__init__(
            cfg,
            # config.conf 符合 ConfigObj 格式
            # https://configobj.readthedocs.io/en/latest/configobj.html#the-config-file-format
            get_absolute_path(__file__, '../config.test_conf')
        )
        pass


class ConfigTest(unittest.TestCase):

    def test_config(self):
        config = {
            'aa': 'aa',
            'bb': 2,
            'c': 'cc'
        }
        c = MyConfig(cfg=config)
        self.assertEqual(c.get('a', 'default'), 'a')
        self.assertEqual(c.get('aa', 'default'), 'aa')
        self.assertEqual(c.get('b', 'default'), 1)
        self.assertEqual(c.get('bb', 'default'), 2)
        self.assertEqual(c.get('c', 'default'), 'cc')
        pass

Connectors

MaxcomputeConnector功能点

  • odps_ins = MaxcomputeConnector(cfg.get('connector.odps')) 连接odps,可配置专网logview地址,同时支持http、socks代理配置
  • odps_ins.get_table_data 拉取表数据,并缓存在本目录 .connector_maxcompute_cache 中,监控last_modified_time决定是否更新
  • odps_ins.update_mr_jar 自动从本地java工程目录拿到jar包并上传到odps作为资源,返回资源名
  • odps_ins.update_udf 更新udf
  • odps_ins.run_sql_with_logview_return_plain_json 运行sql,打印logview,并返回结果
  • odps_ins.run_sql_in_file 运行.sql文件,支持文件中传入 ${变量} 或 ${类实例.属性}
  • odps_ins.create_ots_external_table 创建ots外表

odps和postgres配合使用

from bigdata_util.connector import MaxcomputeConnector, PostGreConnector
from .my_config import MyConfig

cfg = MyConfig()
odps = MaxcomputeConnector(cfg.get('connector.odps'))
postgre = PostGreConnector(cfg.get('connector.postgres'))

# fetch and cache data.
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
# 多次次读数据时,基于远程表的last_modified_time决定是否重新拉取
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
postgre.save_geometry_table('dual_point', data_list, col_name_wkt='wkt', geometry_type='point')

ots使用

from bigdata_util.connector import TableStoreConnector

ots_cli = TableStoreConnector(cfg.get('connector.ots'))
consumed, return_row, next_token = ots_cli.get_row('dual', [('id', 1)])

运行java mr

已在java代码中写好mr逻辑,然后在python中调用

define

#!/usr/bin/env python3
# -*- coding=utf-8 -*-

from bigdata_util.util import get_logger, get_absolute_path
from bigdata_util.connector import MaxComputeConnector
from .my_config import CustomConfig
from bigdata_util.mr_launcher import MapReduceLauncher
from typing import List


logger = get_logger(__file__)
cfg = CustomConfig()
odps_cst = MaxComputeConnector(cfg.get('connector.odps_cst'))


class CalculateDistanceOfPools(MapReduceLauncher):

    def init_mr_jar_base_path(self, mr_jar_base_path: str = None):
        return cfg.get('mr_jar_base_path')

    def init_project_base_path(self, mr_jar_base_path: str = None):
        return cfg.get('project_base_path')

    @staticmethod
    def init_odps_conf_file_name():
        return 'odps_cst_conf.ini'

    def init_mapper_class(self, mapper_class: str = None):
        return 'com.aliyun.citybrain.traffic.commonmapper.RawMapper'

    def init_reducer_class(self, reducer_class: str = None):
        return 'com.aliyun.citybrain.traffic.pooldistance.Reducer'

    def init_maxcompute_ins(self, maxcompute_ins: MaxComputeConnector = None):
        return odps_cst

    def init_mr_jar_name(self, mr_jar_name: str = None):
        return 'traffic-algo-mg-1.0-SNAPSHOT-jar-with-dependencies.jar'


    def init_system_parameters(self, system_parameters: dict = None):
        """
        只控制 下面这三个参数,默认值如下
          -splitSize 32
          -reduceCnt 100
          -reduceMem 4096
        :param system_parameters: {'splitSize': '16', 'resuceCnt': '900', 'reduceMem': 4096}
        :return: 可以为空
        """
        return {
            'reduceCnt': 900
        }

    def init_ddl_file_path(self, ddl_file_path: str = None):
        return os.path.abspath(os.path.join(
            os.path.split(os.path.realpath(__file__))[0],
            'step_3_pool_clustering_euclidean_distance_between_pool_mr.osql'
        ))

    def run(self):
        self.launch()


if __name__ == '__main__':
    CalculateDistanceOfPools().run()

use

        from . import CalculateDistanceOfPools
        mr = CalculateDistanceOfPools()
        mr.set_init_parameters({
            'mapper.key': 'pool_id:string',
            'mapper.value': 'pool_id:string,sub_polygon:string',
            'geometry_col_name': 'sub_polygon',
            'geometry_gen_type': 'wkt',
            'input_table': TableInfo('algtmp_tfc_vhcpool_info', partition=cfg.COMMON_PARTITION_WITH_DT),
            'output_table': TableInfo('algtmp_rltn_euclidean_distance_pool_pool', partition=cfg.COMMON_PARTITION_WITH_DT),
            'overlap_length': 1,       # 至少50米交接
            'overlap_ratio': 0.1,
            'threshold.MAX_DISTANCE': 0
        })
        mr.launch()

# 打印mr运行的logview...

Plot

from . import MyConfig
from bigdata_util.plot import PlotLine
from bigdata_util.connector import MaxcomputeConnector

cfg = MyConfig()
plot_line = PlotLine(MaxcomputeConnector(cfg.get('connector.odps')))
plot_line.plot_line('''
    select
      stat_time x,
      value y,
      name label
    from dual
''')

fig

connectors依赖

odps

pip install pyodps==0.9.1

hive

pip install impyla==0.16.3
pip install thrift_sasl==0.4.2 (mac安装请参考: https://github.com/albin3/book-notes/issues/2)

hive kerberos

安装依赖:

mac: brew install krb5 linux: apt install -y krb5-user

pip install bit_array
pip install thrift
pip install thrift_sasl
pip install impyla
pip install krbcontext
pip install hdfs[kerberos] -i https://mirrors.aliyun.com/pypi/simple
pip install pykerberos -i https://mirrors.aliyun.com/pypi/simple

kafka

pip install kafka-python==2.0.1

pg with proxy

pip install asyncpg

usage:

from bigdata_util.connector.postgre_async import PostgreAsyncConnector

table_name = 'algtmp_connector_test_pg'
pg_conn = PostgreAsyncConnector(
    'postgresql://<user_name>:<passwd>@<host_name>/<data_base>',
    'socks5://<host_name>:<port>'
)
pg_conn.execute_sql(f'''
    create table if not exists {table_name} (
        a varchar,
        b varchar,
        c varchar,
        d varchar,
        PRIMARY KEY (a, b, c, d)
    )
''')
pg_conn.save_data(table_name, [
    {'a': 'a1', 'b': 'b1', 'c': 'c2', 'd': 'd1'}
])
data_list = pg_conn.run_sql_return_plain_json(f'''
    select * from {table_name}
''')
pg_conn.drop_table(table_name)

mysql with proxy

暂时只支持查询,不支持写数据

pip install PyMySQL

usage:

from bigdata_util.connector.mysql import MysqlConnector

mysql_conn = MysqlConnector(
    host='localhost',
    port=3306,
    user='root',
    password='password',
    database='default',
    proxy='socks5://<host_name>:<port>',
)
data_list = mysql_conn.run_sql_return_plain_json('''
    show tables;
''')

install in p36

安装前可能需要先手动安装 numpy pandas shapely

> pip install numpy pandas shapely
> pip install py-bigdate-util

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py-bidata-util-0.1.1.tar.gz (43.5 kB view hashes)

Uploaded Source

Built Distribution

py_bidata_util-0.1.1-py3-none-any.whl (55.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page