嵌入式管道模块, 类似于logstash
Project description
嵌入式管道模块, 类似于logstash
使用方法类似logstash, 配置化操作
安装
pip install zpipe
配置管道框架
创建一个目录, 这个目录作为你的管道组织主路径
mkdir /home/mypipe
mkdir /home/mypipe/pipe_config
mkdir /home/mypipe/plugin
创建框架配置文件
touch /etc/zpipe/config.json
# win系统: "c:/zpipe/config.json"
# 其他系统: "/etc/zpipe/config.json"
# 可以使用环境变量 ZPIPE_CONFIG 指定框架配置文件路径, 优先级最高
修改框架配置文件为你的设置
{ "pipe_config_path": "/home/mypipe/pipe_config", "plugin_path": "/home/mypipe/plugin" }
- pipe_config_path 表示管道配置文件路径, 可以是文件或文件夹, 每个配置文件是一个json文件, 最后框架会将所有json合并为一个更大的json
- plugin_path 表示插件路径, 只能是文件夹
- 路径可以使用相对路径(相对于框架配置文件的路径), 多路径用分号 [;] 隔开
大功告成, 终于可以开始制作管道了
一个简单的管道
- 在管道配置目录下创建一个文件 my_pipe.py, 写入如下内容
{ "test": { "output": "log" } }
- 新建一个py文件, 写入如下内容
from zpipe import pipe_server a = pipe_server() pipe = a.get_pipe('test') result = pipe.process("测试消息")
运行它, 随后你可以在控制台看到打印出来的消息
这个管道做了什么
{
"test": { #定义管道名
"output": "log" #定义输出插件
}
}
from zpipe import pipe_server #导入框架服务 a = pipe_server() #创建框架服务的实例 pipe = a.get_pipe('test') #获取管道实例 result = pipe.process("测试消息") #向管道发送数据并获取管道结果
管道配置详解
对插件设置参数, 执行顺序为 pipe > codec > filter.codec > filter > output.codec > output
{ "管道1": { "codec": { "name": "解码器名", "参数名": "参数值" }, "filter": { "name": "过滤器名", "参数名": "参数值", "codec": { "name": "解码器名", "参数名": "参数值" } }, "output": { "name": "输出插件名", "参数名": "参数值", "codec": { "name": "解码器名", "参数名": "参数值" } }, "参数名": "参数值" }, "管道2":{ } }
如果不设置参数可以简化配置
{ "管道名": { "codec": "解码器名", "filter": "过滤器名", "output": "输出插件名", "参数名": "参数值" } }
自制插件, 简单插件实例
# 自制插件必须遵循以下规则 # 插件所在的目录必须添加到框架配置文件的plugin_path里 # 插件类型目前只有三种['codec', 'filter', 'output'] # 插件文件名必须为规则:[zpipe_插件类型_插件名.py] # 插件类名必须和插件文件名相同并且继承于[zpipe.plugin_base] # zpipe_output_test.py from zpipe import plugin_base class zpipe_output_test(plugin_base): def plugin_init(self, **kw): ''' 插件初始化 :param kw: 这里接收管道定义的插件参数 ''' pass def plugin_distroy(self): ''' 插件销毁时 ''' pass def process(self, data): ''' 插件过程 :param data: 收到的数据 :return: 结果会顺序传递给下一个插件, 如果是最后一个插件则会返回给管道的调用者 ''' print('收到数据', data) return data
内置codec插件
- raw
只需要调用者发送的数据, 忽略管道接收数据时对数据的打包, 没有任何参数
- dict
将调用者发送的数据认为是一个dict, 并将它更新到数据的顶层
- json
将调用者发送的数据认为可以被json.loads函数加载, 结果为dict, 并将它更新到数据的顶层, 参数配置参考json.loads
- msgpack
将调用者发送的数据认为可以被msgpack.loads函数加载, 结果如果为dict, 会将它更新到数据的顶层, 参数配置参考msgpack.loads
内置filter插件
- default
参数名 | 参数类型 | 默认值 | 说明 |
---|---|---|---|
remove | str or list | 移除指定的数据, 允许链式表达数据位置,如 "a.b.c" 表示 del data[a][b][c] |
内置output插件
- log
日志输出插件
参数名 | 参数类型 | 默认值 | 说明 |
---|---|---|---|
log_name | str | zpipe_output_log | 日志名 |
write_stream | bool | True | 是否输出到流(控制台) |
write_file | bool | False | 是否输出到文件 |
file_dir | str | . | 输出文件所在的路径, 允许相对路径(相对于框架配置文件的路径) |
level | str | debug | 日志可见等级(debug,info,warn,error,fatal) |
interval | int | 1 | 间隔多少天新建一个日志文件 |
backupCount | int | 2 | 保留多少个历史日志文件 |
info_level | str | 日志消息等级(debug,info,warn,error,fatal), 允许从消息内提取, 如 "raw.__level__" |
更新日志
发布时间 | 版本 | 发布说明 |
---|---|---|
19-04-12 | 0.0.1 | 发布第一版, 所有功能均已实现, 但是插件很少, 允许用户自定义插件, 插件开发简单 |
本项目仅供所有人学习交流使用, 禁止用于商业用途
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size zpipe-0.0.1-py3-none-any.whl (24.9 kB) | File type Wheel | Python version py3 | Upload date | Hashes View |