ElasticSearch源码-03-索引数据
2025-01-22 08:19:30    2.3k 字   
This post is also available in English and alternative languages.

ElasticSearch版本:6.5.0
纸上谈兵,不定时补充、更新,看到哪里写到哪里。文中图片较多、较大(已经过压缩),打不开多刷新几次。
目前仅涉及ElasticSearch内部,不涉及Lucene。


1. ElasticSearch 索引过程

elasticsearch-index-大致流程

ElasticSearch的写请求,会经过几个不同的节点:协调节点 → 主分片节点 → 副本分片节点。

02-es-基本概念 一篇中,曾介绍过,协调节点负责节点查询、索引时的请求转发、数据收集、合并、聚合等操作,并且ElasticSearch集群中默认所有的节点都是协调节点。

2. 相关概念

2.1. refresh、translog、flush

  • **refresh:**为了搜索可见性;提升搜索实时性,segment(索引段)不刷磁盘,但可以及时被搜索;
  • translog:为了数据可靠性;将所有未落磁盘的写相关操作都落磁盘记录(和refreshflush不相关);
  • **flush:**Lucene commit;segment(索引段)落磁盘的操作,在这个操作的同时会触发一次 refresh,落磁盘后也会把 translog 清空;

2.2. Rest*Action模块

定义某个URI应该由哪个Action模块处理,Rest请求的处理类命名规范为:Rest*Action。

每一种Rest*Action,都对应着某一类型的rest请求,例如:

  • RestSearchAction:处理检索文档相关的rest请求

  • RestUpdateAction:处理更新文档相关的rest请求

  • RestDeleteAction:处理删除文档相关的rest请求

2.2.1. Rest*Action类关系

RestIndexAction的关系类图如下:

RestIndexActionClassDiagram

2.2.1.1. RestHandler

是个接口(顶层抽象),定义的主要方法是:handleRequest(),该方法用来处理、包装用户request请求。

2.2.1.2. BaseRestHandler

它实现了RestHandler接口的抽象类。它实现了RestHandler#handleRequest方法,定义了rest基础处理流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {

//请求执行准备
// prepare the request for execution; has the side effect of touching the request parameters
final RestChannelConsumer action = prepareRequest(request, client);

//参数格式化、校验
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams =
request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

//无效参数处理
// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}

//计数
usageCount.increment();

//执行action
// execute the action
action.accept(channel);
}

该类中还有个重要的抽象方法:BaseRestHandler#prepareRequest,用来组装操作索引的参数。子类实现该方法后用来组装不同操作所需的对象。

BaseRestHandler抽象类几乎是所有rest处理器的父类。

BaseRestHandlerClassDiagram

2.2.1.3. RestIndexAction

RestIndexAction处理器继承了BaseRestHandler抽象类。

是实现了RestIndexAction#prepareRequest方法,组装操作索引的参数。

2.3. TranSport模块

TranSport模块(传输模块)用于集群内节点之间的内部通信。从一个节点到另一个节点的调用都是用传输模块;

TranSport传输机制是异步的,意味着没有阻塞线程等待;

命名规范:TranSport*Action。

2.4. Action模块

例如:IndexAction,这些anction并未真正实现功能,类似代理作用。

每一种action模块(例如:IndexAction),都会映射、关联 上面两个模块类

Rest*Action → *Action → TranSport*Action。

如 RestIndexAction → IndexAction → TranSportIndexAction。

在 org.elasticsearch.action.ActionModule 类中对 Rest*Action → *Action → TranSport*Action 这种关系进行映射、注册 关联。

3. 接收用户请求

除开特殊配置(设置特殊节点),否则几乎每个节点都有可能收到用户请求(协调节点)。

ElasticSearch Server接收用户请求起始点:Netty4HttpRequestHandler#channelRead0

在接受用户请求后,es会根据用户请求参数进行调度、分派,选择对应的Rest*Action(handler)进行处理。

代码流程如下:

接收用户请求代码流程

3.1. 接收用户请求

收到用户原始请求后,会对参数进行初步包装,然后准备进行调度

Netty4Http接受用户请求

3.2. 遍历所有处理程序(Handler)

尝试分派请求,遍历所有可能的处理程序(Handler),这里还进行了简单的校验

遍历所有handler处理器

3.3. 分派请求

在dispatchRequest方法中,根据RestHandler查找对应RestHandler的实现。进而调用handleRequest方法

分派请求给Rest*Action处理

4. 源码流程图

elasticsearch-index-源码流程图

5. 大体处理过程

Netty4HttpRequestHandler#channelRead0收到index索引请求。传递给RestController,然后将请求分发给相应的RestHandler处理。

请求URL的模式符合:“/{index}/{type}/{id}”(上面列出的几种模式),被分配给RestIndexAction处理。

RestIndexAction会从request中获取参数、数据,然后根据不同的action,映射到具体的TransportAction实例(几乎每一个RestAction都有一个TransportAction),RestIndexAction 映射的是 TransportIndexAction。

然后调用TransportIndexAction#execute发起操作请求。TransportIndexAction类,继承了TransportSingleItemBulkWriteAction抽象类。

其实调用的是TransportSingleItemBulkWriteAction#doExecute(批处理)。但该类被标注了@Deprecated注解,最后实际是TransportBulkAction(TransportBulkAction#doExecute)。

5.1. 1、RestIndexAction

RestIndexAction类中,主要是RestIndexAction#prepareRequest方法,组装操作index索引的相关参数,

5.2. 2、TransportIndexAction

在 AbstractClient#index 和 NodeClient#executeLocally 方法中,会完成 RestIndexAction → IndexAction → TransportIndexAction 的映射。

IndexAction 和 TransportIndexAction 的映射关系,在ActionModule#setupActions中配置。

