ElasticSearch版本:6.5.0
纸上谈兵,不定时补充、更新,看到哪里写到哪里。文中图片较多、较大(已经过压缩),打不开多刷新几次。
目前仅涉及ElasticSearch内部,不涉及Lucene。
1. ElasticSearch 索引过程
ElasticSearch的写请求,会经过几个不同的节点:协调节点 → 主分片节点 → 副本分片节点。
在 02-es-基本概念 一篇中,曾介绍过,协调节点负责节点查询、索引时的请求转发、数据收集、合并、聚合等操作,并且ElasticSearch集群中默认所有的节点都是协调节点。
2. 相关概念
2.1. refresh、translog、flush
- **refresh:**为了搜索可见性;提升搜索实时性,segment(索引段)不刷磁盘,但可以及时被搜索;
- translog:为了数据可靠性;将所有未落磁盘的写相关操作都落磁盘记录(和refresh、flush不相关);
- **flush:**Lucene commit;segment(索引段)落磁盘的操作,在这个操作的同时会触发一次 refresh,落磁盘后也会把 translog 清空;
2.2. Rest*Action模块
定义某个URI应该由哪个Action模块处理,Rest请求的处理类命名规范为:Rest*Action。
每一种Rest*Action,都对应着某一类型的rest请求,例如:
RestSearchAction:处理检索文档相关的rest请求
RestUpdateAction:处理更新文档相关的rest请求
RestDeleteAction:处理删除文档相关的rest请求
…
2.2.1. Rest*Action类关系
RestIndexAction的关系类图如下:
2.2.1.1. RestHandler
是个接口(顶层抽象),定义的主要方法是:handleRequest(),该方法用来处理、包装用户request请求。
2.2.1.2. BaseRestHandler
它实现了RestHandler接口的抽象类。它实现了RestHandler#handleRequest方法,定义了rest基础处理流程。
1 |
|
该类中还有个重要的抽象方法:BaseRestHandler#prepareRequest,用来组装操作索引的参数。子类实现该方法后用来组装不同操作所需的对象。
BaseRestHandler抽象类几乎是所有rest处理器的父类。
2.2.1.3. RestIndexAction
RestIndexAction处理器继承了BaseRestHandler抽象类。
是实现了RestIndexAction#prepareRequest方法,组装操作索引的参数。
2.3. TranSport模块
TranSport模块(传输模块)用于集群内节点之间的内部通信。从一个节点到另一个节点的调用都是用传输模块;
TranSport传输机制是异步的,意味着没有阻塞线程等待;
命名规范:TranSport*Action。
2.4. Action模块
例如:IndexAction,这些anction并未真正实现功能,类似代理作用。
每一种action模块(例如:IndexAction),都会映射、关联 上面两个模块类
Rest*Action → *Action → TranSport*Action。
如 RestIndexAction → IndexAction → TranSportIndexAction。
在 org.elasticsearch.action.ActionModule 类中对 Rest*Action → *Action → TranSport*Action 这种关系进行映射、注册 关联。
3. 接收用户请求
除开特殊配置(设置特殊节点),否则几乎每个节点都有可能收到用户请求(协调节点)。
ElasticSearch Server接收用户请求起始点:Netty4HttpRequestHandler#channelRead0
在接受用户请求后,es会根据用户请求参数进行调度、分派,选择对应的Rest*Action(handler)进行处理。
代码流程如下:
3.1. 接收用户请求
收到用户原始请求后,会对参数进行初步包装,然后准备进行调度
3.2. 遍历所有处理程序(Handler)
尝试分派请求,遍历所有可能的处理程序(Handler),这里还进行了简单的校验
3.3. 分派请求
在dispatchRequest方法中,根据RestHandler查找对应RestHandler的实现。进而调用handleRequest方法
4. 源码流程图
5. 大体处理过程
Netty4HttpRequestHandler#channelRead0收到index索引请求。传递给RestController,然后将请求分发给相应的RestHandler处理。
请求URL的模式符合:“/{index}/{type}/{id}”(上面列出的几种模式),被分配给RestIndexAction处理。
RestIndexAction会从request中获取参数、数据,然后根据不同的action,映射到具体的TransportAction实例(几乎每一个RestAction都有一个TransportAction),RestIndexAction 映射的是 TransportIndexAction。
然后调用TransportIndexAction#execute发起操作请求。TransportIndexAction类,继承了TransportSingleItemBulkWriteAction抽象类。
其实调用的是TransportSingleItemBulkWriteAction#doExecute(批处理)。但该类被标注了@Deprecated注解,最后实际是TransportBulkAction(TransportBulkAction#doExecute)。
5.1. 1、RestIndexAction
RestIndexAction类中,主要是RestIndexAction#prepareRequest方法,组装操作index索引的相关参数,
5.2. 2、TransportIndexAction
在 AbstractClient#index 和 NodeClient#executeLocally 方法中,会完成 RestIndexAction → IndexAction → TransportIndexAction 的映射。
IndexAction 和 TransportIndexAction 的映射关系,在ActionModule#setupActions中配置。
NodeClient#transportAction方法中,TransportIndexAction调用execute发起操作请求。
TransportIndexAction:执行索引操作,并且设置autoCreateIndex(是否自动创建索引)、allowIdGeneration(是否允许生成id)
TransportIndexAction类继承了TransportSingleItemBulkWriteAction抽象类。
因此,由TransportSingleItemBulkWriteAction#doExecute负责处理。
5.3. 3、TransportSingleItemBulkWriteAction#doExecute
红框1处,对原request进行了封装,改成了批处理的request。
由于TransportSingleItemBulkWriteAction被标注了@Deprecated注解,最后实际是TransportBulkAction进行处理(TransportBulkAction#doExecute)。
5.4. 4、TransportBulkAction#doExecute
- 校验自动创建节点配置
- 筛选indexName
- 基于当前集群信息,检查索引是否需要创建
- 有索引需要创建,则走创建索引分支
- 没有索引需要创建,则走 预处理和批处理 分支
needToCheck()方法:判断是否允许自动创建索引(对应elasticsearch.yml配置文件中的action.auto_create_index);
shouldAutoCreate()方法:校验是否需要创建索引;
5.5. 5、TransportBulkAction#executeIngestAndBulk
- 检查request请求中是否有预处理标识
- request请求中没有预处理标识,则从当前集群的索引配置检查,是否有配置预处理策略(默认是none)。
- 不需要预处理的话,走批处理分支
5.6. 6、TransportBulkAction#executeBulk
executeBulk方法中调用了 BulkOperation 对象的run()方法,该对象实现了Runnable线程。
BulkOperation 的核心是doRun()方法
5.7. 7、TransportBulkAction-BulkOperation#doRun
(方法较长,分两部分)
5.7.1. 第一部分
循环所有文档,校验文档,Index写索引业务的特定业务处理,如:routing路由、mapping校验、文档id生成。
在IndexRequest#process方法中,如果没有指定文档id,那在这里会自动生成id。
5.7.2. 第二部分
循环所有docRequest,确定其所属分片的shardId(分片id),然后根据shardId进行归类,就是将相同shard id的请求合并
Map.computeIfAbsent 方法是JDK8中的新方法
接着循环已经归类好的request,设定集群节点通信的必要参数。
交给 TransportShardBulkAction#execute 准备进行转发给各个主分片,在listener等待响应。
5.8. 8、TransportShardBulkAction#execute
协调节点转发数据节点(主分片节点)的具体实现,在其父类 TransportReplicationAction#doExecute 中
ReroutePhase类,实现了Runnable,是个线程,核心方法doRun。
- 获取最新集群状态信息
- 检查索引是否关闭状态
- 从路由表中找到目标主分片,根据shardId
- 如果主分片是本机,则本机执行;非本机,则转发到主分片节点(数据节点)。
(由于我这里是本地源码debug,所以是本机执行。)
不论是本机执行,还是发送到主分片节点,都会调用 performAction 方法;该方法参数列表中有一个isPrimaryAction字段,用来标识是否本机执行。
至此,协调节点已处理完成,将消息发送至主分片节点(数据节点)进行处理。
5.9. 9、TransportReplicationAction.PrimaryOperationTransportHandler#messageReceived
这里是接收集群内其他节点请求的地方(协调节点 -> 数据节点)。
AsyncPrimaryAction类继承Runnable,主方法doRun。
5.10. 10-TransportReplicationAction#acquirePrimaryShardReference
获取索引引用,准备执行写索引动作。在本地(主分片)执行写操作,然后发送给副本分片。
在写索引操作之前,有好几个地方会 primaryShardReference.isRelocated()方法,这个方法的作用是检查当前分片是否是主分片,防止在最后写分片的一刻,分片角色切换(主分片变为副本分片),如果当前分片不再是主分片,则会获取最新集群路由状态,将消息再次转发。
检查当前分片角色,如果发生角色切换,获取最新集群路由状态,将请求转发至新主分片。
5.11. 11、ReplicationOperation#execute
检查此次写操作涉及的 shard,活跃 shard 数量是否足够。
然后执行,写主分片操作。写主分片成功后,向副本分片分发请求。
5.12. 12、TransportReplicationAction.PrimaryShardReference#perform
perform方法调用抽象方法:shardOperationOnPrimary。
进入TransportShardBulkAction 类,它对这个方法进行了实现。
执行批量reuqest写请求
5.13. 13、TransportShardBulkAction#executeIndexRequestOnPrimary
5.14. 14、IndexShard#index
补上一张InternalEngine#index方法的代码,在写完lucene索引后,es再写translog。
5.15. 15、InternalEngine#indexIntoLucene
到这里就正式写Lucene索引了。
调用 org.apache.lucene.index.IndexWriter#addDocument 方法