ElasticSearch 集成 Spring 之 RestHighLevelClient 示例
RestHighLevelClient
RestHighLevelClient 是官方指定的连接 API。
另外一个是 TransportClient,但是 TransportClient 这个是已经废弃不用的,所以会在 ES8.0 之后完全移除,也就是说 8.0 之后就无法使用了。
引入依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
配置连接
配置文件 application.properties
:
spring.data.elasticsearch.host=192.168.10.31:192.168.10.32:192.168.10.33:192.168.10.34
spring.data.elasticsearch.port=9200
spring.data.elasticsearch.username=elastic
spring.data.elasticsearch.password=123456
配置 Java 类:
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
@Value("${spring.data.elasticsearch.host}")
private String host;
@Value("${spring.data.elasticsearch.port}")
private int port;
@Value("${spring.data.elasticsearch.username}")
private String username;
@Value("${spring.data.elasticsearch.password}")
private String password;
private RestHighLevelClient restHighLevelClient;
@Override
public void destroy() throws Exception {
try {
LOGGER.info("Closing elasticSearch client");
if (restHighLevelClient != null) {
restHighLevelClient.close();
}
} catch (final Exception e) {
LOGGER.error("Error closing ElasticSearch client: ", e);
}
}
@Override
public RestHighLevelClient getObject() throws Exception {
return restHighLevelClient;
}
@Override
public Class<RestHighLevelClient> getObjectType() {
return RestHighLevelClient.class;
}
@Override
public boolean isSingleton() {
return false;
}
@Override
public void afterPropertiesSet() throws Exception {
buildClient();
}
protected void buildClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
HttpHost[] hostArray = new HttpHost[host.split(":").length];
int index = 0;
for (String httpHost : host.split(":")) {
hostArray[index] = new HttpHost(httpHost, port);
index++;
}
restHighLevelClient = new RestHighLevelClient(RestClient.builder(hostArray).setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
LOGGER.info("elasticSearch client buildClient...");
}
}
引入 RestHighLevelClient
@Autowired
private RestHighLevelClient restHighLevelClient;
索引相关
判断索引是否存在
GetIndexRequest request = new GetIndexRequest(indexName);
boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
列出所有索引
GetAliasesRequest request = new GetAliasesRequest();
GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request,RequestOptions.DEFAULT);
Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases();
Set<String> indices = map.keySet();
创建索引
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
.put("index.number_of_replicas", 1) // 有1个备份
.put("index.number_of_shards", 5)); // 有5个碎片
XContentBuilder mappingBuilder = JsonXContent.contentBuilder()
.startObject()
.startObject("properties")
.startObject("title").field("type", "text").field("index", "true").endObject()
.startObject("content").field("type", "text").field("index", "true").endObject()
.endObject()
.endObject();
request.mapping(mappingBuilder);
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
System.out.println(response.isAcknowledged());
查看索引 Mapping:
$ curl http://127.0.0.1:9200/wyqtest/_mapping?pretty
{
"wyqtest" : {
"mappings" : {
"properties" : {
"content" : {
"type" : "text"
},
"title" : {
"type" : "text"
}
}
}
}
}
删除索引
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
request.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse response = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
log.info("result: {}", response.isAcknowledged());
查询示例
分页查询
构建 BoolQueryBuilder
BoolQueryBuilder bool = new BoolQueryBuilder(); bool.must(QueryBuilders.matchQuery("module", dto.getModule()).minimumShouldMatch("100%")); bool.must(QueryBuilders.termQuery("level", dto.getLevel().toLowerCase()));
设置查询 SearchSourceBuilder
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true); // 设置查询条件BoolQueryBuilder sourceBuilder.query(bool); // 设置分组,需注意 es 的分页是从 0 开始的 sourceBuilder.from(page); sourceBuilder.size(perPage); // 设置排序 sourceBuilder.sort("@timestamp", SortOrder.DESC);
进行查询
SearchRequest searchRequest = new SearchRequest(index); searchRequest.source(sourceBuilder); SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
获取查询结果
// 获取结果集 SearchHits hits = response.getHits(); // 获取总条数 paginator.setItems(Integer.valueOf(String.valueOf(hits.getTotalHits().value))); // 转换结果集 for (SearchHit hit : response.getHits().getHits()) { PhishingLogDto mailServer = new PhishingLogDto(); mailServer.jsonToDto(mailServer, JSONObject.parseObject(hit.getSourceAsString())); list.add(mailServer); }
根据 IDs 查询数据集
MultiGetRequest request = new MultiGetRequest();
for (int i = 0; i < indexArray.size(); i++) {
request.add(new MultiGetRequest.Item(index, idArray.getString(i)));
}
MultiGetResponse response = restHighLevelClient.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse[] itemResponses = response.getResponses();
for (MultiGetItemResponse itemResponse : itemResponses) {
GetResponse getResponse = itemResponse.getResponse();
if (getResponse.isExists()) {
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
for (String key : sourceAsMap.keySet()) {
Object value = sourceAsMap.get(key);
try {
if (Constants.ONLINE_SUSPECT_DATE_COLUMN.contains(key)) { // 定义时间格式
value = Constants.ONLINE_SUSPECT_DATE_FORMAT.parse(value.toString()); // 将时间字符串解析为Date对象
}
BeanUtils.setProperty(onlineSuspect, key, value);
} catch (Exception e) {
log.error("[key]{} [Message]{}", key, e.getMessage(), e);
}
}
}
}
相关文章