Skip to main content

Bigdata Utility Code.

Project description

Bidata Utility

Install

pip install py-bigdate-util --upgrade

Usage

Config

读本地配置文件load配置

#!/usr/bin/env python3
# -*- coding=utf-8 -*-
import unittest

from bigdata_util.util import get_absolute_path
from bigdata_util.util import BaseConfig, Singleton

'''
# config.test_conf
a = a
b = 1
c = c
'''

class MyConfig(BaseConfig, metaclass=Singleton):
    def __init__(self, cfg: dict = None):
        """
        :type cfg: object
        """
        super(MyConfig, self).__init__(
            cfg,
            # config.conf 符合 ConfigObj 格式
            # https://configobj.readthedocs.io/en/latest/configobj.html#the-config-file-format
            get_absolute_path(__file__, '../config.test_conf')
        )
        pass


class ConfigTest(unittest.TestCase):

    def test_config(self):
        config = {
            'aa': 'aa',
            'bb': 2,
            'c': 'cc'
        }
        c = MyConfig(cfg=config)
        self.assertEqual(c.get('a', 'default'), 'a')
        self.assertEqual(c.get('aa', 'default'), 'aa')
        self.assertEqual(c.get('b', 'default'), 1)
        self.assertEqual(c.get('bb', 'default'), 2)
        self.assertEqual(c.get('c', 'default'), 'cc')
        pass

Connectors

MaxcomputeConnector功能点

  • odps_ins = MaxcomputeConnector(cfg.get('connector.odps')) 连接odps,可配置专网logview地址,同时支持http、socks代理配置
  • odps_ins.get_table_data 拉取表数据,并缓存在本目录 .connector_maxcompute_cache 中,监控last_modified_time决定是否更新
  • odps_ins.update_mr_jar 自动从本地java工程目录拿到jar包并上传到odps作为资源,返回资源名
  • odps_ins.update_udf 更新udf
  • odps_ins.run_sql_with_logview_return_plain_json 运行sql,打印logview,并返回结果
  • odps_ins.run_sql_in_file 运行.sql文件,支持文件中传入 ${变量} 或 ${类实例.属性}
  • odps_ins.create_ots_external_table 创建ots外表

odps和postgres配合使用

from bigdata_util.connector import MaxcomputeConnector, PostGreConnector
from .my_config import MyConfig

cfg = MyConfig()
odps = MaxcomputeConnector(cfg.get('connector.odps'))
postgre = PostGreConnector(cfg.get('connector.postgres'))

# fetch and cache data.
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
# 多次次读数据时,基于远程表的last_modified_time决定是否重新拉取
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
postgre.save_geometry_table('dual_point', data_list, col_name_wkt='wkt', geometry_type='point')

ots使用

from bigdata_util.connector import TableStoreConnector

ots_cli = TableStoreConnector(cfg.get('connector.ots'))
consumed, return_row, next_token = ots_cli.get_row('dual', [('id', 1)])

运行java mr

已在java代码中写好mr逻辑,然后在python中调用

define

#!/usr/bin/env python3
# -*- coding=utf-8 -*-

from bigdata_util.util import get_logger, get_absolute_path
from bigdata_util.connector import MaxComputeConnector
from .my_config import CustomConfig
from bigdata_util.mr_launcher import MapReduceLauncher
from typing import List


logger = get_logger(__file__)
cfg = CustomConfig()
odps_cst = MaxComputeConnector(cfg.get('connector.odps_cst'))


class CalculateDistanceOfPools(MapReduceLauncher):

    def init_mr_jar_base_path(self, mr_jar_base_path: str = None):
        return cfg.get('mr_jar_base_path')

    def init_project_base_path(self, mr_jar_base_path: str = None):
        return cfg.get('project_base_path')

    @staticmethod
    def init_odps_conf_file_name():
        return 'odps_cst_conf.ini'

    def init_mapper_class(self, mapper_class: str = None):
        return 'com.aliyun.citybrain.traffic.commonmapper.RawMapper'

    def init_reducer_class(self, reducer_class: str = None):
        return 'com.aliyun.citybrain.traffic.pooldistance.Reducer'

    def init_maxcompute_ins(self, maxcompute_ins: MaxComputeConnector = None):
        return odps_cst

    def init_mr_jar_name(self, mr_jar_name: str = None):
        return 'traffic-algo-mg-1.0-SNAPSHOT-jar-with-dependencies.jar'


    def init_system_parameters(self, system_parameters: dict = None):
        """
        只控制 下面这三个参数,默认值如下
          -splitSize 32
          -reduceCnt 100
          -reduceMem 4096
        :param system_parameters: {'splitSize': '16', 'resuceCnt': '900', 'reduceMem': 4096}
        :return: 可以为空
        """
        return {
            'reduceCnt': 900
        }

    def init_ddl_file_path(self, ddl_file_path: str = None):
        return os.path.abspath(os.path.join(
            os.path.split(os.path.realpath(__file__))[0],
            'step_3_pool_clustering_euclidean_distance_between_pool_mr.osql'
        ))

    def run(self):
        self.launch()


