Canal 是阿里巴巴开源的一个基于 MySQL 二进制日志(Binlog)的增量数据订阅和消费组件,借助它可以实现 MySQL 与 Elasticsearch的数据同步。
MySQL配置
开启 Binlog:编辑 MySQL 配置文件(通常是my.cnf
或my.ini
),添加或修改以下配置:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
创建 Canal 用户:创建一个具有权限的用户。
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
canal配置
配置 Canal:编辑canal.properties
文件,配置 Canal 的基本信息,如端口号等。然后编辑instance.properties
文件,配置 MySQL 连接信息和要监听的数据库:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=your_database_name
数据同步代码
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
public class EsMysqlSync {
private static final String CANAL_SERVER_HOST = "127.0.0.1";
private static final int CANAL_SERVER_PORT = 11111;
private static final String CANAL_DESTINATION = "example";
private static final String CANAL_USERNAME = "";
private static final String CANAL_PASSWORD = "";
private static final String ES_HOST = "localhost";
private static final int ES_PORT = 9200;
public static void main(String[] args) {
// 创建Canal连接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_SERVER_HOST, CANAL_SERVER_PORT),
CANAL_DESTINATION, CANAL_USERNAME, CANAL_PASSWORD);
try {
// 连接Canal
connector.connect();
// 订阅所有表的变更
connector.subscribe(".*\\..*");
// 创建ES客户端
RestHighLevelClient esClient = new RestHighLevelClient(org.elasticsearch.client.RestClient.builder(
new org.apache.http.HttpHost(ES_HOST, ES_PORT, "http")));
// 创建ES索引(如果不存在)
createIndexIfNotExists(esClient, "your_index_name");
while (true) {
// 获取Canal消息
Message message = connector.get(100);
List<CanalEntry.Entry> entries = message.getEntries();
if (entries != null && !entries.isEmpty()) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
// 处理插入事件,将数据同步到ES
syncInsertToEs(esClient, "your_index_name", rowData);
} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
// 处理更新事件,将数据同步到ES
syncUpdateToEs(esClient, "your_index_name", rowData);
} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
// 处理删除事件,从ES中删除数据
syncDeleteToEs(esClient, "your_index_name", rowData);
}
}
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private static void createIndexIfNotExists(RestHighLevelClient esClient, String indexName) throws IOException {
if (!esClient.indices().exists(new org.elasticsearch.client.indices.GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexResponse response = esClient.indices().create(request, RequestOptions.DEFAULT);
if (!response.isAcknowledged()) {
System.out.println("Failed to create index: " + indexName);
}
}
}
private static void syncInsertToEs(RestHighLevelClient esClient, String indexName, CanalEntry.RowData rowData) throws IOException {
StringBuilder jsonData = new StringBuilder("{");
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
jsonData.append("\"").append(column.getName()).append("\":\"").append(column.getValue()).append("\",");
}
if (jsonData.length() > 1) {
jsonData.deleteCharAt(jsonData.length() - 1);
}
jsonData.append("}");
IndexRequest request = new IndexRequest(indexName).source(jsonData.toString(), XContentType.JSON);
IndexResponse response = esClient.index(request, RequestOptions.DEFAULT);
System.out.println("Inserted to ES: " + response.getId());
}
private static void syncUpdateToEs(RestHighLevelClient esClient, String indexName, CanalEntry.RowData rowData) {
// 实现更新逻辑
}
private static void syncDeleteToEs(RestHighLevelClient esClient, String indexName, CanalEntry.RowData rowData) {
// 实现删除逻辑
}
}