Hexiaoqiao

Focus on BigData,Distributed System,Hadoop 2.*

HDFS HA Using QJM原理解析

一、前言

Hadoop 1.0时代,Hadoop核心组件HDFS NameNode和MapReduce JobTracker都存在单点问题(SPOF),其中以NameNode SPOF尤为严重。HDFS作为整个Hadoop生态的基础组件,一旦NameNode发生故障,包括HDFS在内及其上层依赖组件(YARN/MapReduce/Spark/Hive/Pig/HBase等)都将无法正常工作,另外NameNode恢复过程非常耗时,尤其对大型集群,NameNode重启时间非常可观,给HDFS的可用性带来了极大的挑战,同时在使用场景上也带来明显的限制。

Hadoop 2.0时期,Hadoop的所有单点问题基本得到了解决。其中HDFS的高可用(High Availability,HA)在社区经过多种方案的讨论后,最终Cloudera的HA using Quorum Journal Manager(QJM)方案被社区接收,并在Hadoop 2.0版本中发布。

本文将从HDFS HA发展路径,架构原理,实现细节及使用过程中可能遇到的问题几个方面对HDFS NameNode高可用机制(主要是HDFS using QJM,不包含ZKFC)进行简单梳理和分析。

二、HDFS HA发展路径

在Hadoop 1.0时代,HDFS的架构相对简单,组件单一,主要包含NameNode,Seconday NameNode,DataNode和DFSClient四个核心组件(见图1 Hadoop 1.0中HDFS架构),其中NameNode提供元数据管理服务,Secondary NameNode以冷备的状态为NameNode分担Checkpoint工作(定期合并FsImage和Editlog),为避免出现歧义后来也称Secondary NameNode为Checkpoint Node。在这种架构下NameNode是整个系统的单点,一旦出现故障对可用性是致命的。


此后,社区讨论过多种HA方案来解决HDFS NameNode的单点问题,其中以2010年Facebook提出的AvatarNode方案(见图2 Facebook提出的HDFS AvatarNode架构图)较为典型。在AvatarNode方案中使用Avatar Primary NameNode,Avatar Standby NameNode及NFS配合通过共享的方式管理HDFS部分元数据EditLog。其中Avatar Primary NameNode提供读写服务,并将Editlog写入到共享存储NFS上,Avatar Standby NameNode从共享存储NFS上读取Editlog数据并回放,这样尽可能保持与Primary之间状态一致。

AvatarNode方案第一次使HDFS NameNode具备了热备能力,一旦Primary NameNode出现故障,Avatar Standby NameNode可以在极短时间内接管HDFS的读写请求,真正实现了HDFS的高可用。

但是AvatarNode这种方案并不是完美的。它的问题主要是共享存储NFS成为新的SPOF,必须保证其高可用;同时AvatarNode不具备自动Failover能力,一旦Avatar Primary NameNode出现故障,需要运维人员介入手动处理,当然Facebook这样设计有自己的考虑,这里就不再展开;另外一点,昂贵的NFS设备引入与HDFS最初构建在“inexpensive commodity hardware”设计初衷多少有些出入。

虽然存在问题,但是AvatarNode架构相比Hadoop 1.0已经有了本质的区别,也正是因为其在HA特性上的优秀表现,AvatarNode方案被国内外很多团队采纳和使用。


2012年初,Cloudera工程师Todd Lipcon主导设计的HA using QJM方案进入社区(见图3 HDFS HA using QJM架构图)。

QJM的思想最初来源于Paxos协议,摒弃了AvatarNode方案中的共享存储设备,改用多个JournalNode节点组成集群来管理和共享EditLog。与Paxos协议类似,当NameNode向JournalNode请求读写时,要求至少大多数(Majority)成功返回才认为本次请求成功。对于一个由2N+1台JournalNode组成的集群,可以容忍最多N台JournalNode节点挂掉。从这个角度来看,QJM相比AvatarNode方案具备了更强的HA能力。

同时QJM方案也继承了社区早前实施过的HA方案中优秀特性,通过引入Zookeeper实现的选主功能,使NameNode真正具备了自动Failover的能力。

对比AvatarNode,HA using QJM方案不再存在SPOF问题,High Availability特性明显提升,同时具备了自动Failover能力。正是其良好的设计和容错能力,HDFS HA using QJM方案在2012年随Hadoop 2.0正式发布,此后一直是Hadoop社区默认HDFS HA方案。


当然除了上面提到两种典型的HDFS HA方案外,社区其实在HA特性上进行过多次尝试。

开始于2008年,并随Hadoop 0.21发布的BackupNode方案(参考:HADOOP-4539);2011年通过单独分支HDFS HA Branch开发的功能非常丰富的HDFS HA方案,并在Hadoop 0.23发布(参考:HDFS-1623),其中LinuxHA和BookKeeper方案都出自这里;等等不一而足。

三、HA using QJM原理

3.1 HA using QJM架构

HA using Quorum Journal Manager (QJM)是当前主流HDFS HA方案,由Cloudera工程师Todd Lipcon在2012年发起,随Hadoop 2.0.3发布,此后一直被视为HDFS HA默认方案。

在HA using QJM方案中,涉及到的核心组件(见图3)包括:

Active NameNode(ANN):在HDFS集群中,对外提供读写服务的唯一Master节点。ANN将客户端请求过来的写操作通过EditLog写入共享存储系统(即JournalNode Cluster),为Standby NameNode及时同步数据提供支持;

Standby NameNode(SBN):与ANN相互形成热备,SBN及时从共享存储系统中读取EditLog数据并更新内存,以保证当前状态尽可能与ANN同步。当前在整个HDFS集群中最多一台处于Active状态,最多一台处于Standby状态;

JournalNode Cluster(JNs):ANN与SBN之间共享Editlog的一致性存储系统,是HDFS NameNode高可用的核心组件。借助JournalNode集群ANN可以尽可能及时同步元数据到SBN。其中ANN采用Push模式将EditLog写入JN,SBN通过Pull模式从JN拉取数据,整个过程中JN不主动进行数据交换;

ZKFailoverController(ZKFC):ZKFailoverController以独立进程运行,对NameNode主备切换进行控制,正常情况ANN和SBN分别对应各自ZKFC进程。ZKFC主要功能:NameNode健康状况检测;借助Zookeeper实现NameNode自动选主;操作NameNode进行主从切换;

Zookeeper(ZK):为ZKFC实现自动选主功能提供统一协调服务。

需要说明的是,在HA using QJM架构下,DataNode从仅向单个NameNode进行数据交互升级到同时向ANN和SBN进行数据交互,区别是仅执行ANN下发的指令,其他逻辑未发生大变化。

3.2 JournalNode Cluster实现

前面提到NameNode与JournalNode Cluster进行数据读写时核心点是需要大多数JN成功返回才可认为本次请求有效。所以同Zookeeper部署类似,实际部署JournalNode Cluster时也建议奇数节点(大多数场景选择3~5个)。当然奇数节点并非强制,事实上偶数节点组成的JournalNode Cluster也能工作,但是极端的情况会存在问题,比如活锁(原理和细节可参考Paxos made simple[2]),所以不建议这么做。

在JournalNode Cluster中所有JN之间完全对等,不存在Primary/Secondary区别。功能上看也极其简单,Hadoop-2.7.1分支中总共2478行代码实现了完整的JournalNode功能,详细模块见图4JournalNode功能模块。


JournalNode对外提供RPC和HTTP两类数据服务接口,其中JournalNodeHttpServer提供的HTTP服务,除暴露常规Metrics信息外,同时提供了被动Editlog数据同步功能(后面会详细介绍)。JournalNodeRpcServer提供的RPC Server,为NameNode向JournalNode的数据读写和状态获取请求准备了完备的RPC接口,详见QJournalProtocol.proto。Journal模块是JournalNodeRpcServer的具体实现,在Journal模块里,除了简单维护JournalNode状态信息,核心实现是抽象底层存储介质的读写操作。

整体上看,JournalNode模块清晰,实现简单,其中有几处非常讨巧的实现:

1、为了降低读写操作相互影响,Journal采用了DoubleBuffer技术管理实时过来的Editlog数据,通过DoubleBuffer可以为高速设备(内存)与低速设备(磁盘/SSD)之间建立缓存区和管道,避免数据写入被低速设备阻塞影响性能;

2、NameNode到JournalNode的所有数据写入请求都会直接落盘,当然写入请求的数据可以是批量,只有数据持久化完成才能认为本次请求有效和成功,这一点在数据恢复时非常关键;

3、与Pasox/Zookeeper类似,所有到达JournalNode的读写请求,第一件事情是合法性校验,包括EpochNum,CommitTxid等在内的状态信息,只有校验通过才能被处理,状态校验是强一致保证的基础;

一句话总结JournalNode是一套提供读写服务并实时持久化序列数据的有状态存储系统。

3.3 Active NameNode端实现

整个HA using QJM方案核心部分都集中在NameNode端,也可以认为是QJournal的Client端,这里集中了所有关于数据一致性保证和状态合理转换的主要内容。其中Active NameNode因为是写入端,所以实现逻辑也较复杂。

ANN按照响应客户端写请求的粒度实时顺序持久化到JournalNode,也是ANN请求JournalNode的最小粒度FSEditLogOp(HDFS写操作序列化数据)。这里首先需要权衡关于如何在JournalNode落地数据的问题:

1、将所有的FSEditLogOp数据落到同一个文件;
2、将每一条FSEditLogOp数据落到一个文件;
3、折中方案;

前面已经提到,QJournal必须保证完整事务和数据一致性,这就要求具备数据恢复的能力。如果将所有FSEditLogOp都落到一个文件,势必会带来管理成本上额外的开销,一旦出现数据不一致情况恢复大文件的代价非常高,同时同一文件累计了大量事务请求,写入失败风险非常高(后续会详细介绍);另一种极端情况是对每一条FSEditLogOp写入一个文件,这种方式在容错和数据异常恢复会非常方便,但显然读写效率极差。所以必须对两种极端情况做折中(计算机领域内大多问题都在tradeoff)。

按照前面的分析,折中的唯一办法就是对连续FSEditLogOp进行分段管理。事前给每一条FSEditLogOp分配唯一且连续的事务ID称为txid,连续多个txid组成Segment,Segment持久化后对应一个EditLog文件,这样一来,任何时间JournalNode上有且仅有一个Segment处于Inprogress状态,其他均为Finalized状态。即使存在数据不一致仅需恢复Inprogress状态的Segment,Finalized Segment一定是大多数JournalNode成功写入。这样权衡可以较好解决两种极端情况的问题。

在ANN进程的整个生命周期里,按照不同阶段,与JournalNode交互的逻辑可以简单划分成三个部分,如图5所示Active NameNode与JournalNode之间交互的状态转换图。


ANN与JournalNode在整个过程中RPC交互非常频繁,所有RPC请求均采用异步的方式,NameNode端只要收到大多数的JournalNode请求响应即认为本次请求成功,如果部分JournalNode请求失败或者超时该节点将在当前Segment内被置为异常节点。

Recovery

QJournal继承了Paxos一致性模中的EpochNum技术,每次做主从切换时,尝试切换为Active的NameNode从JournalNode端读取当前的EpochNum,自增1后尝试写入JournalNode。当出现多个NameNode均尝试切换为Active时会因为EpochNum问题最多只有一个成功,这从Qjournal的角度可以彻底杜绝出现Brain-Split问题。

NameNode操作主从切换后,首先需要确认所有JournalNode上的Editlog数据保持同步,前面提到只有最后一个Segment可能出现不一致情况,之后才能从JournalNode上拉取最新未同步的Editlog,更新内存以达到最新状态。其中保证所有JournalNode的Segment同步即是Recovery过程,整个过程如图6。

1、NameNode向JournalNode请求获取当前Epoch,从结果中选取出最大的Epoch;

2、将最大的Epoch自增1后尝试请求写入JournalNode;

如前述,以上两步可以在QJournal防止NameNode Brain-Split。这种竞争写入的方式可以保证任意时间最多只能存在一个NameNode有能力写入JournalNode,也是QJournal强一致的基础(关于该算法强一致性模型的形式化证明可参考Paxos made Simple[2])。

3、向JournalNode请求获取最新Segment的起始事务编号(txid),具体实现时这个步骤与前一步合并,可以减少一次RPC请求;

4、取前一步所有返回的txid最大值segmentTxId作为本次Recovery输入进入数据恢复阶段(特殊情况是集群刚初始化时,所有的JournalNode实际上并没有Segment文件,所以数据恢复可直接跳过,判断是否需要做数据恢复的条件是有没有取到txid):

(1)向JournalNode请求PrepareRecovery,参数是segmentTxId,JournalNode根据自己的实际情况返回满足起始txid为segmentTxId的Segment信息:{startTxId, endTxId, isInProgress},分别描述了起始事务编号,最后事务编号及当前Segment是否Inprogress;

(2)根据SegmentRecoveryComparator算法选择所有JournalNode中最优Segment(bestSegment)准备执行Recovery;(SegmentRecoveryComparator算法:优先选择Finalized Segment,再次选择JournalNode端可见最大Epoch的Segment,最后比较endTxId最大者)

(3)使用前一步选出的bestSegment组装出可定位到该Segment(hostname + NamespaceInfo + segmentTxId)的URL(具体JournalNode存储Segment的位置),使用URL作为输入向JournalNode请求AcceptRecovery,当JournalNode接收到AcceptRecovery请求后,经过合法性和URL简单检查,包括startTxId,endTxid是否与本地数据存在包含关系等,通过后拿URL通过HTTP请求将目标JournalNode的Segment拉到本地,并替换本地最新一个Segment,到这里所有JournalNode上的Editlog基本一致;之所以这么说是因为最后一个Segment的数据确实是完全同步的,但是已经Finalized的Segment只能保证大多数JournalNode之间完全一致。

(4)请求JournalNode将最新Segment操作Finalized,最后一步操作相当于跟之前写入的数据彻底“划清界限”;

到这里完成了第一阶段对JournalNode数据一致性检查和修复;

5、NameNode进入数据更新阶段,因为有前面一致性检查和修复,另外FsImage也记录过其对应到的txid,这个阶段只要从JournalNode取回该txid之后写入的数据回放一遍,内存状态即可达到最新,这个过程称为TailEdits;当然回放完成后需要记录nextTxId,为开启ANN写操作做准备;

6、到这里基本具备了开放读写服务的能力,因为前面已经将所有的Segment操作了Finalized,所以开启新的Segment,即请求JournalNode初始化新Segment,并建立到JournalNode的长连接管道startLogSegment,以便数据实时写入;

所有的准备工作完成,NameNode正式进入Active状态,开启HDFS读写服务。


Log Synchronization

ANN正式开启读写服务后,所有读请求在NameNode端即可完成,但是写请求需要实时同步给JournalNode,以便SBN能够及时读取并回放,以保持与Active几乎接近的状态。

ANN写FSEditLogOp的整体流程如图7所示:

1、首先在ANN处理客户端写请求的最后一个阶段,根据当前的请求类型和请求参数及当前的txid组成FSEditLogOp;

2、ANN端将FSEditLogOp写入到DoubleBuffer,这个阶段在NameNode整个FSNamesystem锁内,所以在任意时间仅有唯一的线程可以写入数据。可以看到DoubleBuffer技术在NameNode和JournalNode端均有使用,这里使用的目的与前面讲过的原因基本相当,一方面为低速设备(网络通道)和高速设备(内存)之间建立缓存区和管道,另一方面DoubleBuffer也可以降低锁的粒度,以到达更好的性能;

3、ANN处理客户端写请求的锁外,NameNode请求执行一次对FSEditLogOp的Sync操作,也就是说NameNode端写缓存是单线程操作,但是Sync可能是批量操作,但是从客户端角度来看写请求都会保证数据安全落到JouranlNode后才会返回,所以整体上看NameNode具备强一致性保证,当然整个文件系统强一致保证由多个部分共同支撑,这里不再展开。ANN端的Sync首先从DoubleBuffer里读出所有准备就绪的数据,通过RPC请求送达JournalNode,这类请求到达JournalNode端会第一时间持久化,保证数据可靠;


RollEditLog

前面也提到,单个Segment不可能无限大,是按照区间进行划分的,当然这个区间的划分一定不只有一条标准,默认情况下,ANN端内独立线程每间隔5min做一次检查,如果当前累计写入的FSEditLogOp超过2,000,000条操作Segment滚动,可以看到,事实上这个区间的大小可能会超出2,000,000(每间隔5min检查一次),ANN内的检查机制是为了防止SBN不工作时的补偿机制。

当然,请求执行RollEditLog不单单ANN端的线程,事实上SBN也会触发RollEditLog,SBN默认每1min操作执行一次EditLog回放,在回放EditLog前如果发现超过两分钟没有RollEditLog且期间有新增的FSEditLogOp,先请求ANN强制进行RollEditLog。在正常情况下,通过SBN请求执行RollEditLog是控制Segment规模的直接方法。

RollEditLog与前面的Recovery过程最后一步类似,首先将当前正在写入Segment操作Finalized,然后请求JournalNode初始化新Segment,建立到JournalNode的长连接管道并startLogSegment。

3.4 Standby NameNode端实现

SBN作为ANN的热备,需要尽可能保持与ANN状态一致,做好随时接管ANN任务的准备。当然在不损失一致性保证的前提下如果能分担ANN的部分请求处理会更好。

如前述,SBN为了保持与ANN状态接近甚至一致,默认每间隔1min回放一次EditLog,回放的第一步是从合适的JournalNode拉取Segment。

在Active NameNode端实现一节已经提过ANN写到JournalNode的Segment包含了startTxId和endTxId,所以SBN每回放完一个Segment会记录当前已经回放到的txid。这为接下来继续拉取Segment提供了起始位置,也是本次从JournalNode拉取数据的唯一输入。

首先,SBN根据当前的txid+1从JournalNode端获取所有包含了txid+1或者startTxId大于txid+1的Segment列表;

之后,根据取回Segment集合做简单过滤和排序,过滤是为了找到更合适的Segment,比如相同txid集合的Segment,回放Finalized更安全;保留多个Segment并排序是为了更好的容错;

最后,按照已排好序Segment,逐个FSEditLogOp尝试取Segment内容,按照FSEditLogOp粒度读取并回放,直到改Segment回放完成;

当前实现中,SBN仅回放Finalized状态的Segment。


3.5 数据一致性保证

从前面的分析来看,QJM本质上是一种极其简化版的Paxos协议,所以基本具备了Paxos优势。如果按照分布式系统经典理论CAP来评估,QJM是一种强一致性、高可用的去中心化分布式协议。

高可用:QJM的高可用特性其实是完全继承自Paxos,但在具体实现中为实现强一致性实际上牺牲了少部分高可用性。Paxos中Quorum理论在QJM依然稳定,也就是对于JournalNode Cluster,最多可以容忍小于一半的节点故障,系统仍可正常运行。但是如前面提到为了实现强一致性,QJM放大了故障范围(只要出现一次请求响应失败或者超时即标记该JournalNode在当前Segment范围内失效outOfSync),而且对于故障完全没有恢复能力,虽然通过限定Segment范围尽力补救故障影响范围,但是不可否认因为故障被放大,且没有恢复机制,不可用的风险同故障范围被线性放大。

高度去中心化:前面的流程分析过程不难看出,JournalNode整个生命周期,仅初始化阶段由ANN触发过一次选主以及JournalNode间的交互过程,其他阶段JournalNode之间完全对等,完全无中心化节点,这样也就不存在SPOF的问题,所以也具备了非常好Partition Tolerance特性。

强一致性:

1、强一致性是Paxos及各衍生系统最明显的优势特性,QJM的一致性思想主要来源于Paxos,另外为了达到更好的性能,在Pasox基础上又放松了很多限制条件。比如相比Paxos仅需要竞争一次,完成后继承结果,不需要为强一致每一轮数据读写都先竞选影响性能,从这个角度看,QJM与Raft也有相似的特征(是不是存在更本质一致性算法);当然,QJournal放松诸多限制条件跟HDFS的使用场景强相关,比如最多只存在两个QJournal Client,竞争发生的条件非常严苛,即使极端情况产生竞争也能够在一轮竞选完成;

