歡迎光臨
每天分享高質量文章

MySQL從零到一解讀增量同步資料到elasticsearch canal adapter方式(binlog)實現

本文是作者在單機上面從零到一實現增量同步MySQL資料到elasticsearch canal adapter方式(binlog)實現。

 

實現步驟


 

(1)安裝MySQL

(2)開啟MySQL binlog row樣式,並啟動 MySQL

(3)安裝jdk

(4)安裝Elasticsearch並啟動(我安裝的是6.4.0,主要目前canal adapter1.1.3還不支持7.0.0的版本)

(5)安裝kibana並啟動

(6)安裝並啟動canal-server

(7)安裝並啟動canal-adapter

 

這裡使用的操作系統是centos7

 

1. 通過yum安裝MySQL


 

(1)去官網查看最新的安裝包

https://dev.mysql.com/downloa…

 

(2)下載MySQL源安裝包

wget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm

 

目前版本已經很高了,但是我使用的是57

安裝MySQL源

yum -y install mysql57-community-release-el7-11.noarch.rpm

 

查看效果:

yum repolist enabled | grep mysql.*

 

(3)安裝MySQL服務器

yum install mysql-community-server

 

(4)啟動MySQL服務

systemctl start mysqld.service

 

查看MySQL服務的狀態:

systemctl status mysqld.service

 

(5)查看初始化密碼

grep "password" /var/log/mysqld.log

登錄:

mysql -u root -p

 

(6)資料庫授權(切記這一步一定要做,我為了方便後面使用的都是root賬號,沒有說新建一個canal賬號)

 

資料庫沒有授權,只支持localhost本地訪問

 

GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;FLUSH PRIVILEGES;

 

用戶名:root

密碼:123456

指向ip:%代表所有Ip,此處也可以輸入Ip來指定Ip

 

2. 開啟MySQL binlog樣式


 

找到my.cnf檔案,我本地目錄是/etc/my.cnf

添加即可

log-bin=mysql-binbinlog-format=ROWserver-id=1

 

然後重啟MySQL,檢查一下binlog是否正確啟動

show variables like '%log_bin%';

 

3. 安裝jdk


 

這裡裝的是jdk版本是1.8.0_202

下載網址:

https://www.oracle.com/techne…

 

(1)將jdk-8u202-linux-x64.tar.gz放入/usr/local目錄

(2)解壓縮等一系列處理

 

 

tar -xzvf jdk-8u202-linux-x64.tar.gzmv jdk-8u202-linux-x64 jdkrm -rf jdk-8u202-linux-x64.tar.gz

 

命令執行完成之後在/usr/local目錄下就會生成一個jdk目錄

 

(3)配置環境變數

 

vi /etc/profile增加:export JAVA_HOME=/usr/local/jdkexport CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jarexport PATH=$JAVA_HOME/bin:$PATH

 

(4)檢查JDK是否安裝成功

java -version

 

4. 安裝並啟動Elasticsearch


 

官網地址:https://www.elastic.co/downlo…

執行如下命令,對於安裝包也可以手動下載之後上傳

 

cd /usr/localwget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.0-linux-x86_64.tar.gztar -xzvf elasticsearch-6.4.0-linux-x86_64.tar.gzmv elasticsearch-6.4.0-linux-x86_64 elasticsearchrm -rf elasticsearch-6.4.0-linux-x86_64.tar.gz

 

命令執行完成之後在/usr/local目錄下就會生成一個elasticsearch目錄

由於elasticsearch不能使用root賬戶啟動。

下麵執行如下命令:

useradd elasticsearchchown -R elasticsearch /usr/local/elasticsearchsu elasticsearch

 

使用elasticsearch用戶來啟動ES

 

(1)修改linux引數

vim /etc/security/limits.conf

 

增加:

soft nofile 65536* hard nofile 65536* soft nproc 2048* hard nproc 4096#鎖住swapping因此需要在這個配置檔案下再增加兩行代碼elasticsearch soft memlock unlimitedelasticsearch hard memlock unlimited

 

vim /etc/sysctl.conf

 

增加:

 

vm.max_map_count=655360fs.file-max=655360

 

註意:之後需要執行一句命令sysctl -p使系統配置生效(使用root用戶)

 

(2)修改ES配置檔案(我的IP是192.168.254.131,操作時換成自己的IP即可)

