Skip to main content

No project description provided

Project description

./docs/upload/images/coverage.svg

树融数据融合平台数据上传基础模块,作为是数据源和目标表的连接器,实施团队用于编写上传脚本,存储平台用于调用上传脚本。

docs/uploads/images/uploads工作流程.png

install

download from fusionflow.upload or pip install directly:

pip install git+https://codeup.teambition.com/fusiontree/fusionplatform/fusionflow.upload.git

configure

script level configure

some data-realted configuration must be specified to ensure data transformed correctly for you bussiness logic like “data region” related which timezone used to when save date information. table object will use these configs when transform data from src to dest.

global variables following pattern __upload_{name}__ consider as configure settings, fusionflow.upload.SrcTables will fill up these configurations to tables found in scripts:

__upload_timezone__ = 'GMT+08'

instance and subclass level configs instance and subclass also can override these settings by pass parameters directly through instantiation and subclass attribute. like other required parameters:

SubTable(Table):
    timezone = 'GMT+08'


table = Table(timezone='GMT+08')

usage

1. write source scripts

there are two ways to write source table extract scripts

  1. by subclass fusionflow.upload.Table:

    from fusionflow.upload import Table
    
    class Employ(Table):
        def extract(self):
            yield [{"colname": "colvalue"}, ...]
    
    ...
  2. by instantiation package-include subclass:

    python
    from datetime import *
    from fusionflwo.upload.network import ApiTable
    
    def parse_api_data(data):
        rows = data["dataList"]
        {'cioStatus': 2, 'inTime': '2020-02-24 00:00:22', 'outTime': '2020-02-24 00:23:00', 'plateNo': '沪BT2741', 'recordId': '33930092'}
        for row in rows:
            d = {}
            for key, value in row.items():
                if key in ("inTime", "outTime"):
                    d[key] = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
                elif key == "recordId":
                    d[key] = int(value)
                else:
                    d[key] = value
            yield d
    parkcar_table = ApiTable(uri, table_name='parkcar_table', params={"bizDate": (lambda : (date.today() - timedelta(days=1)).strftime("%Y%m%d"))}, parser=parse_api_data)

2. parse source scripts

get table objects through scripts:

from fusionflow.upload import SrcTables

src_tables = SrcTable("path/to/src/script")
print(src_tables.tables)

extract data from table:

table1 = src_tables.tables[0]

# get table1 field type
fields = table1.fields

# extract data
# table object is iterator, can use in for loop
for row in table1:
    print(row)

Django Itergration

Table class also provide a attribute django_fields to convert field type to django.db.models.fields.Field, to use this feature, use must install django moduel manually, or through:

git clone git@codeup.teambition.com:fusiontree/fusionplatform/fusionflow.upload.git
pip install .[db]

the django_feilds can access from:

from fusionflow.upload.db.oracle import OraceDatabaseTable

table = OracleDatabaseTable(username=user,password=passwd, host='localhost')
print(table.django_fields)

3. validation

tables can add validator to check input value is business or logical leagal , validator can added through subclass parameter validators, instantiation parameter validators, or register through table.registe_validator,

must register all validators before fetch any data

currect support two types of validator:

  1. row-leve validator (not implemented currently)

  2. field value validator

example

subclass method:

class FileTable(Table):
    def extract(self):
        for data in emp_table():
            yield data

    def validate_empno(self, value):
        assert int(value) < 7800, value

register in run time:

# at instantiation
def emp_ge_7600(value):
    assert int(value) > 7600
f = FileTable(validate_empno=[emp_ge_7600])

# through api
validator_str = """def empno_is_digit(value):
   assert value.isdigit()
"""
f.registe_validator('empno', validator_str)

```

test

use pytest test after clone this repo, run:

./test

test also itegrate with setuptools, afte git clone source run:

python setup.py test

support format

there types of easy-used subtable class provided, directly instantiate them

  • DatabaseTable – RDMS table source

  • ApiTable – application programming iterface, load from remote

  • FileTable – excel, csv, tsv, xml like file

contact

logan

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fusionflow_upload-0.1.5-py3-none-any.whl (23.0 kB view details)

Uploaded Python 3

File details

Details for the file fusionflow_upload-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: fusionflow_upload-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 23.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.10.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.6.9

File hashes

Hashes for fusionflow_upload-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 4be23a6c8a01947142864e4db91cffcbf9a4b4350c2d57667beb9e41c14f35fa
MD5 05b13aeaf5857615d3ee7b264de1e8e4
BLAKE2b-256 ad3422f803ce421a8f61eab6a9e94ccfc8b0576d522c869ca1a1fecf81e3969c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page