2、延后读保证数据强一致,SBN当前仅读取和回放Finalized状态Segment,可以保证Finalized数据强一致,借鉴Paxos思想在数据恢复阶段可以做到Inprogress状态数据强一致,同时QJM采用了一种尽力而为的恢复机制,对不确定状态定义了统一恢复策略;

3、全局有序和递增的Epoch序号,任意时间仅一个竞争者(QJournal Client即NameNode)胜出,保证写入端的唯一性和合法性。

四、QJM的问题及解决思路

虽然HA using QJM方案作为HDFS默认HA方案已经在社区稳定运行了超过5年时间,但实际上还是存在一些问题:

1、Paxos算法在出现竞争时,收敛速度慢,甚至可能出现活锁的情况,QJM并没有针对这种问题进行过优化。极端情况下如果出现两个NameNode同时竞争写入,也可能陷入僵局,虽然不至于污染数据(Brain-Split),但是存在永远竞争陷入僵局的可能。

2、QJM放大了故障范围,且在Segment周期里没有任何恢复机制,虽然通过限制Segment大小进行了补偿,但是风险被线性放大,尤其对JournalNode小集群及配置Segment较大事务区间;

3、在线升级JournalNode让QJournal可用性风险成倍放大,原因同问题2;

4、QJM对JournalNode在单Segment故障没有恢复机制,ANN一旦遇到写入失败,只能操作主从切换甚至重启,对规模较大的Hadoop集群,重启NameNode成本非常高;

针对上述问题,实际上业界已有一些对应的解决办法,不过需要强调的是所有解决办法都在尽力权衡CAP:

1、对于Paxos在竞争情况下收敛慢和活锁问题,在HDFS场景里出现的概率极小,而且最多只有两个竞争者,且两个竞争者同时出现的唯一可能是误操作尝试将两个NameNode均切换成ANN,这种问题应该尽可能在运维中避免;从技术的角度看,虽然可以通过Leader选择,加快收敛速度和避免活锁,但是在HDFS场景下为解决极端情况牺牲可用性是否有必要值得商榷;

2、JournalNode故障快速恢复完全可以借鉴RollEditLog的办法,增加触发RollEditLog的条件,同时需要考虑Journal出现故障后频繁RollEditLog带来的性能损失;

3、实际场景里,最需要解决的还是NameNode因为Journal写入失败造成进程退出的问题,可借鉴的方案也很多,这里列出两种代价较小方案供参考: (1)所有Journal写入失败,强制NameNode退出竞争者,进入Standby状态; (2)延长请求JournalNode超时时间,用损失极端情况下性能损失换取NameNode重启成本;

五、总结

本文从HDFS HA的发展过程,各种方案设计背后的考虑,以及社区选择的默认HA using QJM方案原理和其中存在的问题及解决思路简单分析。
通过QJM原理梳理和实现细节分析,可以深入理解HDFS HA现状和存在问题,为后续运维甚至优化改进积累经验。

六、参考

[1] https://hadoop.apache.org
[2] https://www.microsoft.com/en-us/research/publication/paxos-made-simple/
[3] http://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
[4] https://issues.apache.org/jira/secure/attachment/12435811/AvatarNodeDescription.txt
[5] http://www.cloudera.com/blog/2009/07/hadoop-ha-configuration/
[6] http://www.cloudera.com/blog/2012/03/high-availability-for-the-hadoop-distributed-file-system-hdfs/
[7] http://zookeeper.apache.org/bookkeeper/
[8] https://github.com/apache/hadoop/tree/branch-2.7.1
[9] https://issues.apache.org/jira/browse/HDFS-3077
[10] https://issues.apache.org/jira/browse/HDFS-1623
[11] https://issues.apache.org/jira/browse/HADOOP-4539
[12] https://issues.apache.org/jira/browse/HDFS-976

HDFS NameNode重启优化

一、背景

在Hadoop集群整个生命周期里,由于调整参数、Patch、升级等多种场景需要频繁操作NameNode重启,不论采用何种架构,重启期间集群整体存在可用性和可靠性的风险,所以优化NameNode重启非常关键。

本文基于Hadoop-2.xHA with QJM社区架构和系统设计(如图1所示),通过梳理NameNode重启流程,并在此基础上,阐述对NameNode重启优化实践。


二、NameNode重启流程

在HDFS的整个运行期里,所有元数据均在NameNode的内存集中管理,但是由于内存易失特性,一旦出现进程退出、宕机等异常情况,所有元数据都会丢失,给整个系统的数据安全会造成不可恢复的灾难。为了更好的容错能力,NameNode会周期进行Checkpoint,将其中的一部分元数据(文件系统的目录树Namespace)刷到持久化设备上,即二进制文件FSImage,这样的话即使NameNode出现异常也能从持久化设备上恢复元数据,保证了数据的安全可靠。

但是仅周期进行Checkpoint仍然无法保证所有数据的可靠,如前次Checkpoint之后写入的数据依然存在丢失的问题,所以将两次Checkpoint之间对Namespace写操作实时写入EditLog文件,通过这种方式可以保证HDFS元数据的绝对安全可靠。

事实上,除Namespace外,NameNode还管理非常重要的元数据BlocksMap,描述数据块Block与DataNode节点之间的对应关系。NameNode并没有对这部分元数据同样操作持久化,原因是每个DataNode已经持有属于自己管理的Block集合,将所有DataNode的Block集合汇总后即可构造出完整BlocksMap。

HA with QJM架构下,NameNode的整个重启过程中始终以SBN(StandbyNameNode)角色完成。与前述流程对应,启动过程分以下几个阶段:
0、加载FSImage;
1、回放EditLog;
2、执行Checkpoint;(非必须步骤,结合实际情况和参数确定,后续详述)
3、收集所有DataNode的注册和数据块汇报;

默认情况下,NameNode会保存两个FSImage文件,与此对应,也会保存对应两次Checkpoint之后的所有EditLog文件。一般来说,NameNode重启后,通过对FSImage文件名称判断,选择加载最新的FSImage文件及回放该Checkpoint之后生成的所有EditLog,完成后根据加载的EditLog中操作条目数及距上次Checkpoint时间间隔(后续详述)确定是否需要执行Checkpoint,之后进入等待所有DataNode注册和元数据汇报阶段,当这部分数据收集完成后,NameNode的重启流程结束。

从线上NameNode历次重启时间数据看,各阶段耗时占比基本接近如图2所示。


经过优化,在元数据总量540M(目录树240M,数据块300M),超过4K规模的集群上重启NameNode总时间~35min,其中加载FSImage耗时~15min,秒级回放EditLog,数据块汇报耗时~20min,基本能够满足生产环境的需求。

2.1 加载FSImage

如前述,FSImage文件记录了HDFS整个目录树Namespace相关的元数据。从Hadoop-2.4.0起,FSImage开始采用Google Protobuf编码格式描述(HDFS-5698),详细描述文件见fsimage.proto。根据描述文件和实现逻辑,FSImage文件格式如图3所示。


从fsimage.proto和FSImage文件存储格式容易看到,除了必要的文件头部校验(MAGIC)和尾部文件索引(FILESUMMARY)外,主要包含以下核心数据:

(0)NS_INFO(NameSystemSection):记录HDFS文件系统的全局信息,包括NameSystem的ID,当前已经分配出去的最大BlockID以及TransactionId等信息;
(1)INODE(INodeSection):整个目录树所有节点数据,包括INodeFile/INodeDirectory/INodeSymlink等所有类型节点的属性数据,其中记录了如节点id,节点名称,访问权限,创建和访问时间等等信息;
(2)INODE_DIR(INodeDirectorySection):整个目录树中所有节点之间的父子关系,配合INODE可构建完整的目录树;
(3)FILES_UNDERCONSTRUCTION(FilesUnderConstructionSection):尚未完成写入的文件集合,主要为重启时重建Lease集合;
(4)SNAPSHOT(SnapshotSection):记录Snapshot数据,快照是Hadoop 2.1.0引入的新特性,用于数据备份、回滚,以防止因用户误操作导致集群出现数据问题;
(5)SNAPSHOT_DIFF(SnapshotDiffSection):执行快照操作的目录/文件的Diff集合数据,与SNAPSHOT一起构建较完整的快照管理能力;
(6)INODE_REFERENCE(INodeReferenceSection):当目录/文件被操作处于快照,且该目录/文件被重命名后,会存在多条访问路径,INodeReference就是为了解决该问题;
(7)SECRET_MANAGER(SecretManagerSection):记录DelegationKey和DelegationToken数据,根据DelegationKey及由DelegationToken构造出的DelegationTokenIdentifier方便进一步计算密码,以上数据可以完善所有合法Token集合;
(8)CACHE_MANAGER(CacheManagerSection):集中式缓存特性全局信息,集中式缓存特性是Hadoop-2.3.0为提升数据读性能引入的新特性;
(9)STRING_TABLE(StringTableSection):字符串到id的映射表,维护目录/文件的Permission字符到ID的映射,节省存储空间;

NameNode执行Checkpoint时,遵循Protobuf定义及上述文件格式描述,重启加载FSImage时,同样按照Protobuf定义的格式从文件流中读出相应数据构建整个目录树Namespace及其他元数据。将FSImage文件从持久化设备加载到内存并构建出目录树结构后,实际上并没有完全恢复元数据到最新状态,因为每次Checkpoint之后还可能存在大量HDFS写操作。

2.2 回放EditLog

NameNode在响应客户端的写请求前,会首先更新内存相关元数据,然后再把这些操作记录在EditLog文件中,可以看到内存状态实际上要比EditLog数据更及时。

记录在EditLog之中的每个操作又称为一个事务,对应一个整数形式的事务编号。在当前实现中多个事务组成一个Segment,生成独立的EditLog文件,其中文件名称标记了起止的事务编号,正在写入的EditLog文件仅标记起始事务编号。EditLog文件的格式非常简单,没再通过Google Protobuf描述,文件格式如图4所示。


一个完整的EditLog文件包括四个部分内容,分别是:
(0)LAYOUTVERSION:版本信息;
(1)OP_START_LOG_SEGMENT:标识文件开始;
(2)RECORD:顺序逐个记录HDFS写操作的事务内容;
(3)OP_END_LOG_SEGMENT:标记文件结束;

NameNode加载FSImage完成后,即开始对该FSImage文件之后(通过比较FSImage文件名称中包含的事务编号与EditLog文件名称的起始事务编号大小确定)生成的所有EditLog严格按照事务编号从小到大逐个遵循上述的格式进行每一个HDFS写操作事务回放。

NameNode加载完所有必需的EditLog文件数据后,内存中的目录树即恢复到了最新状态。

2.3 DataNode注册汇报

经过前面两个步骤,主要的元数据被构建,HDFS的整个目录树被完整建立,但是并没有掌握从数据块Block与DataNode之间的对应关系BlocksMap,甚至对DataNode的情况都不掌握,所以需要等待DataNode注册,并完成对从DataNode汇报上来的数据块汇总。待汇总的数据量达到预设比例(dfs.namenode.safemode.threshold-pct)后退出Safemode。

NameNode重启经过加载FSImage和回放EditLog后,所有DataNode不管进程是否发生过重启,都必须经过以下两个步骤:
(0)DataNode重新注册RegisterDataNode;
(1)DataNode汇报所有数据块BlockReport;

对于节点规模较大和元数据量较大的集群,这个阶段的耗时会非常可观。主要有三点原因:
(0)处理BlockReport的逻辑比较复杂,相对其他RPC操作耗时较长。图5对比了BlockReport和AddBlock两种不同RPC的处理时间,尽管AddBlock操作也相对复杂,但是对比来看,BlockReport的处理时间显著高于AddBlock处理时间;
(1)NameNode对每一个BlockReport的RPC请求处理都需要持有全局锁,也就是说对于BlockReport类型RPC请求实际上是串行处理;
(2)NameNode重启时所有DataNode集中在同一时间段进行BlockReport请求;


前文NameNode内存全景中详细描述过Block在NameNode元数据中的关键作用及与Namespace/DataNode/BlocksMap的复杂关系,从中也可以看出,每个新增Block需要维护多个关系,更何况重启过程中所有Block都需要建立同样复杂关系,所以耗时相对较高。

三、重启优化

根据前面对NameNode重启过程的简单梳理,在各个阶段可以适当的实施优化以加快NameNode重启过程。

0、HDFS-7097 解决重启过程中SBN执行Checkpoint时不能处理BlockReport请求的问题;

Fix:2.7.0
Hadoop-2.7.0版本前,SBN(StandbyNameNode)在执行Checkpoint操作前会先获得全局读写锁fsLock,在此期间,BlockReport请求由于不能获得全局写锁会持续处于等待状态,直到Checkpoint完成后释放了fsLock锁后才能继续。NameNode重启的第三个阶段,同样存在这种情况。而且对于规模较大的集群,每次Checkpoint时间在分钟级别,对整个重启过程影响非常大。实际上,Checkpoint是对目录树的持久化操作,并不涉及BlocksMap数据结构,所以Checkpoint期间是可以让BlockReport请求直接通过,这样可以节省期间BlockReport排队等待带来的时间开销,HDFS-7097正是将锁粒度放小解决了Checkpoint过程不能处理BlockReport类型RPC请求的问题。

HDFS-7097相对,另一种思路也值得借鉴,就是重启过程尽可能避免出现Checkpoint。触发Checkpoint有两种情况:时间周期或HDFS写操作事务数,分别通过参数dfs.namenode.checkpoint.period和dfs.namenode.checkpoint.txns控制,默认值分别是3600s和1,000,000,即默认情况下一个小时或者写操作的事务数超过1,000,000触发一次Checkpoint。为了避免在重启过程中频繁执行Checkpoint,可以适当调大dfs.namenode.checkpoint.txns,建议值10,000,000 ~ 20,000,000,带来的影响是EditLog文件累计的个数会稍有增加。从实践经验上看,对一个有亿级别元数据量的NameNode,回放一个EditLog文件(默认1,000,000写操作事务)时间在秒级,但是执行一次Checkpoint时间通常在分钟级别,综合权衡减少Checkpoint次数和增加EditLog文件数收益比较明显。

1、HDFS-6763 解决SBN每间隔1min全局计算和验证Quota值导致进程Hang住数秒的问题;

Fix:2.8.0
ANN(ActiveNameNode)将HDFS写操作实时写入JN的EditLog文件,为同步数据,SBN默认间隔1min从JN拉取一次EditLog文件并进行回放,完成后执行全局Quota检查和计算,当Namespace规模变大后,全局计算和检查Quota会非常耗时,在此期间,整个SBN的Namenode进程会被Hang住,以至于包括DN心跳和BlockReport在内的所有RPC请求都不能及时处理。NameNode重启过程中这个问题影响突出。
实际上,SBN在EditLog Tailer阶段计算和检查Quota完全没有必要,HDFS-6763将这段处理逻辑后移到主从切换时进行,解决SBN进程间隔1min被Hang住的问题。
从优化效果上看,对一个拥有接近五亿元数据量,其中两亿数据块的NameNode,优化前数据块汇报阶段耗时~30min,其中触发超过20次由于计算和检查Quota导致进程Hang住~20s的情况,整个BlockReport阶段存在超过5min无效时间开销,优化后可到~25min。

2、HDFS-7980 简化首次BlockReport处理逻辑优化重启时间;

Fix:2.7.1
NameNode加载完元数据后,所有DataNode尝试开始进行数据块汇报,如果汇报的数据块相关元数据还没有加载,先暂存消息队列,当NameNode完成加载相关元数据后,再处理该消息队列。对第一次块汇报的处理比较特别(NameNode重启后,所有DataNode的BlockReport都会被标记成首次数据块汇报),为提高处理速度,仅验证块是否损坏,之后判断块状态是否为FINALIZED,若是建立数据块与DataNode的映射关系,建立与目录树中文件的关联关系,其他信息一概暂不处理。对于非初次数据块汇报,处理逻辑要复杂很多,对报告的每个数据块,不仅检查是否损坏,是否为FINALIZED状态,还会检查是否无效,是否需要删除,是否为UC状态等等;验证通过后建立数据块与DataNode的映射关系,建立与目录树中文件的关联关系。

初次数据块汇报的处理逻辑独立出来,主要原因有两方面:
(0)加快NameNode的启动时间;测试数据显示含~500M元数据的NameNode在处理800K个数据块的初次块汇报的处理时间比正常块汇报的处理时间可降低一个数量级;
(1)启动过程中,不提供正常读写服务,所以只要确保正常数据(整个Namespace和所有FINALIZED状态Blocks)无误,无效和冗余数据处理完全可以延后到IBR(IncrementalBlockReport)或下次BR(BlockReport);

这本来是非常合理和正常的设计逻辑,但是实现时NameNode在判断是否为首次数据块块汇报的逻辑一直存在问题,导致这段非常好的改进点逻辑实际上长期并未真正执行到,直到HDFS-7980在Hadoop-2.7.1修复该问题。HDFS-7980的优化效果非常明显,测试显示,对含80K Blocks的BlockReport RPC请求的处理时间从~500ms可优化到~100ms,从重启期整个BlockReport阶段看,在超过600M元数据,其中300M数据块的NameNode显示该阶段从~50min优化到~25min。

3、HDFS-7503 解决重启前大删除操作会造成重启后锁内写日志降低处理能力;

Fix:2.7.0
若NameNode重启前产生过大删除操作,当NameNode加载完FSImage并回放了所有EditLog构建起最新目录树结构后,在处理DataNode的BlockReport时,会发现有大量Block不属于任何文件,Hadoop-2.7.0版本前,对于这类情况的输出日志逻辑在全局锁内,由于存在大量IO操作的耗时,会严重拉长处理BlockReport的处理时间,影响NameNode重启时间。HDFS-7503的解决办法非常简单,把日志输出逻辑移出全局锁外。线上效果上看对同类场景优化比较明显,不过如果重启前不触发大的删除操作影响不大。

4、防止热备节点SBN(StandbyNameNode)/冷备节点SNN(SecondaryNameNode)长时间未正常运行堆积大量Editlog拖慢NameNode重启时间;

不论选择HA热备方案SBN(StandbyNameNode)还是冷备方案SNN(SecondaryNameNode)架构,执行Checkpoint的逻辑几乎一致,如图6所示。如果SBN/SNN服务长时间未正常运行,Checkpoint不能按照预期执行,这样会积压大量EditLog。积压的EditLog文件越多,重启NameNode需要加载EditLog时间越长。所以尽可能避免出现SNN/SBN长时间未正常服务的状态。


在一个有500M元数据的NameNode上测试加载一个200K次HDFS事务操作的EditLog文件耗时~5s,按照默认2min的EditLog滚动周期,如果一周时间SBN/SNN未能正常工作,则会累积~5K个EditLog文件,此后一旦发生NameNode重启,仅加载EditLog文件的时间就需要~7h,也就是整个集群存在超过7h不可用风险,所以切记要保证SBN/SNN不能长时间故障。

5、HDFS-6425 HDFS-6772 NameNode重启后DataNode快速退出blockContentsStale状态防止PostponedMisreplicatedBlocks过大影响对其他RPC请求的处理能力;

Fix: 2.6.0, 2.7.0
当集群中大量数据块的实际存储副本个数超过副本数时(跨机房架构下这种情况比较常见),NameNode重启后会迅速填充到PostponedMisreplicatedBlocks,直到相关数据块所在的所有DataNode汇报完成且退出Stale状态后才能被清理。如果PostponedMisreplicatedBlocks数据量较大,每次全遍历需要消耗大量时间,且整个过程也要持有全局锁,严重影响处理BlockReport的性能,HDFS-6425HDFS-6772分别将可能在BlockReport逻辑内部遍历非常大的数据结构PostponedMisreplicatedBlocks优化到异步执行,并在NameNode重启后让DataNode快速退出blockContentsStale状态避免PostponedMisreplicatedBlocks过大入手优化重启效率。

6、降低BlockReport时数据规模;

