ElasticSearch 集成 Spring 之 RestHighLevelClient 示例

RestHighLevelClient

RestHighLevelClient 是官方指定的连接 API。

另外一个是 TransportClient,但是 TransportClient 这个是已经废弃不用的,所以会在 ES8.0 之后完全移除,也就是说 8.0 之后就无法使用了。

引入依赖:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>

配置连接

配置文件 application.properties

spring.data.elasticsearch.host=192.168.10.31:192.168.10.32:192.168.10.33:192.168.10.34
spring.data.elasticsearch.port=9200
spring.data.elasticsearch.username=elastic
spring.data.elasticsearch.password=123456

配置 Java 类:

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);

    @Value("${spring.data.elasticsearch.host}")
    private String host;
    @Value("${spring.data.elasticsearch.port}")
    private int port;
    @Value("${spring.data.elasticsearch.username}")
    private String username;
    @Value("${spring.data.elasticsearch.password}")
    private String password;

    private RestHighLevelClient restHighLevelClient;

    @Override
    public void destroy() throws Exception {
        try {
            LOGGER.info("Closing elasticSearch client");
            if (restHighLevelClient != null) {
                restHighLevelClient.close();
            }
        } catch (final Exception e) {
            LOGGER.error("Error closing ElasticSearch client: ", e);
        }
    }

    @Override
    public RestHighLevelClient getObject() throws Exception {
        return restHighLevelClient;
    }

    @Override
    public Class<RestHighLevelClient> getObjectType() {
        return RestHighLevelClient.class;
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        buildClient();
    }

    protected void buildClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        HttpHost[] hostArray = new HttpHost[host.split(":").length];
        int index = 0;
        for (String httpHost : host.split(":")) {
            hostArray[index] = new HttpHost(httpHost, port);
            index++;
        }
        restHighLevelClient = new RestHighLevelClient(RestClient.builder(hostArray).setHttpClientConfigCallback(
                httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
        LOGGER.info("elasticSearch client buildClient...");
    }
}

引入 RestHighLevelClient

@Autowired
private RestHighLevelClient restHighLevelClient;

索引相关

判断索引是否存在

GetIndexRequest request = new GetIndexRequest(indexName);
boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);

列出所有索引

GetAliasesRequest request = new GetAliasesRequest();
GetAliasesResponse getAliasesResponse =  restHighLevelClient.indices().getAlias(request,RequestOptions.DEFAULT);
Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases();
Set<String> indices = map.keySet();

创建索引

CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
                 .put("index.number_of_replicas", 1) // 有1个备份
                 .put("index.number_of_shards", 5)); // 有5个碎片
XContentBuilder mappingBuilder = JsonXContent.contentBuilder()
                    .startObject()
                    .startObject("properties")
                    .startObject("title").field("type", "text").field("index", "true").endObject()
                    .startObject("content").field("type", "text").field("index", "true").endObject()
                    .endObject()
                    .endObject();
request.mapping(mappingBuilder);
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
System.out.println(response.isAcknowledged());

查看索引 Mapping:

$ curl http://127.0.0.1:9200/wyqtest/_mapping?pretty
{
  "wyqtest" : {
    "mappings" : {
      "properties" : {
        "content" : {
          "type" : "text"
        },
        "title" : {
          "type" : "text"
        }
      }
    }
  }
}

删除索引

DeleteIndexRequest request = new DeleteIndexRequest(indexName);
request.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse response = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
log.info("result: {}", response.isAcknowledged());

查询示例

聚合汇总

  1. 构建 BoolQueryBuilder

    BoolQueryBuilder bool = new BoolQueryBuilder();
    bool.must(QueryBuilders.rangeQuery("@timestamp").from(start.getTime()));
    bool.must(QueryBuilders.rangeQuery("@timestamp").to(end.getTime()));
  2. 设置分组 TermsAggregationBuilder

    TermsAggregationBuilder aggregationBuilderGroupBy = AggregationBuilders.terms("agg_count").field("module.keyword").size(200);
  3. 分组查询

    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true).query(bool).aggregation(aggregationBuilderGroupBy);
    SearchRequest searchRequest = new SearchRequest(esIndexName).source(sourceBuilder);
    SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    Aggregations aggregations = response.getAggregations();
  4. 获取查询结果

    Aggregation sourceType = aggregations.get("agg_count");
    for (Terms.Bucket bucket : ((Terms) sourceType).getBuckets()) {
        logger.info("[LogIndex]"+bucket.getKeyAsString()+"[Count]"+bucket.getDocCount());
    }
  5. 执行结果:

    [LogIndex]nlp-model[Count]101520
    [LogIndex]web-admin[Count]1106

分页查询

  1. 构建 BoolQueryBuilder

    BoolQueryBuilder bool = new BoolQueryBuilder();
    bool.must(QueryBuilders.matchQuery("module", dto.getModule()).minimumShouldMatch("100%"));
    bool.must(QueryBuilders.termQuery("level", dto.getLevel().toLowerCase()));
  2. 设置查询 SearchSourceBuilder

    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true);
    // 设置查询条件BoolQueryBuilder
    sourceBuilder.query(bool);
    // 设置分组,需注意 es 的分页是从 0 开始的
    sourceBuilder.from(page);
    sourceBuilder.size(perPage);
    // 设置排序
    sourceBuilder.sort("@timestamp", SortOrder.DESC);
  3. 进行查询

    SearchRequest searchRequest = new SearchRequest(index);
    searchRequest.source(sourceBuilder);
    SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  4. 获取查询结果

    // 获取结果集
    SearchHits hits = response.getHits();
    // 获取总条数
    paginator.setItems(Integer.valueOf(String.valueOf(hits.getTotalHits().value)));
    // 转换结果集
    for (SearchHit hit : response.getHits().getHits()) {
        PhishingLogDto mailServer = new PhishingLogDto();
        mailServer.jsonToDto(mailServer, JSONObject.parseObject(hit.getSourceAsString()));
        list.add(mailServer);
    }