最近在做一个需求,一开始的时候以为用es脚本能搞定,耽搁了一天半时间。
后来用了Java client 的api来做,效率快多了。
package com.XXX.XXXX.XXX;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.assertj.core.util.Arrays;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class HestiaApplicationTests {
@Resource
protected RestHighLevelClient client;
@Test
public void contextLoads() {
}
/**
* *先检索、再更新文档
**/
@Test
public void search() throws IOException {
try {
SearchRequest searchRequest = new SearchRequest("zm_prod");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 第几页
searchSourceBuilder.from(0);
Map<String, Object> params = new HashMap<>();
//课程标签:1.大班课;2.小班课;3.AI课;4.磨课
// Script script = new Script(ScriptType.INLINE, "painless", "ctx._source.values.base_info.course_mark == 2", params);
// searchSourceBuilder.query(QueryBuilders.scriptQuery(script));
// searchSourceBuilder.query(QueryBuilders.matchQuery("_source.values.base_info.course_mark",2));
searchSourceBuilder.query(QueryBuilders.termQuery("_id", 55074));
// searchSourceBuilder.query(QueryBuilders.matchAllQuery());
// 每页多少条数据
searchSourceBuilder.size(1000);
// 设置超时时间为2s
searchSourceBuilder.timeout(new TimeValue(2000));
// 设置request要搜索的索引和类型
// searchRequest.indices("zm_prod").types("news");
// 设置SearchSourceBuilder查询属性
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest);
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (ArrayUtils.isEmpty(searchHits)) {
log.info("searchHits = 666666666666666666666");
}
for (SearchHit hit : searchHits) {
//文档的主键
String id = hit.getId();
String index = hit.getIndex();
String type = hit.getType();
String sourceAsString = hit.getSourceAsString();
if(!sourceAsString.contains("course_mark")){
continue;
}
//源文档内容
Map<String, Object> sourceAsMap = hit.getSource();
// String s = JSON.toJSONString(sourceAsMap);
Map<String, Object> values = (Map)sourceAsMap.get("values");
Map<String, Object> base_info = (Map)values.get("base_info");
Integer course_mark = (Integer) base_info.get("course_mark");
String base_infostr = JSON.toJSONString(base_info);
if(null!=course_mark){
if(course_mark==1||course_mark==2||course_mark==3){
log.info(" base_infostr = " + base_infostr);
//TODO 这个地方反向的在map里面添加字段,然后重新组装,然后更新当前的数据
base_info.put("BBB",7777);
values.put("base_info",base_info);
UpdateRequest uRequest = new UpdateRequest()
.index(index)
.type(type)
.id(id)
.doc(XContentFactory.jsonBuilder()
.startObject().field("values", values).endObject());
BulkRequest blkRequest = new BulkRequest();
blkRequest.add(uRequest);
// 执行
BulkResponse bulkResponse = client.bulk(blkRequest);
// for (UpdateRequest uprequest : list) {
// bulkResponse.add(uprequest);
// }
// BulkResponse bulkResponse = bulkResponse.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("批量错误!");
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
log.error(ExceptionUtils.getMessage(e));
}
}
}
-------------------------------------------------------------------------------------------------------------------
以下是参考别的博主的内容;
最近在学习ElasticSearch,前些天在工作中遇到一个难以解决的问题,问题正如标题所示在使用Java TransportClient更新ES复杂数据结构数组,最后请教大佬问题得以解决。此文章将详细描述问题并提供解决办法。
博主要更新的数据格式大致如下:
原数据:一个嵌套类型的数组
更新后的数据:将商场01对应的数据从数组删除
"list":[
{
"code": "9111364",
"name": "企业01"
},
{
"code": "900662",
"name": "智能01"
},
{
"code": "9000300",
"name": "商场01"
}
]
博主是ES小白,对于此类型的数据不知道如何正确使用 UpdateRequest进行更新。
于是乎使用如下方法,value表示更新的数据(也就是没有"商场01"的list数据),对于value的类型博主尝试过Object和List<>,甚至将list转成Json格式结果都不可以。
// XXXXXXXXX表示要更新的数据
// List value = XXXXXXXXX;
// XXXXXXXXX表示要更新的数据
// Object value = XXXXXXXXX;
List> value = XXXXXXXXX; // 这个好用
updateRequest.doc(XContentFactory.jsonBuilder()
.startObject()
.field("name", value)
.field(flag, 1)
.endObject());
最后大佬告诉我要将List value转成List> value,也就是当使用updateRequest的时候,对于字段类型是对象数组的,ES是无法正常更新的,要将List中的泛型专程Map类型,Es才会识别。 // 将嵌套数组对象转Set格式(List也可以),否则无法进行更新(会报错)
List> set = Lists.newArrayList();
Map map = Maps.newHashMap();
Class clazz;
// 使用反射动态将Set中的属性值放入Map中
for (Object obj : setArry) {
clazz = obj.getClass();
// 遍历当前对象的属性值
for (Field field : clazz.getDeclaredFields()) {
field.setAccessible(true);
String name = field.getName();
Object value = field.get(obj);
map.put(name, value);
}
set.add(map);
map = Maps.newHashMap();
由于此种数组类型较多,博主使用反射,可以兼容每种数组类型。最后成功更新数据。
---------------------------------------------------------------------------------------------------------------
同时参考了
如果更新一条文档,而且知道文档id的前提下可以使用UpdateRequest即可实现,代码如下:
/**
* 根据文档id更新
* @throws IOException
*/
@Test
public void test() throws IOException {
UpdateRequest request = new UpdateRequest("sub_bank1031","sub_bank","SvjgP24BndtcmnpzbiuL");
request.doc("{\"aliasName\":\"中国农业发展银行林州市支行444\",\"bankType\":\"ADB\",\"bankTypeName\":\"中国农业发展银行\",\"cityId\":\"410500\",\"cityName\":\"安阳市\",\"createTime\":1515719190000,\"createUser\":\"system\",\"id\":\"000238a326b044e9ae10cfe4298f4c44\",\"isEnabled\":\"1\",\"name\":\"中国农业发展银行林州市支行\",\"provinceId\":\"410000\",\"provinceName\":\"河南省\",\"unionNumber\":\"203496100010\"}", XContentType.JSON);
UpdateResponse resp = highLevelClient.update(request, RequestOptions.DEFAULT);
println(resp.getResult());
}
但是如果不知道文档id的情况如果还想使用UpdateRequest更新文档就需要先使用SearchRequest根据某个条件查询符合条件的文档,然后再循环更新文档即可。
/**
**先检索、再更新文档
**/
@Test
public void search() throws IOException{
SearchRequest searchRequest = new SearchRequest("sub_bank1031");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("_id", "SvjgP24BndtcmnpzbiuL"));
searchSourceBuilder.size(2);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] searchHits = searchResponse.getHits().getHits();
for(SearchHit s:searchHits){
String docId = s.getId();
UpdateRequest request = new UpdateRequest("sub_bank1031","sub_bank",docId);
request.doc("{\"aliasName\":\"中国农业发展银行林州市支行444\",\"bankType\":\"ADB\",\"bankTypeName\":\"中国农业发展银行\",\"cityId\":\"410500\",\"cityName\":\"安阳市\",\"createTime\":1515719190000,\"createUser\":\"system\",\"id\":\"000238a326b044e9ae10cfe4298f4c44\",\"isEnabled\":\"1\",\"name\":\"中国农业发展银行林州市支行\",\"provinceId\":\"410000\",\"provinceName\":\"河南省\",\"unionNumber\":\"203496100010\"}", XContentType.JSON);
UpdateResponse resp = highLevelClient.update(request, RequestOptions.DEFAULT);
println(resp.getResult());
}
}
上面操作略显麻烦,需要多条http请求才能完成,要更新的文档数量很多时将大大降低系统响应速度,这时候我们可以使用es的UpdateByQueryRequest来实现该功能。
/**
* 通过脚本更新文档
* @throws IOException
*/
@Test
public void updateByQueryRequest() throws IOException {
UpdateByQueryRequest request = new UpdateByQueryRequest("sub_bank1031");
request.setDocTypes("sub_bank");
request.setQuery(new TermQueryBuilder("cityId", "511000"));
request.setSize(2);
request.setScript(
new Script(
ScriptType.INLINE, "painless",
"if (ctx._source.bankType == 'BOC') {ctx._source.aliasName='hello'}",
Collections.emptyMap()));
BulkByScrollResponse resp = highLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
}
PS:
pom文件es相关依赖如下:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.0</version>
</dependency>
<!-- elasticsearch high level -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
<version>6.8.0</version>
</dependency>