ElasticSearch-java-restClient-sniffer嗅探器
2025-01-22 08:19:30    1k 字   
This post is also available in English and alternative languages.

ElasticSearch版本:6.5.0(点击跳转官方文档)


1. elasticsearch restClient sniffer 嗅探器

一般情况,像下面这样子创建客户端;

1
2
3
4
5
HttpHost httpHost2 = new HttpHost("172.16.*.*", 9200, "http");
HttpHost httpHost1 = new HttpHost("172.16.*.*", 9200, "http");
HttpHost httpHost3 = new HttpHost("172.16.*.*", 9200, "http");
httpHosts = new HttpHost[]{httpHost1, httpHost2, httpHost3};
RestClientBuilder clientBuilder = RestClient.builder(httpHosts);

集群节点数量少还好,如果集群有上百上千节点,岂不是要写很多,即便是用配置文件,也很麻烦。

sniffer 嗅探器,它可以从运行中的Elasticsearch集群中自动发现节点,并将它们设置到现有的RestClient实例。

sniffer 支持从Elasticsearch 2.x及其以后的版本获取节点列表。


2. Maven

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.5.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>6.5.0</version>
</dependency>

3. 周期性嗅探 Demo

Sniffer 嗅探器 与 restClient 相关联。

当程序启动,restClient连接上elasticsearch集群后,就开始周期性嗅探(根据 setSniffIntervalMillis 设置的时间)

就相当于启动了一个线程,定时获取集群中所有节点信息,然后更新到restClient中。

周期性嗅探好处是,节点故障(下线)、新增(上线) 可以快速感知,更新restClient中的设置。

坏处也很明显,多了一个定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 创建 elasticsearch high level client
* <p>
* 设置: <br>
* 超时时间<br>
* 线程数<br>
* 定时嗅探器(周期性嗅探):集群中节点故障(下线)、新增(上线),都可以快速感知<br>
*
* @param connectTimeout 超时时间
* @param maxRetryTimeoutMillis 最大重试超时时间
* @param threadCount 线程数
* @param snifferMillis 嗅探器刷新时间
*/
public RestHighClient(int connectTimeout, int maxRetryTimeoutMillis, int threadCount, int snifferMillis) {

RestClientBuilder restClientBuilder = RestClient.builder(httpHosts)
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
//设置线程数
.setIoThreadCount(threadCount)
//连接超时
.setConnectTimeout(connectTimeout)
.build());
}
});

//设置 最大重试超时时间
restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeoutMillis);

//每次节点发生故障时收到通知的侦听器。
restClientBuilder.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
LOGGER.error("es node is failure , hostName:{},port:{}", new Object[]{node.getHost().getHostName(), node.getHost().getPort()});
}
});

restHighLevelClient = new RestHighLevelClient(restClientBuilder);

restClient = restHighLevelClient.getLowLevelClient();

//设置嗅探器
Sniffer.builder(restClient)
//设置连续两次普通嗅探执行之间的间隔(以毫秒为单位)
.setSniffIntervalMillis(snifferMillis)
.build();
}

4. 触发性嗅探

触发性嗅探,需要设置一个监听器 SniffOnFailureListener ,通过 setFailureListener 关联到 restClient 中。

在集群节点发生故障(下线)后,触发执行。

执行的延迟通过 setSniffAfterFailureDelayMillis 设置。

当到达延迟时间点,会快速进行两次嗅探,之后间隔五分钟嗅探一次。

When sniffing on failure, not only do the nodes get updated after each failure, but an additional sniffing round is also scheduled sooner than usual

注意:
在触发性嗅探中,setSniffAfterFailureDelayMillis 和 setSniffIntervalMillis 不建议一起使用。
本地测试时候发现,配置了 setSniffIntervalMillis 之后,既有周期性嗅探,又有触发性嗅探。(如有错误,望指正)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 创建 elasticsearch high level client
* <p>
* 设置:<br>
* 超时时间<br>
* 线程数<br>
* 嗅探器(触发嗅探)<br>
*
* @param connectTimeout 超时时间
* @param maxRetryTimeoutMillis 最大重试超时时间
* @param threadCount 线程数
* @param useSniffer 是否启用嗅探器
* @param sniffAfterFailureDelayMillis 节点故障后计划执行嗅探的延迟(ms)
*/
public RestHighClient(int connectTimeout, int maxRetryTimeoutMillis, int threadCount, boolean useSniffer,
int sniffAfterFailureDelayMillis) {

SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();

RestClientBuilder restClientBuilder = RestClient.builder(httpHosts)
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
//设置线程数
.setIoThreadCount(threadCount)
//连接超时
.setConnectTimeout(connectTimeout)
.build());
}
});

//设置 最大重试超时时间
restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeoutMillis);

if (useSniffer) {
restClientBuilder.setFailureListener(sniffOnFailureListener);
}

restHighLevelClient = new RestHighLevelClient(restClientBuilder);
restClient = restHighLevelClient.getLowLevelClient();

if (useSniffer) {
Sniffer sniffer = Sniffer
.builder(restHighLevelClient.getLowLevelClient())
//设置计划执行嗅探的延迟(从收到节点故障监听开始,以毫秒为单位)
.setSniffAfterFailureDelayMillis(sniffAfterFailureDelayMillis)
.build();
sniffOnFailureListener.setSniffer(sniffer);
}
}

5. Reference