vim /usr/local/elasticsearch/config/elasticsearch.yml# ======================== Elasticsearch Configuration =========================## NOTE: Elasticsearch comes with reasonable defaults for most settings.#       Before you set out to tweak and tune the configuration, make sure you#       understand what are you trying to accomplish and the consequences.## The primary way of configuring a node is via this file. This template lists# the most important settings you may want to configure for a production cluster.## Please consult the documentation for further information on configuration options:# https://www.elastic.co/guide/en/elasticsearch/reference/index.html## ---------------------------------- Cluster -----------------------------------## Use a descriptive name for your cluster:#cluster.name: my-application## ------------------------------------ Node ------------------------------------## Use a descriptive name for the node:#node.name: node-1## Add custom attributes to the node:##node.attr.rack: r1## ----------------------------------- Paths ------------------------------------## Path to directory where to store the data (separate multiple locations by comma):#path.data: /usr/local/elasticsearch-6.4.0/data## Path to log files:#path.logs: /usr/local/elasticsearch-6.4.0/logs## ----------------------------------- Memory -----------------------------------## Lock the memory on startup:##bootstrap.memory_lock: true## Make sure that the heap size is set to about half the memory available# on the system and that the owner of the process is allowed to use this# limit.## Elasticsearch performs poorly when the system is swapping the memory.## ---------------------------------- Network -----------------------------------## Set the bind address to a specific IP (IPv4 or IPv6):#network.host: 192.168.254.131## Set a custom port for HTTP:#http.port: 9200## For more information, consult the network module documentation.## --------------------------------- Discovery ----------------------------------## Pass an initial list of hosts to perform discovery when new node is started:# The default list of hosts is ["127.0.0.1", "[::1]"]#discovery.zen.ping.unicast.hosts: ["192.168.254.131"]## Prevent the "split brain" by configuring the majority of nodes (total number of master-eligible nodes / 2 + 1):##discovery.zen.minimum_master_nodes: ## For more information, consult the zen discovery module documentation.## ---------------------------------- Gateway -----------------------------------## Block initial recovery after a full cluster restart until N nodes are started:##gateway.recover_after_nodes: 3## For more information, consult the gateway module documentation.## ---------------------------------- Various -----------------------------------## Require explicit names when deleting indices:##action.destructive_requires_name: truetransport.tcp.port: 9300transport.tcp.compress: truehttp.cors.enabled: truehttp.cors.allow-origin: "*"

 

(3)啟動elasticsearch

 

cd /usr/local/elasticsearch./bin/elasticsearch -d

 

檢查是否啟動成功:

curl http://192.168.254.131:9200

 

5. 安裝並啟動kibana


 

官網地址:https://www.elastic.co/downlo…

執行如下命令,對於安裝包也可以手動下載之後上傳

 

cd /usr/localwget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.0-linux-x86_64.tar.gztar -xzvf kibana-6.4.0-linux-x86_64.tar.gzmv kibana-6.4.0-linux-x86_64 kibanarm -rf kibana-6.4.0-linux-x86_64.tar.gz

 

命令執行完成之後在/usr/local目錄下就會生成一個kibana目錄

 

修改kibana配置檔案

 

vim /usr/local/kibana/config/kibana.yml

 

# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.# The default is 'localhost', which usually means remote machines will not be able to connect.# To allow connections from remote users, set this parameter to a non-loopback address.server.host: "192.168.254.131"# Enables you to specify a path to mount Kibana at if you are running behind a proxy.# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath# from requests it receives, and to prevent a deprecation warning at startup.# This setting cannot end in a slash.#server.basePath: ""# Specifies whether Kibana should rewrite requests that are prefixed with# `server.basePath` or require that they are rewritten by your reverse proxy.# This setting was effectively always `false` before Kibana 6.3 and will# default to `true` starting in Kibana 7.0.#server.rewriteBasePath: false# The maximum payload size in bytes for incoming server requests.#server.maxPayloadBytes: 1048576# The Kibana server's name.  This is used for display purposes.#server.name: "your-hostname"# The URL of the Elasticsearch instance to use for all your queries.elasticsearch.url: "http://192.168.254.131:9200"# When this setting's value is true Kibana uses the hostname specified in the server.host# setting. When the value of this setting is false, Kibana uses the hostname of the host# that connects to this Kibana instance.#elasticsearch.preserveHost: true# Kibana uses an index in Elasticsearch to store saved searches, visualizations and# dashboards. Kibana creates a new index if the index doesn't already exist.kibana.index: ".kibana6"
# The default application to load.#kibana.defaultAppId: "home"# If your Elasticsearch is protected with basic authentication, these settings provide# the username and password that the Kibana server uses to perform maintenance on the Kibana# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which# is proxied through the Kibana server.#elasticsearch.username: "user"#elasticsearch.password: "pass"
# Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively.# These settings enable SSL for outgoing requests from the Kibana server to the browser.#server.ssl.enabled: false#server.ssl.certificate: /path/to/your/server.crt#server.ssl.key: /path/to/your/server.key# Optional settings that provide the paths to the PEM-format SSL certificate and key files.# These files validate that your Elasticsearch backend uses the same key files.#elasticsearch.ssl.certificate: /path/to/your/client.crt#elasticsearch.ssl.key: /path/to/your/client.key# Optional setting that enables you to specify a path to the PEM file for the certificate# authority for your Elasticsearch instance.#elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ]
# To disregard the validity of SSL certificates, change this setting's value to 'none'.#elasticsearch.ssl.verificationMode: full# Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of# the elasticsearch.requestTimeout setting.#elasticsearch.pingTimeout: 1500# Time in milliseconds to wait for responses from the back end or Elasticsearch. This value# must be a positive integer.#elasticsearch.requestTimeout: 30000# List of Kibana client-side essay-headers to send to Elasticsearch. To send *no* client-side# essay-headers, set this value to [] (an empty list).#elasticsearch.requestHeadersWhitelist: [ authorization ]# Header names and values that are sent to Elasticsearch. Any custom essay-headers cannot be overwritten# by client-side essay-headers, regardless of the elasticsearch.requestHeadersWhitelist configuration.#elasticsearch.customHeaders: {}# Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable.#elasticsearch.shardTimeout: 30000# Time in milliseconds to wait for Elasticsearch at Kibana startup before retrying.#elasticsearch.startupTimeout: 5000# Logs queries sent to Elasticsearch. Requires logging.verbose set to true.#elasticsearch.logQueries: false# Specifies the path where Kibana creates the process ID file.#pid.file: /var/run/kibana.pid# Enables you specify a file where Kibana stores log output.#logging.dest: stdout
# Set the value of this setting to true to suppress all logging output.#logging.silent: false
# Set the value of this setting to true to suppress all logging output other than error messages.#logging.quiet: false
# Set the value of this setting to true to log all events, including system usage information# and all requests.#logging.verbose: false
# Set the interval in milliseconds to sample system and process performance# metrics. Minimum is 100ms. Defaults to 5000.#ops.interval: 5000
# The default locale. This locale can be used in certain circumstances to substitute any missing# translations.#i18n.defaultLocale: "en"

 

