基于High-level-rest-client使用JavaAPI完成对Elasticsearch的聚合查询
基于High-level-rest-client使用JavaAPI完成对Elasticsearch的聚合查询
原本是采用
transportClient
来写的,但是一个是官方说明在5.x
以后的版本就不怎么支持了,二是因为实际环境上使用了加密,无法通过transportclient
的方式进行查询了,所以综合了一下采用了High-level-rest-client
的方式。同时在使用High-level-rest-client
的方式创建 client 的时候务必注意版本的情况,我这里使用的是5.6
版本的,不同版本之间创建 client 的方式的差别还是比较大的。
下面进入正题,因为我使用的是 maven 项目的方式,所以首先
pom.xml
里面是我们可能会用上的一些 jar,其次因为该构建客户端的方式需要java1.8
版本以上,所以我们在 pom.xml 中指定了 java 的版本,并且确认你自己的运行环境中是否成功安装了java1.8
及以上版本。
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--es for transport-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.6.11</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>5.6.11</version>
</dependency>
<!--es sniffer-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>5.6.3</version>
<scope>compile</scope>
</dependency>
<!--es for rest-high-level-client-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>5.6.11</version>
</dependency>
<!-- SpringBoot 热启动 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--预加载配置信息-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
ElasticsearchConfig
配置使用了
lombok
的jar包,配置信息保存在application.properties
中。
@Configuration
@Data
public class ElasticsearchConfig {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private String port;
@Value("${elasticsearch.schema}")
private String schema;
@Value("${elasticsearch.timeout}")
private String timeout;
@Value("${elasticsearch.size}")
private String size;
@Value("${elasticsearch.username}")
private String username;
@Value("${elasticsearch.password}")
private String password;
}
ElasticsearchService
在指定的时间范围内(timestamp)查询 关键字对应为
uri
的值为filtervalue
的信息,timestamp
中的时间范围以时间戳表示
@Component
public class ElasticsearchService {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchService.class);
@Autowired
private ElasticsearchConfig elasticsearchConfig;
public Boolean getSourceByFilters(String topic, String index, String filtervalue, String timestamp){
//Host
String elasticsearchhost = elasticsearchConfig.getHost();
//Port
int elasticsearchport = Integer.parseInt(elasticsearchConfig.getPort());
//Schema
String elasticsearchschema = elasticsearchConfig.getSchema();
//Timeout (seconds)
int elasticsearchtimeout = Integer.parseInt(elasticsearchConfig.getTimeout());
//Single request quantity
int elasticsearchsize = Integer.parseInt(elasticsearchConfig.getSize());
//From what time
String gte = timestamp.split(",")[0];
//To what time
String lte = timestamp.split(",")[1];
//login es username
String elasticsearchusername = elasticsearchConfig.getUsername();
//login es password
String elasticsearchpassword = elasticsearchConfig.getPassword();
//topicname prefix
String fulltopicname = kafkaConfig.getTopicPrefix() + topic;
try {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
//es账号密码
new UsernamePasswordCredentials(elasticsearchusername, elasticsearchpassword));
//自动扫描网段,使用该服务可以在配置信息中仅配置一个es地址即可
SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
//Low Level Client init
RestClient lowLevelRestClient = RestClient.builder(
new HttpHost(elasticsearchhost, elasticsearchport, elasticsearchschema)
).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
//监听同网段服务
.setFailureListener(sniffOnFailureListener)
.build();
SnifferBuilder snifferBuilder = Sniffer.builder(restClient).setSniffIntervalMillis(elasticsearchConfig.getProducerSnifferinterval());
if (elasticsearchConfig.getProducerFailuredelay() > 0) {
snifferBuilder.setSniffAfterFailureDelayMillis(elasticsearchConfig.getProducerFailuredelay());
}
sniffOnFailureListener.setSniffer(snifferBuilder.build());
//High Level Client init
RestHighLevelClient client = new RestHighLevelClient(lowLevelRestClient);
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//Aggregate statement
searchSourceBuilder.query(QueryBuilders.boolQuery()
//.must(QueryBuilders.queryStringQuery(filters))
//.must(QueryBuilders.termsQuery("uri",filters))
.must(QueryBuilders.matchPhraseQuery("uri",filtervalue))
.must(QueryBuilders.rangeQuery("@timestamp").gte(gte).lte(lte))
)
.timeout(new TimeValue(elasticsearchtimeout,TimeUnit.SECONDS))
.size(elasticsearchsize);
searchRequest.source(searchSourceBuilder);
//Print the executed DSL statement, which can be used directly in kibana
//LOGGER.info(searchSourceBuilder.toString());
SearchResponse searchResponse = client.search(searchRequest);
if (searchResponse.getHits().totalHits == 0){
return null;
}else {
Boolean kafkaresstatus = new Boolean(true);
if ("OK".equals(searchResponse.status().toString())){
List list = new ArrayList<>();
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit hit : searchResponse.getHits().getHits()) {
String res = hit.getSourceAsString();
}
long totalHits = searchResponse.getHits().getTotalHits();
long length = searchResponse.getHits().getHits().length;
LOGGER.info("A total of [{}] data was retrieved, and the number of data processed [{}]",totalHits, length);
while (searchHits != null && searchHits.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = client.searchScroll(scrollRequest);
for (SearchHit hit : searchResponse.getHits().getHits()) {
String res = hit.getSourceAsString();
}
length += searchResponse.getHits().getHits().length;
LOGGER.info("A total of [{}] data was retrieved, and the number of data processed [{}]",totalHits, length);
if (length == totalHits){
break;
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);
boolean succeeded = clearScrollResponse.isSucceeded();
return kafkaresstatus;
}else {
return false;
}
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}