博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Elasticsearch JDBC的使用-MySQL 数据源导入和增量索引、更新
阅读量:6121 次
发布时间:2019-06-21

本文共 15197 字,大约阅读时间需要 50 分钟。

hot3.png

在使用 Elasticsearch 的时候,经常会涉及到要将其它数据源的数据导入到 Elasticsearch 中,今天就来介绍一下关于 Elasticsearch 从 MySQL 导入数据和增量索引的实现 

这里要用到一个 Elasticsearch 的插件 elasticsearch-jdbc

需要的资源和版本 

Elasticsearch 版本:2.2.0  
elasticsearch-jdbc 版本 : 2.2 

一、安装 jdbc

jdbc 的压缩包我已经放在了 /usr/local/src/ 目录下,可以去它的  获取对应版本的压缩包

cd /usr/local/src/unzip ./elasticsearch-jdbc-2.2.0.0-dist.zipcp -r ./elasticsearch-jdbc-2.2.0.0 /usr/local/elasticsearch-2.2.0/jdbc2.2
  • 1
  • 2
  • 3

这样就可以使用啦,jdbc 还提供了一些常用的例子,在 【ES安装目录/jdbc2.2/bin/ 】这个文件夹下,改一改就可以用,都是bash 文件,记得加运行权限哦

二、使用jdbc

我们先在 MySQL中创建一个用于测试的数据表 article ,并添加几条数据 

(注意, update_time 字段我加了ON UPDATE CURRENT_TIMESTAMP,数据发生改变就会更新此字段)