啟動kibana

 

cd /usr/local/kibananohup ./bin/kibana &

 

檢查是否啟動成功

在瀏覽器中打開http://192.168.254.131:5601

 

6. 安裝並啟動canal-server


 

詳情請查詢官網文件:

https://github.com/alibaba/ca…

 

(1)下載canal

 

直接下載
訪問:https://github.com/alibaba/canal/releases ,會列出所有歷史的發佈版本包 下載方式,比如以1.0.17版本為例子:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gzor
自己編譯
git clone [email protected]com:alibaba/canal.gitcd canal; mvn clean install -Dmaven.test.skip -Denv=release編譯完成後,會在根目錄下產生target/canal.deployer-$version.tar.gz

 

(2)解壓縮

 

mkdir /usr/local/canaltar zxvf canal.deployer-$version.tar.gz  -C /usr/local/canal

 

(3)修改配置

 

cd /usr/local/canalvim conf/example/instance.properties

 

 

################################################### mysql serverId , v1.0.26+ will autoGen# canal.instance.mysql.slaveId=0
# enable gtid use true/falsecanal.instance.gtidon=false
# position infocanal.instance.master.address=192.168.254.131:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid=
# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=
# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=
# username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=123456canal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=
# mq configcanal.mq.topic=example# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.partitionsNum=3#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################

 

(4)啟動canal-server

 

cd /usr/local/canal./bin/startup.sh

 

 

cat logs/canal/canal.log

2019-05-03 10:58:31.938 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler2019-05-03 10:58:32.106 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations2019-05-03 10:58:32.120 [main] INFO  c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations2019-05-03 10:58:32.143 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.2019-05-03 10:58:32.277 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.254.131:11111]2019-05-03 10:58:34.235 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]2019-05-03 10:58:35.470 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set2019-05-03 10:58:36.317 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$2019-05-03 10:58:36.317 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :2019-05-03 10:58:37.106 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......2019-05-03 10:58:37.239 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2019-05-03 10:58:37.241 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position by switch ::15565974130002019-05-03 10:58:39.239 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4450,serverId=1,gtid=,timestamp=1556596874000] cost : 1915ms , the next step is binlog dump

 

7. 安裝並啟動canal-adapter


 

(1)下載canal-adapter

 

訪問:https://github.com/alibaba/canal/releases ,會列出所有歷史的發佈版本包 下載方式,比如以1.0.17版本為例子:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz

 

(2)解壓縮

 

mkdir /usr/local/canal-adaptertar canal.adapter-1.1.3.tar.gz  -C /usr/local/canal-adapter

 

(3)修改配置

 

cd /usr/local/canal-adaptervim conf/application.yml

 



server:  port: 8081spring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8    default-property-inclusion: non_null

