实体类映射es索引
import cn.easyes.annotation.IndexField;
import cn.easyes.annotation.IndexId;
import cn.easyes.annotation.IndexName;
import cn.easyes.common.enums.FieldType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
// 数据库表映射
@Data
@TableName("t_product")
public class DbProduct implements Serializable {
@TableId
private Long id;
private String name;
private String category;
private Double price;
private LocalDateTime createTime;
private LocalDateTime updateTime;
private Integer status;
}
// ES索引映射
@Data
@IndexName("product_index")
public class EsProduct implements Serializable {
@IndexId
private String id;
@IndexField(fieldType = FieldType.TEXT, analyzer = "ik_max_word")
private String name;
@IndexField(fieldType = FieldType.KEYWORD)
private String category;
@IndexField(fieldType = FieldType.DOUBLE)
private Double price;
@IndexField(fieldType = FieldType.DATE)
private LocalDateTime createTime;
@IndexField(fieldType = FieldType.DATE)
private LocalDateTime updateTime;
}
Mapper(MyBatis-Plus)
数据库mapper
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.springframework.stereotype.Repository;
@Repository
public interface DbProductMapper extends BaseMapper<DbProduct> {
// 自定义查询方法
}
easy-es的mapper
import cn.easyes.core.biz.EsPageInfo;
import cn.easyes.core.conditions.interfaces.BaseEsMapper;
import org.springframework.stereotype.Repository;
@Repository
public interface EsProductMapper extends BaseMapper<EsProduct> {
// 继承BaseEsMapper即可获得基础CRUD方法
}
业务代码实现
import cn.easyes.core.conditions.LambdaEsQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Service
@Slf4j
public class DataImportService {
private static final int BATCH_SIZE = 1000; // 批量导入大小
@Autowired
private DbProductMapper dbProductMapper;
@Autowired
private EsProductMapper esProductMapper;
/**
* 全量导入数据库数据到ES
*/
@Transactional(readOnly = true)
public void fullImport() {
log.info("开始全量导入数据到ES...");
long total = dbProductMapper.selectCount(null);
long pageCount = (total + BATCH_SIZE - 1) / BATCH_SIZE;
// 先删除ES中现有数据
esProductMapper.delete(new LambdaEsQueryWrapper<>());
for (int page = 1; page <= pageCount; page++) {
LambdaQueryWrapper<DbProduct> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.last("LIMIT " + (page - 1) * BATCH_SIZE + ", " + BATCH_SIZE);
List<DbProduct> dbProducts = dbProductMapper.selectList(queryWrapper);
if (CollectionUtils.isNotEmpty(dbProducts)) {
List<EsProduct> esProducts = convertToEsProducts(dbProducts);
boolean success = esProductMapper.insertBatch(esProducts);
log.info("第{}页数据导入ES,共{}条,结果: {}", page, esProducts.size(), success);
}
}
log.info("全量导入完成,共导入{}条数据", total);
}
/**
* 增量导入(定时任务)
*/
@Scheduled(fixedDelay = 60 * 1000) // 每分钟执行一次
public void incrementalImport() {
log.info("开始增量导入数据到ES...");
// 查询上次同步时间(实际项目中可从配置表或Redis获取)
LocalDateTime lastSyncTime = getLastSyncTime();
// 查询更新时间大于上次同步时间的数据
LambdaQueryWrapper<DbProduct> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.gt(DbProduct::getUpdateTime, lastSyncTime);
List<DbProduct> dbProducts = dbProductMapper.selectList(queryWrapper);
if (CollectionUtils.isNotEmpty(dbProducts)) {
List<EsProduct> esProducts = convertToEsProducts(dbProducts);
// 分批处理
List<List<EsProduct>> batches = splitList(esProducts, BATCH_SIZE);
for (List<EsProduct> batch : batches) {
boolean success = esProductMapper.insertBatch(batch);
log.info("批量导入{}条数据到ES,结果: {}", batch.size(), success);
}
// 更新同步时间
updateLastSyncTime(LocalDateTime.now());
log.info("增量导入完成,共导入{}条数据", dbProducts.size());
} else {
log.info("增量导入完成,没有新数据");
}
}
/**
* 将数据库实体转换为ES实体
*/
private List<EsProduct> convertToEsProducts(List<DbProduct> dbProducts) {
return dbProducts.stream().map(dbProduct -> {
EsProduct esProduct = new EsProduct();
BeanUtils.copyProperties(dbProduct, esProduct);
esProduct.setId(dbProduct.getId().toString()); // 设置ES ID
return esProduct;
}).collect(Collectors.toList());
}
/**
* 分割列表为多个子列表
*/
private <T> List<List<T>> splitList(List<T> list, int batchSize) {
List<List<T>> result = new ArrayList<>();
if (CollectionUtils.isEmpty(list)) {
return result;
}
int totalSize = list.size();
int numBatches = (totalSize + batchSize - 1) / batchSize;
for (int i = 0; i < numBatches; i++) {
int start = i * batchSize;
int end = Math.min((i + 1) * batchSize, totalSize);
result.add(list.subList(start, end));
}
return result;
}
/**
* 获取上次同步时间(示例实现)
*/
private LocalDateTime getLastSyncTime() {
// 实际项目中可从配置表或Redis获取
return LocalDateTime.now().minusHours(1);
}
/**
* 更新同步时间(示例实现)
*/
private void updateLastSyncTime(LocalDateTime syncTime) {
// 实际项目中保存到配置表或Redis
log.info("更新同步时间为: {}", syncTime);
}
}
启动类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class DataImportApplication implements CommandLineRunner {
@Autowired
private DataImportService dataImportService;
public static void main(String[] args) {
SpringApplication.run(DataImportApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// 启动时执行一次全量导入
dataImportService.fullImport();
}
}