springboot项目集成Elasticsearch写入数据库数据
【代码】springboot项目集成Elasticsearch写入数据库数据。
·
引入Maven依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
</dependencies>
添加ES配置
spring:
elasticsearch: #springboot 默认配置 localhost:9200
rest:
# uris:多个用逗号分隔
uris: http://****:9200
username: elastic
password: 123456
data:
elasticsearch:
repositories:
enabled: true
# es的连接地址及端口号,多个用逗号分隔
cluster-nodes: 10.0.5.135:9300
cluster-name: elastic
cluster-password: 123456
代码
全文检索实体类
@Data
@ApiModel("地名地址")
@Document(indexName = "#{@elasticSearchConfig.elasticIndexPerf}"+"address", useServerConfiguration = true)
public class EsAddress implements Serializable {
@ApiModelProperty(value = "主键")
@Id
private String id;
@ApiModelProperty(value = "地名")
private String name;
@ApiModelProperty(value = "地址信息")
private String adress;
@ApiModelProperty(value = "地名类别")
private String type;
@ApiModelProperty(value = "户室号")
private String room;
}
Es索引前缀
@Configuration
@Data
public class ElasticSearchConfig {
/**
* es的前缀
*/
@Value("${elastic_index_perf}")
public String elasticIndexPerf = "spatial_";
}
Es全文检索继承ElasticsearchRepository类
public interface AddressRepository extends ElasticsearchRepository<EsAddress, String> {
}
ES增删改查
@Autowired
private AddressRepository addressRepository;
/***
* 实时同步到ES中
* @param managedHouseList
*/
public void realTimeSync(List<AdressDmpManageVO> managedHouseList) {
List<EsAddress> result = new ArrayList<>();
if (CollectionUtil.isNotEmpty(managedHouseList)) {
for (AdressDmpManageVO vo : managedHouseList) {
EsAddress es = new EsAddress();
BeanUtils.copyProperties(vo, es);
result.add(es);
}
try {
addressRepository.saveAll(result);
} catch (Exception exception) {
if (!(exception.getMessage()).contains("200 OK")) {
throw exception;
}
}
}
}
//更新
public void udpateToEs(AdressDmpManageVO vo) {
try {
if (addressRepository.existsById(vo.getId())) {
EsAddress es = new EsAddress();
BeanUtils.copyProperties(vo, es);
addressRepository.save(es);
}
} catch (Exception exception) {
if (!(exception.getMessage()).contains("200 OK")) {
throw exception;
}
}
}
//删除
public void deleteByAddressId(String addressId) {
try {
if (addressRepository.existsById(addressId)) {
addressRepository.deleteById(addressId);
}
} catch (Exception exception) {
if (!(exception.getMessage()).contains("200 OK")) {
throw exception;
}
}
}
//清空
public void deleteAll() {
new Thread(() -> {
try {
addressRepository.deleteAll();
} catch (Exception exception) {
if (!(exception.getMessage()).contains("200 OK")) {
throw exception;
}
}
}).start();
}
数据库数据同步到ES库
@GetMapping("/sync-address-list")
@ApiOperationSupport(order = 1)
@ApiOperation("数据_同步到ES")
public R syncHouseList() {
// 缓存数据的Key(用es的索引名作为key)
String cacheKey = elasticSearchConfig.getElasticIndexPerf() + "address";
// 判断是否正在进行着缓存
boolean isRunCache = iDataCahceLogService.isRunCache(cacheKey);
if (!isRunCache) {
try {
String title = "数据";
// 判断是执行继续缓存还是重新缓存
boolean isRestartCache = iDataCahceLogService.startOrRestartCache(cacheKey, title, DataCacheLogEnum.DATA_CACHE_TYPE_ES.key);
new Thread(() -> {
esAddressService.importHouse(isRestartCache, cacheKey);
}).start();
} catch (Exception e) {
e.printStackTrace();
return R.fail("数据同步失败...");
}
return R.success("数据正在同步中,请稍后...");
} else {
return R.success("数据开始同步...");
}
}
/***
* 开始缓存数据
*
* @param hashKey 缓存的key
* @param title 缓存的标题
* @param type 缓存的类型
* @return true:执行继续缓存操作、false:执行重新缓存操作
*/
@Override
@Transactional(rollbackFor = Exception.class)
public boolean startOrRestartCache(String hashKey, String title, String type) {
boolean reslut = false;
String id = null;
DataCacheLog lastTimeCache = getLastTimeCache(hashKey);
if (lastTimeCache != null) {
if (DataCacheLogEnum.DATA_CACHE_STATUS_STOP.key.equals(lastTimeCache.getStatus())) {
// 判断如果上一次缓存状态是暂停状态时,则不产生新的缓存日志,并且进行继续缓存操作
id = lastTimeCache.getId();
reslut = true;
} else if (DataCacheLogEnum.DATA_CACHE_STATUS_FAILED.key.equals(lastTimeCache.getStatus())) {
// 判断如果上一次缓存状态是失败状态时,则产生新的缓存日志,并且进行继续缓存操作
reslut = true;
}
}
DataCacheLog dataCahceLog = new DataCacheLog();
dataCahceLog.setKey(hashKey);
dataCahceLog.setStartTime(new Date());
dataCahceLog.setTitle(title);
dataCahceLog.setType(type);
dataCahceLog.setStatus(DataCacheLogEnum.DATA_CACHE_STATUS_RUN.key);
if (StringUtil.isNotBlank(id)) {
dataCahceLog.setId(id);
this.updateById(dataCahceLog);
} else {
dataCahceLog.setId(UUID.get32UUID());
this.save(dataCahceLog);
}
return reslut;
}
/***
* 全量同步数据到ES
* @param
*/
@Override
public void importHouse(boolean isRestartCache, String titleKey) {
LambdaQueryWrapper<AdressDmpManage> wrapper = new QueryWrapper().lambda();
// 获取房源总数
long houseTotal = adressDmpManageService.count(wrapper);
// 每页的分页数
long pageSize = 2000;
int threadSize = 10;
// 计算每个线程分页数
List<CountThreadPage> threadList = CalculationUtil.countThreadPage(houseTotal, pageSize, threadSize);
List<Future<?>> futures = new ArrayList<>();
try {
for (int i = 0; i < threadList.size(); i++) {
CountThreadPage threadPage = threadList.get(i);
futures.add(
executorService.submit(() -> {
long startPage = threadPage.getStartPage();
long endPage = threadPage.getEndPage();
for (long j = startPage; j <= endPage; j++) {
if (isRestartCache) {
boolean hasCacheRecord = iDataCacheRecordService.hasCacheRecord(titleKey, titleKey, j + "");
if (hasCacheRecord) {
// 如果有缓存记录,则跳过当前循环
continue;
}
}
List<EsAddress> esAddressList = syncListByPage(j, pageSize);
// 判断是否停止了缓存
boolean isStopCache = iDataCahceLogService.isStopCache(titleKey);
if (isStopCache) {
executorService.shutdown();
break;
}
if (CollectionUtils.isNotEmpty(esAddressList)) {
try {
elasticsearchRestTemplate.save(esAddressList);
} catch (Exception exception) {
if (!(exception.getMessage()).contains("200 OK")) {
throw exception;
}
}
log.info("[elastic-house-delta] {} 分页数据_全量同步完成,当前页:{},大小:{}", Thread.currentThread().getName(), j, esAddressList.size());
// 把分页数缓存到缓存记录中
iDataCacheRecordService.addCacheRecord(titleKey, titleKey, j + "");
}
}
})
);
}
for (Future<?> future : futures) {
future.get();
}
// 设置同步完成的处理
completeSync(titleKey);
log.info("===================数据列表_同步完成=========================");
} catch (Exception e) {
e.printStackTrace();
log.info("===================数据列表_同步失败=========================");
iDataCahceLogService.cacheFailed(titleKey, e.toString());
} finally {
executorService.shutdown();
}
}
/***
* 计算每个线程的分页数
* @param total 数据的总数
* @param pageSize 每页的分页数
* @param threadSize 开启的线程数
* @return
*/
public static List<CountThreadPage> countThreadPage(long total, long pageSize,long threadSize){
// 计算总的分页数
long totalPage = total/pageSize;
if (total % pageSize != 0) {
totalPage++;
}
return countThreadPage(totalPage, threadSize);
}
/***
* 添加缓存记录
* @param titleKey 缓存数据模块的key
* @param cacheKey 缓存数据的key
* @param value 缓存数据的value
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void addCacheRecord(String titleKey, String cacheKey, String value) {
DataCacheRecord entity = new DataCacheRecord();
entity.setId(UUID.get32UUID());
entity.setTitleKey(titleKey);
entity.setCacheKey(cacheKey);
entity.setCacheValue(value);
entity.setCacheTime(new Date());
this.save(entity);
}
/***
* 判断是否有缓存记录
* @param titleKey 缓存数据模块的key
* @param cacheKey 缓存数据的key
* @param value 缓存数据的value
*/
@Override
public boolean hasCacheRecord(String titleKey, String cacheKey, String value) {
boolean result = false;
LambdaQueryWrapper<DataCacheRecord> wrapper = new QueryWrapper().lambda();
wrapper.eq(DataCacheRecord::getTitleKey, titleKey);
wrapper.eq(DataCacheRecord::getCacheKey, cacheKey);
wrapper.eq(DataCacheRecord::getCacheValue, value);
long count = this.count(wrapper);
if (count > 0) {
result = true;
}
return result;
}
/***
* 判断是否停止缓存数据
* @param key 缓存的key
* @return true:已停止缓存、false:正在缓存
*/
@Override
public boolean isStopCache(String key) {
boolean result = false;
DataCacheLog dataCacheLog = getLastTimeCache(key);
if (dataCacheLog != null && !DataCacheLogEnum.DATA_CACHE_STATUS_RUN.key.equals(dataCacheLog.getStatus())) {
result = true;
}
return result;
}

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)