Skip to main content

嵌入式管道模块, 类似于logstash

Project description

嵌入式管道模块, 类似于logstash

使用方法类似logstash, 配置化操作

安装

pip install zpipe

配置管道框架

创建一个目录, 这个目录作为你的管道组织主路径

mkdir /home/mypipe
mkdir /home/mypipe/pipe_config
mkdir /home/mypipe/plugin

创建框架配置文件

touch /etc/zpipe/config.json
# win系统: "c:/zpipe/config.json"
# 其他系统: "/etc/zpipe/config.json"
# 可以使用环境变量 ZPIPE_CONFIG 指定框架配置文件路径, 优先级最高

修改框架配置文件为你的设置

{
  "pipe_config_path": "/home/mypipe/pipe_config",
  "plugin_path": "/home/mypipe/plugin"
}
  • pipe_config_path 表示管道配置文件路径, 可以是文件或文件夹, 每个配置文件是一个json文件, 最后框架会将所有json合并为一个更大的json
  • plugin_path 表示插件路径, 只能是文件夹
  • 路径可以使用相对路径(相对于框架配置文件的路径), 多路径用分号 [;] 隔开

大功告成, 终于可以开始制作管道了

一个简单的管道

  • 在管道配置目录下创建一个文件 my_pipe.py, 写入如下内容
{
  "test": {
    "output": "log"
  }
}
  • 新建一个py文件, 写入如下内容
from zpipe import pipe_server

a = pipe_server()
pipe = a.get_pipe('test')
result = pipe.process("测试消息")

运行它, 随后你可以在控制台看到打印出来的消息

这个管道做了什么

{
  "test": {           #定义管道名
    "output": "log"   #定义输出插件
  }
}
from zpipe import pipe_server      #导入框架服务

a = pipe_server()                  #创建框架服务的实例
pipe = a.get_pipe('test')          #获取管道实例
result = pipe.process("测试消息")   #向管道发送数据并获取管道结果

管道配置详解

对插件设置参数, 执行顺序为 pipe > codec > filter.codec > filter > output.codec > output

{
  "管道1": {
    "codec": {
      "name": "解码器名",
      "参数名": "参数值"
    },
    "filter": {
      "name": "过滤器名",
      "参数名": "参数值",
      "codec": {
        "name": "解码器名",
        "参数名": "参数值"
      }
    },
    "output": {
      "name": "输出插件名",
      "参数名": "参数值",
      "codec": {
        "name": "解码器名",
        "参数名": "参数值"
      }
    },
    "参数名": "参数值"
  },
  "管道2":{
  }
}

如果不设置参数可以简化配置

{
  "管道名": {
    "codec": "解码器名",
    "filter": "过滤器名",
    "output": "输出插件名",
    "参数名": "参数值"
  }
}

自制插件, 简单插件实例

# 自制插件必须遵循以下规则
# 插件所在的目录必须添加到框架配置文件的plugin_path里
# 插件类型目前只有三种['codec', 'filter', 'output']
# 插件文件名必须为规则:[zpipe_插件类型_插件名.py]
# 插件类名必须和插件文件名相同并且继承于[zpipe.plugin_base]

# zpipe_output_test.py

from zpipe import plugin_base

class zpipe_output_test(plugin_base):
    def plugin_init(self, **kw):
        '''
        插件初始化
        :param kw: 这里接收管道定义的插件参数
        '''
        pass

    def plugin_distroy(self):
        '''
        插件销毁时
        '''
        pass

    def process(self, data):
        '''
        插件过程
        :param data: 收到的数据
        :return: 结果会顺序传递给下一个插件, 如果是最后一个插件则会返回给管道的调用者
        '''
        print('收到数据', data)
        return data

内置codec插件

  • raw

只需要调用者发送的数据, 忽略管道接收数据时对数据的打包, 没有任何参数

  • dict

将调用者发送的数据认为是一个dict, 并将它更新到数据的顶层

  • json

将调用者发送的数据认为可以被json.loads函数加载, 结果为dict, 并将它更新到数据的顶层, 参数配置参考json.loads

  • msgpack

将调用者发送的数据认为可以被msgpack.loads函数加载, 结果如果为dict, 会将它更新到数据的顶层, 参数配置参考msgpack.loads

内置filter插件

  • default
参数名 参数类型 默认值 说明
remove str or list 移除指定的数据, 允许链式表达数据位置,如 "a.b.c" 表示 del data[a][b][c]

内置output插件

  • log

日志输出插件

参数名 参数类型 默认值 说明
log_name str zpipe_output_log 日志名
write_stream bool True 是否输出到流(控制台)
write_file bool False 是否输出到文件
file_dir str . 输出文件所在的路径, 允许相对路径(相对于框架配置文件的路径)
level str debug 日志可见等级(debug,info,warn,error,fatal)
interval int 1 间隔多少天新建一个日志文件
backupCount int 2 保留多少个历史日志文件
info_level str 日志消息等级(debug,info,warn,error,fatal), 允许从消息内提取, 如 "raw.__level__"

更新日志

发布时间 版本 发布说明
19-04-12 0.0.1 发布第一版, 所有功能均已实现, 但是插件很少, 允许用户自定义插件, 插件开发简单

本项目仅供所有人学习交流使用, 禁止用于商业用途

Project details


Release history Release notifications

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for zpipe, version 0.0.1
Filename, size File type Python version Upload date Hashes
Filename, size zpipe-0.0.1-py3-none-any.whl (24.9 kB) File type Wheel Python version py3 Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page