NameNode处理BlockReport的效率低主要原因还是每次BlockReport所带的Block规模过大造成,所以可以通过调整Block数量阈值,将一次BlockReport分成多盘分别汇报,以提高NameNode对BlockReport的处理效率。可参考的参数为:dfs.blockreport.split.threshold,默认值1,000,000,即当DataNode本地的Block个数超过1,000,000时才会分盘进行汇报,建议将该参数适当调小,具体数值可结合NameNode的处理BlockReport时间及集群中所有DataNode管理的Block量分布确定。

7、重启完成后对比检查数据块上报情况;

前面提到NameNode汇总DataNode上报的数据块量达到预设比例(dfs.namenode.safemode.threshold-pct)后就会退出Safemode,一般情况下,当NameNode退出Safemode后,我们认为已经具备提供正常服务的条件。但是对规模较大的集群,按照这种默认策略及时执行主从切换后,容易出现短时间丢块的问题。考虑在200M数据块的集群,默认配置项dfs.namenode.safemode.threshold-pct=0.999,也就是当NameNode收集到200M*0.999=199.8M数据块后即可退出Safemode,此时实际上还有200K数据块没有上报,如果强行执行主从切换,会出现大量的丢块问题,直到数据块汇报完成。应对的办法比较简单,尝试调大dfs.namenode.safemode.threshold-pct到1,这样只有所有数据块上报后才会退出Safemode。但是这种办法一样不能保证万无一失,如果启动过程中有DataNode汇报完数据块后进程挂掉,同样存在短时间丢失数据的问题,因为NameNode汇总上报数据块时并不检查副本数,所以更稳妥的解决办法是利用主从NameNode的JMX数据对比所有DataNode当前汇报数据块量的差异,当差异都较小后再执行主从切换可以保证不发生上述问题。

8、其他;

除了优化NameNode重启时间,实际运维中还会遇到需要滚动重启集群所有节点或者一次性重启整集群的情况,不恰当的重启方式也会严重影响服务的恢复时间,所以合理控制重启的节奏或选择合适的重启方式尤为关键,HDFS集群启动方式分析一文对集群重启方式进行了详细的阐述,这里就不再展开。

经过多次优化调整,从线上NameNode历次的重启时间监控指标上看,收益非常明显,图7截取了其中几次NameNode重启时元数据量及重启时间开销对比,图中直观显示在500M元数据量级下,重启时间从~4000s优化到~2000s。


这里罗列了一小部分实践过程中可以有效优化重启NameNode时间或者重启全集群的点,其中包括了社区成熟Patch和相关参数优化,虽然实现逻辑都很小,但是实践收益非常明显。当然除了上述提到,NameNode重启还有很多可以优化的地方,比如优化FSImage格式,并行加载等等,社区也在持续关注和优化,部分讨论的思路也值得关注、借鉴和参考。

四、总结

NameNode重启甚至全集群重启在整个Hadoop集群的生命周期内是比较频繁的运维操作,优化重启时间可以极大提升运维效率,避免可能存在的风险。本文通过分析NameNode启动流程,并结合实践过程简单罗列了几个供参考的有效优化点,借此希望能给实践过程提供可优化的方向和思路。

五、参考

[1] NameNode内存全景. http://tech.meituan.com/namenode.html
[2] NameNode内存详解. http://tech.meituan.com/namenode-memory-detail.html
[3] Apache Hadoop. http://hadoop.apache.org/
[4] Hadoop Source. https://github.com/apache/hadoop
[5] HDFS Issues. https://issues.apache.org/jira/browse/HDFS
[6] Cloudera Blog. http://blog.cloudera.com/

Apache Kylin精确去重指标优化

前篇《Apache Kylin精确计数与全局字典揭秘》从精确计数使用场景到当前方案存在的问题及Apache Kylin在解决精确计数问题的思路进行了详细介绍。在此基础上,来自团队同学也是Apache Kylin社区Committer的@Kangkaisen更进一步,将Apache Kylin超高基数的精确去重指标查询提速数十倍。

一、问题背景

某业务方的Cube有12个维度,35个指标,其中13个是精确去重指标,并且有一半以上的精确去重指标单天基数在千万级别,Cube单天数据量1.5亿行左右。

但是一个结果仅有21行的精确去重查询竟然需要12秒多:

1
2
3
4
SELECT A, B, count(distinct uuid), 
FROM table
WHERE dt = 17150
GROUP BY A, B

跟踪整个查询执行过程发现,HBase端耗时~6s,Kylin的Query Server端耗时~5s。

精确去重指标已经在美团点评生产环境大规模使用,精确去重的查询的确比普通Sum指标慢,但是差异并不明显。但是这个查询的性能表现已超出预期,决定分析一下,到底慢在哪。

二、优化过程

2.1 将精确去重指标拆分HBase列族

首先确认了这个Cube的维度设计是合理的,这个查询也精准匹配了cuboid,并且在HBase端也只扫描了21行数据。

那么问题是,为什么在HBase端只扫描21行数据需要~6s?一个显而易见的原因是Kylin的精确去重指标是用bitmap存储的明细数据,而这个Cube有13个精确去重指标,并且基数都很大。

从两方面验证了这个猜想:
(1)同样SQL的查询Sum指标只需要120毫秒,并且HBase端Scan仅需2毫秒;
(2)用HBase HFile命令行工具查看并计算出HFile单个KeyValue的大小,发现普通的指标列族的每个KeyValue大小是29B,精确去重指标列族的每个KeyValue大小是37M;

所以第一个优化就是将精确去重指标拆分到多个HBase列族,优化后效果十分明显。查询时间从12s减少到5.7s,HBase端耗时从6s减少到1.3s,不过Query Server耗时依旧有~4.5s。

2.2 移除不必要的toString避免bitmap deserialize

Kylin的Query Server耗时依旧有4.5s,猜测还是和bitmap比较大有关,但是为什么bitmap大会导致如此耗时呢?

为了分析Query Server端查询处理的时间到底花在了哪,利用[Java Mission Control][1]进行了性能分析。

JMC分析很简单,在Kylin的启动进程中增加以下参数:

1
-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:StartFlightRecording=delay=20s,duration=300s,name=kylin,filename=myrecording.jfr,settings=profile

获得myrecording.jfr文件后,在本机执行jmc,然后打开myrecording.jfr文件就可以进行性能分析。

热点代码的分析如图:


从图中我们可以发现,耗时最多的代码是一个毫无意义的toString。

1
Preconditions.checkState(comparator.compare(last, fetched) <= 0, "Not sorted! last: " + last + " fetched: " + fetched);

其中last和fetched就是一个bitamp。 去掉这个toString之后,Query Server的耗时减少超过1s。

2.3 获取bitmap的字节长度时避免deserialize

在去掉无意义的toString之后,热点代码已经变成了对bitmap的deserialize。

不过bitmap的deserialize共有两处,一处是bitmap本身的deserialize,一处是在获取bitmap的字节长度。

于是很自然的想法就是在获取bitmap的字节长度时避免deserialize bitmap,当时有两种思路: (1)在serialize bitmap时就写入bitmap的字节长度;
(2)在MutableRoaringBitmap序列化的头信息中获取bitmap的字节长度。(Kylin的精确去重使用的bitmap是[RoaringBitmap][2]);

思路1中一个显然问题是如何保证向前兼容,这里向前兼容的方法就是根据MutableRoaringBitmap deserialize时的cookie头信息来确认版本,并在新的serialize方式中写入了版本号,便于之后序列化方式的更新和向前兼容。

经过这个优化后,Kylin Query Server端耗时再次减少超过1s。

2.4 无需上卷聚合的精确去重查询优化

从精确去重指标在美团点评大规模使用以来,我们发现部分用户的应用场景并没有跨Segment上卷聚合的需求,即只需要查询单天的去重值,或是每次全量构建的Cube,也无需跨Segment上卷聚合。

所以我们希望对无需上卷聚合的精确去重查询进行优化,当时考虑了两种可行方案:

方案1:精确去重指标新增一种返回类型

一个极端的做法是对无需跨segment上卷聚合的精确去重查询,我们只存储最终的去重值。

优点:
(1)存储成本会极大降低;
(2)查询速度会明显提高;

缺点:
(1)无法支持上卷聚合,与Kylin指标的设计原则不符合;
(2)无法支持segment的merge,因为要进行merge必须要存储明细的bitmap;
(3)新增一种返回类型,对不清楚的用户可能会有误导;
(4)查询需要上卷聚合时直接报错,用户体验不好,尽管使用这种返回类型的前提是无需上聚合卷;

实现难点: 如果能够接受以上缺点,实现成本并不高,目前没有想到明显的难点。

方案2:serialize bitmap的同时写入distinct count值

优点:
(1)对用户无影响;
(2)符合现在Kylin指标和查询的设计;

缺点:
(1)存储依然需要存储明细的bitmap;
(2)查询速度提升有限,因为即使不进行任何bitmap serialize,bitmap本身太大也会导致HBase scan,网络传输等过程变慢;

实现难点: 如何根据是否需要上卷聚合来确定是否需要serialize bitmap?开始的思路是从查询过程入手,确认在整个查询过程中,哪些地方需要进行上卷聚合。

为此,仔细阅读了Kylin Query Server端的查询代码,HBase Coprocessor端的查询代码,看了Calcite的example例子。发现在HBase端和Kylin Query Server端,Cube build时都有可能需要指标的聚合。

此时又意识到另外一个问题:即使清晰的知道了何时需要聚合,我又该如何把是否聚合的标记传递到精确去重的反序列方法中呢?

现在精确去重的deserialize方法参数只有一个ByteBuffer,如果加参数,就要改变整个kylin指标deserialize的接口,这将会影响所有指标类型,并会造成大范围的改动。所以把这个思路放弃了。

后来想到既然目标是优化无需上卷的精确去重指标,那为什么还要费劲去deserialize出整个bitmap呢,只要个distinct count值就可以。

所以目标就集中在BitmapCounter本身的deserialize上,并联想到早前在Kylin前端加载速度提升十倍的核心思想:延迟加载,就改变了BitmapCounter的deserialize方法,默认只读出distinct count值,不进行bitmap的deserialize,并将那个buffer保留,等到的确需要上卷聚合的时候再根据buffer deserialize 出bitmap。

当然,这个思路可行有一个前提,就是buffer内存拷贝的开销是远小于bitmap deserialize的开销,庆幸的是事实的确如此。

最终经过这个优化,对于无需上卷聚合的精确去重查询,查询速度也有了较大提升。

显然,这个优化加速查询的同时加大了需要上卷聚合的精确去重查询的内存开销。解决的办法:
(1)对于超大数据集并且需要上卷的精确去重查询,用户在分析查询时返回的结果行数应该不会太多;
(2)我们需要做好Query Server端的内存控制;

三、总结

通过总共4个优化,在向前兼容的前提下,后端仅通过100多行的代码改动,对Kylin超高基数的精确去重指标查询有了明显提升,测试中最明显的查询超过50倍的性能提升。

四、反思

(1)善于利用各类命令和工具,快速分析和定位问题;
(2)重写toString,hashCode,equals等基础方法一定要轻量化,避免复杂操作;
(3)设计序列化,通信协议,存储格式时,一定要有版本信息,便于之后的更新和兼容;

五、参考

[1] http://blog.takipi.com/oracle-java-mission-control-the-ultimate-guide/
[2] https://github.com/RoaringBitmap/RoaringBitmap
[3] https://issues.apache.org/jira/browse/KYLIN-2308
[4] https://issues.apache.org/jira/browse/KYLIN-2337
[5] https://issues.apache.org/jira/browse/KYLIN-2349
[6] https://issues.apache.org/jira/browse/KYLIN-2353

Apache Kylin精确计数与全局字典揭秘

Apache Kylin是基于Hadoop之上的SQL多维分析引擎,具有支持海量数据、秒级响应、高并发等特点。Kylin中使用的精确去重计数和全局字典技术在相关领域里非常具有借鉴意义。本文作者来自Apache Kylin社区PMC成员@Sunyerui,谢绝转载。

一、问题背景

在OLAP数据分析中,去重计数(count distinct)是非常常见的需求,且通常希望统计的结果是精确值,也就是说需要精确去重计数。

但在超大的数据集规模上,实现精确去重计数是一件非常困难的事,一句话总结:超大规模数据集下精确去重与快速响应之间的矛盾:

(1)超大规模数据集:意味着需要处理的数据量很多;
(2)精确去重:意味着要保留所有数据细节,这样才能使结果可上卷;
(3)快速响应:需要通过预计算或者内存计算等手段,加快速度;

上述条件相互约束,很难处理。因此目前主流OLAP系统都是采用近似去重计数的方式。

二、现状分析

在Apache Kylin中,目前提供的也是近似去重计数,是基于HLL(HyperLogLog)实现的。

简单来说,每个需要被计数的值都会经过特定Hash函数的计算,将得到的哈希值放入到byte数组中,最后根据特定算法对byte数据的内容进行统计,就可以得到近似的去重结果。

这种方式的好处是,不论数据有多少,byte数组的大小是有理论上限的,通常不会超过128KB,存储压力非常小。也就是说能够满足超大数据集和快速响应的要求。

但最大的问题就是结果是非精确的,这是因为保存的都是经过hash计算的值,而一旦使用hash就一定会有冲突,这就是导致结果不精确的直接原因。此外在实践中发现,hash函数的计算是计算密集型的任务,需要大量的CPU资源。

目前Apache Kylin提供的近似去重计数最高精度误差为1.44%,这在很多场景下是不能满足需求的,这促使我们考虑是否能在现有框架下实现精确去重计数的功能。

三、具体实现

3.1 Int型数据精确去重计数

回顾文初提到三个问题,为了保证去重结果是精确,统计的结果必须要保留细节,而信息保存的最小单位是bit,也就是说在最理想情况下,每个用于统计的值都在结果中用一个bit来保存。这样的分析使我们很容易想到bitmap这种数据结构,它可以以bit为单位保存信息,且可以根据数据分布进行有效压缩,节省最后的存储量。但bitmap通常只能实现对int类型(包括byte和short类型)的处理,所以我们可以先尝试只针对Int型数据实现精确去重计数。

目前业界有很多成熟的bitmap库可用,比如RoaringBitmap,ConciseSet等,其中RoaringBitmap的读写速度最快,存储效率也很高,目前已经在Spark,Drill等开源项目中使用,因此我们也选择了RoaringBitmap。经过试验,证明了我们的想法是可行的,通过实现一种新的基于bitmap的去重计算指标,确实能够实现对Int型数据的精确去重计数。在百万量级的数据量下,存储的结果可以控制在几兆,同时查询能够在秒级内完成,且可以支持任意粒度的上卷聚合计算。

3.2 Trie树与AppendTrie树

上述的结果能够满足Int类型数据的需求,但更多的数据是其它类型的,比如String类型的uuid。那么是否能够采用类似方式支持其它类型的精确去重计数呢?答案是肯定的。只要能将所有数据都统一映射到int类型的数据,那么就可以同样采用bitmap的方式解决问题。这样的映射关系有很多种实现方式,比如基于格式的编码,hash编码等,其中空间和性能效率都比较高,且通用性更强的是基于Trie树的字典编码方式。目前Apache Kylin之前已经实现了TrieDictionary,用于维度值的编码。

Trie树又名前缀树,是是一种有序树,一个节点的所有子孙都有相同的前缀,也就是这个节点对应的字符串,根节点对应空字符串,而每个字符串对应的编码值由对应节点在树中的位置来决定。图1是一棵典型的Trie树示意图,注意并不是每个节点都有对应的字符串值。


在构造Trie树时,将每个值依次加入到树中,可以分为三种情况,如图2所示。


上述的是Trie树在构建过程中的内存模型,当所有数据加入之后,需要将整棵Trie树的数据序列化并持久化。具体的格式如图3所示。


从图3中可以按照,整棵树按照广度优先的顺序依次序列化,其中childOffset和nValuesBeneath的长度是可变的,这是为了根据整棵树的大小尽可能使用更短的数据,降低存储量。此外需要注意到,childOffset的最高两位是标志位,分别标识当前节点是否是最后一个child,以及当前节点是否对应了原始数据的一个值。

当需要从字典中检索某个值的映射id时,直接从序列化后的数据读取即可,不需要反序列化整棵Trie树,这样能在保证检索速度的同时保持较低的内存用量。整个过程如图4所示。


通过上述对Trie树的分析可以看到,Trie树的效率很高,但有一个问题,Trie树的内容是不可变的。也就是说,当一颗Trie树构建完成后,不能再追加新的数据进去,也不能删除现有数据,这是因为每个原始数据对应的映射id是由对应节点在树中的位置决定的,一旦树的结构发生变化,那么会导致部分原始数据的映射id发生变化,从而导致错误的结果。为此,我们需要对Trie树进行改造,使之能够持续追加数据,也就是AppendTrie树。

图5是AppendTrie的序列化格式,可以看到和传统Trie树相比,主要区别在于不保留每个节点的子value数,转而将节点对应的映射id保存到序列化数据中,这样虽然增大了存储空间,但使得整棵树是可以追加的。


经过改造,AppendTrie树已经满足了我们的需求,能够支持我们实现对所有数据的统一编码,进而支持精确去重计数。但从实际情况来看,当统计的数据量超过千万时,整颗树的内存占用和序列化数据都会变得很大,因此需要考虑分片,将整颗树拆成多颗子树,通过控制每颗子树的大小,来实现整棵树的容量扩展。为了实现这个目标,当一棵树的大小超过设定阈值后,需要通过分裂算法将一棵树分裂成两棵子树,图6展示了整个过程。


在一棵AppendTrie树是由多棵子树构成的基础上,我们很容易想到通过LRU之类的算法来控制所有子树的加载和淘汰行为。为此我们基于guava的LoadingCache实现了特定的数据结构CachedTreeMap。这种map继承于TreeMap,同时value通过LoadingCache管理,可以根据策略加载或换出,从而在保证功能的前提下降低了整体的内存占用。

此外,为了支持字典数据的读写并发,数据持久化采用mvcc的理念,每次构建持久化的结果作为一个版本,版本一旦生成就不可再更改,后续更新必须复制版本数据后进行,并持久化为更新的版本。每次读取时则选择当前最新的版本读取,这样就避免了读写冲突。同时设置了基于版本个数和存活时间的版本淘汰机制,保证不会占用过多存储。

经过上述的改进和优化后,AppendTrie树完全达到了我们的要求,可以对所有类型的数据统一做映射编码,支持的数据量可以到几十亿甚至更多,且保持内存占用量的可控,这就是我们的可追加通用字典AppendTrieDictionary。

3.3 全局字典与全类型精确去重计数

基于AppendTrieDictionary,我们可以将任意类型的数据放到一个字典里进行统一映射。在Apache Kylin的现有实现中,Cube的每个Segment都会创建独立的字典,这种方式会导致相同数据在不同Segment字典中被映射成不同的值,这会导致最终的去重结果出错。为此,我们需要在所有Segment中使用同一个字典实例,也就是全局字典GlobaDictionary。

具体来说,根据字典的资源路径(元数据名+库名+表名+列名)可以从元数据中获取同一个字典实例,后续的数据追加也是基于这个唯一的字典实例创建的builder进行的。

另外,经常出现一个cube中需要对多列计算精确去重,这些列的数据基于同一数据源,且所有列是其中某列数据的子集,比如uuid类型的数据,此时可以针对包含全集数据的列建一份全局字典,其它列复用这一个字典即可。

图7展示了整个字典构建到应用的全过程。


四、结论与展望

通过对Bitmap和Trie树的合理运用和针对性优化,以及cache置换策略的应用,我们在Apache Kylin中成功实现了亿级规模的精确去重计数功能,支持任意粒度的上卷聚合,且保证内存可控。这里并没有全新的技术,只是对现有相关技术的组合运用,但解决了业界顶尖的技术难题。这也启发我们除了需要关注新技术,更应该多思考如何挖掘现有技术的潜力。

当然,目前的全局字典还存在一些问题和可改进的点,也欢迎更多讨论和贡献,其中包括:
(1)目前在内存有限,且字典较大时,容易出现字典分片被频繁换入换成的抖动,导致整体效率不高;
(2)全局字典只支持进程内的并发构建,但还不支持跨机器的并发构建;
(3)实际场景中,很多列的数据有高度相似性或属于同一来源,有可能共享同一个全局字典;

