Skip to main content

数据智脑开源计算引擎dataprime

Project description

数据智脑计算引擎dataprime

dataprime是一款数据计算引擎,提供了数据钻取、过滤、生成、采样、聚合、排序等功能,输入dataframe对象,调用不同方法,即可返回相应的结果, 并且可以结合celery进行异步调用 ​

一 快速开始

  • 安装

pip install dataprime

  • 使用
import json

from dataprime.dataprime import DataPrime
import pandas as pd

# 准备数据
data = [
        {"name": "test1", "time": "2021-01-05", "score": 10},
        {"name": "test2", "time": "2021-01-12", "score": 20},
        {"name": "test3", "time": "2021-01-24", "score": 30}
]

# 过滤参数
filter_list = [
    {
        "column": "score",
        "dtype": "int",
        "filter": {
            "condition": "greater_than",
            "value": "10",
            "max": 0,
            "min": 0
        }
    }
]

data_frame = pd.read_json(json.dumps(data))

# 初始化一个DataPrime对象
dp = DataPrime(dataframe=data_frame)

# 调用方法进行处理, 返回一个dataframe对象
result = dp.filter(filter_list).dataframe

print(result)

二 使用文档

1.钻取 Drill

钻取是以一个时间字段为维度,将数据按年、月、日进行细分 ​

  • 参数字段

drill_granularity_list 钻取粒度列表,包含年、月、日 drill_node_list 钻取节点列表 dimensions 钻取字段 ​

  • 参数示例:
drill_granularity_list = [
    {
        "value": "YEAR",
        "label": "年"
    },
    {
        "value": "MONTH",
        "label": "月"
    },
    {
        "value": "DAY",
        "label": "日"
    }
]

drill_node_list = [
    {
        "granularity": "",
        "point": "YEAR",
    },
    {
        "granularity": "YEAR",
        "point": "2021",
    },
    {
        "granularity": "MONTH",
        "point": "01",
    }
]

dimensions = [
    {
        "column": "time",
        "dtype": "datetime",
    }
]
  • 使用示例
result = dp.drill(drill_granularity_list, drill_node_list, dimensions).dataframe

2.过滤 Filter

过滤是根据某一个度量字段,通过一定条件来筛选需要的数据

  • 参数示例
filter_list = [
    {
        "column": "score",             # 过滤字段
        "dtype": "int",                # 字段类型
        "filter": {
            "condition": "less_than",  # 过滤条件
            "value": "100",            # 过滤值
            "max": 50,                 # 最大值, 区间判断时用到
            "min": 0                   # 最小值, 区间判断时用到
        }
    }
]
  • 使用示例
result = dp.filter(filter_list).dataframe
  • condition选项
条件
等于 equal
不等于 not_equal
区间内 in_between
区间外 out_between
大于 greater_than
大于等于 equal_greater_than
小于 less_than
小于等于 equal_less_than
包含 contains
不包含 not_contains

3.生成器 Generator

生成器是在dataframe中新增一列,提供求和、求平均值、计算行等结果,例如有a、b两列,可以通过生成器生成"a-b-平均值"列 ​

  • 参数示例
generated_metric_list = [
    {
        "dtype": "float",
        "operate": {
            "label": "均值",
            "value": "Avg"          # 生成类型
        },
        "source_col_list": [        # 源数据列
            {
                "column": "age",
                "dtype": "int"
            },
            {
                "column": "score",
                "dtype": "int"
            }
        ],
        "column": "age-score-均值"  # 生成列名称
    }
]
  • 使用示例
result = dp.generate(generated_metric_list).dataframe
  • 生成类型
生成类型 值value
平均值 Avg
求和 Sum
计算行 COUNT_COL

4.采样 Sampling

采样是指通过等距采样随机采样等方式,对数据进行采样,获取一定数量的行 ​

  • 参数示例
sample = {
    "number": 5,      # 采样数量
    "type": "random"  # 采样方式, 随机:random  等距:equidistant
}
  • 使用示例
result = dp.sample(5, "random").dataframe

5.聚合 Aggregate

聚合是对数值(度量)列进行聚合,可以进行计数、求和、求平均值等聚合操作 ​

  • 参数示例
