Easy-es导入数据到ES并实现增量更新

实体类映射es索引

import cn.easyes.annotation.IndexField;
import cn.easyes.annotation.IndexId;
import cn.easyes.annotation.IndexName;
import cn.easyes.common.enums.FieldType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

// 数据库表映射
@Data
@TableName("t_product")
public class DbProduct implements Serializable {
    @TableId
    private Long id;
    private String name;
    private String category;
    private Double price;
    private LocalDateTime createTime;
    private LocalDateTime updateTime;
    private Integer status;
}

// ES索引映射
@Data
@IndexName("product_index")
public class EsProduct implements Serializable {
    @IndexId
    private String id;

    @IndexField(fieldType = FieldType.TEXT, analyzer = "ik_max_word")
    private String name;

    @IndexField(fieldType = FieldType.KEYWORD)
    private String category;

    @IndexField(fieldType = FieldType.DOUBLE)
    private Double price;

    @IndexField(fieldType = FieldType.DATE)
    private LocalDateTime createTime;

    @IndexField(fieldType = FieldType.DATE)
    private LocalDateTime updateTime;
}

Mapper(MyBatis-Plus)

数据库mapper

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.springframework.stereotype.Repository;

@Repository
public interface DbProductMapper extends BaseMapper<DbProduct> {
    // 自定义查询方法
}

easy-es的mapper

import cn.easyes.core.biz.EsPageInfo;
import cn.easyes.core.conditions.interfaces.BaseEsMapper;
import org.springframework.stereotype.Repository;

@Repository
public interface EsProductMapper extends BaseMapper<EsProduct> {
    // 继承BaseEsMapper即可获得基础CRUD方法
}

业务代码实现

import cn.easyes.core.conditions.LambdaEsQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@Service
@Slf4j
public class DataImportService {
    private static final int BATCH_SIZE = 1000;  // 批量导入大小

    @Autowired
    private DbProductMapper dbProductMapper;

    @Autowired
    private EsProductMapper esProductMapper;

    /**
     * 全量导入数据库数据到ES
     */
    @Transactional(readOnly = true)
    public void fullImport() {
        log.info("开始全量导入数据到ES...");
        long total = dbProductMapper.selectCount(null);
        long pageCount = (total + BATCH_SIZE - 1) / BATCH_SIZE;

        // 先删除ES中现有数据
        esProductMapper.delete(new LambdaEsQueryWrapper<>());

        for (int page = 1; page <= pageCount; page++) {
            LambdaQueryWrapper<DbProduct> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.last("LIMIT " + (page - 1) * BATCH_SIZE + ", " + BATCH_SIZE);
            List<DbProduct> dbProducts = dbProductMapper.selectList(queryWrapper);

            if (CollectionUtils.isNotEmpty(dbProducts)) {
                List<EsProduct> esProducts = convertToEsProducts(dbProducts);
                boolean success = esProductMapper.insertBatch(esProducts);
                log.info("第{}页数据导入ES,共{}条,结果: {}", page, esProducts.size(), success);
            }
        }

        log.info("全量导入完成,共导入{}条数据", total);
    }

    /**
     * 增量导入(定时任务)
     */
    @Scheduled(fixedDelay = 60 * 1000)  // 每分钟执行一次
    public void incrementalImport() {
        log.info("开始增量导入数据到ES...");

        // 查询上次同步时间(实际项目中可从配置表或Redis获取)
        LocalDateTime lastSyncTime = getLastSyncTime();

        // 查询更新时间大于上次同步时间的数据
        LambdaQueryWrapper<DbProduct> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.gt(DbProduct::getUpdateTime, lastSyncTime);

        List<DbProduct> dbProducts = dbProductMapper.selectList(queryWrapper);

        if (CollectionUtils.isNotEmpty(dbProducts)) {
            List<EsProduct> esProducts = convertToEsProducts(dbProducts);

            // 分批处理
            List<List<EsProduct>> batches = splitList(esProducts, BATCH_SIZE);
            for (List<EsProduct> batch : batches) {
                boolean success = esProductMapper.insertBatch(batch);
                log.info("批量导入{}条数据到ES,结果: {}", batch.size(), success);
            }

            // 更新同步时间
            updateLastSyncTime(LocalDateTime.now());
            log.info("增量导入完成,共导入{}条数据", dbProducts.size());
        } else {
            log.info("增量导入完成,没有新数据");
        }
    }

    /**
     * 将数据库实体转换为ES实体
     */
    private List<EsProduct> convertToEsProducts(List<DbProduct> dbProducts) {
        return dbProducts.stream().map(dbProduct -> {
            EsProduct esProduct = new EsProduct();
            BeanUtils.copyProperties(dbProduct, esProduct);
            esProduct.setId(dbProduct.getId().toString());  // 设置ES ID
            return esProduct;
        }).collect(Collectors.toList());
    }

    /**
     * 分割列表为多个子列表
     */
    private <T> List<List<T>> splitList(List<T> list, int batchSize) {
        List<List<T>> result = new ArrayList<>();
        if (CollectionUtils.isEmpty(list)) {
            return result;
        }

        int totalSize = list.size();
        int numBatches = (totalSize + batchSize - 1) / batchSize;

        for (int i = 0; i < numBatches; i++) {
            int start = i * batchSize;
            int end = Math.min((i + 1) * batchSize, totalSize);
            result.add(list.subList(start, end));
        }

        return result;
    }

    /**
     * 获取上次同步时间(示例实现)
     */
    private LocalDateTime getLastSyncTime() {
        // 实际项目中可从配置表或Redis获取
        return LocalDateTime.now().minusHours(1);
    }

    /**
     * 更新同步时间(示例实现)
     */
    private void updateLastSyncTime(LocalDateTime syncTime) {
        // 实际项目中保存到配置表或Redis
        log.info("更新同步时间为: {}", syncTime);
    }
}

启动类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class DataImportApplication implements CommandLineRunner {

    @Autowired
    private DataImportService dataImportService;

    public static void main(String[] args) {
        SpringApplication.run(DataImportApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // 启动时执行一次全量导入
        dataImportService.fullImport();
    }
}

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部