Skip to main content

A tool to import data from mysql into elasticsearch

Project description

Data2es数据导入工具

本项目是一个用来把数据从mysq导入到es中的小工具。程序使用pip 安装,程序作为后台程序执行,可以脱离当前命令行环境。启动时需要指定配置文件。程序的日志文件默认为 /tmp/data2es.log 此文件也可以在启动时指定。

命令行执行方法如下

usage: data2esd {start|stop|restart|kill} [-c]

positional arguments:
  {start|stop|restart|kill}
                        This control command can be used with start or stop or
                        restart or kill. Used to control the running of
                        data2esd daemons

optional arguments:
  -h, --help            show this help message and exit
  -c CONFIGFILE_PATH, --config CONFIGFILE_PATH
                        Specify configuration file path.
  -l LOG_FILE_PATH, --logfile LOG_FILE_PATH
                        Specify log file path.
  -v, --version         show program's version number and exit

安装

pip install data2es

启动

data2esd start -c 'config.conf'

其中config是必须指定的

停止

data2esd stop

重启

data2esd restart

配置文件范例

以下配置文件所有配置项都要有,值可以为空。

input:{
    mysql:{
        provider = "mysql" #数据源类型        
        db_host = "127.0.0.1"
        db_port=3110
        db_user="root"
        db_password="root"
        db_name="db_test"
        #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        record_last_run = true
        #追踪的字段
        tracking_column = "id"        
        #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
        last_run_metadata_path = "/tmp/base_parameter.txt"
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        #clean_run = false
        #是否将 column 名称转小写
        lowercase_column_names = false
        # sql 语句文件,分页和增量跟新都写在sql语句中
        statement = "select *from table where `id` > {last_run_id} order by `id` limit 5000"
        trigger = "schedule" #定时器触发
        #trigger = "webhook" #webhook触发
        # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
        schedule = "* * * * * *"
        #设置webhook的访问令牌,为空时采用默认的令牌 ”amituofo@xfjl“
        webhook_token=""
        #设置webhook的访问令牌,为0时采用默认的端口号9899
        webhook_port=0        
        #指定对应的输出
        output_name="elasticsearch"
    }   
}

output:{ #elasticsearch服务器和index的配置
      elasticsearch:{
        hosts="127.0.0.1:9200"        
        index="test-index1"
        document_id = "id"
        username="elastic"
        password="elastic"
      }       
    }
webhook:{#有两种事件,一个是开始执行一次任务,一个是任务完成,如果指定了url那么在事件发生时会通知到这两个地址
  start=""
  finished=""
  error=""
}

执行触发

提供两种触发方式,一种是定时任务,一种是webhook触发。定时任务触发采用类似corn的方式设置。webhook的方式下,系统会运行一个http服务器,监听指定的端口,调用地址为 http://<ipaddress>:port/hook?token=''

webhook

如果在配置文件中配置了webhook地址,那个在事件发生时会通知设置的地址。通知内容如下(兼容钉钉和企业微信群机器人):

start

{
   "msgtype": "text",
   "text": {
        "content": "0000-00-00 00:00:00 从第 xxxxx 开始同步数据。"
    }
}

finished

{
   "msgtype": "text",
   "text": {
        "content": "0000-00-00 00:00:00 完成数据从 xxxx 到 xxxx 的同步。"
    }
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data2es-0.0.4.tar.gz (12.2 kB view hashes)

Uploaded Source

Built Distribution

data2es-0.0.4-py3-none-any.whl (14.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page