binlog+canal实现es数据增量更新

Canal 是阿里巴巴开源的一个基于 MySQL 二进制日志(Binlog)的增量数据订阅和消费组件,借助它可以实现 MySQL 与 Elasticsearch的数据同步。

MySQL配置

开启 Binlog:编辑 MySQL 配置文件(通常是my.cnfmy.ini),添加或修改以下配置:

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

创建 Canal 用户:创建一个具有权限的用户。

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

canal配置

配置 Canal:编辑canal.properties文件,配置 Canal 的基本信息,如端口号等。然后编辑instance.properties文件,配置 MySQL 连接信息和要监听的数据库:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=your_database_name

数据同步代码

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;

public class EsMysqlSync {

    private static final String CANAL_SERVER_HOST = "127.0.0.1";
    private static final int CANAL_SERVER_PORT = 11111;
    private static final String CANAL_DESTINATION = "example";
    private static final String CANAL_USERNAME = "";
    private static final String CANAL_PASSWORD = "";
    private static final String ES_HOST = "localhost";
    private static final int ES_PORT = 9200;

    public static void main(String[] args) {
        // 创建Canal连接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_SERVER_HOST, CANAL_SERVER_PORT),
                CANAL_DESTINATION, CANAL_USERNAME, CANAL_PASSWORD);
        try {
            // 连接Canal
            connector.connect();
            // 订阅所有表的变更
            connector.subscribe(".*\\..*");
            // 创建ES客户端
            RestHighLevelClient esClient = new RestHighLevelClient(org.elasticsearch.client.RestClient.builder(
                    new org.apache.http.HttpHost(ES_HOST, ES_PORT, "http")));
            // 创建ES索引(如果不存在)
            createIndexIfNotExists(esClient, "your_index_name");

            while (true) {
                // 获取Canal消息
                Message message = connector.get(100);
                List<CanalEntry.Entry> entries = message.getEntries();
                if (entries != null && !entries.isEmpty()) {
                    for (CanalEntry.Entry entry : entries) {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                                    // 处理插入事件,将数据同步到ES
                                    syncInsertToEs(esClient, "your_index_name", rowData);
                                } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                                    // 处理更新事件,将数据同步到ES
                                    syncUpdateToEs(esClient, "your_index_name", rowData);
                                } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                                    // 处理删除事件,从ES中删除数据
                                    syncDeleteToEs(esClient, "your_index_name", rowData);
                                }
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    private static void createIndexIfNotExists(RestHighLevelClient esClient, String indexName) throws IOException {
        if (!esClient.indices().exists(new org.elasticsearch.client.indices.GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            CreateIndexResponse response = esClient.indices().create(request, RequestOptions.DEFAULT);
            if (!response.isAcknowledged()) {
                System.out.println("Failed to create index: " + indexName);
            }
        }
    }

    private static void syncInsertToEs(RestHighLevelClient esClient, String indexName, CanalEntry.RowData rowData) throws IOException {
        StringBuilder jsonData = new StringBuilder("{");
        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
            jsonData.append("\"").append(column.getName()).append("\":\"").append(column.getValue()).append("\",");
        }
        if (jsonData.length() > 1) {
            jsonData.deleteCharAt(jsonData.length() - 1);
        }
        jsonData.append("}");

        IndexRequest request = new IndexRequest(indexName).source(jsonData.toString(), XContentType.JSON);
        IndexResponse response = esClient.index(request, RequestOptions.DEFAULT);
        System.out.println("Inserted to ES: " + response.getId());
    }

    private static void syncUpdateToEs(RestHighLevelClient esClient, String indexName, CanalEntry.RowData rowData) {
        // 实现更新逻辑
    }

    private static void syncDeleteToEs(RestHighLevelClient esClient, String indexName, CanalEntry.RowData rowData) {
        // 实现删除逻辑
    }
}    

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部