if __name__ == '__main__':
    CalculateDistanceOfPools().run()

use

        from . import CalculateDistanceOfPools
        mr = CalculateDistanceOfPools()
        mr.set_init_parameters({
            'mapper.key': 'pool_id:string',
            'mapper.value': 'pool_id:string,sub_polygon:string',
            'geometry_col_name': 'sub_polygon',
            'geometry_gen_type': 'wkt',
            'input_table': TableInfo('algtmp_tfc_vhcpool_info', partition=cfg.COMMON_PARTITION_WITH_DT),
            'output_table': TableInfo('algtmp_rltn_euclidean_distance_pool_pool', partition=cfg.COMMON_PARTITION_WITH_DT),
            'overlap_length': 1,       # 至少50米交接
            'overlap_ratio': 0.1,
            'threshold.MAX_DISTANCE': 0
        })
        mr.launch()

# 打印mr运行的logview...

Plot

from . import MyConfig
from bigdata_util.plot import PlotLine
from bigdata_util.connector import MaxcomputeConnector

cfg = MyConfig()
plot_line = PlotLine(MaxcomputeConnector(cfg.get('connector.odps')))
plot_line.plot_line('''
    select
      stat_time x,
      value y,
      name label
    from dual
''')

fig

connectors依赖

odps

pip install pyodps==0.9.1

hive

pip install impyla==0.16.3
pip install thrift_sasl==0.4.2 (mac安装请参考: https://github.com/albin3/book-notes/issues/2)

hive kerberos

安装依赖:

mac: brew install krb5 linux: apt install -y krb5-user

pip install bit_array
pip install thrift
pip install thrift_sasl
pip install impyla
pip install krbcontext
pip install hdfs[kerberos] -i https://mirrors.aliyun.com/pypi/simple
pip install pykerberos -i https://mirrors.aliyun.com/pypi/simple

kafka

pip install kafka-python==2.0.1

pg with proxy

pip install asyncpg

usage:

from bigdata_util.connector.postgre_async import PostgreAsyncConnector

table_name = 'algtmp_connector_test_pg'
pg_conn = PostgreAsyncConnector(
    'postgresql://<user_name>:<passwd>@<host_name>/<data_base>',
    'socks5://<host_name>:<port>'
)
pg_conn.execute_sql(f'''
    create table if not exists {table_name} (
        a varchar,
        b varchar,
        c varchar,
        d varchar,
        PRIMARY KEY (a, b, c, d)
    )
''')
pg_conn.save_data(table_name, [
    {'a': 'a1', 'b': 'b1', 'c': 'c2', 'd': 'd1'}
])
data_list = pg_conn.run_sql_return_plain_json(f'''
    select * from {table_name}
''')
pg_conn.drop_table(table_name)

mysql with proxy

暂时只支持查询,不支持写数据

pip install PyMySQL

usage:

from bigdata_util.connector.mysql import MysqlConnector

mysql_conn = MysqlConnector(
    host='localhost',
    port=3306,
    user='root',
    password='password',
    database='default',
    proxy='socks5://<host_name>:<port>',
)
data_list = mysql_conn.run_sql_return_plain_json('''
    show tables;
''')

install in p36

安装前可能需要先手动安装 numpy pandas shapely

> pip install numpy pandas shapely
> pip install py-bigdate-util

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py-bigdata-util-0.1.0.tar.gz (43.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_bigdata_util-0.1.0-py3-none-any.whl (55.6 kB view details)

Uploaded Python 3

File details

Details for the file py-bigdata-util-0.1.0.tar.gz.

File metadata

  • Download URL: py-bigdata-util-0.1.0.tar.gz
  • Upload date:
  • Size: 43.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.9

File hashes

Hashes for py-bigdata-util-0.1.0.tar.gz
Algorithm Hash digest
SHA256 a66dff08da0efc0d1bb209739ea1d0da653a9d0f6bb1d6d8e7162ce428b97e46
MD5 a3680c4b737574d0320f6e7d4692e2e2
BLAKE2b-256 dba8c672525c9aa009a705cd33a56e45dc414b0d8a7ec44a3705822d68921266

See more details on using hashes here.

File details

Details for the file py_bigdata_util-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for py_bigdata_util-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 98a65943add752ee0c442ef6cb0238e2c2637ea75a4614ce498d984bf4a4449a
MD5 45141ba56772b4d41e88a7c9e6679c94
BLAKE2b-256 1c148f867212d56e680349a42f6d6b22b58101e5bf97fb8c2041a68e88af3491

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page