DROP TABLE IF EXISTS `article`;CREATE TABLE `article` (  `id` mediumint(8) unsigned NOT NULL AUTO_INCREMENT,  `subject` varchar(150) NOT NULL,  `author` varchar(15) DEFAULT NULL,  `create_time` timestamp NULL DEFAULT NULL,  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;# 数据INSERT INTO `article` VALUES ('1', '"闺蜜"崔顺实被韩检方传唤 韩总统府促彻查真相', 'jam', '2016-10-31 17:49:21', '2016-10-31 17:50:21');INSERT INTO `article` VALUES ('2', '韩举行"护国训练" 青瓦台:决不许国家安全出问题', 'jam00', '2016-10-31 17:50:39', '2016-10-31 17:50:51');INSERT INTO `article` VALUES ('3', '媒体称FBI已经取得搜查令 检视希拉里电邮', 'tomi', '2016-10-31 17:51:03', '2016-10-31 17:51:08');INSERT INTO `article` VALUES ('4', '村上春树获安徒生奖 演讲中谈及欧洲排外问题', 'jason', '2016-10-31 17:51:38', '2016-10-31 17:51:41');INSERT INTO `article` VALUES ('5', '希拉里团队炮轰FBI 参院民主党领袖批其“违法”', 'tommy', '2016-10-31 17:52:07', '2016-10-31 17:52:09');
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

1、数据源导入

首先执行全部数据导入(注:ES 使用的是默认配置) 

我们写一个名叫 mysql-article.sh 的bash脚本,并放在 /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh 下面,脚本内容如下(内容注释会在后面给出)

#执行/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh #文件内容如下#!/bin/shDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"bin=${DIR}/../binlib=${DIR}/../libecho '{    "type" : "jdbc",    "jdbc" : {        "url" : "jdbc:mysql://localhost:3306/test",        "user" : "root",        "password" : "123456",        "sql" : "select *, id as _id from article",        "index" : "jdbctest",        "type" : "article",        "index_settings" : {            "analysis" : {                "analyzer" : {                    "ik" : {                        "tokenizer" : "ik"                    }                }            }        },        "type_mapping": {            "article" : {                "properties" : {                    "id" : {                        "type" : "integer",                        "index" : "not_analyzed"                    },                    "subject" : {                        "type" : "string",                        "analyzer" : "ik"                    },                    "author" : {                        "type" : "string",                        "analyzer" : "ik"                    },                    "create_time" : {                        "type" : "date"                    },                    "update_time" : {                        "type" : "date"                    }                }            }        }    }}' | java \    -cp "${lib}/*" \    -Dlog4j.configurationFile=${bin}/log4j2.xml \    org.xbib.tools.Runner \    org.xbib.tools.JDBCImporter
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

执行后会自动创建 jdbctest 索引(若不存在) ,article 类型 和几个对应的字段,这里因为有中文,我使用了 ik 分词器() 

若执行失败,请查看日志文件,jdbc 的日志存放在 /usr/local/elasticsearch-2.2.0/logs/jdbc.log 
查看是否导入成功

curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty'#返回{  "took" : 33,  "timed_out" : false,  "_shards" : {    "total" : 5,    "successful" : 5,    "failed" : 0  },  "hits" : {    "total" : 5,    "max_score" : 1.0,    "hits" : [ {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "5",      "_score" : 1.0,      "_source" : {        "id" : 5,        "subject" : "希拉里团队炮轰FBI 参院民主党领袖批其“违法”",        "author" : "tommy",        "create_time" : "2016-10-31T17:52:07.000+08:00",        "update_time" : "2016-10-31T17:52:09.000+08:00"      }    }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "2",      "_score" : 1.0,      "_source" : {        "id" : 2,        "subject" : "韩举行"护国训练" 青瓦台:决不许国家安全出问题",        "author" : "jam00",        "create_time" : "2016-10-31T17:50:39.000+08:00",        "update_time" : "2016-10-31T17:50:51.000+08:00"      }    }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "4",      "_score" : 1.0,      "_source" : {        "id" : 4,        "subject" : "村上春树获安徒生奖 演讲中谈及欧洲排外问题",        "author" : "jason",        "create_time" : "2016-10-31T17:51:38.000+08:00",        "update_time" : "2016-10-31T17:51:41.000+08:00"      }    }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "1",      "_score" : 1.0,      "_source" : {        "id" : 1,        "subject" : ""闺蜜"崔顺实被韩检方传唤 韩总统府促彻查真相",        "author" : "jam",        "create_time" : "2016-10-31T17:49:21.000+08:00",        "update_time" : "2016-10-31T17:50:21.000+08:00"      }    }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "3",      "_score" : 1.0,      "_source" : {        "id" : 3,        "subject" : "媒体称FBI已经取得搜查令 检视希拉里电邮",        "author" : "tomi",        "create_time" : "2016-10-31T17:51:03.000+08:00",        "update_time" : "2016-10-31T17:51:08.000+08:00"      }    } ]  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

内容已成功导入到 Elasticsearch 中

2、增量索引、更新

如果我们对数据做了更改或是有新数据加入,若再执行全部导入,就有点得不偿失了 

这里我们就要用到jdbc 的两个属性 statefile(状态文件) 和 schedule(计划任务时间),并且 sql 语句也要改成动态的 
改动如下

"statefile" : "statefile-article.json","schedule" : "0 0-59 0-23 ? * *","sql" : [    {        "statement" : "select *, id as _id from article where update_time > ?",        "parameter" : [ "$metrics.lastexecutionstart" ]    }],
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

改动后的完整文件 mysql-article.sh

#!/bin/shDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"bin=${DIR}/../binlib=${DIR}/../libecho '{    "type" : "jdbc",    "jdbc" : {        "url" : "jdbc:mysql://localhost:3306/test",        "user" : "root",        "password" : "123456",        "statefile" : "statefile-article.json",        "schedule" : "0 0-59 0-23 ? * *",        "sql" : [            {                "statement" : "select *, id as _id from article where update_time > ?",                "parameter" : [ "$metrics.lastexecutionstart" ]            }        ],        "index" : "jdbctest",        "type" : "article",        "index_settings" : {            "analysis" : {            "analyzer" : {                "ik" : {                    "tokenizer" : "ik"                }            }        }        },        "type_mapping": {            "article" : {                "properties" : {                    "id" : {                        "type" : "integer",                        "index" : "not_analyzed"                    },                    "subject" : {                        "type" : "string",                        "analyzer" : "ik"                    },                    "author" : {                        "type" : "string",                        "analyzer" : "ik"                    },                    "create_time" : {                        "type" : "date"                    },                    "update_time" : {                        "type" : "date"                    }                }            }        }    }}' | java \    -cp "${lib}/*" \    -Dlog4j.configurationFile=${bin}/log4j2.xml \    org.xbib.tools.Runner \    org.xbib.tools.JDBCImporter
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

运行该文件 :/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh 

可以看到 命令行端 被占用,一直在运行,并且在 mysql-article.sh 的同级目录下生成了一个 statefile-article.json 的文件,sql 语句中需要的数据 lastexecutionstart 就保存在该文件中 
现在我们来改动一下MySQL 中的数据,增加一条数据,并修改一条 id 等于 5 的数据

INSERT INTO article() VALUES(NULL,'测试JDBC','jam00','2016-11-01 13:34:15','2016-11-01 13:34:15');UPDATE article SET `subject`='测试JDBC-改动' WHERE id=5;
  • 1
  • 2

最多等一分钟,再看看ES 中的数据

curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty' -d '{    "sort": {         "id": { "order": "desc" }    }}'# 返回 ..."hits" : [ {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "6",      "_score" : null,      "_source" : {        "id" : 6,        "subject" : "测试JDBC",        "author" : "jam00",        "create_time" : "2016-11-01T13:34:15.000+08:00",        "update_time" : "2016-11-01T13:34:15.000+08:00"      },      "sort" : [ 6 ]    }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "5",      "_score" : null,      "_source" : {        "id" : 5,        "subject" : "测试JDBC-改动",        "author" : "tommy",        "create_time" : "2016-10-31T17:52:07.000+08:00",        "update_time" : "2016-11-01T13:35:41.000+08:00"      },      "sort" : [ 5 ]    }...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

测试成功。 

为了让 mysql-article.sh 后台执行,我们可以使用 nohup 命令

nohup /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh &
  • 1

当我们想停止执行的时候。

ps aux |grep jdbc2.2#返回root     26118  0.0  0.1 106092  1212 pts/0    S    14:03   0:00 /bin/sh /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.shroot     26123 11.0  4.4 1079192 44932 pts/0   Sl   14:03   0:00 java -cp /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../lib/* -Dlog4j.configurationFile=/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../bin/log4j2.xml org.xbib.tools.Runner org.xbib.tools.JDBCImporter# 使用 kill 命令关闭进程, 26123 就是上面一句返回的进程号,不用杀掉 26118 ,杀掉26123 这个进程,26118 进程会自动关闭kill -9 26123
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

至此,MySQL 数据源的 增量索引和更新就完成了。

3、bash 文件释义

增量索引的bash文件注释如下,更多详细配置请查阅

#!/bin/sh# 当前脚本的绝对路径DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"bin=${DIR}/../binlib=${DIR}/../libecho '{    "type" : "jdbc",    "jdbc" : {        # 链接 mysql 的 test 数据库        "url" : "jdbc:mysql://localhost:3306/test",        # mysql 用户        "user" : "root",        # mysql 密码        "password" : "123456",        # 计划任务状态文件        "statefile" : "statefile-article.json",        # 计划任务时间 这里是每分钟执行一次        "schedule" : "0 0-59 0-23 ? * *",        # 执行导入的sql 语句        "sql" : [            {                "statement" : "select *, id as _id from article where update_time > ?",                "parameter" : [ "$metrics.lastexecutionstart" ]            }        ],        # 索引名称 jdbctest        "index" : "jdbctest",        # 类型名称 article        "type" : "article",        # 类型设置        "index_settings" : {            "analysis" : {            "analyzer" : {                "ik" : {                        # 涉及到中文使用ik 分词                    "tokenizer" : "ik"                }            }        }        },        # 类型中的字段映射        "type_mapping": {            # 类型名称            "article" : {                "properties" : {                    # 对应的字段                    "id" : {                        # 字段类型                        "type" : "integer",                        # 当成一个准确的值进行索引(全匹配)                        "index" : "not_analyzed"                    },                    "subject" : {                        "type" : "string",                        "analyzer" : "ik"                    },                    "author" : {                        "type" : "string",                        "analyzer" : "ik"                    },                    "create_time" : {                        "type" : "date"                    },                    "update_time" : {                        "type" : "date"                    }                }            }        }    }}' | java \    -cp "${lib}/*" \    -Dlog4j.configurationFile=${bin}/log4j2.xml \    org.xbib.tools.Runner \    org.xbib.tools.JDBCImporter
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

这里选几个属性来介绍一下 

url:数据库链接串,所以把这个链接串改成其它数据源,这个脚本也可以使用(前提是那个数据源中有对应的 article 表) 
statefile :计划任务状态文件名称。它长这样:

{  "type" : "jdbc",  "jdbc" : {    "index_settings" : {      "analysis" : {        "analyzer" : {          "ik" : {            "tokenizer" : "ik"          }        }      }    },    "index" : "jdbctest",    "schedule" : "0 0-59 0-23 ? * *",    "sql" : [ {      "statement" : "select *, id as _id from article where update_time > ?",      "parameter" : [ "$metrics.lastexecutionstart" ]    } ],    "metrics" : {      "lastexecutionend" : "2016-11-01T06:01:01.441Z",      "lastexecutionstart" : "2016-11-01T06:01:01.125Z",      "counter" : "23"    },    "type" : "article",    "statefile" : "statefile-article.json",    "user" : "root",    "password" : "123456",    "url" : "jdbc:mysql://localhost:3306/test",    "type_mapping" : {      "article" : {        "properties" : {          "create_time" : {            "type" : "date"          },          "id" : {            "type" : "integer",            "index" : "not_analyzed"          },          "author" : {            "type" : "string",            "analyzer" : "ik"          },          "update_time" : {            "type" : "date"          },          "subject" : {            "type" : "string",            "analyzer" : "ik"          }        }      }    }  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

其实 jdbc 每次执行的就是这个文件,执行完成后就覆盖此文件,改变的只是 metrics 属性内的时间,而 lastexecutionstart 这个时间就是我们下面 sql 语句要用到的最后更新时间 

schedule : 计划任务时间表。表示多久执行一次更新。下面有几个例子 
0 0-59 0-23 ? * *:每分钟执行一次 
0 0/5 0-23 ? * * :每五分钟执行一次;当分钟等于 0,5,10,15…55的时候执行 
我还是贴一个官方的字段描述

字段名称 允许的值 允许的特殊字符
Seconds 0-59 , - * /
Minutes 0-59 , - * /
Hours 0-23 , - * /
Day-of-month 1-31 , - * ? / L W
Month 1-12 or JAN-DEC , - * /
Day-of-Week 1-7 or SUN-SAT , - * ? / L #
Year (Optional) empty, 1970-2199 , - * /

详细注释请

sql:支持两种方式,一种是直接写sql语句,一种是有条件的sql语句。一般我们会在sql语句中使用”field as _id “这样的方式来指定这条数据在ES 中的唯一标识(field字段为唯一标识) 

parameter 属性中的可选的动态参数有

$now - the current timestamp$state - the state, one of: BEFORE_FETCH, FETCH, AFTER_FETCH, IDLE, EXCEPTION$metrics.counter - a counter$lastrowcount - number of rows from last statement$lastexceptiondate - SQL timestamp of last exception$lastexception - full stack trace of last exception$metrics.lastexecutionstart - SQL timestamp of the time when last execution started$metrics.lastexecutionend - SQL timestamp of the time when last execution ended$metrics.totalrows - total number of rows fetched$metrics.totalbytes - total number of bytes fetched$metrics.failed - total number of failed SQL executions$metrics.succeeded - total number of succeeded SQL executions
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在上面例子中的 sql

select *, id as _id from article where update_time > ?

表示获取更新时间(update_time)大于 最后执行时间($metrics.lastexecutionstart)的所有数据

其它如 index、type_mapping 之类的属性就不一一介绍了,很容易理解

转载于:https://my.oschina.net/xiaominmin/blog/1786111

你可能感兴趣的文章
【XCode7+iOS9】http网路连接请求、MKPinAnnotationView自定义图片和BitCode相关错误--备用...
查看>>
各大公司容器云的技术栈对比
查看>>
记一次eclipse无法启动的排查过程
查看>>
【转】jmeter 进行java request测试
查看>>
读书笔记--MapReduce 适用场景 及 常见应用
查看>>
SignalR在Xamarin Android中的使用
查看>>
Eclipse和MyEclipse使用技巧--Eclipse中使用Git-让版本管理更简单
查看>>
[转]响应式表格jQuery插件 – Responsive tables
查看>>
8个3D视觉效果的HTML5动画欣赏
查看>>
C#如何在DataGridViewCell中自定义脚本编辑器
查看>>
【linux】crontab定时命令
查看>>
Android UI优化——include、merge 、ViewStub
查看>>
Office WORD如何取消开始工作右侧栏
查看>>
Android Jni调用浅述
查看>>
CodeCombat森林关卡Python代码
查看>>
第一个应用程序HelloWorld
查看>>
(二)Spring Boot 起步入门(翻译自Spring Boot官方教程文档)1.5.9.RELEASE
查看>>
Android Annotation扫盲笔记
查看>>
React 整洁代码最佳实践
查看>>
聊聊架构设计做些什么来谈如何成为架构师
查看>>