1、数据库配置
首先使用canal需要修改数据库配置
[mysqld]
log-bin=mysql-bin # 开启
binlog binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
创建canal数据库用户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2、安装canal
下载:https://github.com/alibaba/canal/releases
解压(修改版本号):tar zxvf canal.deployer-1.1.4.tar.gz -C ./canal
配置开放服务器端口:11110、11111、11112
修改canal配置文件(这里设置了两个instance,即两个数据库):
vi canal/conf/canal.properties
canal.destinations = example1,example2
配置instance:
cp -R canal/conf/example conf/example1
mv conf/example conf/example2
第一个数据库配置
vi canal/conf/example1/instance.properties
canal.instance.master.address=32.1.2.140:3306
第二个数据库配置
vi canal/conf/example2/instance.properties
canal.instance.master.address=32.1.2.140:3307
#如果需要新增一个instance,只需要修改canal.properties文件,并新增一个instance配置即可,无需重启canal。
运行:
sh canal/bin/startup.sh # 查看日志
cat canal/logs/canal/canal
3、Java使用样例
引入pom依赖,需要与安装的canal版本一致
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> </dependencies>
示例代码(异步打印两个数据库的修改内容):
package cn.spicybar.dblog; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) { new Thread(() -> initConnector("example1")).start(); new Thread(() -> initConnector("example2")).start(); } private static void initConnector(String destination) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("32.1.0.237", 11111), destination, "", ""); try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(1000); if (message.getId() != -1 && message.getEntries().size() > 0) { printEntry(message.getEntries()); } connector.ack(message.getId()); } } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entries) { for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); System.out.println(rowChange.getSql()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser error, data:" + entry.toString(), e); } } }