Apache CarbonData初探

CarbonData是由华为开源并支持Hadoop的列式存储文件格式,支持索引、压缩以及解编码等。其目的是为了实现同一份数据达到多种需求,而且能够实现更快的交互查询,目前该项目正处于Apache孵化阶段。本文在简单介绍CarbonData基础上,利用SSB基准测试工具对CarbonData与其他多种列式文件存储格式进行简单测试。部分内容来自@大月同学。

一、简介

1.1 背景

针对数据的需求分析主要有以下5点要求:
(1)支持海量数据扫描提取其中某些列;
(2)支持根据主键进行查找的低于秒级响应;
(3)支持海量数据进行交互式查询的秒级响应;
(4)支持快速地抽取单独记录,并且从该记录中获取到所有列信息;
(5)支持HDFS,可以与Hadoop集群进行很好的无缝兼容。
现有的Hadoop生态系统中没有同时满足这五点要求文件格式。比如Parquet/ORC的文件能够满足第一和第五条要求,其他的要求无法满足,基于这些事实华为开发了CarbonData。

1.2 优势

CarbonData文件格式是基于列式存储的,并存储在HDFS之上;其包含了现有列式存储文件格式的许多优点,比如:可分割、可压缩、支持复杂数据类型等。

CarbonData为了解决前面提到的几点要求,加入了许多独特的特性,主要概括为以下四点:
(1)数据及索引:在有过滤的查询中,它可以显著地加速查询性能,减少I/O和CPU资源;CarbonData的索引由多级索引组成,计算引擎可以利用这些索引信息来减少调度和一些处理的开销;扫描数据的时候可以仅仅扫描更细粒度的单元(称为blocklet),而不再是扫描整个文件;
(2)可操作的编码数据:通过支持高效的压缩和全局编码模式,它可以直接在压缩或者编码的数据上查询,仅仅在需要返回结果的时候才进行转换,更好的查询下推;
(3)列组:支持列组,并且使用行格式进行存储,减少查询时行重建的开销;
(4)多种使用场景:顺序存取、随机访问、类OLAP交互式查询等。

1.3 文件格式

一个CarbonData文件是由一系列被称为blocklet组成的,除了blocklet,还有许多其他的元信息,比如模式、偏移量以及索引信息等,这些元信息是存储在CarbonData文件中的footer里。
当在内存中建立索引的时候都需要读取footer里面的信息,因为可以利用这些信息优化后续所有的查询。

每个blocklet又是由许多Data Chunks组成。Data Chunks里面的数据可以按列或者行的形式存储;数据既可以是单独的一列也可以是多列。文件中所有blocklets都包含相同数量和类型的Data Chunks。CarbonData文件格式如图1所示。

CarbonData文件格式

每个Data Chunk又是由许多被称为Pages的单元组成。总共有三种类型的pages:
(1)Data Page:包含一列或者列组的编码数据;
(2)Row ID Page:包含行id的映射,在Data Page以反向索引的形式存储时会被使用;
(3)RLE Page:包含一些额外的元信息,只有在Data Page使用RLE编码的时候会被使用。

二、SSB介绍

SSB全称Star Schema Benchmark,顾名思义,是一套用于测试数据库产品在星型模式下性能表现的基准测试规范,目前在学术界和工业界都得到了广泛的使用。提到数据仓库系统(更广义地说,决策支持系统)的基准测试规范,最权威的莫过于TPC-H和TPC-DS这两套规范,他们都由非营利组织TPC(事务处理性能理事会)发布。SSB实际上就是基于TPC-H修改而来的,将TPC-H的雪花模式简化为了星型模式,将基准查询由TPC-H的复杂Ad-Hoc查询改为了结构更固定的OLAP查询。

SSB的设定为零售业订单的产品、供应商分析场景,Schema包含一张事实表「订单lineorder」和四张维表:「消费者customer」, 「供应商supplier」, 「零件part」, 「日期date」,构成了一个典型的星型模式。图1中,表名下方的"SF * 30,000"代表各表的数据行数。例如,当SF=1时,事实表lineorder包含6,000,000行数据,维表customer包含30,000行数据,Date表的行数固定,不随SF变化。

SSB Schema

SSB的基准查询专注于星型模式下的一类典型查询:读取事实表一次,连接各个维表,对某些维度属性做过滤,最后对某些维度属性分组聚集。在此基础上,SSB重点关注以下方面:
(1)提升「功能覆盖率」:基准查询集合应当尽可能地覆盖对星型模式的各种查询类型。SSB的基准查询分为四组,分别测试带2、3、4个维度属性过滤的情况,基本覆盖了多数场景;
(2)提升「选择度覆盖率」:某个查询最终需要读取的事实表行数(即事实表的选择度)由各个维度过滤条件的FF(过滤因子)决定。基准查询应当覆盖不同的选择度。SSB的基准查询的选择度大到1.9%,小到百万分之1,覆盖了很宽的范围;
(3)减小缓存的影响:如果相邻的两个基准查询扫描的事实数据有很大的重合,后者很有可能直接从缓存中读取数据,这会影响最终的测试结果。因此应当尽量避免基准查询读取重合的事实数据。

SSB的完整规范见http://www.cs.umb.edu/~poneil/StarSchemaB.PDF%E3%80%82

简单说明:
SF(Scale Factor) :生成测试数据集时传入的数据量规模因子,决定了各表最终生成的行数。
FF(Filter Factor):每个where过滤条件筛选出一部分行,被筛选出的行数占过滤前行数的比例叫做FF。在过滤列彼此独立的条件下,表的FF为该表上各个过滤条件FF的乘积。

三、安装使用

SSB的安装是指测试数据集生成工具dbgen的安装,步骤如下:
(1)下载代码。官方提供的代码不支持在Mac上编译,这里我们使用Presto开发人员修改过的版本;

ssb-dbgen
1
2
$ git clone https://github.com/electrum/ssb-dbgen
$ cd ssb-dbgen

(2)如果在Mac上编译,直接运行make即可。如果在Linux上编译,需要修改makefile,将“MACHINE =MAC”改为“MACHINE =LINUX”;
(3)编译后的可执行程序为dbgen,它依赖一个数据分布文件dists.dss。dbgen默认将测试数据生成在当前目录,如果需要生成到其他目录,可以将dbgen和dists.dss拷贝到对应目录使用;
(4)使用dbgen生成测试数据。


四、基准测试用例

SSB提供了一套标准对各个数据仓库/OLAP系统进行性能测试和比较,其最大的特点:使用星型模式、基准查询代表性强、可以生成任意量级的测试数据。SSB的基准查询集分为4组,共13个查询。每组的查询结构类似,但「选择度」不同。这里列出每个查询的SQL和FF以及需要的事实表行数。

ssbquery-set
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
Q1.1
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, date
where lo_orderdate = d_datekey
and d_year = 1993
and lo_discount between 1 and 3
and lo_quantity < 25;
FF = (1/7)*0.5*(3/11) = 0.0194805
# of lineorder = 0.0194805*6,000,000  116,883

Q1.2
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, date
where lo_orderdate = d_datekey
and d_yearmonthnum = 199401
and lo_discount between 4 and 6
and lo_quantity between 26 and 35;
FF = (1/84)*(3/11)*0.2 = 0.00064935
# of lineorder = 0.00064935*6,000,000  3896

Q1.3
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, date
where lo_orderdate = d_datekey
and d_weeknuminyear = 6
and d_year = 1994
and lo_discount between 5 and 7
and lo_quantity between 26 and 35;
FF = (1/364)*(3/11)*0.1 = 0.000075
# of lineorder = 0.000075*6,000,000  450

Q2.1
select sum(lo_revenue), d_year, p_brand1
from lineorder, date, part, supplier
where lo_orderdate = d_datekey and lo_partkey = p_partkey and lo_suppkey = s_suppkey
and p_category = 'MFGR#12'
and s_region = 'AMERICA'
group by d_year, p_brand1
order by d_year, p_brand1;
pcategory = 'MFGR#12', FF = 1/25; sregion, FF=1/5.
FF = (1/25)*(1/5) = 1/125
# of lineorder = (1/125)*6,000,000  48,000

Q2.2
select sum(lo_revenue), d_year, p_brand1
from lineorder, date, part, supplier
where lo_orderdate = d_datekey and lo_partkey = p_partkey and lo_suppkey = s_suppkey
and p_brand1 between 'MFGR#2221' and 'MFGR#2228'
and s_region = 'ASIA'
group by d_year, p_brand1
order by d_year, p_brand1;
FF = (1/125)*(1/5) = 1/625
# of lineorder = (1/625)*6,000,000  9600

Q2.3
select sum(lo_revenue), d_year, p_brand1
from lineorder, date, part, supplier
where lo_orderdate = d_datekey and lo_partkey = p_partkey and lo_suppkey = s_suppkey
and p_brand1 = 'MFGR#2221'
and s_region = 'EUROPE'
group by d_year, p_brand1
order by d_year, p_brand1;
FF = (1/1000)*(1/5) = 1/5000
# of lineorder = (1/5000)*6,000,000  1200

Q3.1
select c_nation, s_nation, d_year, sum(lo_revenue) as revenue
from lineorder, customer, supplier, date
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey
and c_region = 'ASIA'
and s_region = 'ASIA'
and d_year >= 1992 and d_year <= 1997
group by c_nation, s_nation, d_year
order by d_year asc, revenue desc;
FF = (1/5)*(1/5)*(6/7) = 6/175
# of lineorder = (6/175)*6,000,000 ≈ 205,714

Q3.2
select c_city, s_city, d_year, sum(lo_revenue) as revenue
from lineorder, customer, supplier, date
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey
and c_nation = 'UNITED STATES'
and s_nation = 'UNITED STATES'
and d_year >= 1992 and d_year <= 1997
group by c_city, s_city, d_year
order by d_year asc, revenue desc;
FF = (1/25)*(1/25)*(6/7) = 6/4375
# of lineorder = (6/4375)*6,000,000 ≈ 8,228

Q3.3
select c_city, s_city, d_year, sum(lo_revenue) as revenue
from lineorder, customer, supplier, date
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or c_city='UNITED KI5')
and (s_city='UNITED KI1' or s_city='UNITED KI5')
and d_year >= 1992 and d_year <= 1997
group by c_city, s_city, d_year
order by d_year asc, revenue desc;
FF = (1/125)*(1/125)*(6/7) = 6/109375
# of lineorder = (6/109375)*6,000,000 ≈ 329

Q3.4 "needle-in-haystack"
select c_city, s_city, d_year, sum(lo_revenue) as revenue
from lineorder, customer, supplier, date
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or c_city='UNITED KI5')
and (s_city='UNITED KI1' or s_city='UNITED KI5')
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc;
FF = (1/125)*(1/125)*(1/84) = 1/1,312,500
# of lineorder = (1/1,312,500)*6,000,000 ≈ 5

Q4.1
select d_year, c_nation, sum(lo_revenue - lo_supplycost) as profit
from lineorder, date, customer, supplier, part
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_partkey = p_partkey and lo_orderdate = d_datekey
and c_region = 'AMERICA'
and s_region = 'AMERICA'
and (p_mfgr = 'MFGR#1' or p_mfgr = 'MFGR#2')
group by d_year, c_nation
order by d_year, c_nation;
FF = (1/5)(1/5)*(2/5) = 2/125
# of lineorder = (2/125)*6,000,000 ≈ 96000

Q4.2 "Drill Down to Category in 2 Specific Years"
select d_year, s_nation, p_category, sum(lo_revenue - lo_supplycost) as profit
from lineorder, date, customer, supplier, part
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_partkey = p_partkey and lo_orderdate = d_datekey
and c_region = 'AMERICA'
and s_region = 'AMERICA'
and (d_year = 1997 or d_year = 1998)
and (p_mfgr = 'MFGR#1' or p_mfgr = 'MFGR#2')
group by d_year, s_nation, p_category
order by d_year, s_nation, p_category;
FF = (2/7)*(2/125) = 4/875
# of lineorder = (4/875)*6,000,000 ≈ 27,428

Q4.3 "Further Drill Down to cities in US"
select d_year, s_city, p_brand1, sum(lo_revenue - lo_supplycost) as profit
from lineorder, date, customer, supplier, part
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_partkey = p_partkey and lo_orderdate = d_datekey
and c_region = 'AMERICA'
and s_nation = 'UNITED STATES'
and (d_year = 1997 or d_year = 1998)
and p_category = 'MFGR#14'
group by d_year, s_city, p_brand1
order by d_year, s_city, p_brand1;
FF = (1/5)*(1/25)*(2/7)*(1/25) = 2/21875
# of lineorder = (2/21875)*6,000,000 ≈ 549

五、测试过程

5.1 测试范围

为了尽可能覆盖多种列式存储系统的性能表现,本次测试选择了ORC,Parquet和CarbonData。

5.2 测试数据规模

我们希望测到千万级、亿级和十亿级事实表的规模,因此分别选择了2、20、200的SF(Scale Factor),对应的各表行数如下:


5.3 测试数据准备

由于CarbonData的数据导入必须依赖csv文件,但是dbgen生成的是"|“分割的文本数据,所以利用dbgen生成数据后需要先对测试数据进行一次预处理,将其转换成各系统均可识别的格式。最后,为避免测试过程中,数据缓存对测试结果的影响,为不同的存储系统和SF建立不同的测试表{table}_{format}_{scale},如lineorder_orc_2。
综上考虑,以SF=2为例,数据准备分为三步:
(1)使用dbgen生成文本的测试数据;
(2)预处理生成的测试数据将其转换成csv格式;
(3)将csv格式的测试数据按照{table}_{format}_{scale}导入不同存储格式的表中; 最终在测试库下会生成包括文本存储格式数据在内共60张测试表{table}_{format}_{scale}; 测试数据在HDFS上副本因子统一为3;

附:建表语句

create ssb tables
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
USE ssb;

DROP TABLE IF EXISTS `LINEORDER`;
CREATE TABLE `LINEORDER` (
  LO_ORDERKEY       bigint,
  LO_LINENUMBER     int,
  LO_CUSTKEY        bigint,
  LO_PARTKEY        bigint,
  LO_SUPPKEY        bigint,
  LO_ORDERDATE      int,
  LO_ORDERPRIOTITY  string,
  LO_SHIPPRIOTITY   int,
  LO_QUANTITY       int,
  LO_EXTENDEDPRICE  int,
  LO_ORDTOTALPRICE  int,
  LO_DISCOUNT       int,
  LO_REVENUE        int,
  LO_SUPPLYCOST     int,
  LO_TAX            int,
  LO_COMMITDATE     int,
  LO_SHIPMODE       string
)
STORED BY 'carbondata';

DROP TABLE IF EXISTS `CUSTOMER`;
CREATE TABLE `CUSTOMER` (
  C_CUSTKEY     bigint,
  C_NAME        string,
  C_ADDRESS     string,
  C_CITY        string,
  C_NATION      string,
  C_REGION      string,
  C_PHONE       string,
  C_MKTSEGMENT  string
)
STORED BY 'carbondata';

DROP TABLE IF EXISTS `DATE`;
CREATE TABLE `DATE` (
  D_DATEKEY          int,
  D_DATE             string,
  D_DAYOFWEEK        string,
  D_MONTH            string,
  D_YEAR             int,
  D_YEARMONTHNUM     int,
  D_YEARMONTH        string,
  D_DAYNUMINWEEK     int,
  D_DAYNUMINMONTH    int,
  D_DAYNUMINYEAR     int,
  D_MONTHNUMINYEAR   int,
  D_WEEKNUMINYEAR    int,
  D_SELLINGSEASON    string,
  D_LASTDAYINWEEKFL  int,
  D_LASTDAYINMONTHFL int,
  D_HOLIDAYFL        int,
  D_WEEKDAYFL        int
)
STORED BY 'carbondata';

DROP TABLE IF EXISTS `PART`;
CREATE TABLE `PART`  (
  P_PARTKEY     bigint,
  P_NAME        string,
  P_MFGR        string,
  P_CATEGORY    string,
  P_BRAND1      string,
  P_COLOR       string,
  P_TYPE        string,
  P_SIZE        int,
  P_CONTAINER   string
)
STORED BY 'carbondata';

DROP TABLE IF EXISTS `SUPPLIER`;
CREATE TABLE `SUPPLIER` (
  S_SUPPKEY     bigint,
  S_NAME        string,
  S_ADDRESS     string,
  S_CITY        string,
  S_NATION      string,
  S_REGION      string,
  S_PHONE       string
)
STORED BY 'carbondata';

5.4 测试方法

使用13个基准查询对各系统进行测试,测试期间尽量避免队列资源存在竞争情况。每个基准查询跑三遍,结果取均值。
为减少不同计算引擎之间的差异,本次测试基于Spark 1.5.2进行,详细配置如下:

spark-1.5.2
1
$SPARK_HOME/bin/spark-submit --master yarn-client --queue root.hadoop-hdp.test --executor-cores 4 --executor-memory 8G --num-executors 8 --name "SSB Colume Test"

对于CarbonData选择相同的资源选项。

carbondata-0.1.0
1
$CARBON_HOME/bin/carbon-spark-sql --master yarn-client --queue root.hadoop-hdp.test --executor-cores 4 --executor-memory 8G --num-executors 8 --conf spark.carbon.storepath=hdfs:///user/hive/warehouse/ssb_compress.db --name "SSB Column Test"

附Hadoop集群环境:
(1)JDK:1.7.0_76 HotSpot™ 64-Bit Server
(2)Hadoop:2.7.1
(3)Spark:1.5.2
(4)HDFS Replica Ratio:3

5.5 测试结果

1、数据导入时间对比

数据导入时间可参考:Apache CarbonData Performance Benchmark,数据导入效率表现基本一致,orc与parquet无明显差别,这里不再详述。

2、存储资源占用对比




从数据压缩率的测试结果可以看出:
(1)orc和parquet的数据压缩率与数据本身的关系并不大,基本可以控制在0.3以内,但是CarbonData对数据的压缩能力并不好,对于SSB的测试数据集,压缩率主要集中在0.4上下;
(2)需要关注的的是CarbonData中部分数据甚至超过的原始文本数据大小;

3、查询效率对比

注:下面关于查询效率的测试数据中包含了作业提交时间、JVM启动时间等,数据本身存在少许误差。


从SF=2数据规模下查询效率对比数据来看:
(1)多种存储系统的查询性能差别并不大;
(2) 整体来看,CarbonData稍微优于其他存储系统;
如果考虑到作业提交时间及JVM启动时间的误差,在SF2数据规模下的结果基本没有明显差异;


注:在SF=20数据规模下的测试结果看,结论与SF=2保持一致;


在SF=200的数据规模查询效率对比数据来看:
(1)所有查询模式中,carbon明显优于其他存储系统;
(2)从整体上看,ORC/Parquet没有明显区别,基本可认为性能表现相似;
如果考虑到作业提交时间及JVM启动时间的误差,作业执行时间的差异表现差异非常明显,CarbonData具有非常好的优势;

六、结论

1、从查询性能上看,CarbonData具备非常好的优势,主要原因:
(1)MDK,Min-Max and Inverted Index等辅助信息可对结果数据进行更快速的查询,也能高效重建结果数据;
(2)列组(Column group)技术消除了行数据重建时隐式Join操作;
(3)使用全局字典编码加快计算速度,处理/查询引擎直接在编码数据上进行处理而不需要转换;
(4)延迟解码使得聚合操作更快,只有需要返回结果给用户时才进行解码转换。

2、由于CarbonData在加载数据时需要建立索引和全局字典编码,所以在数据加载和压缩率上比ORC和Parquet都要差,尤其在对一些特殊结构的数据表现较差,更适合一次写多次读且对存储资源使用不敏感的场景。
3、截止2016.10.01最新发布版本carbondata-0.1.0,在功能完备性、系统稳定性等方面还有提升空间,社区也在持续改进完善。
4、从整体上看,CarbonData优异的查询性能表现及社区的持续改进优化,未来非常值得期待。

七、参考

[1] https://github.com/apache/incubator-carbondata
[2] https://cwiki.apache.org/confluence/display/CARBONDATA/CarbonData+Home
[3] https://www.iteblog.com/archives/1806

