ElasticSearch 集成 Spring 之 RestHighLevelClient 示例
RestHighLevelClient
RestHighLevelClient 是官方指定的连接 API。
另外一个是 TransportClient,但是 TransportClient 这个是已经废弃不用的,所以会在 ES8.0 之后完全移除,也就是说 8.0 之后就无法使用了。
引入依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
配置连接
配置文件 application.properties
:
spring.data.elasticsearch.host=192.168.10.31:192.168.10.32:192.168.10.33:192.168.10.34
spring.data.elasticsearch.port=9200
spring.data.elasticsearch.username=elastic
spring.data.elasticsearch.password=123456
配置 Java 类:
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
@Value("${spring.data.elasticsearch.host}")
private String host;
@Value("${spring.data.elasticsearch.port}")
private int port;
@Value("${spring.data.elasticsearch.username}")
private String username;
@Value("${spring.data.elasticsearch.password}")
private String password;
private RestHighLevelClient restHighLevelClient;
@Override
public void destroy() throws Exception {
try {
LOGGER.info("Closing elasticSearch client");
if (restHighLevelClient != null) {
restHighLevelClient.close();
}
} catch (final Exception e) {
LOGGER.error("Error closing ElasticSearch client: ", e);
}
}
@Override
public RestHighLevelClient getObject() throws Exception {
return restHighLevelClient;
}
@Override
public Class<RestHighLevelClient> getObjectType() {
return RestHighLevelClient.class;
}
@Override
public boolean isSingleton() {
return false;
}
@Override
public void afterPropertiesSet() throws Exception {
buildClient();
}
protected void buildClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
HttpHost[] hostArray = new HttpHost[host.split(":").length];
int index = 0;
for (String httpHost : host.split(":")) {
hostArray[index] = new HttpHost(httpHost, port);
index++;
}
restHighLevelClient = new RestHighLevelClient(RestClient.builder(hostArray).setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
LOGGER.info("elasticSearch client buildClient...");
}
}
引入 RestHighLevelClient
@Autowired
private RestHighLevelClient restHighLevelClient;
索引相关
判断索引是否存在
GetIndexRequest request = new GetIndexRequest(indexName);
boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
列出所有索引
GetAliasesRequest request = new GetAliasesRequest();
GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request,RequestOptions.DEFAULT);
Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases();
Set<String> indices = map.keySet();
创建索引
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
.put("index.number_of_replicas", 1) // 有1个备份
.put("index.number_of_shards", 5)); // 有5个碎片
XContentBuilder mappingBuilder = JsonXContent.contentBuilder()
.startObject()
.startObject("properties")
.startObject("title").field("type", "text").field("index", "true").endObject()
.startObject("content").field("type", "text").field("index", "true").endObject()
.endObject()
.endObject();
request.mapping(mappingBuilder);
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
System.out.println(response.isAcknowledged());
查看索引 Mapping:
$ curl http://127.0.0.1:9200/wyqtest/_mapping?pretty
{
"wyqtest" : {
"mappings" : {
"properties" : {
"content" : {
"type" : "text"
},
"title" : {
"type" : "text"
}
}
}
}
}
删除索引
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
request.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse response = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
log.info("result: {}", response.isAcknowledged());
查询示例
聚合汇总
构建 BoolQueryBuilder
BoolQueryBuilder bool = new BoolQueryBuilder(); bool.must(QueryBuilders.rangeQuery("@timestamp").from(start.getTime())); bool.must(QueryBuilders.rangeQuery("@timestamp").to(end.getTime()));
设置分组 TermsAggregationBuilder
TermsAggregationBuilder aggregationBuilderGroupBy = AggregationBuilders.terms("agg_count").field("module.keyword").size(200);
分组查询
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true).query(bool).aggregation(aggregationBuilderGroupBy); SearchRequest searchRequest = new SearchRequest(esIndexName).source(sourceBuilder); SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = response.getAggregations();
获取查询结果
Aggregation sourceType = aggregations.get("agg_count"); for (Terms.Bucket bucket : ((Terms) sourceType).getBuckets()) { logger.info("[LogIndex]"+bucket.getKeyAsString()+"[Count]"+bucket.getDocCount()); }
执行结果:
[LogIndex]nlp-model[Count]101520 [LogIndex]web-admin[Count]1106
分页查询
构建 BoolQueryBuilder
BoolQueryBuilder bool = new BoolQueryBuilder(); bool.must(QueryBuilders.matchQuery("module", dto.getModule()).minimumShouldMatch("100%")); bool.must(QueryBuilders.termQuery("level", dto.getLevel().toLowerCase()));
设置查询 SearchSourceBuilder
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true); // 设置查询条件BoolQueryBuilder sourceBuilder.query(bool); // 设置分组,需注意 es 的分页是从 0 开始的 sourceBuilder.from(page); sourceBuilder.size(perPage); // 设置排序 sourceBuilder.sort("@timestamp", SortOrder.DESC);
进行查询
SearchRequest searchRequest = new SearchRequest(index); searchRequest.source(sourceBuilder); SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
获取查询结果
// 获取结果集 SearchHits hits = response.getHits(); // 获取总条数 paginator.setItems(Integer.valueOf(String.valueOf(hits.getTotalHits().value))); // 转换结果集 for (SearchHit hit : response.getHits().getHits()) { PhishingLogDto mailServer = new PhishingLogDto(); mailServer.jsonToDto(mailServer, JSONObject.parseObject(hit.getSourceAsString())); list.add(mailServer); }
相关文章