Skip to main content

No project description provided

Project description

./docs/upload/images/coverage.svg

树融数据融合平台数据上传基础模块,作为是数据源和目标表的连接器,实施团队用于编写上传脚本,存储平台用于调用上传脚本。

docs/uploads/images/uploads工作流程.png

install

download from fusionflow.upload or pip install directly:

pip install git+https://codeup.teambition.com/fusiontree/fusionplatform/fusionflow.upload.git

configure

script level configure

some data-realted configuration must be specified to ensure data transformed correctly for you bussiness logic like “data region” related which timezone used to when save date information. table object will use these configs when transform data from src to dest.

global variables following pattern __upload_{name}__ consider as configure settings, fusionflow.upload.SrcTables will fill up these configurations to tables found in scripts:

__upload_timezone__ = 'GMT+08'

instance and subclass level configs instance and subclass also can override these settings by pass parameters directly through instantiation and subclass attribute. like other required parameters:

SubTable(Table):
    timezone = 'GMT+08'


table = Table(timezone='GMT+08')

Caution!

subclass can directly interpret these settings on their declared module, however instance can use module level configurations themself.

subclass declaration must be global variables

usage

1. write source scripts

there are two ways to write source table extract scripts

  1. by subclass fusionflow.upload.Table:

    from fusionflow.upload import Table
    
    class Employ(Table):
        def extract(self):
            yield [{"colname": "colvalue"}, ...]
    
    ...
    
  2. by instantiation package-include subclass:

    python
    from datetime import *
    from fusionflwo.upload.network import ApiTable
    
    def parse_api_data(data):
        rows = data["dataList"]
        {'cioStatus': 2, 'inTime': '2020-02-24 00:00:22', 'outTime': '2020-02-24 00:23:00', 'plateNo': '沪BT2741', 'recordId': '33930092'}
        for row in rows:
            d = {}
            for key, value in row.items():
                if key in ("inTime", "outTime"):
                    d[key] = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
                elif key == "recordId":
                    d[key] = int(value)
                else:
                    d[key] = value
            yield d
    parkcar_table = ApiTable(uri, table_name='parkcar_table', params={"bizDate": (lambda : (date.today() - timedelta(days=1)).strftime("%Y%m%d"))}, parser=parse_api_data)
    

2. parse source scripts

get table objects through scripts:

from fusionflow.upload import SrcTables

src_tables = SrcTable("path/to/src/script")
print(src_tables.tables)

extract data from table:

table1 = src_tables.tables[0]

# get table1 field type
fields = table1.fields

# extract data
# table object is iterator, can use in for loop
for row in table1:
    print(row)

Django Itergration

Table class also provide a attribute django_fields to convert field type to django.db.models.fields.Field, to use this feature, use must install django moduel manually, or through:

git clone git@codeup.teambition.com:fusiontree/fusionplatform/fusionflow.upload.git
pip install .[db]

the django_feilds can access from:

from fusionflow.upload.db.oracle import OraceDatabaseTable

table = OracleDatabaseTable(username=user,password=passwd, host='localhost')
print(table.django_fields)

3. validation

tables can add validator to check input value is business or logical leagal , validator can added through subclass parameter validators, instantiation parameter validators, or register through table.registe_validator,

must register all validators before fetch any data

currect support two types of validator:

  1. row-leve validator (not implemented currently)
  2. field value validator

example

subclass method:

class FileTable(Table):
    def extract(self):
        for data in emp_table():
            yield data

    def validate_empno(self, value):
        assert int(value) < 7800, value

register in run time:

# at instantiation
def emp_ge_7600(value):
    assert int(value) > 7600
f = FileTable(validate_empno=[emp_ge_7600])

# through api
validator_str = """def empno_is_digit(value):
   assert value.isdigit()
"""
f.registe_validator('empno', validator_str)

```

test

use pytest test after clone this repo, run:

./test

test also itegrate with setuptools, afte git clone source run:

python setup.py test

support format

there types of easy-used subtable class provided, directly instantiate them

  • DatabaseTable – RDMS table source
  • ApiTable – application programming iterface, load from remote
  • FileTable – excel, csv, tsv, xml like file

contact

logan

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for fusionflow-upload, version 0.1.5
Filename, size File type Python version Upload date Hashes
Filename, size fusionflow_upload-0.1.5-py3-none-any.whl (23.0 kB) File type Wheel Python version py3 Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page