NodeClient#transportAction方法中,TransportIndexAction调用execute发起操作请求。

TransportIndexAction:执行索引操作,并且设置autoCreateIndex(是否自动创建索引)、allowIdGeneration(是否允许生成id)

IndexActionTransportAction

TransportIndexAction类继承了TransportSingleItemBulkWriteAction抽象类。

因此,由TransportSingleItemBulkWriteAction#doExecute负责处理。

TransportIndexAction

5.3. 3、TransportSingleItemBulkWriteAction#doExecute

红框1处,对原request进行了封装,改成了批处理的request。

由于TransportSingleItemBulkWriteAction被标注了@Deprecated注解,最后实际是TransportBulkAction进行处理(TransportBulkAction#doExecute)。

TransportSingletemBulkWriteAction

5.4. 4、TransportBulkAction#doExecute

  • 校验自动创建节点配置
  • 筛选indexName
  • 基于当前集群信息,检查索引是否需要创建
  • 有索引需要创建,则走创建索引分支
  • 没有索引需要创建,则走 预处理和批处理 分支

needToCheck()方法:判断是否允许自动创建索引(对应elasticsearch.yml配置文件中的action.auto_create_index);

shouldAutoCreate()方法:校验是否需要创建索引;

TransportBulkAction-doExecute

5.5. 5、TransportBulkAction#executeIngestAndBulk

  • 检查request请求中是否有预处理标识
  • request请求中没有预处理标识,则从当前集群的索引配置检查,是否有配置预处理策略(默认是none)。
  • 不需要预处理的话,走批处理分支

TransportBulkAction#executeIngestAndBulk

5.6. 6、TransportBulkAction#executeBulk

executeBulk方法中调用了 BulkOperation 对象的run()方法,该对象实现了Runnable线程。

BulkOperation 的核心是doRun()方法

TransportBulkAction#executeBulk

5.7. 7、TransportBulkAction-BulkOperation#doRun

(方法较长,分两部分)

5.7.1. 第一部分

循环所有文档,校验文档,Index写索引业务的特定业务处理,如:routing路由、mapping校验、文档id生成。

TransportBulkAction-BulkOperation#doRun

在IndexRequest#process方法中,如果没有指定文档id,那在这里会自动生成id。

TransportBulkAction-BulkOperation#doRun#process

5.7.2. 第二部分

循环所有docRequest,确定其所属分片的shardId(分片id),然后根据shardId进行归类,就是将相同shard id的请求合并

Map.computeIfAbsent 方法是JDK8中的新方法

接着循环已经归类好的request,设定集群节点通信的必要参数。

交给 TransportShardBulkAction#execute 准备进行转发给各个主分片,在listener等待响应。

TransportBulkAction-BulkOperation#doRun-2

5.8. 8、TransportShardBulkAction#execute

协调节点转发数据节点(主分片节点)的具体实现,在其父类 TransportReplicationAction#doExecute 中

ReroutePhase类,实现了Runnable,是个线程,核心方法doRun。

TransportReplicationAction#doExecute

  • 获取最新集群状态信息
  • 检查索引是否关闭状态
  • 从路由表中找到目标主分片,根据shardId
  • 如果主分片是本机,则本机执行;非本机,则转发到主分片节点(数据节点)。

TransportReplicationAction#doRun

(由于我这里是本地源码debug,所以是本机执行。)

不论是本机执行,还是发送到主分片节点,都会调用 performAction 方法;该方法参数列表中有一个isPrimaryAction字段,用来标识是否本机执行。

至此,协调节点已处理完成,将消息发送至主分片节点(数据节点)进行处理。

TransportReplicationAction#doRun-2

5.9. 9、TransportReplicationAction.PrimaryOperationTransportHandler#messageReceived

这里是接收集群内其他节点请求的地方(协调节点 -> 数据节点)。

AsyncPrimaryAction类继承Runnable,主方法doRun。

TransportReplicationAction-PrimaryOperationTransportHandler#messageReceived

5.10. 10-TransportReplicationAction#acquirePrimaryShardReference

获取索引引用,准备执行写索引动作。在本地(主分片)执行写操作,然后发送给副本分片。

在写索引操作之前,有好几个地方会 primaryShardReference.isRelocated()方法,这个方法的作用是检查当前分片是否是主分片,防止在最后写分片的一刻,分片角色切换(主分片变为副本分片),如果当前分片不再是主分片,则会获取最新集群路由状态,将消息再次转发。

TransportReplicationAction#acquirePrimaryShardReference

检查当前分片角色,如果发生角色切换,获取最新集群路由状态,将请求转发至新主分片。

TransportReplicationAction-AsyncPrimaryAction#onResponse

5.11. 11、ReplicationOperation#execute

检查此次写操作涉及的 shard,活跃 shard 数量是否足够。

然后执行,写主分片操作。写主分片成功后,向副本分片分发请求。

ReplicationOperation#execute

5.12. 12、TransportReplicationAction.PrimaryShardReference#perform

perform方法调用抽象方法:shardOperationOnPrimary。

进入TransportShardBulkAction 类,它对这个方法进行了实现。

TransportReplicationAction-PrimaryShardReference#perform

执行批量reuqest写请求

TransportShardBulkAction-shardOperationOnPrimary

5.13. 13、TransportShardBulkAction#executeIndexRequestOnPrimary

TransportShardBulkAction-exechteIndexRequestOnPrimary

5.14. 14、IndexShard#index

IndexShadr-Index

补上一张InternalEngine#index方法的代码,在写完lucene索引后,es再写translog。

InternalEngine-Index

5.15. 15、InternalEngine#indexIntoLucene

到这里就正式写Lucene索引了。

调用 org.apache.lucene.index.IndexWriter#addDocument 方法

InternalEngine-indexIntoLucene