This post is also available in English and alternative languages.ElasticSearch版本:6.5.0(点击跳转官方文档)
1. elasticsearch restClient sniffer 嗅探器
一般情况,像下面这样子创建客户端;
1 2 3 4 5
| HttpHost httpHost2 = new HttpHost("172.16.*.*", 9200, "http"); HttpHost httpHost1 = new HttpHost("172.16.*.*", 9200, "http"); HttpHost httpHost3 = new HttpHost("172.16.*.*", 9200, "http"); httpHosts = new HttpHost[]{httpHost1, httpHost2, httpHost3}; RestClientBuilder clientBuilder = RestClient.builder(httpHosts);
|
集群节点数量少还好,如果集群有上百上千节点,岂不是要写很多,即便是用配置文件,也很麻烦。
sniffer 嗅探器,它可以从运行中的Elasticsearch集群中自动发现节点,并将它们设置到现有的RestClient实例。
sniffer 支持从Elasticsearch 2.x及其以后的版本获取节点列表。
2. Maven
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.5.0</version> </dependency>
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client-sniffer</artifactId> <version>6.5.0</version> </dependency>
|
3. 周期性嗅探 Demo
Sniffer 嗅探器 与 restClient 相关联。
当程序启动,restClient连接上elasticsearch集群后,就开始周期性嗅探(根据 setSniffIntervalMillis 设置的时间)
就相当于启动了一个线程,定时获取集群中所有节点信息,然后更新到restClient中。
周期性嗅探好处是,节点故障(下线)、新增(上线) 可以快速感知,更新restClient中的设置。
坏处也很明显,多了一个定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
public RestHighClient(int connectTimeout, int maxRetryTimeoutMillis, int threadCount, int snifferMillis) {
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom() .setIoThreadCount(threadCount) .setConnectTimeout(connectTimeout) .build()); } });
restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeoutMillis);
restClientBuilder.setFailureListener(new RestClient.FailureListener() { @Override public void onFailure(Node node) { LOGGER.error("es node is failure , hostName:{},port:{}", new Object[]{node.getHost().getHostName(), node.getHost().getPort()}); } });
restHighLevelClient = new RestHighLevelClient(restClientBuilder);
restClient = restHighLevelClient.getLowLevelClient();
Sniffer.builder(restClient) .setSniffIntervalMillis(snifferMillis) .build(); }
|
4. 触发性嗅探
触发性嗅探,需要设置一个监听器 SniffOnFailureListener ,通过 setFailureListener 关联到 restClient 中。
在集群节点发生故障(下线)后,触发执行。
执行的延迟通过 setSniffAfterFailureDelayMillis 设置。
当到达延迟时间点,会快速进行两次嗅探,之后间隔五分钟嗅探一次。
When sniffing on failure, not only do the nodes get updated after each failure, but an additional sniffing round is also scheduled sooner than usual
注意:
在触发性嗅探中,setSniffAfterFailureDelayMillis 和 setSniffIntervalMillis 不建议一起使用。
本地测试时候发现,配置了 setSniffIntervalMillis 之后,既有周期性嗅探,又有触发性嗅探。(如有错误,望指正)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
public RestHighClient(int connectTimeout, int maxRetryTimeoutMillis, int threadCount, boolean useSniffer, int sniffAfterFailureDelayMillis) {
SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom() .setIoThreadCount(threadCount) .setConnectTimeout(connectTimeout) .build()); } });
restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeoutMillis);
if (useSniffer) { restClientBuilder.setFailureListener(sniffOnFailureListener); }
restHighLevelClient = new RestHighLevelClient(restClientBuilder); restClient = restHighLevelClient.getLowLevelClient();
if (useSniffer) { Sniffer sniffer = Sniffer .builder(restHighLevelClient.getLowLevelClient()) .setSniffAfterFailureDelayMillis(sniffAfterFailureDelayMillis) .build(); sniffOnFailureListener.setSniffer(sniffer); } }
|
5. Reference