NameNode RepicationMonitor异常追查

集群版本从2.4.1升级到2.7.1之后,出现了一个诡异的问题,虽然没有影响到线上正常读写服务,但是潜在的问题还是比较严重,经过追查彻底解决,这里简单整理追查过程。

一、问题描述

异常初次出现时收集到的集群异常表现信息有两条:

1、两个关键数据结构持续堆积,监控显示UnderReplicatedBlocks和PendingDeletionBlocks表现明显。

NameNode UnderReplicatedBlocks数据结构变化趋势

NameNode PendingBlocks数据结构变化趋势

说明:没有找到异常同一时间段的监控图,可将上图时间点简单匹配,基本不影响后续的分析。

2、从NameNode的jstack获得信息ReplicationMonitor线程在长期执行chooseRandom函数;

namenode.jstack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
"org.apache.hadoop.hdfs.server.blockmanagement.BlockManager$ReplicationMonitor@254e0df1" daemon prio=10 tid=0x00007f59b4364800 nid=0xa7d9 runnable [0x00007f2baf40b000]
   java.lang.Thread.State: RUNNABLE
        at java.util.AbstractCollection.toArray(AbstractCollection.java:195)
        at java.lang.String.split(String.java:2311)
        at org.apache.hadoop.net.NetworkTopology$InnerNode.getLoc(NetworkTopology.java:282)
        at org.apache.hadoop.net.NetworkTopology$InnerNode.getLoc(NetworkTopology.java:292)
        at org.apache.hadoop.net.NetworkTopology$InnerNode.access$000(NetworkTopology.java:82)
        at org.apache.hadoop.net.NetworkTopology.getNode(NetworkTopology.java:539)
        at org.apache.hadoop.net.NetworkTopology.countNumOfAvailableNodes(NetworkTopology.java:775)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseRandom(BlockPlacementPolicyDefault.java:707)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseTarget(BlockPlacementPolicyDefault.java:383)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseTarget(BlockPlacementPolicyDefault.java:432)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseTarget(BlockPlacementPolicyDefault.java:225)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseTarget(BlockPlacementPolicyDefault.java:120)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager$ReplicationWork.chooseTargets(BlockManager.java:3783)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager$ReplicationWork.access$200(BlockManager.java:3748)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.computeReplicationWorkForBlocks(BlockManager.java:1408)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.computeReplicationWork(BlockManager.java:1314)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.computeDatanodeWork(BlockManager.java:3719)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager$ReplicationMonitor.run(BlockManager.java:3671)
        at java.lang.Thread.run(Thread.java:745)

由于线上环境的日志级别为INFO,而ReplicationMonitor中INFO级别之上的日志非常少,从中几乎不能获取到任何有用信息;

异常出现场景:
1、坏盘、DataNode Decommision或进程异常退出,但不能稳定复现;
2、外部环境无任何变化和异常,正常读写服务期偶发。

二、追查过程

2.1 处理线上问题

ReplicationMonitor线程运行异常,造成数据块的副本不能及时补充,如果异常长期存在,极有可能出现丢数据的情况,在没有其他信息辅助解决的情况下,唯一的办法就是重启NameNode(传说中的“三大招”之一),好在HA架构的支持,不至于影响到正常数据生产。

2.2 日志

缺少日志,不能定位问题出现的场景,所以首先需要在关键路径上留下必要的信息,方便追查。由于ReplicationMonitor属于独立线程,合理的日志量输出不至于影响服务性能,经过多次调整基本确定需要收集的日志信息:

1、根据NameNode多次jstack信息,怀疑chooseRandom时不停计算countNumOfAvailableNodes,可能存在死循环,尝试输出两类信息:
(1)ReplicationMonitor当前处理的整体参数及正在处理的Block;

BlockManager.javagithub
1
2
3
4
5
LOG.info("numlive = " + numlive);
LOG.info("blockToProcess = " + blocksToProcess);
LOG.info("nodeToProcess = " + nodesToProcess);
LOG.info("blocksInvalidateWorkPct = " + this.blocksInvalidateWorkPct);
LOG.info("workFound = " + workFound);

(2)chooseRandom逻辑中循环体内(调用了countNumOfAvailableNodes)运行超过1min输出该函数入口的所有参数;问题复现后,日志并没有输出,说明异常并不在chooseRandom逻辑本身;

2、结合NameNode的jstack信息并跟进实现逻辑时发现NetworkTopology.InnerNode#getLoc(String loc)的实现存在性能问题:

NetworkTopology.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Given a node's string representation, return a reference to the node
 * @param loc string location of the form /rack/node
 * @return null if the node is not found or the childnode is there but
 * not an instance of {@link InnerNode}
 */
private Node getLoc(String loc) {
  if (loc == null || loc.length() == 0) return this;
  String[] path = loc.split(PATH_SEPARATOR_STR, 2);
  Node childnode = null;
  for(int i=0; i<children.size(); i++) {
    if (children.get(i).getName().equals(path[0])) {
      childnode = children.get(i);
    }
  }
  if (childnode == null) return null; // non-existing node
  if (path.length == 1) return childnode;
  if (childnode instanceof InnerNode) {
    return ((InnerNode)childnode).getLoc(path[1]);
  } else {
    return null;
  }
}

这段逻辑的用意是通过集群网络拓扑结构中节点的字符串标识(如:/IDC/Rack/hostname)获取该节点的对象。实现方法是从拓扑结构中根节点开始逐层向下搜索,直到找到对应的目标节点,逻辑本身没有问题,但是在line286处应该正常break,实现时出现遗漏,其结果是多出一些不必要的时间开销,对于小集群可能影响不大,但是对于IO比较密集的大集群其实影响还是比较大,线下模拟~5K节点的集群拓扑结构,对于NetworkTopology.InnerNode#getLoc(String loc)本身,break可以提升一半的时间开销。

3、通过前面两个阶段仍然不能完全解决问题,只能继续追加日志,这里再次怀疑可能BlockManager.computeReplicationWorkForBlocks(List<List> blocksToReplicate)在调用chooseRandom方法时耗时严重,所以在chooseRandom结束后增加了关键的几条日志:

BlockManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
LOG.info("ReplicationMonitor: block = " + rw.block);
LOG.info("ReplicationMonitor: priority = " + rw.priority);
LOG.info("ReplicationMonitor: srcNode = " + rw.srcNode);
LOG.info("ReplicationMonitor: storagepolicyid = " + rw.bc.getStoragePolicyID());
if (rw.targets == null || rw.targets.length == 0) {
  LOG.info("ReplicationMonitor: targets is empty");
} else {
  LOG.info("ReplicationMonitor: targets.length = " + rw.targets.length);
  for (int i = 0; i < rw.targets.length; i++) {
    LOG.info("ReplicationMonitor: target = " + rw.targets[i] + ", StorageType = " + rw.targets[i].getStorageType());
  }
}
for (Iterator<Node> iterator = excludedNodes.iterator(); iterator.hasNext();) {
  DatanodeDescriptor node = (DatanodeDescriptor) iterator.next();
  LOG.info("ReplicationMonitor: exclude = " + node);
}

包括当前正在处理的Block,优先级(标识缺块的严重程度),源和目标节点集合;(遗漏了关键的信息:Block的Numbytes,后面后详细解释。)

通过这一步基本上能够收集到异常现场信息,同时也可确定异常时ReplicationMonitor的运行情况,从后续的日志也能说明这一点:

namenode.log
1
2
3
4
5
6
7
8
9
10
11
12
2016-04-19 20:08:52,328 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy: Failed to place enough replicas, still in need of 7 to reach 10 (unavailableStorages=[], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=false) For more information, please enable DEBUG log level on org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy
2016-04-19 20:08:52,328 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy: Failed to place enough replicas, still in need of 7 to reach 10 (unavailableStorages=[DISK], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=false) For more information, please enable DEBUG log level on org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy
2016-04-19 20:08:52,328 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy: Failed to place enough replicas, still in need of 7 to reach 10 (unavailableStorages=[DISK, ARCHIVE], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=false) All required storage types are unavailable: unavailableStorages=[DISK, ARCHIVE], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}
2016-04-19 20:08:52,328 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: block = blk_8206926206_7139007477
2016-04-19 20:08:52,328 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: priority = 2
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: srcNode = 10.16.*.*:*
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: storagepolicyid = 0
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: targets is empty
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: exclude = 10.16.*.*:*
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: exclude = 10.16.*.*:*
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: exclude = 10.16.*.*:*
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: exclude = 10.16.*.*:**

日志中容易看到当前发生异常时的场景:

(1)Block的副本数尝试从3调整的10;(异常时还有其他副本增加的请求)
(2)ReplicationMonitor尝试进行副本调整时失败,原因是遍历全集群后并没有找到一个合适的节点给该Block提供副本存储,日志显示全集群的节点均被加入到了exclude;

基本能够确定由于chooseRandom函数遍历了全集群导致处理某(些)Block耗时严重,类似情况累积会恶化这种问题;

到这里基本可以解释为什么几个关键数据结构(UnderReplicatedBlocks和PendingDeletionBlocks)的量持续增加,根本原因在于ReplicationMonitor在尝试对某个Block进行副本调整时,遍历全集群不能选出合适的节点,导致处理一个Block都会耗时严重,如果多个类似Block累积会滚雪球式使情况恶化,而且更加糟糕的是UnderReplicatedBlocks本质是一个优先级队列,如果正好这些Block的优先级较高,处理失败发生超时后还会回到原来的优先队列里,导致后续正常Block也会被阻塞,即使在超时时间范围内ReplicationMonitor可以正常工作,限于其本身的限流及周期(3s)运行机制,实际上可处理的规模非常小,而UnderReplicatedBlocks及PendingDeletionBlocks的生产者丝毫没有变慢,所以造成了数据源源不断的进入队列,但是消费非常缓慢。线上监控数据看到某次极端情况一度累积到1000K规模的UnderReplicatedBlocks,其实风险已经非常高了。

虽然从日志能够解释通UnderReplicatedBlocks和PendingDeletionBlocks持续升高了,但是仍然遗留了一个关键问题:为什么在副本调整时全集群遍历都没有选出合适的节点?

2.3 暴力破解

此前已经在社区找到类似问题反馈: https://issues.apache.org/jira/browse/HDFS-8718 但是很遗憾没看到解决方案;

尝试从各种可能和怀疑中解释前面留下的问题并在线下进行各种场景复现:
(1)线下模拟了~5000节点集群规模遍历的时间开销,基本能够反映线上的情况;
(2)构造负载严重不均衡时节点选择的场景,不能复现;
(3)异构存储实现逻辑可能造成的chooseRandom遍历全集群,尝试构造各种异构存储组合并,不能复现;
(4)并发进行删除和副本调整,没有复现;(后面详细介绍)

其实(2)和(3)的验证必要性不是很大,负载问题通过源码简单分析即可,异构存储线上并没有开启。复现结果是:没有结果。

不得已选择临时解决方案:在BlockManager.computeReplicationWorkForBlocks(List<List> blocksToReplicate)的第二个阶段,针对需要调整副本的Block集合批量进行chooseTargets时加入时间判断,并设定了阈值,当超时发生时退出本轮目标选择逻辑,可以解决PendingDeletionBlocks长时间不能被处理到的问题,代价是牺牲少量处理UnderReplicatedBlocks的时间;上线后符合预期,PendingDeletionBlocks规模得到了有效控制,但是UnderReplicatedBlocks的问题依然存在。

2.4 调整参数

期间,我们从前面新增的日志里同时发现了一个有意思的现象,正常情况下workFound的值相对较高,但是一旦出现异常,开始严重下降。workFound标识的是ReplicationMonitor本轮可以调度出去的Block数,影响该值的三个关键参数如下:

hdfs-site.xmlapache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<property>
  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
  <value>5</value>
</property>
<property>
  <name>dfs.namenode.replication.max-streams</name>
  <value>50</value>
  <description>
        The maximum number of outgoing replication streams a given node should have
        at one time considering all but the highest priority replications needed.
  </description>
</property>
<property>
  <name>dfs.namenode.replication.max-streams-hard-limit</name>
  <value>100</value>
  <description>
        The maximum number of outgoing replication streams a given node should have
        at one time.
  </description>
</property>

为控制单DN并发数默认值为<2,2,4>,此外可调度的Block数与集群规模正相关,正常情况其实完全满足运行需求,但是由于存在Block不符预期,所以造成workFound量会下降。
结合集群实际基础环境,尝试大幅提高并发度,设置为<5,50,100>,提高ReplicationMonitor每一轮的处理效率。参数调整后,情况得到了明显改善。

说明:结合实际情况谨慎调整该参数,可能会给集群内的网络带来压力。

虽然通过一系列调整能够暂缓和改善线上情况,但是依然没有回答前面留下的疑问,也没有彻底解决问题。只有从头再来梳理流程。

三、ReplicationMonitor工作流程

ReplicationMonitor是NameNode内部线程,负责维护数据块的副本数稳定,包括清理无效块和对不符预期副本数的Block进行增删工作。ReplicationMonitor是周期运行线程,默认每3s执行一次,主要由两个关键函数组成:computeDatanodeWork和processPendingReplications(rescanPostponedMisreplicatedBlocks在NameNode启动/主从切换被调用,不包括在本次异常分析范围内)。

BlockManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
privateclass ReplicationMonitor implements Runnable {

  @Override
  publicvoid run() {
    while (namesystem.isRunning()) {
      try {
        // Process replication work only when active NN is out of safe mode.
        if (namesystem.isPopulatingReplQueues()) {
          computeDatanodeWork();
          processPendingReplications();
          rescanPostponedMisreplicatedBlocks();
        }
        Thread.sleep(replicationRecheckInterval);
      } catch (Throwable t) {
        ......
      }
    }
  }
}

computeDatanodeWork
(1)从UnderReplicatedBlocks中取出给定阈值(默认为集群节点总数的2倍)数量范围内需要进行复制的Block集合;由于UnderReplicatedBlocks是一个优先级队列,所以每次一定是按照优先级从高到低获取;
(2)遍历选出的Block集合,对于每一个Block,根据当前副本分布及chooseTarget策略,选择合适的DataNode集合作为目标节点,准备副本复制;
(3)将Block进行副本复制的指令分发到NameNode里对应DatanodeDescriptor数据结构中,待该DataNode下次heartbeat过来后及时下发,同时将该Block从UnderReplicatedBlocks拿出来暂存到pendingReplications;
(4)DataNode接收到指令后把对应Block复制到目标节点,复制结束后,目标节点向NameNode汇报RECEIVED_BLOCK,此后便可以从pendingReplications中删除对应的Block;这里引入pendingReplications的目的是防止Block在复制过程中出现异常后超时,当在给定时间内(默认为5min)仍没有完成复制,需要将其从pendingReplications转移到timedOutItems集合中;超时检查的工作由PendingReplicationBlocks#PendingReplicationMonitor负责。
(5)将InvalidateBlocks中待删除的Blocks按照DataNode分组后取出分发到NameNode里对应DatanodeDescriptor数据结构中,同样待该DataNode的heartbeat过来后及时下发删除指令;

processPendingReplications
computeDatanodeWork步骤4出现超时后,将对应的Block从pendingReplications转移到timedOutItems后并没有其他处理逻辑,但是Block复制的事情还得继续,所以还需要将Block再拿回到UnderReplicatedBlocks后重复前面的工作;从timedOutItems拿回到UnderReplicatedBlocks的工作即由processPendingReplications来负责;

可以看出computeDatanodeWork,processPendingReplications和PendingReplicationMonitor组成了一个生产者消费者的环,下图可以说明这个过程。

ReplicationMonitor相关数据流动图示

ReplicationMonitor涉及到两个关键的数据结构:UnderReplicatedBlocks和InvalidateBlocks;这两个数据结构到底是什么,数据哪里来。

(1)UnderReplicatedBlocks:副本数不足的Block集合;
* 写数据完成时进行副本检查,副本不足Block;
* 用户调用setReplication增加副本;
* DataNode节点异常,其上的所有Block;

(2)InvalidateBlocks:无效Block集合;
* 文件删除操作;
* 用户调用setReplication降低副本;

可以简单理解副本调整和数据删除本质上是一个异步操作,当NameNode接收到客户端的setReplication或delete请求后,简单处理后即可返回,实际的工作是由ReplicationMonitor周期异步进行处理。

四、根本原因

继续追踪日志时,针对触发问题的Block检索了所有日志,发现另外一个现象,每次chooseTarget目标节点选择失败后,总会紧跟一条删除操作的日志:

namenode.log
1
2016-04-19 09:23:08,453 INFO BlockStateChange: BLOCK* addToInvalidates: blk_8197702254_7129783384 10.16.*.*:* 10.16.*.*:* 10.16.*.*:**

审计日志中也能对照到同一时间点,对应的文件确实有删除操作。这个现象在多次异常复现时稳定发生。可以猜测与删除操作有关联。

删除操作的流程是先把目录树的节点删除,根据删除结果收集到的Block集合,删除每一个Block。

其中在逐个Block进行删除过程中,发现其逻辑有疑点,主要在block.setNumBytes(BlockCommand.NO_ACK),其中NO_ACK=Long.MAX_VALUE,也即先将该Block的numbytes设置为最大值,再后续的操作。

BlockManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void removeBlock(Block block) {
  assert namesystem.hasWriteLock();
  // No need to ACK blocks that are being removed entirely
  // from the namespace, since the removal of the associated
  // file already removes them from the block map below.
  block.setNumBytes(BlockCommand.NO_ACK);
  addToInvalidates(block);
  removeBlockFromMap(block);
  // Remove the block from pendingReplications and neededReplications
  pendingReplications.remove(block);
  neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
  if (postponedMisreplicatedBlocks.remove(block)) {
    postponedMisreplicatedBlocksCount.decrementAndGet();
  }
}

虽然对目录树的操作及removeBlock的操作均会持有全局写锁,但是很自然将Block的NumBytes设置成Long.MAX_VALUE的逻辑与chooseTarget遍历全集群仍不能选出合适节点的事实结合起来。

接下来自然是验证chooseTarget的处理逻辑:

BlockManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
......
} finally {
  namesystem.writeUnlock();
}
final Set<Node> excludedNodes = new HashSet<Node>();
for(ReplicationWork rw : work){
  // Exclude all of the containing nodes from being targets.
  // This list includes decommissioning or corrupt nodes.
  excludedNodes.clear();
  for (DatanodeDescriptor dn : rw.containingNodes) {
    excludedNodes.add(dn);
  }
  // choose replication targets: NOT HOLDING THE GLOBAL LOCK
  // It is costly to extract the filename for which chooseTargets is called,
  // so for now we pass in the block collection itself.
  rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
}
namesystem.writeLock();

上面的实现可以看出,chooseTargets之前释放了全局锁,chooseTargets后重新申请到全局锁,唯独中间的chooseTargets在锁之外。至此,问题触发条件、场景等基本清楚:

(1)setReplication将文件的副本调大,此时会有一批属于该文件的Block进入UnderReplicatedBlocks等待ReplicationMonitor处理;
(2)ReplicationMonitor从UnderReplicatedBlocks中取出部分Block,并在前期根据处理逻辑初始化相关参数,将每个Block打包成ReplicationWork,取出的所有Block完成打包后组成ReplicationWork集合,这个过程持有全局锁;
(3)当步骤2释放完全局锁后,被删除请求的RPC抢到全局锁,恰好这次删除操作对应文件即是步骤(1)中的文件,此时Block的NumBytes被设置成Long.MAX_VALUE,并被从BlocksMap,pendingReplications及UnderReplicatedBlocks中删除,但是该Block对象的引用还被步骤(2)中的ReplicationWork集合持有,不会被JVM回收,不同的是ReplicationWork集合中对应Block的NumBytes已经被修改成Long.MAX_VALUE;
(4)ReplicationMonitor中computeReplicationWorkForBlocks继续进行chooseTarget时显然已经不可能在集群中选出合适的节点,即使遍历完整个集群,本质上还是由于块大小已经是Long.MAX_VALUE,不可能有节点能满足需求。

通过单元测试对该场景能够稳定复现。