aggregation_list = [
    {
        "column": "age",          # 源数据列
        "dtype": "int",
        "aggregation_option": {
            "label": "均值",
            "value": "mean"       # 聚合方法
        }
    },
    {
        "column": "score",
        "dtype": "int",
        "aggregation_option": {
            "label": "均值",
            "value": "mean"
        }
    }
]
  • 可用的聚合方法

['count', 'sum', 'mean', 'median', 'std', 'var', 'max', 'min'] ​

  • 使用示例
result = dp.aggregate(aggregation_list).dataframe

6.排序 Sorting

排序是根据某一个数据列进行升降排序 ​

  • 参数示例
sort = {
	"condition": "desc",  # 升序:desc  降序:asc
    "column": "time"      # 排序字段
}
  • 使用示例
result = dp.order_by("time", "desc").dataframe

7.top N

top N是指按照某一列的值进行排序,取出前几行 ​

  • 参数示例
top_n_param = {
    "column": "score",     # 排序字段
    "condition": "desc",   # 排序方式, 升序:desc  降序:asc
    "limit": 3             # 取出数量
}
  • 使用示例
result = dp.top_n("score", "desc", 3).dataframe

8.统一入口

除了上述的单个方法外,也可以使用dataprime.process_df_operator()来同时进行多个数据处理操作,参数字段与上面相同 ​

  • 使用示例
dataprime = DataPrime(
    dataframe=data_frame,
    **{
        "dimensions": dimension_list,
        "metrics": metric_list,
        "calculation_flow_nodes": calculation_flow_nodes,
        "generated_metric_list": generated_metric_list,
        "filter_list": filter_list,
        "aggregation_list": aggregation_list,
        "sample": sample,
        "drill_granularity_list": drill_granularity_list,
        "drill_node_list": drill_node_list,
        "sort": sort_param,
        "topn": top_n_param
    }
)

result = dataprime.process_df_operator()
  • calculation_flow_nodes为计算节点列表, 可选值有:

    DRILLING 钻取

    SAMPLING 采样

    FILTER 过滤

    AGGREGATION 聚合

    例如需要钻取、采样两种计算,则calculation_flow_nodes=["DRILLING", "SAMPLING"], 同时也需要传递这2个计算节点对应的参数drill_node_list drill_granularity_list filter_list. 使用多个计算节点时, 按照列表中元素的先后顺序来依次调用

9.链式调用

上述的单个数据处理方法可以进行链式调用,如:

result = dp.order_by("time", "desc").sample(5, "random").dataframe

三 异步调用

dataprime可以结合celery进行异步调用, 主要通过AsyncDataprime提供的接口来调用, 其方法名称与同步调用时一致, 返回结果为celery任务的id,使用示例如下:

  • 初始化celery工程文件

    执行以下代码

    from dataprime.dataprime import DataPrime
    
    dp = DataPrime()
    dp.init_celery()
    

    将会在当前目录下生成 dataprime_celery文件夹, 包含以下文件:

    celery_app.py celery主入口文件

    config.py celery配置文件, 需要手动配置broker_url和result_backend

    task.py celery任务文件

  • celery的其余使用配置参考其官方文档:

    https://docs.celeryproject.org/en/stable/

  • 启动celery worker

    配置好celery的broker, backend等配置之后,执行以下命令启动

    celery -A dataprime_celery.celery_app worker -l INFO

  • 发送异步任务请求  

from async_dataprime.executor import AsyncDataPrime

# 初始化
dataframe="A dataframe object"
adp = AsyncDataPrime()

filter_list = [
    {
        "column": "score",
        "dtype": "int",
        "filter": {
            "condition": "greater_than",
            "value": "10",
            "max": 0,
            "min": 0
        }
    }
]

# 调用方法进行处理, 返回异步任务的任务id
# 异步任务的方法和同步调用时的方式名称一致,调用对象换成了AsyncDataPrime对象
task_id = adp.filter(dataframe=dataframe, filter_list=filter_list)
print(task_id)
  • 获取结果

  异步任务的结果是以json的格式存储在事先配置的redis backend中, 通过task_id即可获取:

# 执行任务
adp = AsyncDataPrime()
task_id = adp.filter(data_frame, filter_list)

# 等待任务完成

# 获取结果, datatype可选"json"或"dataframe"
res = adp.get_async_result(task_id, data_type="dataframe")
print(res)

四 开源协议

本项目采用 木兰宽松许可证

http://license.coscl.org.cn/MulanPSL2/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dataprime-0.1.9.tar.gz (17.5 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page