canal.conf:  mode: tcp # kafka rocketMQ  canalServerHost: 192.168.254.131:11111#  zookeeperHosts: slave1:2181#  mqServers: 127.0.0.1:9092 #or rocketmq#  flatMessage: true  batchSize: 500  syncBatchSize: 1000  retries: 0  timeout:  accessKey:  secretKey:  srcDataSources:    defaultDS:      url: jdbc:mysql://192.168.254.131:3306/mytest?useUnicode=true      username: root      password: 123456  canalAdapters:  - instance: example # canal instance Name or mq topic name    groups:    - groupId: g1      outerAdapters:      - name: logger#      - name: rdb#        key: mysql1#        properties:#          jdbc.driverClassName: com.mysql.jdbc.Driver#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true#          jdbc.username: root#          jdbc.password: 121212#      - name: rdb#        key: oracle1#        properties:#          jdbc.driverClassName: oracle.jdbc.OracleDriver#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE#          jdbc.username: mytest#          jdbc.password: m121212#      - name: rdb#        key: postgres1#        properties:#          jdbc.driverClassName: org.postgresql.Driver#          jdbc.url: jdbc:postgresql://localhost:5432/postgres#          jdbc.username: postgres#          jdbc.password: 121212#          threads: 1#          commitSize: 3000#      - name: hbase#        properties:#          hbase.zookeeper.quorum: 127.0.0.1#          hbase.zookeeper.property.clientPort: 2181#          zookeeper.znode.parent: /hbase      - name: es        hosts: 192.168.254.131:9300        properties:          cluster.name: my-application
vim conf/es/mytest_user.yml

 

dataSourceKey: defaultDS
destination: examplegroupId: g1esMapping:  _index: mytest_user  _type: _doc  _id: _id  upsert: true#  pk: id  sql: "select a.id as _id, a.name, a.role_id, a.c_time from user a"#  objFields:#    _labels: array:;#  etlCondition: "where a.c_time>='{0}'"  commitBatch: 3000

 

(4)先創建MySQL表user以及索引mytest_user,否則啟動canal-adapter會報錯

 

create database mytest;use mytest;create table user (  `id` int(10) NOT NULL,  `name` varchar(100) DEFAULT NULL,  `role_id` int(10) NOT NULL,  `c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,  `c_utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`));

 

PUT /mytest_user{  "mappings": {    "_doc": {      "properties": {        "name": {          "type": "text",          "fields": {            "keyword": {              "type": "keyword"            }          }        },        "role_id": {          "type": "long"        },        "c_time": {          "type": "date"        }      }    }  }}

 

(5)啟動canal-adapter

 

cd /usr/local/canal-adapter./bin/startup.sh

 

查看日誌:

 

cat logs/adapter/adapter.log

 

(6)測試是否增量同步資料成功

 

沒有資料更新前

 

 

GET /mytest_user/_search{  "took": 1,  "timed_out": false,  "_shards": {    "total": 5,    "successful": 5,    "skipped": 0,    "failed": 0  },  "hits": {    "total": 0,    "max_score": null,    "hits": []  }}

 

插入一條資料:

 

insert user(id, name, role_id) values(7, "test", 7);GET /mytest_user/_doc/7{  "_index": "mytest_user",  "_type": "_doc",  "_id": "7",  "_version": 1,  "found": true,  "_source": {    "name": "test",    "role_id": 7,    "c_time": "2019-05-04T06:11:31-05:00"  }}

 

更新一條資料:

 

update user set name = 'zhengguo' where id = 7;GET /mytest_user/_doc/7{  "_index": "mytest_user",  "_type": "_doc",  "_id": "7",  "_version": 2,  "found": true,  "_source": {    "name": "zhengguo",    "role_id": 7,    "c_time": "2019-05-04T06:11:31-05:00"  }}

 

刪除一條資料:

 

delete from user where id = 7;

 

GET /mytest_user/_doc/7
{  "_index": "mytest_user",  "_type": "_doc",  "_id": "7",  "found": false}

 

可以看到操作都成功了。

 

遇到的一個坑

之後可能canal會優化掉

 

目前如果使用adapter1.1.3增量同步的話,如果Elasticsearch的版本是7.X.X的,那麼在資料增量同步的時候,會報ESSyncService – sync error, es index: mytest_user, DML : Dml{destination=’example’, database=’mytest’, table=’user’, type=’INSERT’, es=1556597413000, ts=1556597414139, sql=”, data=[{id=4, name=junge, role_id=4, c_time=2019-04-30 00:10:13.0, c_utime=2019-04-30 00:10:13.0}], old=null} ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker – NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{lTIHs6ZsTe-PqHs9CToQYQ}{192.168.254.131}{192.168.254.131:9300}]] 無法連接ES的錯誤。

也就是目前還不支持7版本的增量同步。更換成6.X.X就OK了。

 

    已同步到看一看
    赞(0)

    分享創造快樂