五、解决方式

问题分析完后,解决办法其实比较简单:

(1)如果ReplicationMonitor遇到了Block的NumBytes=BlockCommand.NO_ACK,直接将该Block从UnderReplicatedBlocks中删除;
(2)如果chooseTarget时遇到了Block的NumBytes=BlockCommand.NO_ACK,直接返回空,无需再遍历整个集群节点;

至此彻底解决了线上隐藏将近了一个月的Bug。线上再没有出现该异常。详细Patch见:https://issues.apache.org/jira/browse/HDFS-10453

六、经验

回头看追查的整个过程,有几点值得总结的经验:

1、日志经过多次才调整到位,中间遗漏了关键的信息(block.getNumBytes),如果开始及时收集到这个信息,可以省去很多时间,所以如果能够准确快速收集关键数据,问题已经解决一半;

2、场景复现时提高并发其实是可以复现的,当时仅利用小工具模拟简单的场景,没有在真实环境进行高并发复现,错过一次可以定位的机会,合理的假设怀疑和严谨的场景复现很重要;

3、虽然问题在线上存在了超过两周时间,但是并没有实际影响到集群正常服务,得益于中间合理可控的缓解手段。如果不能彻底解决可以尝试通过各种方法缓解或绕过问题值得借鉴,这种方法论随处可见,但是只有亲自趟过坑后才能印象深刻。

4、回过头再看整个问题,解决问题的思路没有问题,但追查过程其实存在一个严重Bug,不再展开详述。

HDFS集中式缓存管理

一、背景

Hadoop设计之初借鉴GFS/MapReduce的思想:移动计算的成本远小于移动数据的成本。所以调度通常会尽可能将计算移动到拥有数据的节点上,在作业执行过程中,从HDFS角度看,计算和数据通常是同一个DataNode节点,即存在大量的本地读写。

但是HDFS最初实现时,并没有区分本地读和远程读,二者的实现方式完全一样,都是先由DataNode读取数据,然后通过DFSClient与DataNode之间的Socket管道进行数据交互。这样的实现方式很显然由于经过DataNode中转对数据读性能有一定的影响。

社区很早也关注到了这一问题,先后提出过两种方案来提升性能,分别是HDFS-347HDFS-2246

HDFS-2246是比较直接和自然的想法,既然DFSClient和DataNode在同一个节点上,当DFSClient对数据发出读请求后,DataNode提供给DFSClient包括文件路径,偏移量和长度的三元组(path,offset,length),DFSClient拿到这些信息后直接从文件系统读取数据,从而绕过DataNode避免一次数据中转的过程。但是这个方案存在两个问题,首先,HDFS需要为所有用户配置白名单,赋予其可读权限,当增加新用户需要更新白名单,维护不方便;其次,当为用户赋权后,意味着用户拥有了访问所有数据的权限,相当于超级用户,从而导致数据存在安全漏洞。

HDFS-347使用UNIX提供的Unix Domain Socket进程通讯机制实现了安全的本地短路读取。DFSClient向DataNode请求数据时,DataNode打开块文件和元数据文件,通过Unix Domain Socket将对应的文件描述符传给DFSClient,而不再是路径、偏移量和长度等三元组。文件描述符是只读的,DFSClient不能随意修改接收到的文件。同时由于DFSClient自身无法访问块所在的目录,也就不能访问未授权数据。

虽然本地短路读在性能上有了明显的提升,但是从全集群看,依然存在几个性能问题:
(1)DFSClient向DataNode发起数据读请求后,DataNode在OS Buffer对数据会进行Cache,但是数据Cache的分布情况并没有显式暴漏给上层,对任务调度透明,造成Cache浪费。比如同一Block多个副本可能被Cache在多个存储这些副本的DataNode OS Buffer,造成内存资源浪费。
(2)由于Cache的分布对任务调度透明,一些低优先级任务的读请求有可能将高优先级任务正在使用的数据从Cache中淘汰出去,造成数据必须从磁盘读,增加读数据的开销从而影响任务的完成时间,甚至影响到关键生产任务SLA。

针对这些问题,社区在2013年提出集中式缓存方案(Centralized cache management)HDFS-4949,由NameNode对DataNode的Cache进行统一集中管理,并将缓存接口显式暴漏给上层应用,该功能在2.3.0发布。这个功能对于提升HDFS读性能和上层应用的执行效率与实时性有很大帮助。

集中式缓存方案的主要优势:
(1)用户可以指定常用数据或者高优先级任务对应的数据常驻内存,避免被淘汰到磁盘。例如在数据仓库应用中事实表会频繁与其他表JOIN,如果将这些事实表常驻内存,当DataNode内存使用紧张的时候也不会把这些数据淘汰出去,可以很好的实现了对于关键生产任务的SLA保障;
(2)由NameNode统一进行缓存的集中管理,DFSClient根据Block被Cache分布情况调度任务,尽可能实现本地内存读,减少资源浪费;
(3)明显提升读性能。当DFSClient要读取的数据被Cache在同一DataNode时,可以通过ZeroCopy直接从内存读,略过磁盘IO和checksum校验等环节,从而提升读性能;
(4)由于NameNode统一管理并作为元数据的一部分进行持久化处理,即使DataNode节点出现宕机,Block移动,集群重启,Cache不会受到影响。

二、部署与使用

2.1 部署

集群开启HDFS集中式缓存特性非常简单,虽然HDFS本身为集中式缓存在NameNode/DataNode端均提供了多个配置参数,但是大多不是必须配置项,最核心的配置项是DataNode侧一个参数。

hdfs-site.xmlhdfs-default.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
<property>
  <name>dfs.datanode.max.locked.memory</name>
  <value>0</value>
  <description>
    The amount of memory in bytes to use for caching of block replicas in
    memory on the datanode. The datanode's maximum locked memory soft ulimit
    (RLIMIT_MEMLOCK) must be set to at least this value, else the datanode
    will abort on startup.
    By default, this parameter is set to 0, which disables in-memory caching.
    If the native libraries are not available to the DataNode, this
    configuration has no effect.
  </description>
</property>

如配置项描述所述,该配置的默认值为0,表示集中式缓存特性处于关闭状态,选择适当的值打开该特性。

开启集中式缓存特性需要注意两个前提:
(1)DataNode的native库必须可用;因为集中式缓存特性通过系统调用mmap/mlock实现,DataNode需要通过native库支持完成系统调用,否则会导致该特性不生效。
(2)系统memlock至少与配置值相同;因为集中式缓存特性通过系统调用mmap/mlock实现,所以系统最大锁定内存空间需要至少与DataNode配置的锁定空间大小相同,否则会导致DataNode进行启动失败。

此外,HDFS还包括了其他可选的配置项:

hdfs-site.xmlhdfs-default.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<property>
<name>dfs.namenode.path.based.cache.refresh.interval.ms</name>
<value>30000</value>
<description>The amount of milliseconds between subsequent path cache rescans. Path cache rescans are when we calculate which blocks should be cached, and on what datanodes. By default, this parameter is set to 30 seconds.</description>
</property>
<property>
<name>
dfs.namenode.path.based.cache.block.map.allocation.percent
</name>
<value>0.25</value>
<description>The percentage of the Java heap which we will allocate to the cached blocks map. The cached blocks map is a hash map which uses chained hashing. Smaller maps may be accessed more slowly if the number of cached blocks is large; larger maps will consume more memory.</description>
</property>
<property>
<name>dfs.cachereport.intervalMsec</name>
<value>10000</value>
<description>Determines cache reporting interval in milliseconds. After this amount of time, the DataNode sends a full report of its cache state to the NameNode. The NameNode uses the cache report to update its map of cached blocks to DataNode locations. This configuration has no effect if in-memory caching has been disabled by setting dfs.datanode.max.locked.memory to 0 (which is the default). If the native libraries are not available to the DataNode, this configuration has no effect.</description>
</property>
property>
<name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
<value>4</value>
<description>The maximum number of threads per volume to use for caching new data on the datanode. These threads consume both I/O and CPU. This can affect normal datanode operations.</description>
</property>

2.2 使用

HDFS集中式缓存对数据读写接口并没有影响,正常调用已缓存数据的读写即可使用缓存特性。 为了便于数据管理,HDFS通过CacheAdmin对外暴露了一系列缓存管理的接口,其中CLI接口如下。

CacheAdmin
1
2
3
4
5
6
7
8
9
10
11
Usage: bin/hdfs cacheadmin [COMMAND]
          [-addDirective -path <path> -pool <pool-name> [-force] [-replication <replication>] [-ttl <time-to-live>]]
          [-modifyDirective -id <id> [-path <path>] [-force] [-replication <replication>] [-pool <pool-name>] [-ttl <time-to-live>]]
          [-listDirectives [-stats] [-path <path>] [-pool <pool>] [-id <id>]
          [-removeDirective <id>]
          [-removeDirectives -path <path>]
          [-addPool <name> [-owner <owner>] [-group <group>] [-mode <mode>] [-limit <limit>] [-maxTtl <maxTtl>]
          [-modifyPool <name> [-owner <owner>] [-group <group>] [-mode <mode>] [-limit <limit>] [-maxTtl <maxTtl>]]
          [-removePool <name>]
          [-listPools [-stats] [<name>]]
          [-help <command-name>]

CacheAdmin主要对用户暴露的是对缓存数据/缓存池(在系统架构及原理进行详细解释)的增删改查功能,另外还提供了相关的缓存策略,供用户灵活使用。
如这里期望能够缓存数据仓库中被频繁访问的user表数据:

CacheAdmin CLI
1
2
$HADOOP_HOME/bin/hdfs cacheadmin -addPool factPool -owner hadoop-user -group hadoop-user -mod 777 -limit 1024000000 -ttl 2d
$HADOOP_HOME/bin/hdfs cacheadmin -addDirective -path /user/hive/warehouse/dw.db/user -pool factPool -force -replication 3 -ttl 1d

首先新建名称为factPool的缓存池,并赋予相关的用户组及权限等信息,另外限制该缓存池可以缓存的最大空间及缓存数据的最大TTL等;
然后将user表数据加入到缓存池factPool进行缓存,并指定缓存时间为1天,缓存3个副本;
之后当有读user表数据的请求过来后即可调度到缓存节点上从内存直接读取,从而提升读性能。其它CLI的用法可类比这里不再一一罗列。

2.3 适用场景

当前内存相比HDD成本还比较高,另外对于Hadoop集群,节点的内存大部分是分配给YARN供计算使用,所以剩余的内存资源其实非常有限,能够提供给HDFS集中式缓存使用的部分更少,为了使有限的资源发挥出最好的效率,这里提供几点建议:
(1)数据仓库中存在一部分事实表被频繁访问,与其他事实表/维度表JOIN,将访问频率较高的部分事实表进行缓存,可以提高数据生产的效率;
(2)根据局部性原理,最近写入的数据最容易被访问到,从数据仓库应用来看,每天有大量报表统计任务,需要读取前一天数据做分析,事实上大量表都是按天进行分区,可以把符合要求的热点分区数据做缓存处理,过期后清理缓存,也能大幅提升生产和统计效率;
(3)资源数据,当前存在非常多的计算框架依赖JAR/SO等一些公共资源,传统的做法是将这些资源数据写入到HDFS,通过Distributed Cache进行全局共享,也便于管理,如Spark/Tez/Hive/Kafaka等使用到的公共JAR包。如果将这部分资源数据进行长期缓存,可以优化JVM初始化时间,进而提升效率;
(4)其他;

三、系统架构及原理

3.1 架构

设计文档中定义集中式缓存机制(Centralized cache management):

An explicit caching mechanism that allows users to specify paths to be cached by HDFS.

其中包含了若干具体的目标:

Strong semantics for cached paths
Exposing cache state to framework schedulers
Defer cache policy decisions to higher­ level frameworks
Backwards compatibility
Management, debugging, metrics
Security
Quotas

为了实现上述目标,首先引入两个重要的概念:CacheDirective,CachePool。其中CacheDirective定义了缓存基本单元,本质上是文件系统的目录或文件与具体缓存策略等属性的集合;为了便于灵活管理,将属性类似的一组CacheDirective组成缓存池(CachePool),在缓存池CachePool上可以进行权限、Quota、缓存策略和统计信息等灵活控制。
在具体展开集中式缓存的系统架构和原理前,首先梳理对CacheDirective缓存的详细流程,具体如图1:
(1)用户通过调用客户端接口向NameNode发起对CacheDirective(Directory/File)缓存请求;
(2)NameNode接收到缓存请求后,将CacheDirective转换成需要缓存的Block集合,并根据一定的策略并将其加入到缓存队列中;
(3)NameNode接收到DataNode心跳后,将缓存Block的指令下发到DataNode;
(4)DataNode接收到NameNode发下的缓存指令后根据实际情况触发Native JNI进行系统调用,对Block数据进行实际缓存;
(5)此后DataNode定期(默认10s)向NameNode进行缓存汇报,更新当前节点的缓存状态;
(6)上层调度尽可能将任务调度到数据所在的DataNode,当客户端进行读数据请求时,通过DFSClient直接从内存进行ZeroCopy,从而显著提升性能;

HDFS集中式缓存管理流程图

HDFS集中式缓存的架构如图2所示,这里主要涉及到NameNode和DataNode两侧的管理和实现,NameNode对数据缓存的统一集中管理,并根据策略调度合适的DataNode对具体的数据进行数据的缓存和置换,DataNode根据NameNode的指令执行对数据的实际缓存和置换。

HDFS集中式缓存架构图

3.2 DataNode

DataNode是执行缓存和置换的具体执行者,具体来说即cacheBlock和uncacheBlock调用。FsDatasetImpl#FsDatasetCache类是该操作的执行入口。

Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2) system calls to lock blocks into memory. Block checksums are verified upon entry into the cache.

FsDatasetCache的核心是称为mappableBlockMap的HashMap,用于维护当前缓存的Block集合,其中Key为标记该Block的ExtendedBlockId,为了能够实现与Federation的兼容,在blockid的基础上增加了blockpoolid,这样多个blockpool的block不会因为blockid相同产生冲突;Value是具体的缓存数据及当前的缓存状态。

FsDatasetCache#mappableBlockMap
1
private final HashMap<ExtendedBlockId, Value> mappableBlockMap = new HashMap<ExtendedBlockId, Value>();

mappableBlockMap更新只有FsdatasetCache提供的两个具体的函数入口:

FsDatasetCache.java
1
2
synchronized void cacheBlock(long blockId, String bpid, String blockFileName, long length, long genstamp, Executor volumeExecutor)
synchronized void uncacheBlock(String bpid, long blockId)

可以看出,对mappableBlockMap的并发控制实际上放在了cacheBlock和uncacheBlock两个方法上,虽然锁粒度比较大,但是并不会对并发读写带来影响。原因是:cacheBlock在系统调用前构造空Value结构加入mappableBlockMap中,此时该Value维护的Block状态是CACHING,之后将真正缓存数据的任务加入异步任务CachingTask去完成,所以锁很快会被释放,当处于CACHING状态的Block被访问的时候会退化到从HDD访问,异步任务CachingTask完成数据缓存后将其状态置为CACHED;uncacheBlock是同样原理。所以,虽然锁的粒度比较大,但是并不会阻塞后续的数据缓存任务,也不会对数据读写带来额外的开销。

顺着自底向上的思路,再来看触发cacheBlock和uncacheBlock的场景,通过函数调用关系容易看到缓存和置换的触发场景都比较简单。
(1)cacheBlock:唯一的入口是从ANN(HA Active NameNode)下发的指令;


(2)uncacheBlock:与cacheBlock不同,uncacheBlock存在三个入口:append,invalidate和uncache。其中uncache也是来自ANN下发的指令;append和invalidate触发uncacheBlock的原因是:append会导致数据发生变化,缓存失效需要清理后重新缓存,invalidate来自删除操作,缓存同样失效需要清理。


DataNode的处理逻辑比较简单,到这里整个实现的主路径基本梳理完成。

3.3 NameNode

相比DataNode的实现逻辑,NameNode侧要复杂的多,缓存管理继承了NameNode一贯的模块化思路,通过CacheManager实现了整个集中式缓存在管理端的复杂处理逻辑。
CacheManager通过几个关键数据结构组织对数据缓存的实现。

CacheManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
 * Cache directives, sorted by ID.
 */
private final TreeMap<Long, CacheDirective> directivesById = new TreeMap<Long, CacheDirective>();
/**
 * Cache directives, sorted by path
 */
private final TreeMap<String, List<CacheDirective>> directivesByPath = new TreeMap<String, List<CacheDirective>>();
/**
 * Cache pools, sorted by name.
 */
private final TreeMap<String, CachePool> cachePools = new TreeMap<String, CachePool>();
/**
 * All cached blocks.
 */
private final GSet<CachedBlock, CachedBlock> cachedBlocks;
private CacheReplicationMonitor monitor;

通过上述的几个核心集合类数据结构,很容易实现用户层对缓存数据/缓存池的增删改查功能调用的支持。

虽然上述的几个核心集合类数据结构能够容易支持用户对相关接口的调用,但是从整个集中式缓存全局来看,并没有将用户调用接口对目录/文件的缓存同DataNode实际数据缓存建立起有效连接。CacheReplicationMonitor发挥的即是这种作用,CacheReplicationMonitor使得缓存接口调用、核心数据结构以及数据缓存真正有序流动起来。

如果对BlockManager#ReplicationMonitor比较熟悉的话,可以发现CacheReplicationMonitor的工作模式几乎从ReplicationMonitor复制而来,CacheReplicationMonitor本质上也是一个定时线程,与ReplicationMonitor稍微不同的是,CacheReplicationMonitor除了定时触发外用户的缓存调用也会触发。其核心是其rescan方法,具体来看主要做三个具体工作:
(1)resetStatistics:对CacheManager从CacheDirective和CachePool两个维度对统计信息进行更新;
(2)rescanCacheDirectives:顺序扫描CacheManager#directivesById数据结构(与CacheManager#directivesByPath实际上等价),检查哪些CacheDirective(目录或文件)需要进行cache处理,一旦发现有文件需要进行缓存,立即将该文件的所有Block加入到CacheReplicationMonitor#cachedBlocks(GSet<CachedBlock, CachedBlock> cachedBlocks)中,后续工作由rescanCachedBlockMap接着进行;
(3)rescanCachedBlockMap:顺序扫描CacheReplicationMonitor#cachedBlocks的所有CacheBlock,由于CacheBlock也有副本个数的概念,rescanCachedBlockMap在扫描的过程中会发现实际缓存的Block副本数与预设的缓存副本有差异,比如新增缓存请求/节点宕机/心跳异常/节点下线等等导致Cache Block副本数与期望值之间产生差异,所以需要CacheReplicationMonitor进行周期检查和修复,根据差异多少关系将对应的CacheBlock加入到DatanodeManager管理的对应DatanodeDescriptor#{pendingCached,pendingUncached}数据结构中,待对应的DataNode心跳过来后将其转化成对应的执行指令下发给DataNode实际执行。关于心跳与指令下发的细节已经在之前文章中多处提到,这里不再展开。

这里还遗留一个问题,由于Block与CacheBlock均存在多副本的关系,如何选择具体的DataNode执行缓存或置换。
(1)uncacheBlock:uncacheBlock选择对应的DataNode其实比较简单,顺序遍历所有已经缓存了该Block的DataNode,为其准备uncacheBlock指令,直到缓存副本达到预期即可;
(2)cacheBlock:cacheBlock稍微复杂,先从Block副本所在的所有DataNode集合中排除DecommissionInProgress/Decommissioned/CorruptBlock所在DataNode/已经缓存了该Block的DataNode之后,在剩下的DataNode集合中随机进行选择即可。

DataNode实际执行完成NameNode下发的cacheBlock/uncacheBlock指令后,在下次cacheReport(默认时间间隔10s)汇报给NameNode,CacheManager根据汇报情况对缓存执行情况对CacheManager#cachedBlocks进行更新。

至此,NameNode端的集中式缓存逻辑形成了合理有效的闭环,基本实现了设计目标。

3.3 DFSClient

客户端本身的读数据逻辑基本没有变化,与传统的读模式区别在于,当客户端向DataNode发出REQUEST_SHORT_CIRCUIT_FDS请求到达DataNode后,DataNode会首先判断数据是否被缓存,如果数据已经缓存,将缓存信息返回到客户端后续直接进行内存数据的ZeroCopy;如果数据没有缓存采用传统方式进行ShortCircuit-Read。

四、总结

通过前述分析,可以看到HDFS集中式缓存优势非常明显:
1、显著提升数据读性能;
2、提升集群内存利用率;

虽然优势明显,但是HDFS集中式缓存目前还有一些不足:
1、内存置换策略(LRU/LFU/etc.)尚不支持,如果内存资源不足,缓存会失败;
2、集中式缓存与Balancer之间还不能很好的兼容,大量的Block迁移会造成频繁内存数据交换;
3、缓存能力仅对读有效,对写来说其实存在副作用,尤其是Append;
4、与Federation的兼容非常不友好;

总之,内存模式在存储和计算的多个方向上已经成为业界广泛关注的方向和趋势,Hadoop社区也在投入精力持续在内存上发力,从集中式缓存来看,收益非常明显,但是还存在一些小问题,在实际应用过程中还需要优化和改进。

五、引用

[1] https://issues.apache.org/jira/browse/HDFS-4949
[2] http://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
[3] https://issues.apache.org/jira/secure/attachment/12610186/caching-design-doc-2013-10-24.pdf
[4] https://en.wikipedia.org/wiki/Zero-copy
[5] https://en.wikipedia.org/wiki/Mmap
[6] https://issues.apache.org/jira/browse/HDFS-347
[7] https://issues.apache.org/jira/browse/HDFS-2246

NameNode内存详解

一、背景

NameNode内存数据主要对整个文件系统元数据的管理。Namenode目前元数据管理可以分成两个层次,一个是Namespace的管理层,这一层负责管理HDFS分布式文件系统中的树状目录和文件结构;另一层则为Block管理层,这一层负责管理HDFS分布式文件系统中存储文件到物理块之间的映射关系BlocksMap元数据。其中对Namespace的管理数据除在内存常驻外,会定期Flush到持久化设备中;对BlocksMap元数据的管理只存在内存;当NameNode发生重启,需要从持久化设备中读取Namespace管理数据,并重新构造BlocksMap。这两部分数据结构占用巨大的JVM Heap空间。

除了对文件系统本身元数据的管理外,NameNode还需要维护DataNode本身的元数据,这部分空间相对固定,且占用空间较小。

从实际Hadoop集群环境历史数据看,当Namespace中包含INode(目录和文件总量)~140M,数据块数量~160M,常驻内存使用量达在~50G。随着数据规模的持续增长,内存占用接近同步线性增长。在整个HDFS服务中,NameNode的核心作用及内存数据结构的重要地位,所以分析内存使用情况对维护HDFS服务稳定性至关重要。

这里在《NameNode内存全景》基础上,进一步对NameNode内存中关键数据结构的细节进行详细解读。

二、内存分析

2.1 NetworkTopology

NameNode除对文件系统本身元数据的管理外还需要维护DataNode的信息,主要通过NetworkTopology中DatanodeDescriptor对DataNode进行表示,该类继承结构如下图示:


在整个继承机构中各个类的内存占用情况:


其中DatanodeDescriptor还包括一部分非常驻内存对象,这里没有详细统计,所以结果可能会有少许误差。

假设集群中包括1000个DataNode节点,仅DataNode部分占用内存情况:

(48+88+64)*1000=200000=195+K

所以仅NetworkTopology维护的DataNode信息,相比整个NameNode所占的内存空间微乎其微。

2.2 Namespace

与传统单机文件系统相似,HDFS对文件系统的目录结构也是按照树状结构维护,Namespace保存的正是目录树及每个目录/文件节点的属性,包括:名称(name),编号(id),所属用户(user),所属组(group),权限(permission),修改时间(mtime),访问时间(atime),子目录/文件(children)等信息。

下图为整个Namespace中INode的类图结构,从类图可以看出,INodeFile和INodeDirectory的继承关系。其中目录在内存中由INodeDirectory对象来表示,并用List children成员列表来标识该目录下的子目录和文件;文件在内存中则由INodeFile来表示,并用BlockInfo[] blocks成员数组来标识该文件由那些blocks分块组成。其他的目录/文件属性在该类继承结构的各个相应子类成员变量标识。


在整个继承关系中不同类的内存占用情况:


其中,除了上面提到的结构外,如目录节点的附加属性等非通用数据结构,没有在统计范围内。另外,INodeFile和INodeDirectory.withQuotaFeature在内存中使用最为广泛的两个结构。

假设Namespace包含INode数为1亿,仅Namespace占用内存情况:
(24+104+8+avg(32+44+ 8 * 2, 56+ 8 * 2)) * 100000000=21800000000=20.30GB

Namespace数据会定期持久化到外存设备上,内存也会常驻,在整个NameNode的生命周期内一直缓存在内存中,随着HDFS中存储的数据增多,文件数/目录树也会随之增加,占用内存空间也会同步增加。NameNode进程、单机内存容量及JVM对内存管理能力将成为制约HDFS的主要瓶颈。

2.3 BlocksMap

在HDFS中,每个block都对应多个副本,存储在不同的存储节点DataNode上。在NameNode元数据管理上需要维护从Block到DataNode列表的对应关系,描述每一个Block副本实际存储的物理位置,当前BlocksMap解决的即是从Block对对应DataNode列表的映射关系。

BlocksMap内部数据结构如下图示:


随着存储量规模不断增加,BlocksMap在内存中占用的空间会随之增加,社区在BlocksMap的数据结构使用上做过优化,最初直接使用HashMap解决从Block到BlockInfo的映射关系,之后经过优化使用重新实现的LightWeightGSet代替HashMap,该数据结构通过数据保存元素信息,利用链表解决碰撞冲突,达到更少的内存使用。

该数据结构里Block对象中只记录了blockid,blocksize和timestamp。BlockInfo继承自Block,除了Block对象中保存的信息外,最重要的是该block对应的DataNode的列表信息。

内存占用情况如下:


LightWeightGSet与HashMap相比,减少了HashMap在load factor避免冲突的额外内存开销,即使经过优化,BlocksMap也是占用了大量的内存空间,假设HDFS中存在1亿Block ,其占用内存情况:

(40+120+8)* 100000000=16800000000=15.65GB

BlocksMap数据常驻内存,在整个NameNode生命周期内一直缓存内存中,随着数据规模的增加,对应Namespace和Block数会随之增多,NameNode进程、单机内存容量及JVM对内存管理能力将成为主要瓶颈。

2.4 小结

根据前述对当前线上集群数据量:Namespace中包含INode(目录和文件总量):~140M,数据块数量:160M,计算内存使用情况:

Namespace:(24+104+8+avg(32+44+ 8 * 2, 56+ 8 * 2)) * 140M = ~29.0 GB
BlocksMap:(40+120+8) * 160M = ~25.0 GB
二者组合结果:29.0 GB + 25.0 GB = 53.0 GB

结果来看与监控常驻内存~50GB接近,基本符合实际情况。

从前面的讨论可以看出,在NameNode整个内存对象里,占用空间最大的两个结构即为Namespace和BlocksMap,当数据规模增加后,巨大的内存占用势必会制约NameNode进程的服务能力,尤其对JVM的内存管理会带来极大的挑战。

据了解业界在NameNode上JVM最大使用到180G,结合前面的计算可以得知,元数据总量700M基本上是服务的上限。

三、结论

1、NameNode内存使用量预估模型:Total=218 * num(INode) + 168 * num(Blocks);
2、受JVM可管理内存上限等物理因素,180G内存下,NameNode服务上限的元数据量约700M。

HDFS升级过程中重启方式选择

一、背景

集群在运行过程中,由于升级等原因难免会遇到重启NameNode或整个集群节点的情况,不同的重启方式会影响到整个运维操作的效率。
刚接触HDFS时的某次全集群内DataNode升级,遇到一次非预期内的NameNode重启。升级时,NameNode首先进入Safemode模式,全集群禁止写操作。DataNode数据包和配置更新后操作了所有DataNode一次性重启,之后NameNode间歇性不能响应,持续高负载达~45min,之后不得不通过重启NameNode,之后~35min全集群启动完成,服务恢复正常。

由此引出问题:
1、造成全集群DataNode重启后NameNode不能正常响应的根本原因是什么?
2、重启NameNode为什么能够实现恢复服务?

二、原因分析

1、现场还原

在集群重启过程中,不管以什么方式进行重启,避免不了DataNode向NameNode进行BlockReport的交互,从NameNode现场截取两个时间段里部分BlockReport日志。

表1 不同重启方式NameNode处理BR时间统计

DN Restart Only - - With NN Restart - -
Date Blocks ProcessTime Date Blocks ProcessTime
2015-04-28 13:15:51 49428 646 msecs 2015-04-28 14:00:57 66392 73 msecs
2015-04-28 13:15:51 49428 646 msecs 2015-04-28 14:00:57 66392 73 msecs
2015-04-28 13:15:51 49068 644 msecs 2015-04-28 14:00:57 69978 84 msecs
2015-04-28 13:15:52 51822 638 msecs 2015-04-28 14:00:57 78222 98 msecs
2015-04-28 13:15:53 83131 1214 msecs 2015-04-28 14:00:57 64663 71 msecs
2015-04-28 13:15:53 90088 169 msecs 2015-04-28 14:00:57 85106 99 msecs
2015-04-28 13:15:54 82024 1107 msecs 2015-04-28 14:00:57 87346 96 msecs
2015-04-28 13:15:55 48114 637 msecs 2015-04-28 14:00:57 87802 96 msecs
2015-04-28 13:15:55 49457 84 msecs 2015-04-28 14:00:57 65646 71 msecs
2015-04-28 13:15:56 82989 457 msecs 2015-04-28 14:00:57 71025 86 msecs
2015-04-28 13:15:57 84634 1181 msecs 2015-04-28 14:00:57 66144 73 msecs
2015-04-28 13:15:58 67321 885 msecs 2015-04-28 14:00:57 72652 90 msecs
2015-04-28 13:15:59 70668 924 msecs 2015-04-28 14:00:57 66118 76 msecs
2015-04-28 13:15:59 73114 138 msecs 2015-04-28 14:00:58 67011 74 msecs
2015-04-28 13:15:59 28215 692 msecs 2015-04-28 14:00:58 78216 84 msecs
2015-04-28 13:16:00 30080 321 msecs 2015-04-28 14:00:58 60988 66 msecs
2015-04-28 13:16:00 30435 329 msecs 2015-04-28 14:00:58 52376 58 msecs
2015-04-28 13:16:00 34350 360 msecs 2015-04-28 14:00:58 66801 73 msecs
2015-04-28 13:16:01 32487 344 msecs 2015-04-28 14:00:58 49134 53 msecs
2015-04-28 13:16:01 28244 308 msecs 2015-04-28 14:00:58 66928 73 msecs
2015-04-28 13:16:01 29138 308 msecs 2015-04-28 14:00:58 75560 82 msecs
2015-04-28 13:16:02 29765 301 msecs 2015-04-28 14:00:58 83880 92 msecs
2015-04-28 13:16:02 28699 309 msecs 2015-04-28 14:00:58 82989 93 msecs
2015-04-28 13:16:02 35377 370 msecs 2015-04-28 14:00:58 56210 60 msecs
2015-04-28 13:16:03 49204 626 msecs 2015-04-28 14:00:58 65517 78 msecs
2015-04-28 13:16:03 27554 438 msecs 2015-04-28 14:00:58 76159 78 msecs
2015-04-28 13:16:04 27285 326 msecs 2015-04-28 14:00:58 59725 58 msecs

左半部分记录的是重启全集群DataNode后,NameNode处理单个BlockReport请求耗时,右半部分为重启NameNode后,处理单个BlockReport请求耗时。这里只列了部分数据,虽不具统计意义,但是在处理时间的量级上可信。

从数据上可以看到,对于BlockReport类型的RPC请求,不同的重启方式,RPC的处理时间有明显差异。

2、深度分析

前面也提到从数据上看,对于BlockReport类型的RPC请求,重启全集群DataNode与重启NameNode,RPC处理时间有一个数量级的差别。这种差别通过代码得到验证。

BlockManager.javaGithub
1
2
3
4
5
6
7
if (storageInfo.getBlockReportCount() == 0) {
  // The first block report can be processed a lot more efficiently than
  // ordinary block reports.  This shortens restart times.
  processFirstBlockReport(node, storage.getStorageID(), newReport);
} else {
  invalidatedBlocks = processReport(node, storage, newReport);
}

可以看到NameNode对BlockReport的处理方式仅区别于是否为初次BlockReport。初次BlockReport显然只发生在NameNode重启期间。
processFirstBlockReport:对Standby节点(NameNode重启期间均为Standby),如果汇报的数据块相关元数据还没有加载,会将报告的块信息暂存队列,当Standby节点完成加载相关元数据后,再处理该消息队列; 对第一次块汇报的处理比较特别,为提高处理效率,仅验证块是否损坏,然后判断块状态是否为FINALIZED状态,如果是建立块与DN节点的映射,其他信息一概暂不处理。
processReport:对于非初次块汇报,处理逻辑要复杂很多;对报告的每个块信息,不仅会建立块与DN的映射,还会检查是否损坏,是否无效,是否需要删除,是否为UC状态等等。

初次块汇报的处理逻辑单独拿出来,主要原因有两方面:
1、加快NameNode的启动时间;统计数据也能说明,初次块汇报的处理时间比正常块汇报的处理时间能节省约一个数量级的时间。
2、由于启动过程中,不提供正常读写服务,所以只要确保正常数据(整个Namespace和所有FINALIZED状态Blocks)无误,无效和冗余数据处理完全可以延后到IBR或next BR。
说明:
1、是否选择processFirstBlockReport处理逻辑不会因为NameNode当前为safemode或者standby发生变化,仅NameNode重启生效;
2、BlockReport的处理时间与DataNode数据规模正相关,当前DataNode中Block数处于:200,000 ~ 1,000,000。
如果不操作NameNode重启,BlockReport处理时间会因为处理逻辑复杂带来额外的处理时间,统计数据显示,约一个数量级的差别。

NameNode对非第一次BlockReport的复杂处理逻辑只是NameNode负载持续处于高位的诱因,在其诱发下发生了一系列“滚雪球”式的异常放大。
1、所有DataNode进程被关闭后,NameNode的CallQueue(默认大小:3200)会被快速消费完;


2、所有DataNode进程被重启后,NameNode的CallQueue会被迅速填充,主要来自DataNode重启后正常流程里的VersionRequest和registerDataNode两类RPC请求,由于均较轻量,所以也会被迅速消费完;


3、之后DataNode进入BlockReport流程,NameNode的CallQueue填充内容开始从VersionRequest和registerDataNode向BlockReport过渡;


直到CallQueue里几乎被所有BlockReport填充满。


前面的统计数据显示,NameNode不重启对BlockReport的处理时间~500ms,另一个关键数据是Client看到的RPC超时时间,默认为60s;在默认的RPC超时时间范围内,CallQueue里最多可能被处理的BlockReport数~120个,其它均会发生超时。 当发生超时后,Client端(DataNode)会尝试重试,所以NameNode的CallQueue会被持续打满;另一方面,如果NameNode发现RPC Request出现超时会被忽略(可以从日志证实),直到存在未超时的请求,此时从CallQueue拿出来的BlockReport请求虽未超时,但也处于即将超时的边缘,即使处理完成其中的少数几个,CallQueue中的剩余大部分也会出现超时。

namenode.lgo
1
2
3
4
5
6
7
8
9
2015-04-28 13:14:48,709 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.45.38:37649 Call#650 Retry#0
2015-04-28 13:14:48,709 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.53.5:14839 Call#659 Retry#0
2015-04-28 13:14:48,709 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.62.5:55833 Call#702 Retry#0
2015-04-28 13:14:48,709 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.51.31:41016 Call#655 Retry#0
2015-04-28 13:14:48,709 INFO org.apache.hadoop.ipc.Server: IPC Server handler 29 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.62.36:53163 Call#702 Retry#0
2015-04-28 13:14:48,709 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.46.32:53530 Call#662 Retry#0
2015-04-28 13:14:48,710 INFO org.apache.hadoop.ipc.Server: IPC Server handler 29 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.55.11:52372 Call#662 Retry#0
2015-04-28 13:14:48,710 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.55.44:30295 Call#666 Retry#0
2015-04-28 13:14:48,710 INFO org.apache.hadoop.ipc.Server: IPC Server handler 29 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.16.50.4:37880 Call#674 Retry#0

通过前面的分析,从整个Timeline上看,NameNode长期处于满负荷运行状态,但是有效处理能力非常低(仅针对BlockReport)。这也是为什么1000+ DataNode(每一个DataNode管理的Block数均未超过1,000,000),也即1000+有效BlockReport请求,在~50min内依然没有被处理完成。

如果DataNode进程处于正常运行状态下,重启NameNode后会发生完全不同的情况。
1、NameNode重启后,首先加载FsImage,此时,除Namespace外NameNode的元数据几乎为空,此后开始接收DataNode过来的RPC请求(绝大多数为Heartbeat);


2、NameNode接收到Heartbeat后由于在初始状态会要求DataNode重新注册;由于Heartbeat间隔是3s,所以从NameNode的角度看,所有DataNode的后续一系列RPC请求会被散列到3s时间线上;


3、DataNode向NameNode注册完成后立即开始BlockReport;由于步骤2里提到的3s时间线散列关系,队列里后半部分BlockReport请求和VersionRequest/registerDataNode请求会出现相互交叉的情况;


4、如前述,处理BlockReport时部分RPC请求一样会发生超时;


5、由于超时重试,所以部分BlockReport和registerDataNode需要重试;可以发现不同于重启所有DataNode时重试的RPC几乎都是BlockReport,这里重试的RPC包括了VersionRequest/registerDataNode(可以从日志证实),这就大幅降低了NameNode的负载,避免了“滚雪球”式高负载RPC堆积,使异常有效收敛。

namenode.log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.32.73.39:16329 Call#2893 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.32.20.15:54831 Call#2889 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.16.62.38:10818 Call#2835 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.16.52.18:59462 Call#2818 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.registerDatanode from 10.16.39.24:13728 Call#2864 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.32.27.8:58789 Call#2883 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.registerDatanode from 10.32.73.40:56606 Call#2889 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.16.40.21:19961 Call#2843 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.16.43.13:22644 Call#2870 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.registerDatanode from 10.16.43.26:16289 Call#2876 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.16.61.30:31968 Call#2825 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.32.21.5:47752 Call#2879 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.blockReport from 10.32.49.11:46892 Call#2904 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.16.36.24:12326 Call#2859 Retry#0
2015-04-28 14:01:19,302 INFO org.apache.hadoop.ipc.Server: IPC Server handler 31 on 8020: skipped org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.versionRequest from 10.16.56.4:55321 Call#2833 Retry#0

3、避免重启大量DataNode时雪崩

从前面的分析过程,可以得出两个结论:
(1)NameNode对正常BlockReport处理效率是造成可能雪崩的根本原因;
(2)BlockReport的堆积让问题完全失控;

从这两个结论出发可以推导出相应的解决办法:

1、解决效率问题:
(1)优化代码逻辑;这块代码相对成熟,可优化的空间不大,另外所需的时间成本较高,暂可不考虑;
(2)降低BlockReport时数据规模;NameNode处理BR的效率低主要原因还是每次BR所带的Block规模过大造成,所以可以通过调整Block数量阈值,将一次BlockReport分成多盘分别汇报,提高NameNode处理效率。可参考的参数为:dfs.blockreport.split.threshold,默认为1,000,000,当前集群DataNode上Block规模数处于240,000 ~ 940,000,建议调整为500,000;另一方面,可以通过在同一个物理节点上部署多个DataNode实例,分散数据,达到缩小规模的目的,但是这种方案仅能解决当前问题,长期来看依然不能避免,且影响范围比较大,需要多方面权衡。

2、解决堆积问题:
(1)控制重启DataNode的数量;按照当前节点数据规模,如果大规模重启DataNode,可采取滚动方式,以~15/单位间隔~1min滚动重启,如果数据规模增长,需要适当调整实例个数;
(2)定期清空CallQueue;如前述,当大规模DataNode实例被同时重启后,如果不采取措施一定会发生“雪崩”,若确实存在类似需求或场景,可以通过定期清空CallQueue(dfsadmin -refreshCallQueue)的方式,避免堆积效应;这种方案的弊端在于不能有选择的清空RPC Request,所以当线上服务期时,存在数据读写请求超时、作业失败的风险。

3、选择合适的重启方式:
(1)当需要对全集群的DataNode重启操作,且规模较大(包括集群规模和数据规模)时,建议在重启DataNode进程之后将NameNode重启,避免前面的“雪崩”问题;
(2)当灰度操作部分DataNode或者集群规模和数据规模均较小时,可采取滚动重启DataNode进程的方式;

三、总结

1、重启所有DataNode时,由于处理BlockReport逻辑不同,及由此诱发的“雪崩式”效应,导致重启进度极度缓慢;
2、在数据规模达到10K~100K,重启一台DataNode都会给NameNode的正常服务造成瞬时抖动;
3、在数据规模到100K量级时,同时重启~15以内DataNode不会对集群造成雪崩式灾难,但是可能出现短时间内服务不可用状态;
4、全集群升级时,建议NameNode和DataNode均重启,在预期时间内可恢复服务。

NameNode内存全景

一、概述

从整个HDFS系统架构上看,NameNode是其中最重要、最复杂也是最容易出现问题的地方,而且一旦NameNode出现故障,整个Hadoop集群就将处于不可服务的状态,同时随着数据规模和集群规模的持续增长,很多小量级时被隐藏的问题逐渐暴露出来。所以,从更高层次掌握NameNode的内部结构和运行机制尤其重要。除特别说明外,本文主要基于社区版本Hadoop-2.4.1[1][2]。

NameNode管理着整个HDFS文件系统的元数据。从架构设计上看,元数据大致分成两个层次:Namespace管理层,负责管理文件系统中的树状目录结构以及文件与数据块的映射关系;块管理层,负责管理文件系统中文件的物理块与实际存储位置的映射关系BlocksMap,如图1所示[1]。Namespace管理的元数据除内存常驻外,也会周期Flush到持久化设备上FsImage文件;BlocksMap元数据只在内存中存在;当NameNode发生重启,首先从持久化设备中读取FsImage构建Namespace,之后根据DataNode的汇报信息重新构造BlocksMap。这两部分数据结构是占据了NameNode大部分JVM Heap空间。

HDFS结构图

除了对文件系统本身元数据的管理之外,NameNode还需要维护整个集群的机架及DataNode的信息、Lease管理以及集中式缓存引入的缓存管理等等。这几部分数据结构空间占用相对固定,且占用较小。

测试数据显示,Namespace目录和文件总量到2亿,数据块总量到3亿后,常驻内存使用量超过90GB。

二、内存全景

如前述,NameNode整个内存结构大致可以分成四大部分:Namespace,BlocksMap,NetworkTopology,LeaseManager & SnapshotManager & CacheManager及其他,图2为各数据结构内存逻辑分布图示。

namenode内存全景图

Namespace:维护整个文件系统的目录树结构,及目录树上的状态变化;
BlocksManager:维护整个文件系统中与数据块相关的信息,及数据块的状态变化;
NetworkTopology:维护机架拓扑及DataNode信息,机架感知的基础;
LeaseManager:读写的互斥同步就是靠Lease实现,支持HDFS的Write-Once-Read-Many的核心数据结构;
CacheManager:Hadoop 2.3.0引入的集中式缓存新特性,支持集中式缓存的管理,实现memory-locality提升读性能;
SnapshotManager:Hadoop 2.1.0引入的Snapshot新特性,用于数据备份、回滚,以防止因用户误操作导致集群出现数据问题;
DelegationTokenSecretManager:管理HDFS的安全访问;
其他:临时数据信息、统计信息metrics等等。

NameNode常驻内存主要被Namespace和BlockManager使用,二者使用占比分别接近50%。其他部分内存开销较小且相对固定,与Namespace和BlockManager相比基本可以忽略。

三、内存分析

3.1 Namespace

与单机文件系统相似,HDFS对文件系统的目录结构也是按照树状结构维护,Namespace保存了目录树及每个目录/文件节点的属性。除在内存常驻外,这部分数据会定期flush到持久化设备上,生成一个新的FsImage文件,方便NameNode发生重启时,从FsImage及时恢复整个Namespace。图3所示为Namespace内存结构。前述集群中目录和文件总量即整个Namespace目录树中包含的节点总数,可见Namespace本身其实是一棵非常巨大的树。

Namespace内存结构

在整个Namespace目录树中存在两种不同类型的INode数据结构:INodeDirectory和INodeFile。其中INodeDirectory标识的是目录树中的目录,INodeFile标识的是目录树中的文件。由于二者均继承自INode,所以具备大部分相同的公共信息INodeWithAdditionalFields,除常用基础属性外,其中还提供了扩展属性features,如Quota,Snapshot等均通过Feature增加,如果以后出现新属性也可通过Feature方便扩展。不同的是,INodeFile特有的标识副本数和数据块大小组合的header(2.6.1之后又新增了标识存储策略ID的信息)及该文件包含的有序Blocks数组;INodeDirectory则特有子节点的列表children。这里需要特别说明children是默认大小为5的ArrayList,按照子节点name有序存储,虽然在插入时会损失一部分写性能,但是可以方便后续快速二分查找提高读性能,对一般存储系统,读操作比写操作占比要高。具体的继承关系见图4所示。

INode继承关系

3.2 BlockManager

BlocksMap在NameNode内存空间占据很大比例,由BlockManager统一管理,相比Namespace,BlockManager管理的这部分数据要复杂的多。Namespace与BlockManager之间通过前面提到的INodeFile有序Blocks数组关联到一起。图5所示BlockManager管理的内存结构。

BlockManager管理的内存结构

每一个INodeFile都会包含数量不等的Block,具体数量由文件大小及每一个Block大小(默认为64M)比值决定,这些Block按照所在文件的先后顺序组成BlockInfo数组,如图5所示的BlockInfo[A~K],BlockInfo维护的是Block的元数据,结构如图6所示,数据本身是由DataNode管理,所以BlockInfo需要包含实际数据到底由哪些DataNode管理的信息,这里的核心是名为triplets的Object数组,大小为3*replicas,其中replicas是Block副本数量。triplets包含的信息:

triplets[i]:Block所在的DataNode;
triplets[i+1]:该DataNode上前一个Block;
triplets[i+2]:该DataNode上后一个Block;

其中i表示的是Block的第i个副本,i取值[0,replicas)。

BlockInfo继承关系

从前面描述可以看到BlockInfo几块重要信息:文件包含了哪些Block,这些Block分别被实际存储在哪些DataNode上,DataNode上所有Block前后链表关系。

如果从信息完整度来看,以上数据足够支持所有关于HDFS文件系统的正常操作,但还存在一个使用场景较多的问题:不能通过blockid快速定位Block,所以引入了BlocksMap。

BlocksMap底层通过LightWeightGSet实现,本质是一个链式解决冲突的哈希表。为了避免rehash过程带来的性能开销,初始化时,索引空间直接给到了整个JVM可用内存的2%,并且不再变化。集群启动过程,DataNode会进行BR(BlockReport),根据BR的每一个Block计算其HashCode,之后将对应的BlockInfo插入到相应位置逐渐构建起来巨大的BlocksMap。前面在INodeFile里也提到的BlockInfo集合,如果我们将BlocksMap里的BlockInfo与所有INodeFile里的BlockInfo分别收集起来,可以发现两个集合完全相同,事实上BlocksMap里所有的BlockInfo就是INodeFile中对应BlockInfo的引用;通过Block查找对应BlockInfo时,也是先对Block计算HashCode,根据结果快速定位到对应的BlockInfo信息。至此涉及到HDFS文件系统本身元数据的问题基本上已经解决了。

前面提到部分都属于静态数据部分,NameNode内存中所有数据都要随读写情况发生变化,BlockManager当然也需要管理这部分动态数据。主要是当Block发生变化不符合预期时需要及时调整Blocks的分布。这里涉及几个核心的数据结构:

excessReplicateMap:若某个Block实际存储的副本数多于预设副本数,这时候需要删除多余副本,这里多余副本会被置于excessReplicateMap中。excessReplicateMap是从DataNode的StorageID到Block集合的映射集。 neededReplications:若某个Block实际存储的副本数少于预设副本数,这时候需要补充缺少副本,这里哪些Block缺少多少个副本都统一存在neededReplications里,本质上neededReplications是一个优先级队列,缺少副本数越多的Block之后越会被优先处理。 invalidateBlocks:若某个Block即将被删除,会被置于invalidateBlocks中。invalidateBlocks是从DataNode的StorageID到Block集合的映射集。如某个文件被客户端执行了删除操作,该文件所属的所有Block会先被置于invalidateBlocks中。 corruptReplicas:有些场景Block由于时间戳/长度不匹配等等造成Block不可用,会被暂存在corruptReplicas中,之后再做处理。

前面几个涉及到Block分布情况动态变化的核心数据结构,这里的数据实际上是过渡性质的,BlocksManager内部的ReplicationMonitor线程(图5标识Thread/Monitor)会持续从其中取出数据并通过逻辑处理后分发给具体的DatanodeDescriptor对应数据结构(3.3 NetworkTopology里会有简单介绍),当对应DataNode的心跳过来之后,NameNode会遍历DatanodeDescriptor里暂存的数据,将其转换成对应指令返回给DataNode,DataNode收到任务并执行完成后再反馈回NameNode,之后DatanodeDescriptor里对应信息被清除。如BlockB预设副本数为3,由于某种原因实际副本变成4(如之前下线的DataNode D重新上线,其中B正好有BlockB的一个副本数据),BlockManager能及时发现副本变化,并将多余的DataNode D上BlockB副本放置到excessReplicateMap中,ReplicationMonitor线程定期检查时发现excessReplicateMap中数据后将其移到DataNode D对应DatanodeDescriptor中invalidateBlocks里,当DataNode D下次心跳过来后,随心跳返回删除Block B的指令,DataNode D收到指令实际删除其上的Block B数据并反馈回NameNode,此后BlockManager将DataNode D上的Block B从内存中清除,至此Block B的副本符合预期,整个流程如图7所示。

副本数异常时处理过程

3.3 NetworkTopology

前面多次提到Block与DataNode之间的关联关系,事实上NameNode确实还需要管理所有DataNode,不仅如此,由于数据写入前需要确定数据块写入位置,NameNode还维护着整个机架拓扑NetworkTopology。图8所示内存中机架拓扑图。

NetworkTopology内存结构

从图8可以看出这里包含两个部分:机架拓扑结构NetworkTopology和DataNode节点信息。其中树状的机架拓扑是根据机架感知(一般都是外部脚本计算得到)在集群启动完成后建立起来,整个机架的拓扑结构在NameNode的生命周期内一般不会发生变化;另一部分是比较关键的DataNode信息,BlockManager已经提到每一个DataNode上的Blocks集合都会形成一个双向链表,更准确的应该是DataNode的每一个存储单元DatanodeStorageInfo上的所有Blocks集合会形成一个双向链表,这个链表的入口就是机架拓扑结构叶子节点即DataNode管理的DatanodeStorageInfo。此外由于上层应用对数据的增删查随时发生变化,随之DatanodeStorageInfo上的Blocks也会动态变化,所以NetworkTopology上的DataNode对象还会管理这些动态变化的数据结构,如replicateBlocks/recoverBlocks/invalidateBlocks,这些数据结构正好和BlockManager管理的动态数据结构对应,实现了数据的动态变化由BlockManager传达到DataNode内存对象最后通过指令下达到物理DataNode实际执行的流动过程,流程在3.2 BlockManager已经介绍。

这里存在一个问题,为什么DatanodeStorageInfo下所有Block之间会以双向链表组织,而不是其他数据结构?如果结合实际场景就不难发现,对每一个DatanodeStorageInfo下Block的操作集中在快速增加/删除(Block动态增减变化)及顺序遍历(BlockReport期间),所以双向链表是非常合适的数据结构。

3.4 LeaseManager

Lease 机制是重要的分布式协议,广泛应用于各种实际的分布式系统中。HDFS支持Write-Once-Read-Many,对文件写操作的互斥同步靠Lease实现。Lease实际上是时间约束锁,其主要特点是排他性。客户端写文件时需要先申请一个Lease,一旦有客户端持有了某个文件的Lease,其他客户端就不可能再申请到该文件的Lease,这就保证了同一时刻对一个文件的写操作只能发生在一个客户端。NameNode的LeaseManager是Lease机制的核心,维护了文件与Lease、客户端与Lease的对应关系,这类信息会随写数据的变化实时发生对应改变。

LeaseManager的内存数据结构

图9所示为LeaseManager内存结构,包括以下三个主要核心数据结构:

sortedLeases:Lease集合,按照时间先后有序组织,便于检查Lease是否超时;
leases:客户端到Lease的映射关系;
sortedLeasesByPath:文件路径到Lease的映射关系;

其中每一个写数据的客户端会对应一个Lease,每个Lease里包含至少一个标识文件路径的Path。Lease本身已经维护了其持有者(客户端)及该Lease正在操作的文件路径集合,之所以增加了leases和sortedLeasesByPath为提高通过Lease持有者或文件路径快速索引到Lease的性能。

由于Lease本身的时间约束特性,当Lease发生超时后需要强制回收,内存中与该Lease相关的内容要被及时清除。超时检查及超时后的处理逻辑由LeaseManager.Monitor统一执行。LeaseManager中维护了两个与Lease相关的超时时间:软超时(softLimit)和硬超时(hardLimit),使用场景稍有不同。

正常情况下,客户端向集群写文件前需要向NameNode的LeaseManager申请Lease;写文件过程中定期更新Lease时间,以防Lease过期,周期与softLimit相关;写完数据后申请释放Lease。整个过程可能发生两类问题:(1)写文件过程中客户端没有及时更新Lease时间;(2)写完文件后没有成功释放Lease。两个问题分别对应为softLimit和hardLimit。两种场景都会触发LeaseManager对Lease超时强制回收。如果客户端写文件过程中没有及时更新Lease超过softLimit时间后,另一客户端尝试对同一文件进行写操作时触发Lease软超时强制回收;如果客户端写文件完成但是没有成功释放Lease,则会由LeaseManager的后台线程LeaseManager.Monitor检查是否硬超时后统一触发超时回收。不管是softLimit还是hardLimit超时触发的强制Lease回收,处理逻辑都一样:FSNamesystem.internalReleaseLease,逻辑本身比较复杂,这里不再展开,简单的说先对Lease过期前最后一次写入的Block进行检查和修复,之后释放超时持有的Lease,保证后面其他客户端的写入能够正常申请到该文件的Lease。

NameNode内存数据结构非常丰富,这里对几个重要的数据结构进行了简单的描述,除了前面罗列之外,其实还有如SnapShotManager/CacheManager等,由于其内存占用有限且有一些特性还尚未稳定,这里不再展开。

四、问题

随着集群中数据规模的不断积累,NameNode内存占用随之成比例增长。不可避免的NameNode内存将逐渐成为集群发展的瓶颈,并开始暴漏诸多问题。

1、启动时间变长。NameNode的启动过程可以分成FsImage数据加载、editlogs回放、Checkpoint、DataNode的BlockReport几个阶段。数据规模较小时,启动时间可以控制在~10min以内,当元数据规模达到5亿(Namespace中INode数超过2亿,Block数接近3亿),FsImage文件大小将接近到20GB,加载FsImage数据就需要~14min,Checkpoint需要~6min,再加上其他阶段整个重启过程将持续~50min,极端情况甚至超过60min,虽然经过多轮优化重启过程已经能够稳定在~30min,但也非常耗时。如果数据规模继续增加,启动过程将同步增加。

2、性能开始下降。HDFS文件系统的所有元数据相关操作基本上均在NameNode端完成,当数据规模的增加致内存占用变大后,元数据的增删改查性能会出现下降,且这种下降趋势会因规模效应及复杂的处理逻辑被放大,相对复杂的RPC请求(如addblock)性能下降更加明显。

3、NameNode JVM FGC(Full GC)风险较高。主要体现在两个方面:(1)FGC频率增加;(2)FGC时间增加且风险不可控。针对NameNode的应用场景,目前看CMS内存回收算法比较主流,正常情况下,对超过100GB内存进行回收处理时,可以控制到秒级别的停顿时间,但是如果回收失败被降级到串行内存回收时,应用的停顿时间将达到数百秒,这对应用本身是致命的。

4、超大JVM Heap Size调试问题。如果线上集群性能表现变差,不得不通过分析内存才能得到结论时,会成为一件异常困难的事情。且不说Dump本身极其费时费力,Dump超大内存时存在极大概率使NameNode不可服务。

针对NameNode内存增长带来的诸多问题,社区和业界都在持续关注并尝试不同的解决方案。整体上两个思路:(1)扩展NameNode分散单点负载;(2)引入外部系统支持NameNode内存数据。

从2010年开始社区就投入大量精力持续解决,Federation方案[3]通过对NameNode进行水平扩展分散单点负载的方式解决NameNode的问题,经过几年的发展该方案逐渐稳定,目前已经被业界广泛使用。除此之外,社区也在尝试将Namespace存储值外部的KV存储系统如LevelDB[4],从而降低NameNode内存负载。

除社区外,业界也在尝试自己的解决方案。Baidu HDFS2[5]将元数据管理通过主从架构的集群形式提供服务,本质上是将原生NameNode管理的Namespace和BlockManagement进行物理拆分。其中Namespace负责管理整个文件系统的目录树及文件到BlockID集合的映射关系,BlockID到DataNode的映射关系是按照一定的规则分到多个服务节点分布式管理,这种方案与Lustre有相似之处(Hash-based Partition)。Taobao HDFS2[6]尝试过采用另外的思路,借助高速存储设备,将元数据通过外存设备进行持久化存储,保持NameNode完全无状态,实现NameNode无限扩展的可能。其他类似的诸多方案不一而足。

尽管社区和业界均对NameNode内存瓶颈有成熟的解决方案,但是不一定适用所有的场景,尤其是中小规模集群。结合实践过程和集群规模发展期可能遇到的NameNode内存相关问题这里有几点建议:

1、合并小文件。正如前面提到,目录/文件和Block均会占用NameNode内存空间,大量小文件会降低内存使用效率;另外,小文件的读写性能远远低于大文件的读写,主要原因对小文件读写需要在多个数据源切换,严重影响性能。

2、调整合适的BlockSize。主要针对集群内文件较大的业务场景,可以通过调整默认的Block Size大小(参数:dfs.blocksize,默认128M),降低NameNode的内存增长趋势。

3、HDFS Federation方案。当集群和数据均达到一定规模时,仅通过垂直扩展NameNode已不能很好的支持业务发展,可以考虑HDFS Federation方案实现对NameNode的水平扩展,在解决NameNode的内存问题的同时通过Federation可以达到良好的隔离性,不会因为单一应用压垮整集群。

五、总结

NameNode在整个HDFS系统架构中占据举足轻重的位置,内部数据和处理逻辑相对复杂,本文简单梳理了NameNode的内存全景及对其中几个关键数据结构,从NameNode内存核心数据视角对NameNode进行了简单的解读,并结合实际场景介绍了随着数据规模的增加NameNode内存可能遇到的问题及业界各种可借鉴的解决方案。

六、参考

[1] Apache Hadoop. https://hadoop.apache.org/.
[2] Apache Hadoop Source Code. https://github.com/apache/hadoop/tree/branch-2.4.1/.
[3] HDFS Federation. https://issues.apache.org/jira/browse/HDFS-1052.
[4] NemeNode Scalability. https://issues.apache.org/jira/browse/HDFS-5389.
[5] Baidu HDFS2. http://static.zhizuzhefu.com/wordpress_cp/uploads/2013/04/a9.pdf.
[6] Taobao HDFS2. https://github.com/taobao/ADFS.