Hexiaoqiao

Focus on BigData,Distributed System,Hadoop Ecosystem

NameNode FSImage加载优化

一、背景

HDFS NameNode重启效率是另一个被长期诟病的问题,尤其对超大规模集群,动辄数小时的重启时间对整个集群的稳定性和可用性都存在极大的潜在风险,HDFS NameNode重启优化一文对NameNode启动效率提升的优化办法做过简单梳理和探讨,但是从实践情况来看,虽然有提升,但是依然存在优化空间。

从线上一组NameNode重启的抽样数据为例来看,整个重启时间依然非常可观,具体到重启每阶段的时间分布如图1所示(时间占比分布情况受元数据量、NameNode本地存储介质和集群规模等诸多因素影响,不具备一般性,仅供参考)。从时间分布来看,占比较大的是加载FSImage和BlockReport两个阶段,其中FSImage加载的实际时间开销超过小时。所以优化和提升FSImage加载效率对整个进程重启有很大帮助。


本文将在介绍FSImage原有加载方式基础上,分析效率低的原因,并结合加载方式的演进和实践效果对FSImage的并行加载优化进行简单说明,期望能有借鉴和参考价值。

二、重启流程

在HDFS的整个运行期里,整个文件系统的目录树均在NameNode内存中集中管理,但由于内存易失特性,一旦出现进程退出、宕机等异常情况,所有元数据都会丢失,将给整个系统的数据安全会造成不可恢复的灾难。为了更好的容错能力,NameNode(SBN/Secondary)会周期进行Checkpoint,将文件系统的目录树持久化到外部设备,即二进制文件FSImage,这样即使NameNode出现异常也能从持久化设备上恢复元数据,保证了数据的安全可靠。详细分析参考HDFS NameNode重启优化一文。

在HA with QJM架构下,NameNode重启始终以SBN(StandbyNameNode)角色开始。启动过程大致分成以下几个阶段:
0、加载FSImage:从最新持久化的FSImage中恢复文件系统的目录树结构;
1、回放EditLog:通过回放EditLog对齐最新的目录树结构;
2、执行Checkpoint:可选操作;
3、收集所有DataNode的注册和数据块汇报:重建文件的数据块内容具体分布;

FSImage完整记录了文件系统目录树相关的数据。从Hadoop-2.4.0起,FSImage开始使用Google Protocol Buffers编码格式描述(HDFS-5698),详细描述文件见fsimage.proto,当然在后续的版本中也有调整,但整体上没有本质差异。根据描述文件和实现逻辑,FSImage文件组织格式如图2所示。


从fsimage.proto和FSImage文件存储格式容易看到,除了文件头部校验(MAGIC)和尾部文件索引(FILESUMMARY)等管理信息外,核心数据都是与文件系统目录树强相关。其中INODE和INODE_DIR是整个FSImage文件中最核心且也是规模最大的两个部分。

NameNode重启的第一个步骤就是加载FSImage,传统的加载完全按照串行方式执行:
(1)FSImage文件MD5值校验;
(2)读取FSImage文件的Summary数据;
(3)根据Summary的信息对每个Section依次读取、反序列化并构建内存数据结构;
其中两个规模最大的两个Section即INODE和INODE_DIR。假设针对包含1亿个INode目录树的加载过程:需要先将1亿个INode从FSImage文件中按序读入并反序列化;完成后再将包含1亿条父子关系的INODE_DIR按序读入并反序列化,根据反序列化结果将所有子节点的引用按照二分查找的方式插入到父节点维护的数组中,到这里基本上目录树就构建起来(当然如果开启了如SNAPSHOT和Cache等特性的话,还需要将这部分数据加载完成),目录树构建完成后的内存组织情况详情参考NameNode内存全景

使用传统的FSImage加载模式,测试验证~3亿节点规模的目录树,FSImage文件大小~30GB,加载过程时间开销统计:
- MD5校验耗时~125sec;
- FSImage加载时间~811sec;

三、并行加载优化

如果分析FSImage的整个加载过程,尤其是占比最大的INODE和INODE_DIR两个Section容易发现两个特点:
(1)INODE_DIR加载依赖INODE完成,即INODE和INODE_DIR两个Section之间存在严格的先后顺序;
(2)INODE和INODE_DIR两个Section内部Entry(目录树节点数据和节点之间父子关系信息)相互之间其实完全独立;
根据这两个特点,我们可以把INODE和INODE_DIR内部结构进一步做逻辑拆分,切割成多个INODE_SUB和多个INODE_DIR_SUB便于后续并行处理,其中:

将INODE和INODE_DIR两个Section进行逻辑拆分后其实不影响FSImage物理上的组织结构。为了能把INODE_SUB和INODE_DIR_SUB真正的分配给独立的线程且不重不漏,只需要在FSImage文件的FILESUMMARY索引里对逻辑SUB_SECTION(INODE_SUB+INODE_DIR_SUB)做好记录:偏移量+长度。


FSImage文件索引数据就绪后,当再次重启触发加载时,根据SUB_SECTION的个数及配置的加载线程数进行均衡拆分:
(1)确保多个线程之间分配到的SUB_SECTION尽可能相同;
(2)每一个SUB_SECTION只被一个线程独立消费;
通过这种方式拆分,FSImage的加载过程可以演变成如下图4所示流程。


从整个优化思路可以看到,FSImage文件物理结构没有大调整,仅对FILESUMMARY做了简单扩展,核心数据组织上没有做任何改变,所以兼容能力上相对更好。
(1)使用原有逻辑加载新格式的FSImage文件时仅需要在读取FILESUMMARY时将INODE_SUB和INODE_DIR_SUB两类SECTION的索引过滤掉就可以,事实上在实现时也是这么处理的,即使是较早版本的实现也非常容易修复;
(2)并行逻辑加载原有格式FSImage文件时因为在FILESUMMARY中没有SUB_SECTION描述,所以及时升级到并行加载逻辑在真正执行时也是单线程完成;

整体上看,这种并行加载方案在效率和兼容性上能做到兼顾。

四、其他优化

除了对INODE和INODE_DIR并行加载优化外,其实社区参与者还提出了其他实现逻辑的优化,其中效果较好的主要有:

(1)异步MD5检查;
在图4的FSImage加载流程里,为了检查FSImage文件合法性,第一步需要对其进行MD5校验。这个步骤需要执行一次完整的FSImage文件读操作,如果文件较大,IO开销比较可观。为了提升加载效率,HDFS-13694提出将MD5检查逻辑从主流程中摘出来,用独立的线程异步执行检查,减少文件合法性检查引入的时间开销。

(2)加载INODE_DIR/INODE_DIR_SUB去掉二分查找逻辑;
如前述,INODE_DIR Section实际上是为了构建目录树节点之间的父子关系。为了提升检索效率,父节点使用数组按照字符序维护子节点引用的集合,这样处理后读取时容易通过二分查找的方式在O(logn)时间复杂度内就完成检索;为了满足子节点有序的条件,传统方式当加载INODE_DIR/INODE_DIR_SUB时也是先按照二分查找的方法定位到每一个子节点引用应该插入到数组的具体位置,再执行插入操作。但事实上,对目录树序列化操作时(即执行Checkpoint)子节点本身都已经是有序持久化到FSImage的INODE_DIR Section内,所以加载的时候每一次二分查找的目标位置一定是数组尾部。这种情况下其实二分查找定位目标位置的逻辑完全没有必要。HDFS-13693提出按照INODE_DIR内Entry的加载顺序逐个插入子节点数组的尾部,去掉二分查找逻辑。

五、效果验证

测试环境

1、基础环境:
CPU: Intel® Xeon® CPU 2.60GHz
OS: CentOS 6.6
FS: EXT4 on HDD
JDK: Java HotSpot™ 64-Bit Server VM 1.8.0

2、数据规模:
INode总数:~3亿
Block数:~3亿
FSImage大小:~30GB

3、测试场景:
原生加载
并行加载(默认线程数12)
并行加载+异步MD5校验
并行加载+异步MD5校验+跳过二分查找

结果对比

下图5是针对同一份元数据使用不同策略执行加载过程耗时情况。


结果说明:
(1)将INODE和INODE_DIR两个最大规模的元数据并行化加载收益最明显,使用12个线程数加载效率有~40%的提升;实际场景中具体应该使用多少线程执行并行加载需要综合考虑服务器核数和元数据规模等。
(2)MD5检查是容易被忽略的部分,尤其针对较大规模元数据场景,因为需要一次完整IO过程,所以开销也比较可观~10%;这里的主要开销在IO,所以与FSImage文件大小强相关。
(3)虽然在加载INODE_DIR构建节点之间父子关系时可以跳过二分查找,直接进入数组尾部,但是效果并不明显。原因在于:一般场景子节点的规模都不大,另外二分查找本身的开销非常低;在常规服务器上即使对1亿规模的数据执行1亿次二分查找的耗时不超过10秒;

使用参数

FSImage并行加载特性使用的主要参数如下:
1、dfs.image.parallel.load: 描述是否开启并行加载特性;
2、dfs.image.parallel.target.sections: 描述开启并行加载特性后新生成的FSImage里包含的SUB_SECTION个数,一般建议设置为dfs.image.parallel.threads的整数倍;
3、dfs.image.parallel.inode.threshold: 描述开启并行加载特性所需节点数的最小阈值,对小规模元数据并行加载并不会有很好的效果,所以默认在1000000节点规模下,并行加载特性不会开启;
4、dfs.image.parallel.threads: 描述并行加载使用的线程数,需要综合考虑元数据规模和NameNode进程所在服务器的承载能力适当调整。

hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  <property>
    <name>dfs.image.parallel.load</name>
    <value>true</value>
    <description>
      If true, write sub-section entries to the fsimage index so it can
      be loaded in parallel. Also controls whether parallel loading
      will be used for an image previously created with sub-sections.
      If the image contains sub-sections and this is set to false,
      parallel loading will not be used.
      Parallel loading is not compatible with image compression,
      so if dfs.image.compress is set to true this setting will be
      ignored and no parallel loading will occur.
      Enabling this feature may impact rolling upgrades and downgrades if
      the previous version does not support this feature. If the feature was
      enabled and a downgrade is required, first set this parameter to
      false and then save the namespace to create a fsimage with no
      sub-sections and then perform the downgrade.
    </description>
  </property>

  <property>
    <name>dfs.image.parallel.target.sections</name>
    <value>12</value>
    <description>
      Controls the number of sub-sections that will be written to
      fsimage for each section. This should be larger than
      dfs.image.parallel.threads, otherwise all threads will not be
      used when loading. Ideally, have at least twice the number
      of target sections as threads, so each thread must load more
      than one section to avoid one long running section affecting
      the load time.
    </description>
  </property>

  <property>
    <name>dfs.image.parallel.inode.threshold</name>
    <value>1000000</value>
    <description>
      If the image contains less inodes than this setting, then
      do not write sub-sections and hence disable parallel loading.
      This is because small images load very quickly in serial and
      parallel loading is not needed.
    </description>
  </property>

  <property>
    <name>dfs.image.parallel.threads</name>
    <value>12</value>
    <description>
      The number of threads to use when dfs.image.parallel.load is
      enabled. This setting should be less than
      dfs.image.parallel.target.sections. The optimal number of
      threads will depend on the hardware and environment.
    </description>
  </property>

社区在优化和解决FSImage加载问题的讨论持续了较长时间,其中比较典型的解决方案还有如HDFS-7784HDFS-13700等,这些解决方案虽然没有最终合入主干代码,但是都提供了非常不错的想法。综合考虑性能、稳定性和兼容能力,HDFS-14617优势明显,另外从实践效果上看HDFS-14617也有较好的表现。如果集群规模和元数据规模较大,且重启加载FSImage阶段耗时严重,并行加载特性值得一试。

六、参考

[1] https://issues.apache.org/jira/browse/HDFS-14617
[2] https://issues.apache.org/jira/browse/HDFS-13694
[3] https://issues.apache.org/jira/browse/HDFS-13693

HDFS锁机制优化方向讨论

一、背景

众所周知,NameNode全局锁(FSNamesystemLock)问题一直是制约HDFS性能尤其是NameNode处理能力的主要原因。为此,社区和业界经过多次尝试,试图解决NameNode全局锁问题,但是从结果来看,都不理想。

本文将首先梳理NameNode当前的锁机制以及解决全局锁问题所面临的困难,结合经典分布式文件系统在这个问题上的一般解法,尝试给出可能的解决思路。

二、全局锁机制

NameNode是整个HDFS的核心组件[1],集中管理HDFS集群的所有元数据,主要包括文件系统的目录树、数据块集合和分布以及整个集群的拓扑结构。


同GFS一样HDFS采用了”一次写多次读“的读写模型来满足离线数据处理场景的存储需求,在此基础上,进一步放松一致性模型简化文件系统。在具体实现上,相比GFS1.0,HDFS做了更大胆取舍,锁机制上使用全局锁来统一来控制并发读写。这样处理的优势非常明显,全局锁进一步简化锁模型,不需要额外考虑锁依赖关系,同时降低复杂度,减少工程量。但是问题比优势更加突出,核心问题就是全局唯一锁制约性能提升。

为了更好地理解使用全局锁存在的问题,首先梳理全局锁管理的主要数据结构,大致分成三类:
(1)目录树:文件系统的全局目录视图。获取目录树上任一节点的信息必须先拿到全局读锁;目录树上任一节点新增、删除、修改都必须先拿到全局写锁。
(2)数据块集合:文件系统的全量数据信息。获取其中任一数据块信息必须先拿到全局读锁;新增、删除,修改都必须先拿到全局写锁。
(3)集群信息:HDFS集群节点信息的集合。获取节点信息等必须先拿到全局读锁;注册,下线或者变更节点信息请求处理时必须先拿到全局写锁。当然为了减少对全局影响,后续版本里少数如生命线等RPC请求不再获取全局锁,部分不适合使用全局锁的处理逻辑,将并发控制下放到具体的节点信息,尝试提升处理能力。


具体实现上,NameNode使用了JDK提供的可重入读写锁(ReentrantReadWriteLock),我们知道ReentrantReadWriteLock对并行请求有严格限制,简单来说:读锁并行写锁排它。

针对不同RPC请求的处理逻辑,按照需要获取锁粒度,我们可以把所有请求抽象为读(Read Handler,获取全局读锁)和写(Write Handler,获取全局写锁)两类。
Read Handler:客户端请求(getListing/getBlockLocations/getFileInfo)、服务管理接口(monitorHealth/getServiceStatus)和主从节点之间请求(getTransactionID)等;
Write Handler:客户端请求(create/mkdir/rename/append/truncate/complete/recoverLease)、服务管理接口(transitionToActive/transitionToStandby/setSafeMode)和主从节点之间请求(rollEditLog)等;
这里只列了一些常用请求类型,其他如Cache/Snapshot/ACL/XAttr/Quota/Lease及NameNode内部线程调用等需要获取锁的逻辑没有再详细列出和归类。


NameNode的锁控制如图3所示。核心处理逻辑路径上有两把锁:FSNamesystemLock和FSEditLogLock。其中FSNamesystemLock即为通常所说的全局锁,采用ReentrantReadWriteLock机制实现;另外为了实现高可靠/高可用的目的,NameNode需要将对部分元数据的修改实时同步到EditLog,为了提升性能,EditLog读写不在FSNamesystemLock锁内执行,独立维护锁控制并行读写,暂称为FSEditLogLock,采用Synchronized排它机制实现。
(1)获取全局锁(FSNamesystemLock)入口:外部RPC请求从IPC层进入NameNode和内部线程请求;
(2)获取局部锁(FSEditLogLock)入口:主要来源外部RPC请求对元数据的写操作;

以RPC请求#mkdir为例:
(1)RPC请求从IPC层进入NameNode;
(2)获取全局写锁(FSNamesystemLock#writeLock#lock),如果持有读锁或者写锁的请求正在被处理,排队等待;
(3)更新内存目录树结构;
(4)释放全局写锁(FSNamesystemLock#writeLock#unlock);
(5)获取EditLog排它锁;
(6)写EditLog;
(7)释放EditLog排它锁;
(8)通过IPC层将结果返回客户端;

可以看到,单个RPC请求处理流程经过了两次获取锁阶段。虽然二者相互独立,但其中任意一处如果不能及时获取到锁,RPC将处于排队等待状态,直到成功获得锁。等锁时间直接影响请求响应性能,极端场景下如果长时间不能获得锁,将造成IPC队列堆积,TCP连接队列被打满,客户端出现请求超时或者失败重试,新建连接超时失败等各种异常问题。
另外从全局来看,写锁因为排它对性能影响更加明显。如图3所示,如果当前有写请求正在被处理,其他所有请求都必须排队等待,直到写请求被处理完成释放锁后再竞争全局锁。

通常情况下,FSNamesystemLock锁范围要远大于FSEditLogLock锁范围。考虑负载较高的大规模集群,按照9:1读写比预估,只有10%请求需要同时获取FSNamesystemLock和FSEditLogLock,但是100%请求需要获取全局锁FSNamesystemLock。再加上新型硬件(SSD/3DPoint/PM)对IO性能的支持,EditLog写入性能远高于实际需求。所以从整体上看,当集群规模增加和负载增高后,全局锁FSNamesystemLock将逐渐成为NameNode性能瓶颈。如果能彻底解决NameNode全局锁问题,HDFS性能将得到极大提升。

三、拆锁复杂度

如前述,NameNode全局锁的拆分能带来非常可观的收益,Hadoop社区和业界也尝试过多次,但是从结果来看,效果都不理想。就我个人理解,其中问题复杂度客观存在,当然也有一些主观因素。总结下来有几个方面:

1、问题复杂度
Hadoop发展到今天已经超过十年,其中HDFS经过多次迭代演进,架构已经非常复杂。图4所示为HDFS项目包含和依赖的不完全组件列表,即使从事HDFS开发和运维的专业人员,想要完整了解和掌握HDFS的所有组件绝非易事。


仅针对NameNode组件,架构上模块划分不够清晰,内部核心数据结构和工作线程之间耦合非常严重。比如:
(1)INodeFile通过对象引用关联Block,这种引用关系存在天然的耦合,很难通过不同锁进行并发访问控制;
(2)数据块写入完成后除了直接更新Block状态外,还需要再次回去更新文件属性,比如存储空间占用;
(3)在可靠性上使用到的FSImage和FSEditLog两份持久化数据内Namespace和BlocksMap数据共存;
实现细节上,还存在大量相互依赖,不一而足。

除了问题本身的复杂度,工程复杂度也比较高,据不完全统计,trunk分支上仅HDFS项目代码量超过1000K LOC,其中非测试代码量超过760K LOC,包括了超过2000类文件,要想优雅实现锁粒度拆分工程量很大。

2、实际需求
以社区版本branch-2.7为例,经过性能优化,NameNode处理能力可以达到5000TPS(写请求)或200000QPS(读请求),这种处理能力能够满足大多数公司的实际需求。如果负载超过这个量级一般也能通过Federation架构做横向扩展解决(虽然Federation架构在使用上会遇到很多意想不到的问题)。真正有实际需求,并需要尝试降低NameNode全局锁粒度解决性能问题的场景并不多。
NOTE:性能数据是具体场景读写比例压测结果,不具备通用性,请谨慎参考。

3、社区动力不足
社区在全局锁和扩展性问题上做过多次尝试。比较有代表性的几类工作如下:

HDFS-8966:Separate the lock used in namespace and block management layer
HDFS-5453:Support fine grain locking in FSNamesystem
HDFS-8286:Scaling out the namespace using KV store
HDFS-7836:BlockManager Scalability Improvements

几类方案中都描述了非常好的愿景,但是这些工作多数只推进了其中一部分,有的甚至还处于方案讨论阶段。总之,从几次尝试工作的结果来看,社区在这个方向上的动力并不足,投入有限。

4、历史问题
HDFS最初设计时为了实现简单方便做了很多取舍,其中全局锁是对后续的发展影响较大的一个。之后架构迭代中,大量工程实现都在全局锁基础上构建,确实对开发工作有很多便捷,但是如果想尝试梳理清楚和优雅拆分难度较大。

四、拆锁讨论

事实上,在分布式文件系统中,为实现解决数据一致性,通常都会不可避免遇到锁问题。不同的是,对于适合不同场景的文件系统,做的妥协或采用的方法有很大差异。借鉴成熟文件系统的锁模型,可以为HDFS拆锁工作提供一些参考和借鉴。其中Alluxio是非常好的参考对象,本章在调研Alluxio锁模型基础上,分析降低NameNode全局锁粒度的可能发展方向。

4.1 Alluxio内存锁模型

Alluxio[2]是一个基于内存的分布式文件系统,得益于云计算场景下的良好表现,被广泛部署和应用。
同HDFS类似,Alluxio也使用了Master-Slave的架构,其中Master管理Alluxio集群所有的元数据,包括目录树结构、数据块集合和分布及集群节点信息。实现上,FileSystemMaster负责管理整个目录树,BlockMaster管理数据块集合和分布,集群节点信息由BlockMaster中单独的集合数据结构mWorkers独立管理。

整体框架上与HDFS非常相似,但是具体到实现上,差异比较明显。
(1)FileSystemMaster和BlockMaster完全独立,通过blockid关联;
(2)mWorkers与FileSystemMaster/BlockMaster之间不存在复杂的耦合关系;
为了实现数据一致性,FileSystemMaster/BlockMaster/mWorkers之间独立加锁,以达到最好的并行性能。具体来看:

1、FileSystemMaster中目录树上所有节点各自维护读写锁(ReentrantReadWriteLock),控制并发读写:
(1)一元操作符:按照路径从根目录开始顺序加锁,写锁只加到最后一级目录,其他目录均加读锁;
(2)二元操作符:对公共目录非最后一级加读锁,最后一级根据操作符加读/写锁,剩余目录按照最后一级公共目录顺序加锁。
(3)为避免死锁,对于二元或者多元操作符先按路径排序,根据排序结果顺序对路径分别加锁;


2、BlockMaster完全独立于FileSystemMaster,核心数据结构mBlocks使用ConcurrentHashMap控制并发读写,具体到单个Block操作使用synchronized控制;

3、mWorker本身使用线程安全的集合数据结构管理,涉及到注册心跳等操作时,为每一个worker独立加锁;

从整个锁逻辑上看,有几点非常值得借鉴的地方:
(1)所有模块之间耦合度极低,核心逻辑不存在排它锁影响性能;
(2)为了将锁影响控制到最低,使用了大量在具体对象(block/worker)上加锁逻辑,而不是全局;

当然,凡是都有利弊,降低锁冲突提升性能一定是需要付出代价的:
(1)内存开销,因为在FileSystemMaster中目录树所有节点上独立使用读写锁(ReentrantReadWriteLock),会存在大量的内存对象的开销,制约Alluxio集群规模;在64bit环境上统计数据结构ReentrantReadWriteLock的footprint:

ReentrantReadWriteLock footprint
1
2
3
4
5
6
7
8
java.util.concurrent.locks.ReentrantReadWriteLock@29444d75d footprint:
     COUNT       AVG       SUM   DESCRIPTION
         1        24        24   java.util.concurrent.locks.ReentrantReadWriteLock
         1        48        48   java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync
         1        16        16   java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock
         1        16        16   java.util.concurrent.locks.ReentrantReadWriteLock$Sync$ThreadLocalHoldCounter
         1        16        16   java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock
         5                 120   (total)

对于一个10亿节点的目录树,仅ReentrantReadWriteLock对象的内存开销就将到~120GB,显然是一个巨大的开销。
(2)Alluxio的Master节点为了实现高可用,本身采用集群方式部署,为了保证一致性,所有元数据必须同步。这里涉及到FileSystemMaster/BlockMaster的Journal独立持久化逻辑,Alluxio实现时,将这部分逻辑都放在了锁内,对写请求处理的性能影响较大。

4.2 GFS锁模型

重新回顾GFS1.0是如何管理目录树和目录锁。下面是从论文《The Google File System》[4]中摘抄的有关目录树和锁机制的描述段落。

Many master operations can take a long time: for example, a snapshot operation has to revoke chunkserver leases on all chunks covered by the snapshot. We do not want to delay other master operations while they are running. Therefore, we allow multiple operations to be active and use locks over regions of the namespace to ensure proper serialization. Unlike many traditional file systems, GFS does not have a per-directory data structure that lists all the files in that directory. Nor does it support aliases for the same file or directory (i.e, hard or symbolic links in Unix terms). GFS logically represents its namespace as a lookup table mapping full pathnames to metadata. With prefix compression, this table can be efficiently represented in memory. Each node in the namespace tree (either an absolute file name or an absolute directory name) has an associated read-write lock. Each master operation acquires a set of locks before it runs. Typically, if it involves /d1/d2/…/dn/leaf, it will acquire read-locks on the directory names /d1, /d1/d2, …, /d1/d2/…/dn, and either a read lock or a write lock on the full pathname /d1/d2/…/dn/leaf. Note that leaf may be a file or directory depending on the operation. We now illustrate how this locking mechanism can prevent a file /home/user/foo from being created while /home/user is being snapshotted to /save/user. The snapshot operation acquires read locks on /home and /save, and write locks on /home/user and /save/user. The file creation acquires read locks on /home and /home/user, and a write lock on /home/user/foo. The two operations will be serialized properly because they try to obtain conflicting locks on /home/user. File creation does not require a write lock on the parent directory because there is no “directory”, or inode-like, data structure to be protected from modification. The read lock on the name is sufficient to protect the parent directory from deletion. One nice property of this locking scheme is that it allows concurrent mutations in the same directory. For example, multiple file creations can be executed concurrently in the same directory: each acquires a read lock on the directory name and a write lock on the file name. The read lock on the directory name suffices to prevent the directory from being deleted, renamed, or snapshotted. The write locks on file names serialize attempts to create a file with the same name twice. Since the namespace can have many nodes, read-write lock objects are allocated lazily and deleted once they are not in use. Also, locks are acquired in a consistent total order to prevent deadlock: they are first ordered by level in the namespace tree and lexicographically within the same level.

从这段描述里我们可以看到GFS对锁的管理:
(1)目录树中节点各自独立管理锁来控制并发;
(2)对锁模型更加激进,比如创建文件在整条路径上只使用读锁;
(3)为了避免死锁,对多元操作符按照路径排序后顺序加锁;
整体来看,Alluxion目录树锁机制与GFS锁机制异曲同工,将并行处理能力最大化。

4.3 HDFS拆锁讨论

借鉴和参考前面两类文件系统锁机制实现并结合HDFS现状,我个人认为HDFS降低全局锁粒度的可能发展路线:
1、垂直拆分[3]
NameNode内存几个核心数据结构里,DataNodeManager管理的内容相对独立,比较容易独立拆分出去,事实上社区现在基本完成了这个工作,下面只考虑两个核心数据结构Namespace和BlocksMap:
(1)按照HDFS Federation架构的思路,在单NameNode进程内实施Federation;
(2)将Namespace按照Range进行垂直切分;
(3)Namespace变化成两级管理结构;

Double-level-struction
1
2
RangeMap:Range-GSet
GSet:key-INode/BlockInfo

(4)Range内独享锁,Range之间可并行访问;
(5)跨Range多元操作符按照Range排序后顺序加锁避免死锁;
(6)当单进程整体负载较高时,Range重新分配独立进程,实现动态切分目录树的效果;

目录树的垂直切分思路到最后可以跟HDFS Federation很好的结合起来(虽然HDFS Federation架构存在很多问题)实现类似Ceph中简化版Dynamic Subtree Partitioning目标。



2、水平拆分
NameNode全局锁水平拆分的思路可以借鉴GFS1.0和Alluxio解决思路,按照两个阶段降低NameNode锁粒度:
第一阶段:对NameNode核心数据结构进行分层解耦,不同层独立持锁;
第二阶段:降低Namespace层锁粒度;

第一阶段分层解耦:
(1)Namespace层维护与目录树有关的所有数据结构(INodeMap,Lease等),核心是INodeMap,目录树文件节点上通过List BlockIds即数据块序号维护与数据块的关系,取代对象索引;
(2)BlocksManager层维护与数据块相关的所有数据结构(BlocksMap,ReplicationMonitor,NetworkTopology等),核心是BlocksMap:GSet<BlockId, BlockInfo>;将副本数和存储策略等与数据块有关的属性统一下沉到BlockInfo内,降低Namespace与BlocksManager的耦合;(一部分工作社区已经完成)
(3)DataNodeManager层仅维护集群节点数据结构,不维护拓扑结构(非重点,当前的实现已经不在锁内);
(4)每一层维护独立锁,开放接口以线程安全方式对外暴露。

拆分后同一进程内会出现多把独立锁,不可避免会存在锁内相互调用的问题,为了避免出现死锁,可以做简单约束:
(1)单次请求处理涉及数据结构, 或者<Namespace,BlocksMap>;
(2)尽可能减少或避免锁内跨层调用(如Alluxio);
(3)特殊场景需要锁内跨层调用时,仅允许Namespace到BlocksMap单向调用;


第二阶段降低锁粒度:
(1)目录树全局锁下沉到目录树节点,不过如前述因为ReentrantReadWriteLock的footprint较大,直接使用容易造成内存瓶颈。可以选择以下优化和改进:
* 按照满足读写锁能力的最小资源重新实现Lock,降低整颗目录树节点使用锁后的内存占用;
* 维护独立的目录锁动态子树;因为NameNode进程内提供的请求处理线程数有限,目录锁子树规模非常小,几乎没有管理和遍历的成本;
(2)写锁仅持有写操作的最后一级目录,其他父目录均加读锁;
(3)多元操作符按照请求目录排序后顺序加锁避免死锁;


全局操作类型,比如safemode/haadmin/metasave因为都是superuser类请求,频率非常低,不需要再维护独立锁。为了简化,对大部分superuser管理类型的请求可以同时获取两把写锁,对整体性能不会有影响。

五、总结

NameNode全局锁一直是影响HDFS性能的关键问题,尽管社区在这方面做过多次尝试,但是结果都不是很理想。其中的问题难度客观存在。
(1)HDFS架构快速迭代和演进,丰富的功能和更加复杂组件让HDFS内部模块之间存在千丝万缕的耦合关系,完全梳理清楚成本较高;
(2)HDFS项目代码量除单元测试外接近780K LOC,工程量很大;
(3)设计之初为实现简单做了很多取舍,比如全局锁及在此基础上的大量工程实现(“战术的勤奋掩盖战略的懒惰”的实例);
虽有难度但也存在办法,本文在参考其他分布式文件系统锁模型的基础上,结合当前HDFS实际情况和业界正在尝试的方向,期望提供两种降低全局锁粒度的思路和可能演进方向,两种演进方向相互没有依赖,可以并行演进。

当然,提升性能或者扩展能力,拆分NameNode全局锁并不是唯一解。比如由LinkedIn和Hortorworks分别在推进的Observer NameNode和OZone都是非常好的思路。
Observer NameNode通过开放Standby读能力提升NameNode整体QPS。
Hadoop OZone通过引入对象存储思路,将文件系统的元数据进行分解下沉,期望能够实现良好的性能和扩展能力。

六、参考

[1] https://hadoop.apache.org/docs/r3.2.0/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
[2] https://www.alluxio.org/
[3] https://engineering.linkedin.com/blog/2019/02/the-present-and-future-of-apache-hadoop--a-community-meetup-at-l
[4] https://static.googleusercontent.com/media/research.google.com/en//archive/gfs-sosp2003.pdf

大数据职位招聘

数据平台 - Hadoop实习工程师

职位描述

1、搭建维护Hadoop、HBase、Spark等集群并进行必要的troubleshooting,保障系统正常运行;
2、提高集群的使用效率及稳定性,并对整个系统做合理分离,满足各种应用场景的使用;
3、构建集群监控体系、可视化体系、调度系统,保障集群安全和数据安全;
4、负责搭建BI系统,数据仓库模型的ETL实施;
5、数据使用开发平台;
6、各个业务线的数据接入及数据一致性建设;
7、不断解决规模增长带来的技术和业务问题,确保数据平台稳定性可靠性和线性扩展能力,驱动业务发展。

职位要求

1、对技术有着永无止境的追求,自认为是技术Geek,具备很强的问题解决能力;
2、熟悉Hadoop生态系统开源项目,至少精读过其中某一个的源码,对大规模数据处理具有独到的理解,有Patch源代码经验者优先;
3、有Hadoop经验优先。

金融安全部 - 数据PM岗

职位描述

1、负责数据治理体系的规划,设计,和实现落地;
2、负责数据合法合规、数据安全、内外部业务方面的需求管理和分析,识别需求本质,采用产品化思维解决问题;
3、沟通和协调各部门资源,组织跨团队协作并推进产品实施,驱动项目进度,按期保质完成项目各阶段目标;
4、其他数据相关工作等。

职位要求

1、有数据分析、产品经理或金融行业工作经验;
2、数学、统计、金融、计算机或信息技术等相关专业本科及以上学历;
3、具有一定的产品文档能力,能够产出标准的产品需求文档,熟练掌握Axure等原型设计工具;
4、具备优秀的沟通、协调能力,和团队合作精神;思路清晰,善于提炼、分析并解决问题;
5、具备极强的责任心、抗压能力、沟通理解能力及协同推动能力。

加分项

1、对数据驱动业务有一定理解,对数据与安全业务有一定敏感性,有较强的逻辑分析能力、独立思考能力和创新能力;
2、具有互联网行业的数据产品经验者,或安全业务数据分析经验,或金融行业从业经验者优先;
3、熟悉Oracle、Mysql等数据库,精通SQL;
4、熟悉数据挖掘基本原理,数据挖掘方法论和基本算法;精通至少一种数据分析/挖掘工具,如SAS等。

金融安全部 - 数据流通岗

职位描述

1、负责内外部数据项目管理工作,组织推进各部门各角色快速高效完成数据类项目;
2、负责数据管理制度的拟定、发布及推动落地;
3、其他数据相关工作等。

职位要求

1、至少1年以上项目管理或数据安全管理工作经验;
2、处理过跨部门、多角色合作的复杂事务,思路清晰,善于提炼、分析并解决问题;
3、具备极强的责任心、抗压能力、沟通理解能力及协同推动能力。

加分项

1、具备传统金融或互联网金融企业从业背景者优先;
2、有安全技术背景或数据安全管理经验者优先。

联系方式:xq.he2009#gmail.com(#替换@)

HDFS BlockToken机制解析

一、背景

敏感信息和隐私数据的安全保障是互联网公司非常关心的问题,尤其进入大数据时代,稍有不慎就会出现重大安全事故,所以数据安全问题就变得越来越重要。

Hadoop作为数据平台的基础设施,需要优先关注和解决好安全问题。虽然安全特性对Hadoop非常重要,不过社区直到2011年末随Hadoop-1.0.0才第一次正式发布Hadoop Security,在这之前Hadoop社区版存在较大的安全隐患,需要用户自行解决。

当然数据安全本身是一个复杂的系统工程,想要描述清楚和完美解决几乎不可能。尽管如此,合理有效的安全保障是必要的。本文就Hadoop中数据块安全问题,从设计权衡和实现原理进行简单分析和梳理,简要阐述当前方案在实践中可能遇到的问题,同时提供可借鉴的解决思路。

二、Hadoop安全概述

Hadoop安全需要解决两个问题:
(1)认证:解决用户身份合法性验证问题;
(2)授权:解决认证用户的操作范围问题;
其中认证问题通过Kerberos能够很好地解决,并通过HADOOP-4487在Hadoop内部设计了一套Token机制完美实现了安全认证问题,同时在性能上得到保证,图1为Hadoop安全认证体系概要图示。关于Hadoop Security特性的细节参考HADOOP-4487,这里不再展开。


社区针对这个问题在2008.10与Hadoop Security特性同步开始设计BlockToken方案HADOOP-4359,经过半年左右时间在2009.05完成并发布,BlockToken特性可以非常好地保护数据块安全。可以说HADOOP-4487和HADOOP-4359构建起整个Hadoop安全体系,本文重点关注HADOOP-4359。

社区针对这个问题在2008.10与Hadoop Security特性同步开始设计BlockToken方案HADOOP-4359,经过半年左右时间在2009.05完成并发布,通过BlockToken数据块安全问题也得到了很好的解决。可以说HADOOP-4487HADOOP-4359构建起了整个Hadoop安全体系。

三、安全基础简介

BlockToken方案使用HMAC(Hash Message Authentication Code)[1]技术实现对合法请求的访问认证检查。

HMAC是一种基于HASH函数和共享密钥的消息安全认证协议,它可以有效地防止数据在传输的过程中被截取和篡改,维护数据的安全性、完整性和可靠性。HMAC可以与任何迭代HASH函数结合使用,MD5和SHA-1就是这种HASH函数。实现原理是用公开函数和共享密钥对原始数据产生一个固定长度的值作为认证标识,用这个标识鉴别消息的完整性。使用密钥生成一个固定大小的消息摘要小数据块即HMAC,并加入到消息中一起传输。接收方利用与发送方共享的密钥对接收到的消息进行认证和合法性检查。这种算法不可逆,无法通过消息摘要反向推导出消息,因此又称为单向HASH函数。通过这种技术可以有效保证数据的安全性、完整性和可靠性。

HMAC算法流程: (1)消息传递前,Alice和Bob约定共享密钥和HASH函数; (2)Alice把要发送的消息使用共享密钥计算出HMAC值,然后将消息和HMAC发送给Bob; (3)Bob接收到消息和HMAC值后,使用共享密钥独立计算消息本身的HMAC值,与接收到的HMAC值对比; (4)如果二者的HMAC值相同,说明接收到的消息是完整的,且是Alice发送;

BlockToken方案默认使用了经典的HMAC-SHA1算法,对照前面的流程,Alice代表的是NameNode,Bob代表DataNode,客户端在整个过程中仅作为数据流转的节点。因为HMAC能够保证数据传输过程中不被截取和篡改,只要NameNode给客户端发放了BlockToken,即可认为该客户端申请对单个数据块的访问权限是可信赖的,DataNode只要对BlockToken检查通过就必须接受客户端表述的所有权限。

四、HDFS BlockToken机制

Token机制是整个Hadoop生态里安全协议的重要组成部分,在HDFS内部包括两个部分:
(1)客户端经过初始认证(Kerberos),从NameNode获取DelegationToken,作为后续访问HDFS的凭证;
(2)客户端读写数据前,请求NameNode获取对应数据块Block信息和BlockToken,根据结果向对应DataNode真正请求读写数据。请求到达DataNode端,根据客户端提供的BlockToken进行安全认证检查,通过后继续后续步骤,否则请求失败;

第二部分就是HADOOP-4359和本文主要关注的内容。

4.1 HDFS读写流程

开始详细梳理BlockToken原理之前,首先简单梳理下如图2所示的HDFS读写流程:


(1)客户端读写操作(open/create)需首先获取数据块Block分布,根据文件路径请求NameNode获取LocatedBlock;
(2)如果是读操作,根据返回LocatedBlock集合,从中选择合适的DataNode进行读数据请求,若需要读取的数据分布在多个Block,按顺序逐个切换到对应DataNode读取;
(3)如果是写操作,首先将返回的LocatedBlock中所有DataNode建立数据管道(Pipeline),然后开始向数据管道里写数据,若写出的数据不能在一个Block内完成,再次向NameNode申请LocatedBlock,直到所有数据成功写出;
(4)读写操作完成,关闭数据流;

LocatedBlock是衔接整个读写流程的关键数据结构:

LocatedBlock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LocatedBlock {
  private final ExtendedBlock b;
  private long offset;  // offset of the first byte of the block in the file
  private final DatanodeInfoWithStorage[] locs;
  /** Cached storage ID for each replica */
  private String[] storageIDs;
  /** Cached storage type for each replica, if reported. */
  private StorageType[] storageTypes;
  // corrupt flag is true if all of the replicas of a block are corrupt.
  // else false. If block has few corrupt replicas, they are filtered and 
  // their locations are not part of this object
  private boolean corrupt;
  private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
  ......
}

4.2 BlockToken数据结构

前一节提到的LocatedBlock除了标识数据块Block信息外,还包含了认证流程中的核心数据结构blockToken:

Token.java
1
2
3
4
5
6
7
public class Token<T extends TokenIdentifier> implements Writable {
  private byte[] identifier;
  private byte[] password;
  private Text kind;
  private Text service;
  private TokenRenewer renewer;
}

blockToken的主要属性如下:
(1)kind标识的是Token的类型,这里为常量“HDFS_BLOCK_TOKEN”;
(2)service用来描述请求的服务,一般由服务端的”host:port”组成,对blockToken一般置空;
(3)TokenRenewer在客户端生命周期内周期Renew,避免因为Token过期造成请求失败,对BlockToken未见Renew的显性实现,所以BlockToken只在有效期内生效;
(4)identifier是BlockTokenIdentifier的序列化结果:

BlockTokenIdentifier.java
1
2
3
4
5
6
7
8
public class BlockTokenIdentifier extends TokenIdentifier {
  private long expiryDate;
  private int keyId;
  private String userId;
  private String blockPoolId;
  private long blockId;
  private final EnumSet<AccessMode> modes;
}

包含了当前请求来源userId,数据块标识blockId,数据块所在的BlockPool(用于HDFS Federation架构),本次请求的权限标识modes(READ, WRITE, COPY, REPLACE),Token的过期时间及keyId;
(5)password即是使用共享密钥SecretKey应用HMAC算法对identifier计算得到的密码。
需要说明的是,keyId和SecretKey存在对应关系,通过keyId可以索引到SecretKey,后续详细介绍。

4.3 BlockToken流程

BlockToken体现在HDFS读写流程的以下几个步骤里:

1、客户端使用文件路径向NameNode发送读写请求,其中请求接口如下:

interface
1
2
public LocatedBlocks getBlockLocations(String clientName, String src, long offset, long length);
public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, String[] favoredNodes);

2、NameNode经过权限检查后,搜索到文件对应的数据块信息,结合激活的keyId组织出完整的BlockTokenIdentifier,使用keyId对应密钥SecretKey加密BlockTokenIdentifier得到密码,BlockToken数据就绪,加上已经获取到的数据块信息即是LocatedBlock返回给客户端;

3、客户端从NameNode获取到LocatedBlock后,带着BlockToken请求对应DataNode执行数据读写操作;

4、DataNode端接收到读写请求,首先进行BlockToken检查,目的是检查客户端的真实性和权限。主要有两个步骤:
(1)将BlockToken里的identifier反序列化,检查客户端请求的数据块、访问权限及用户名是否与BlockToken的表达一致,如果检查通过进入下一步,否则直接失败;
(2)从identifier反序列化结果里取出keyId,在本地索引对应的共享密钥SecretKey,使用与NameNode端相同的HMAC算法计算password,之后与BlockToken中的password进行比较,如果相等开始真正的数据读写流程,否则请求失败。

上述流程中,NameNode和DataNode计算密码时使用的密钥SecretKey均是以BlockTokenIdentifier.keyid作为索引在本地内存中获取。要想对相同的BlockTokenIdentifier使用同样的加密算法计算得到相同的结果,密钥SecretKey必须完全一致。所以核心问题是,NameNode和DataNode如何保证密钥SecretKey同步,使符合预期的请求通过验证。

最简单的办法就是NameNode和DataNode初始化固定的密钥,到期后NameNode重新生成并同步给DataNode问题解决。

但是事实并没有这么简单,我们知道DataNode与NameNode之间信息交互最频繁的渠道是Heartbeat(默认3s一次),如果NameNode更新了SecretKey,但是DataNode心跳3s后才上报,在这3s时间内,两端存在密钥不一致的问题,也就是在这个时段内即使合法请求也会检查失败,所以“最简单的办法”显然还不能完全解决问题。

虽然“最简单的办法”存在问题,但是提供了一种简单高效解决问题的思路,既然只维护一份共享密钥SecretKey会出现“黑障区”问题,那么同一时刻始终保持两份在线,这样就可以完全避免3s的黑障时间段。

事实上,HDFS更进一步同时维护三份共享密钥,NameNode一旦发现有SecretKey过期,马上生成新SecretKey补充进来并向前滚动当前激活SecretKey,DataNode心跳过来后及时下发更新后的SecretKey集合,如图3所示。维护三份密钥的代价是NameNode需要同时检查三份数据有效期,但是通常情况过期时间较大(默认是10h)且数据量极小,所以完全不会给NameNode或者DataNode带来负担。


4.4 BlockToken密钥HA

前面提到了NameNode和DataNode同步密钥的流程,在HDFS HA架构里通常还存在Active NameNode和Standby NameNode同步数据的问题。

事实上,Active与Standby之间不对SecretKey通过EditLog或其他方式同步。这样带来的新问题是:如何保证操作主从切换后,当前正常读写请求的Token验证通过。如前面提到,NameNode定期更新SecretKey后及时将更新后的SecretKey集合同步给DataNode,DataNode更新以保证正常读写请求通过验证,这种方式对Active和Standby同样适用。所以单从DataNode来看,同一个BlockPool实际上同一时间本地缓存至少6份共享密钥,其中3份来自Active NameNode,另外3份来自Standby NameNode。这样的话,不管客户端请求携带的keyId来自Active NameNode或者Standby NameNode,只要是正常请求均能验证通过,与是否操作主从切换或者从Standby NameNode请求无关。

接下来的问题,DataNode维护了多份<keyId,SecretKey>数据,如何避免来自Active和Standby之间的keyId冲突,以及HDFS Federation架构下,来自多个Namespace的keyId冲突。先来看HDFS Federation架构,与BlockPool类似,共享密钥相关信息也按照这个维度组织就不会相互干扰。来自同Namespace下Active和Standby的keyId确实存在冲突的可能,为了避免出现这种情况,实现时结合Active和Standby的nnId分配独立的keyId序号段即可解决。

除了以上问题,服务重启时还存在其他问题:
(1)NameNode重启:当NameNode重启会重置,由于NameNode重启后所有DataNode需要重新注册,注册完成后返回的CMD指令中包含了NameNode的集合,保证了DataNode与NameNode之间完成同步;
(2)DataNode重启:DataNode重启比较简单,向Active NameNode和Standby NameNode分别注册,成功后会收到Active和Standby的所有集合,更新内存状态即可。

为什么NameNode之间不像其他的WRITE操作,通过EditLog在Active与Standby之间保持同步?原因有两个:
1、SecretKey更新频率很低(10h);
2、数据量非常小(可忽略);
根据这两条不管是NameNode端还是DataNode端都完全可以承载,另外如果通过EditLog同步会增加复杂度,同时如果持久化SecretKey安全性上大打折扣,与Token设计的初衷相悖。

至此,BlockToken的整个流程简单梳理完成,可以看出BlockToken与Kerberos体系的架构和核心流程有很多相似的地方。

五、BlockToken的问题及解决思路

前面BlockToken流程分析可以看出,设计思路和实现方案都比较优雅,但是实践过程中还是可能会遇到一些问题:
(1)NameNode重启完成后DataNode没有成功更新SecretKey造成客户端读写失败;
(2)NameNode滚动SecretKey后DataNode没有及时同步造成后续读写失败;

datanode.log
1
2
3
4
5
6
7
8
9
10
11
12
2017-05-17 23:41:58,952 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: hostname:50010:DataXceiver error processing WRITE_BLOCK operation  src: /ip:port
dst: /ip:port
org.apache.hadoop.security.token.SecretManager$InvalidToken: Can't re-compute password for block_token_identifier (expiryDate=*, keyId=*, userId=*, blockPoolId=*, blockId=*, access modes=[WRITE]), since the required block key (keyID=*) doesn't exist.
       at org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.retrievePassword(BlockTokenSecretManager.java:384)
       at org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.checkAccess(BlockTokenSecretManager.java:302)
       at org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager.checkAccess(BlockPoolTokenSecretManager.java:97)
       at org.apache.hadoop.hdfs.server.datanode.DataXceiver.checkAccess(DataXceiver.java:1271)
       at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:663)
       at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:137)
       at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:74)
       at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:251)
       at java.lang.Thread.run(Thread.java:745)

这两个问题的主要原因是社区实现中DataNode同步SecretKey采用的是从NameNode Push的方案,但是对是否Push成功没有感知,比如:

(1)NameNode重启后,会重新生成新SecretKey集合,DataNode注册时NameNode将所有新生成的SecretKey集合Push给DataNode。我们知道NameNode重启阶段负载非常高,尤其是大规模集群,存在一种情况是NameNode端成功处理了DataNode的注册请求,并将SecretKey集合返回给DataNode,但是DataNode端已经超时没有接收到NameNode的返回结果,这个时候NameNode和DataNode两端出现不一致:NameNode认为DataNode已经成功更新了SecretKey,之后不再下发更新SecretKey命令,但是DataNode端没有接收到新SecretKey集合,依然维护一批无效SecretKey。此后当客户端读写请求过来后,BlockToken验证永远失败;

(2)NameNode滚动SecretKey后,通过Heartbeat的返回值将新SecretKey集合Push给DataNode,同前述场景类似,返回值超时或者DataNode没有接收到心跳的返回值,同样造成NameNode和DataNode两端密钥不一致,默认最长10h后该keyId被激活时,客户端的请求因为BlockToken验证失败同样会读写失败;

前面的两类场景可以看出,问题实际上发生在NameNode向DataNode同步SecretKey,由于采用了Push的方案,但是对结果是否正常并没有感知,两端的数据不一致造成。对应解决方案其实也比较清晰,将NameNode向DataNode同步SecretKey的实现从Push改为Pull,该方案已在社区讨论详见HDFS-13473
(1)DataNode注册时通过NameNode发下命令更新SecretKey的处理流程保持现状;
(2)在DataNode的心跳中增加当前SecretKey的版本号,NameNode端如果发现与本地SecretKey版本号不匹配通过心跳返回最新SecretKey集合;

将SecretKey同步方式从Push更新到Pull之后,因为心跳间隔默认3s,即使存在单次甚至连续数次心跳处理失败,也可以在接下来成功的请求里及时更新,而不再是必须等默认10h之后才能再次发起同步,而且依然存在更新不成功的可能。可以有效避免NameNode和DataNode两端因为SecretKey不一致造成客户端读写请求失败的问题。

六、总结

本文以Hadoop Security特性背景入手,对HDFS BlockToken方案设计的考虑,社区实现原理,存在的问题和解决思路进行了简单分析和梳理。
通过对BlockToken机制原理和实现细节解析,期望对Hadoop安全窥一斑见全局,对其中可能存在的问题及优化思路提供参考价值。

七、参考

[1] https://en.wikipedia.org/wiki/HMAC
[2] https://issues.apache.org/jira/secure/attachment/12428537/security-design.pdf
[3] https://issues.apache.org/jira/secure/attachment/12409284/AccessTokenDesign1.pdf
[4] https://issues.apache.org/jira/browse/HADOOP-4487
[5] https://issues.apache.org/jira/browse/HADOOP-4359
[6] https://issues.apache.org/jira/browse/HDFS-13473
[7] http://hadoop.apache.org/releases.html

HDFS HA Using QJM原理解析

一、前言

Hadoop 1.0时代,Hadoop核心组件HDFS NameNode和MapReduce JobTracker都存在单点问题(SPOF),其中以NameNode SPOF尤为严重。HDFS作为整个Hadoop生态的基础组件,一旦NameNode发生故障,包括HDFS在内及其上层依赖组件(YARN/MapReduce/Spark/Hive/Pig/HBase等)都将无法正常工作,另外NameNode恢复过程非常耗时,尤其对大型集群,NameNode重启时间非常可观,给HDFS的可用性带来了极大的挑战,同时在使用场景上也带来明显的限制。

Hadoop 2.0时期,Hadoop的所有单点问题基本得到了解决。其中HDFS的高可用(High Availability,HA)在社区经过多种方案的讨论后,最终Cloudera的HA using Quorum Journal Manager(QJM)方案被社区接收,并在Hadoop 2.0版本中发布。

本文将从HDFS HA发展路径,架构原理,实现细节及使用过程中可能遇到的问题几个方面对HDFS NameNode高可用机制(主要是HDFS using QJM,不包含ZKFC)进行简单梳理和分析。

二、HDFS HA发展路径

在Hadoop 1.0时代,HDFS的架构相对简单,组件单一,主要包含NameNode,Seconday NameNode,DataNode和DFSClient四个核心组件(见图1 Hadoop 1.0中HDFS架构),其中NameNode提供元数据管理服务,Secondary NameNode以冷备的状态为NameNode分担Checkpoint工作(定期合并FsImage和Editlog),为避免出现歧义后来也称Secondary NameNode为Checkpoint Node。在这种架构下NameNode是整个系统的单点,一旦出现故障对可用性是致命的。


此后,社区讨论过多种HA方案来解决HDFS NameNode的单点问题,其中以2010年Facebook提出的AvatarNode方案(见图2 Facebook提出的HDFS AvatarNode架构图)较为典型。在AvatarNode方案中使用Avatar Primary NameNode,Avatar Standby NameNode及NFS配合通过共享的方式管理HDFS部分元数据EditLog。其中Avatar Primary NameNode提供读写服务,并将Editlog写入到共享存储NFS上,Avatar Standby NameNode从共享存储NFS上读取Editlog数据并回放,这样尽可能保持与Primary之间状态一致。

AvatarNode方案第一次使HDFS NameNode具备了热备能力,一旦Primary NameNode出现故障,Avatar Standby NameNode可以在极短时间内接管HDFS的读写请求,真正实现了HDFS的高可用。

但是AvatarNode这种方案并不是完美的。它的问题主要是共享存储NFS成为新的SPOF,必须保证其高可用;同时AvatarNode不具备自动Failover能力,一旦Avatar Primary NameNode出现故障,需要运维人员介入手动处理,当然Facebook这样设计有自己的考虑,这里就不再展开;另外一点,昂贵的NFS设备引入与HDFS最初构建在“inexpensive commodity hardware”设计初衷多少有些出入。

虽然存在问题,但是AvatarNode架构相比Hadoop 1.0已经有了本质的区别,也正是因为其在HA特性上的优秀表现,AvatarNode方案被国内外很多团队采纳和使用。


2012年初,Cloudera工程师Todd Lipcon主导设计的HA using QJM方案进入社区(见图3 HDFS HA using QJM架构图)。

QJM的思想最初来源于Paxos协议,摒弃了AvatarNode方案中的共享存储设备,改用多个JournalNode节点组成集群来管理和共享EditLog。与Paxos协议类似,当NameNode向JournalNode请求读写时,要求至少大多数(Majority)成功返回才认为本次请求成功。对于一个由2N+1台JournalNode组成的集群,可以容忍最多N台JournalNode节点挂掉。从这个角度来看,QJM相比AvatarNode方案具备了更强的HA能力。

同时QJM方案也继承了社区早前实施过的HA方案中优秀特性,通过引入Zookeeper实现的选主功能,使NameNode真正具备了自动Failover的能力。

对比AvatarNode,HA using QJM方案不再存在SPOF问题,High Availability特性明显提升,同时具备了自动Failover能力。正是其良好的设计和容错能力,HDFS HA using QJM方案在2012年随Hadoop 2.0正式发布,此后一直是Hadoop社区默认HDFS HA方案。


当然除了上面提到两种典型的HDFS HA方案外,社区其实在HA特性上进行过多次尝试。

开始于2008年,并随Hadoop 0.21发布的BackupNode方案(参考:HADOOP-4539);2011年通过单独分支HDFS HA Branch开发的功能非常丰富的HDFS HA方案,并在Hadoop 0.23发布(参考:HDFS-1623),其中LinuxHA和BookKeeper方案都出自这里;等等不一而足。

三、HA using QJM原理

3.1 HA using QJM架构

HA using Quorum Journal Manager (QJM)是当前主流HDFS HA方案,由Cloudera工程师Todd Lipcon在2012年发起,随Hadoop 2.0.3发布,此后一直被视为HDFS HA默认方案。

在HA using QJM方案中,涉及到的核心组件(见图3)包括:

Active NameNode(ANN):在HDFS集群中,对外提供读写服务的唯一Master节点。ANN将客户端请求过来的写操作通过EditLog写入共享存储系统(即JournalNode Cluster),为Standby NameNode及时同步数据提供支持;

Standby NameNode(SBN):与ANN相互形成热备,SBN及时从共享存储系统中读取EditLog数据并更新内存,以保证当前状态尽可能与ANN同步。当前在整个HDFS集群中最多一台处于Active状态,最多一台处于Standby状态;

JournalNode Cluster(JNs):ANN与SBN之间共享Editlog的一致性存储系统,是HDFS NameNode高可用的核心组件。借助JournalNode集群ANN可以尽可能及时同步元数据到SBN。其中ANN采用Push模式将EditLog写入JN,SBN通过Pull模式从JN拉取数据,整个过程中JN不主动进行数据交换;

ZKFailoverController(ZKFC):ZKFailoverController以独立进程运行,对NameNode主备切换进行控制,正常情况ANN和SBN分别对应各自ZKFC进程。ZKFC主要功能:NameNode健康状况检测;借助Zookeeper实现NameNode自动选主;操作NameNode进行主从切换;

Zookeeper(ZK):为ZKFC实现自动选主功能提供统一协调服务。

需要说明的是,在HA using QJM架构下,DataNode从仅向单个NameNode进行数据交互升级到同时向ANN和SBN进行数据交互,区别是仅执行ANN下发的指令,其他逻辑未发生大变化。

3.2 JournalNode Cluster实现

前面提到NameNode与JournalNode Cluster进行数据读写时核心点是需要大多数JN成功返回才可认为本次请求有效。所以同Zookeeper部署类似,实际部署JournalNode Cluster时也建议奇数节点(大多数场景选择3~5个)。当然奇数节点并非强制,事实上偶数节点组成的JournalNode Cluster也能工作,但是极端的情况会存在问题,比如活锁(原理和细节可参考Paxos made simple[2]),所以不建议这么做。

在JournalNode Cluster中所有JN之间完全对等,不存在Primary/Secondary区别。功能上看也极其简单,Hadoop-2.7.1分支中总共2478行代码实现了完整的JournalNode功能,详细模块见图4JournalNode功能模块。


JournalNode对外提供RPC和HTTP两类数据服务接口,其中JournalNodeHttpServer提供的HTTP服务,除暴露常规Metrics信息外,同时提供了被动Editlog数据同步功能(后面会详细介绍)。JournalNodeRpcServer提供的RPC Server,为NameNode向JournalNode的数据读写和状态获取请求准备了完备的RPC接口,详见QJournalProtocol.proto。Journal模块是JournalNodeRpcServer的具体实现,在Journal模块里,除了简单维护JournalNode状态信息,核心实现是抽象底层存储介质的读写操作。

整体上看,JournalNode模块清晰,实现简单,其中有几处非常讨巧的实现:

1、为了降低读写操作相互影响,Journal采用了DoubleBuffer技术管理实时过来的Editlog数据,通过DoubleBuffer可以为高速设备(内存)与低速设备(磁盘/SSD)之间建立缓存区和管道,避免数据写入被低速设备阻塞影响性能;

2、NameNode到JournalNode的所有数据写入请求都会直接落盘,当然写入请求的数据可以是批量,只有数据持久化完成才能认为本次请求有效和成功,这一点在数据恢复时非常关键;

3、与Pasox/Zookeeper类似,所有到达JournalNode的读写请求,第一件事情是合法性校验,包括EpochNum,CommitTxid等在内的状态信息,只有校验通过才能被处理,状态校验是强一致保证的基础;

一句话总结JournalNode是一套提供读写服务并实时持久化序列数据的有状态存储系统。

3.3 Active NameNode端实现

整个HA using QJM方案核心部分都集中在NameNode端,也可以认为是QJournal的Client端,这里集中了所有关于数据一致性保证和状态合理转换的主要内容。其中Active NameNode因为是写入端,所以实现逻辑也较复杂。

ANN按照响应客户端写请求的粒度实时顺序持久化到JournalNode,也是ANN请求JournalNode的最小粒度FSEditLogOp(HDFS写操作序列化数据)。这里首先需要权衡关于如何在JournalNode落地数据的问题:

1、将所有的FSEditLogOp数据落到同一个文件;
2、将每一条FSEditLogOp数据落到一个文件;
3、折中方案;

前面已经提到,QJournal必须保证完整事务和数据一致性,这就要求具备数据恢复的能力。如果将所有FSEditLogOp都落到一个文件,势必会带来管理成本上额外的开销,一旦出现数据不一致情况恢复大文件的代价非常高,同时同一文件累计了大量事务请求,写入失败风险非常高(后续会详细介绍);另一种极端情况是对每一条FSEditLogOp写入一个文件,这种方式在容错和数据异常恢复会非常方便,但显然读写效率极差。所以必须对两种极端情况做折中(计算机领域内大多问题都在tradeoff)。

按照前面的分析,折中的唯一办法就是对连续FSEditLogOp进行分段管理。事前给每一条FSEditLogOp分配唯一且连续的事务ID称为txid,连续多个txid组成Segment,Segment持久化后对应一个EditLog文件,这样一来,任何时间JournalNode上有且仅有一个Segment处于Inprogress状态,其他均为Finalized状态。即使存在数据不一致仅需恢复Inprogress状态的Segment,Finalized Segment一定是大多数JournalNode成功写入。这样权衡可以较好解决两种极端情况的问题。

在ANN进程的整个生命周期里,按照不同阶段,与JournalNode交互的逻辑可以简单划分成三个部分,如图5所示Active NameNode与JournalNode之间交互的状态转换图。


ANN与JournalNode在整个过程中RPC交互非常频繁,所有RPC请求均采用异步的方式,NameNode端只要收到大多数的JournalNode请求响应即认为本次请求成功,如果部分JournalNode请求失败或者超时该节点将在当前Segment内被置为异常节点。

Recovery

QJournal继承了Paxos一致性模中的EpochNum技术,每次做主从切换时,尝试切换为Active的NameNode从JournalNode端读取当前的EpochNum,自增1后尝试写入JournalNode。当出现多个NameNode均尝试切换为Active时会因为EpochNum问题最多只有一个成功,这从Qjournal的角度可以彻底杜绝出现Brain-Split问题。

NameNode操作主从切换后,首先需要确认所有JournalNode上的Editlog数据保持同步,前面提到只有最后一个Segment可能出现不一致情况,之后才能从JournalNode上拉取最新未同步的Editlog,更新内存以达到最新状态。其中保证所有JournalNode的Segment同步即是Recovery过程,整个过程如图6。

1、NameNode向JournalNode请求获取当前Epoch,从结果中选取出最大的Epoch;

2、将最大的Epoch自增1后尝试请求写入JournalNode;

如前述,以上两步可以在QJournal防止NameNode Brain-Split。这种竞争写入的方式可以保证任意时间最多只能存在一个NameNode有能力写入JournalNode,也是QJournal强一致的基础(关于该算法强一致性模型的形式化证明可参考Paxos made Simple[2])。

3、向JournalNode请求获取最新Segment的起始事务编号(txid),具体实现时这个步骤与前一步合并,可以减少一次RPC请求;

4、取前一步所有返回的txid最大值segmentTxId作为本次Recovery输入进入数据恢复阶段(特殊情况是集群刚初始化时,所有的JournalNode实际上并没有Segment文件,所以数据恢复可直接跳过,判断是否需要做数据恢复的条件是有没有取到txid):

(1)向JournalNode请求PrepareRecovery,参数是segmentTxId,JournalNode根据自己的实际情况返回满足起始txid为segmentTxId的Segment信息:{startTxId, endTxId, isInProgress},分别描述了起始事务编号,最后事务编号及当前Segment是否Inprogress;

(2)根据SegmentRecoveryComparator算法选择所有JournalNode中最优Segment(bestSegment)准备执行Recovery;(SegmentRecoveryComparator算法:优先选择Finalized Segment,再次选择JournalNode端可见最大Epoch的Segment,最后比较endTxId最大者)

(3)使用前一步选出的bestSegment组装出可定位到该Segment(hostname + NamespaceInfo + segmentTxId)的URL(具体JournalNode存储Segment的位置),使用URL作为输入向JournalNode请求AcceptRecovery,当JournalNode接收到AcceptRecovery请求后,经过合法性和URL简单检查,包括startTxId,endTxid是否与本地数据存在包含关系等,通过后拿URL通过HTTP请求将目标JournalNode的Segment拉到本地,并替换本地最新一个Segment,到这里所有JournalNode上的Editlog基本一致;之所以这么说是因为最后一个Segment的数据确实是完全同步的,但是已经Finalized的Segment只能保证大多数JournalNode之间完全一致。

(4)请求JournalNode将最新Segment操作Finalized,最后一步操作相当于跟之前写入的数据彻底“划清界限”;

到这里完成了第一阶段对JournalNode数据一致性检查和修复;

5、NameNode进入数据更新阶段,因为有前面一致性检查和修复,另外FsImage也记录过其对应到的txid,这个阶段只要从JournalNode取回该txid之后写入的数据回放一遍,内存状态即可达到最新,这个过程称为TailEdits;当然回放完成后需要记录nextTxId,为开启ANN写操作做准备;

6、到这里基本具备了开放读写服务的能力,因为前面已经将所有的Segment操作了Finalized,所以开启新的Segment,即请求JournalNode初始化新Segment,并建立到JournalNode的长连接管道startLogSegment,以便数据实时写入;

所有的准备工作完成,NameNode正式进入Active状态,开启HDFS读写服务。


Log Synchronization

ANN正式开启读写服务后,所有读请求在NameNode端即可完成,但是写请求需要实时同步给JournalNode,以便SBN能够及时读取并回放,以保持与Active几乎接近的状态。

ANN写FSEditLogOp的整体流程如图7所示:

1、首先在ANN处理客户端写请求的最后一个阶段,根据当前的请求类型和请求参数及当前的txid组成FSEditLogOp;

2、ANN端将FSEditLogOp写入到DoubleBuffer,这个阶段在NameNode整个FSNamesystem锁内,所以在任意时间仅有唯一的线程可以写入数据。可以看到DoubleBuffer技术在NameNode和JournalNode端均有使用,这里使用的目的与前面讲过的原因基本相当,一方面为低速设备(网络通道)和高速设备(内存)之间建立缓存区和管道,另一方面DoubleBuffer也可以降低锁的粒度,以到达更好的性能;

3、ANN处理客户端写请求的锁外,NameNode请求执行一次对FSEditLogOp的Sync操作,也就是说NameNode端写缓存是单线程操作,但是Sync可能是批量操作,但是从客户端角度来看写请求都会保证数据安全落到JouranlNode后才会返回,所以整体上看NameNode具备强一致性保证,当然整个文件系统强一致保证由多个部分共同支撑,这里不再展开。ANN端的Sync首先从DoubleBuffer里读出所有准备就绪的数据,通过RPC请求送达JournalNode,这类请求到达JournalNode端会第一时间持久化,保证数据可靠;


RollEditLog

前面也提到,单个Segment不可能无限大,是按照区间进行划分的,当然这个区间的划分一定不只有一条标准,默认情况下,ANN端内独立线程每间隔5min做一次检查,如果当前累计写入的FSEditLogOp超过2,000,000条操作Segment滚动,可以看到,事实上这个区间的大小可能会超出2,000,000(每间隔5min检查一次),ANN内的检查机制是为了防止SBN不工作时的补偿机制。

当然,请求执行RollEditLog不单单ANN端的线程,事实上SBN也会触发RollEditLog,SBN默认每1min操作执行一次EditLog回放,在回放EditLog前如果发现超过两分钟没有RollEditLog且期间有新增的FSEditLogOp,先请求ANN强制进行RollEditLog。在正常情况下,通过SBN请求执行RollEditLog是控制Segment规模的直接方法。

RollEditLog与前面的Recovery过程最后一步类似,首先将当前正在写入Segment操作Finalized,然后请求JournalNode初始化新Segment,建立到JournalNode的长连接管道并startLogSegment。

3.4 Standby NameNode端实现

SBN作为ANN的热备,需要尽可能保持与ANN状态一致,做好随时接管ANN任务的准备。当然在不损失一致性保证的前提下如果能分担ANN的部分请求处理会更好。

如前述,SBN为了保持与ANN状态接近甚至一致,默认每间隔1min回放一次EditLog,回放的第一步是从合适的JournalNode拉取Segment。

在Active NameNode端实现一节已经提过ANN写到JournalNode的Segment包含了startTxId和endTxId,所以SBN每回放完一个Segment会记录当前已经回放到的txid。这为接下来继续拉取Segment提供了起始位置,也是本次从JournalNode拉取数据的唯一输入。

首先,SBN根据当前的txid+1从JournalNode端获取所有包含了txid+1或者startTxId大于txid+1的Segment列表;

之后,根据取回Segment集合做简单过滤和排序,过滤是为了找到更合适的Segment,比如相同txid集合的Segment,回放Finalized更安全;保留多个Segment并排序是为了更好的容错;

最后,按照已排好序Segment,逐个FSEditLogOp尝试取Segment内容,按照FSEditLogOp粒度读取并回放,直到改Segment回放完成;

当前实现中,SBN仅回放Finalized状态的Segment。


3.5 数据一致性保证

从前面的分析来看,QJM本质上是一种极其简化版的Paxos协议,所以基本具备了Paxos优势。如果按照分布式系统经典理论CAP来评估,QJM是一种强一致性、高可用的去中心化分布式协议。

高可用:QJM的高可用特性其实是完全继承自Paxos,但在具体实现中为实现强一致性实际上牺牲了少部分高可用性。Paxos中Quorum理论在QJM依然稳定,也就是对于JournalNode Cluster,最多可以容忍小于一半的节点故障,系统仍可正常运行。但是如前面提到为了实现强一致性,QJM放大了故障范围(只要出现一次请求响应失败或者超时即标记该JournalNode在当前Segment范围内失效outOfSync),而且对于故障完全没有恢复能力,虽然通过限定Segment范围尽力补救故障影响范围,但是不可否认因为故障被放大,且没有恢复机制,不可用的风险同故障范围被线性放大。

高度去中心化:前面的流程分析过程不难看出,JournalNode整个生命周期,仅初始化阶段由ANN触发过一次选主以及JournalNode间的交互过程,其他阶段JournalNode之间完全对等,完全无中心化节点,这样也就不存在SPOF的问题,所以也具备了非常好Partition Tolerance特性。

强一致性:

1、强一致性是Paxos及各衍生系统最明显的优势特性,QJM的一致性思想主要来源于Paxos,另外为了达到更好的性能,在Pasox基础上又放松了很多限制条件。比如相比Paxos仅需要竞争一次,完成后继承结果,不需要为强一致每一轮数据读写都先竞选影响性能,从这个角度看,QJM与Raft也有相似的特征(是不是存在更本质一致性算法);当然,QJournal放松诸多限制条件跟HDFS的使用场景强相关,比如最多只存在两个QJournal Client,竞争发生的条件非常严苛,即使极端情况产生竞争也能够在一轮竞选完成;

2、延后读保证数据强一致,SBN当前仅读取和回放Finalized状态Segment,可以保证Finalized数据强一致,借鉴Paxos思想在数据恢复阶段可以做到Inprogress状态数据强一致,同时QJM采用了一种尽力而为的恢复机制,对不确定状态定义了统一恢复策略;

3、全局有序和递增的Epoch序号,任意时间仅一个竞争者(QJournal Client即NameNode)胜出,保证写入端的唯一性和合法性。

四、QJM的问题及解决思路

虽然HA using QJM方案作为HDFS默认HA方案已经在社区稳定运行了超过5年时间,但实际上还是存在一些问题:

1、Paxos算法在出现竞争时,收敛速度慢,甚至可能出现活锁的情况,QJM并没有针对这种问题进行过优化。极端情况下如果出现两个NameNode同时竞争写入,也可能陷入僵局,虽然不至于污染数据(Brain-Split),但是存在永远竞争陷入僵局的可能。

2、QJM放大了故障范围,且在Segment周期里没有任何恢复机制,虽然通过限制Segment大小进行了补偿,但是风险被线性放大,尤其对JournalNode小集群及配置Segment较大事务区间;

3、在线升级JournalNode让QJournal可用性风险成倍放大,原因同问题2;

4、QJM对JournalNode在单Segment故障没有恢复机制,ANN一旦遇到写入失败,只能操作主从切换甚至重启,对规模较大的Hadoop集群,重启NameNode成本非常高;

针对上述问题,实际上业界已有一些对应的解决办法,不过需要强调的是所有解决办法都在尽力权衡CAP:

1、对于Paxos在竞争情况下收敛慢和活锁问题,在HDFS场景里出现的概率极小,而且最多只有两个竞争者,且两个竞争者同时出现的唯一可能是误操作尝试将两个NameNode均切换成ANN,这种问题应该尽可能在运维中避免;从技术的角度看,虽然可以通过Leader选择,加快收敛速度和避免活锁,但是在HDFS场景下为解决极端情况牺牲可用性是否有必要值得商榷;

2、JournalNode故障快速恢复完全可以借鉴RollEditLog的办法,增加触发RollEditLog的条件,同时需要考虑Journal出现故障后频繁RollEditLog带来的性能损失;

3、实际场景里,最需要解决的还是NameNode因为Journal写入失败造成进程退出的问题,可借鉴的方案也很多,这里列出两种代价较小方案供参考: (1)所有Journal写入失败,强制NameNode退出竞争者,进入Standby状态; (2)延长请求JournalNode超时时间,用损失极端情况下性能损失换取NameNode重启成本;

五、总结

本文从HDFS HA的发展过程,各种方案设计背后的考虑,以及社区选择的默认HA using QJM方案原理和其中存在的问题及解决思路简单分析。
通过QJM原理梳理和实现细节分析,可以深入理解HDFS HA现状和存在问题,为后续运维甚至优化改进积累经验。

六、参考

[1] https://hadoop.apache.org
[2] https://www.microsoft.com/en-us/research/publication/paxos-made-simple/
[3] http://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
[4] https://issues.apache.org/jira/secure/attachment/12435811/AvatarNodeDescription.txt
[5] http://www.cloudera.com/blog/2009/07/hadoop-ha-configuration/
[6] http://www.cloudera.com/blog/2012/03/high-availability-for-the-hadoop-distributed-file-system-hdfs/
[7] http://zookeeper.apache.org/bookkeeper/
[8] https://github.com/apache/hadoop/tree/branch-2.7.1
[9] https://issues.apache.org/jira/browse/HDFS-3077
[10] https://issues.apache.org/jira/browse/HDFS-1623
[11] https://issues.apache.org/jira/browse/HADOOP-4539
[12] https://issues.apache.org/jira/browse/HDFS-976

HDFS NameNode重启优化

一、背景

在Hadoop集群整个生命周期里,由于调整参数、Patch、升级等多种场景需要频繁操作NameNode重启,不论采用何种架构,重启期间集群整体存在可用性和可靠性的风险,所以优化NameNode重启非常关键。

本文基于Hadoop-2.xHA with QJM社区架构和系统设计(如图1所示),通过梳理NameNode重启流程,并在此基础上,阐述对NameNode重启优化实践。


二、NameNode重启流程

在HDFS的整个运行期里,所有元数据均在NameNode的内存集中管理,但是由于内存易失特性,一旦出现进程退出、宕机等异常情况,所有元数据都会丢失,给整个系统的数据安全会造成不可恢复的灾难。为了更好的容错能力,NameNode会周期进行Checkpoint,将其中的一部分元数据(文件系统的目录树Namespace)刷到持久化设备上,即二进制文件FSImage,这样的话即使NameNode出现异常也能从持久化设备上恢复元数据,保证了数据的安全可靠。

但是仅周期进行Checkpoint仍然无法保证所有数据的可靠,如前次Checkpoint之后写入的数据依然存在丢失的问题,所以将两次Checkpoint之间对Namespace写操作实时写入EditLog文件,通过这种方式可以保证HDFS元数据的绝对安全可靠。

事实上,除Namespace外,NameNode还管理非常重要的元数据BlocksMap,描述数据块Block与DataNode节点之间的对应关系。NameNode并没有对这部分元数据同样操作持久化,原因是每个DataNode已经持有属于自己管理的Block集合,将所有DataNode的Block集合汇总后即可构造出完整BlocksMap。

HA with QJM架构下,NameNode的整个重启过程中始终以SBN(StandbyNameNode)角色完成。与前述流程对应,启动过程分以下几个阶段:
0、加载FSImage;
1、回放EditLog;
2、执行Checkpoint;(非必须步骤,结合实际情况和参数确定,后续详述)
3、收集所有DataNode的注册和数据块汇报;

默认情况下,NameNode会保存两个FSImage文件,与此对应,也会保存对应两次Checkpoint之后的所有EditLog文件。一般来说,NameNode重启后,通过对FSImage文件名称判断,选择加载最新的FSImage文件及回放该Checkpoint之后生成的所有EditLog,完成后根据加载的EditLog中操作条目数及距上次Checkpoint时间间隔(后续详述)确定是否需要执行Checkpoint,之后进入等待所有DataNode注册和元数据汇报阶段,当这部分数据收集完成后,NameNode的重启流程结束。

从线上NameNode历次重启时间数据看,各阶段耗时占比基本接近如图2所示。


经过优化,在元数据总量540M(目录树240M,数据块300M),超过4K规模的集群上重启NameNode总时间~35min,其中加载FSImage耗时~15min,秒级回放EditLog,数据块汇报耗时~20min,基本能够满足生产环境的需求。

2.1 加载FSImage

如前述,FSImage文件记录了HDFS整个目录树Namespace相关的元数据。从Hadoop-2.4.0起,FSImage开始采用Google Protobuf编码格式描述(HDFS-5698),详细描述文件见fsimage.proto。根据描述文件和实现逻辑,FSImage文件格式如图3所示。


从fsimage.proto和FSImage文件存储格式容易看到,除了必要的文件头部校验(MAGIC)和尾部文件索引(FILESUMMARY)外,主要包含以下核心数据:

(0)NS_INFO(NameSystemSection):记录HDFS文件系统的全局信息,包括NameSystem的ID,当前已经分配出去的最大BlockID以及TransactionId等信息;
(1)INODE(INodeSection):整个目录树所有节点数据,包括INodeFile/INodeDirectory/INodeSymlink等所有类型节点的属性数据,其中记录了如节点id,节点名称,访问权限,创建和访问时间等等信息;
(2)INODE_DIR(INodeDirectorySection):整个目录树中所有节点之间的父子关系,配合INODE可构建完整的目录树;
(3)FILES_UNDERCONSTRUCTION(FilesUnderConstructionSection):尚未完成写入的文件集合,主要为重启时重建Lease集合;
(4)SNAPSHOT(SnapshotSection):记录Snapshot数据,快照是Hadoop 2.1.0引入的新特性,用于数据备份、回滚,以防止因用户误操作导致集群出现数据问题;
(5)SNAPSHOT_DIFF(SnapshotDiffSection):执行快照操作的目录/文件的Diff集合数据,与SNAPSHOT一起构建较完整的快照管理能力;
(6)INODE_REFERENCE(INodeReferenceSection):当目录/文件被操作处于快照,且该目录/文件被重命名后,会存在多条访问路径,INodeReference就是为了解决该问题;
(7)SECRET_MANAGER(SecretManagerSection):记录DelegationKey和DelegationToken数据,根据DelegationKey及由DelegationToken构造出的DelegationTokenIdentifier方便进一步计算密码,以上数据可以完善所有合法Token集合;
(8)CACHE_MANAGER(CacheManagerSection):集中式缓存特性全局信息,集中式缓存特性是Hadoop-2.3.0为提升数据读性能引入的新特性;
(9)STRING_TABLE(StringTableSection):字符串到id的映射表,维护目录/文件的Permission字符到ID的映射,节省存储空间;

NameNode执行Checkpoint时,遵循Protobuf定义及上述文件格式描述,重启加载FSImage时,同样按照Protobuf定义的格式从文件流中读出相应数据构建整个目录树Namespace及其他元数据。将FSImage文件从持久化设备加载到内存并构建出目录树结构后,实际上并没有完全恢复元数据到最新状态,因为每次Checkpoint之后还可能存在大量HDFS写操作。

2.2 回放EditLog

NameNode在响应客户端的写请求前,会首先更新内存相关元数据,然后再把这些操作记录在EditLog文件中,可以看到内存状态实际上要比EditLog数据更及时。

记录在EditLog之中的每个操作又称为一个事务,对应一个整数形式的事务编号。在当前实现中多个事务组成一个Segment,生成独立的EditLog文件,其中文件名称标记了起止的事务编号,正在写入的EditLog文件仅标记起始事务编号。EditLog文件的格式非常简单,没再通过Google Protobuf描述,文件格式如图4所示。


一个完整的EditLog文件包括四个部分内容,分别是:
(0)LAYOUTVERSION:版本信息;
(1)OP_START_LOG_SEGMENT:标识文件开始;
(2)RECORD:顺序逐个记录HDFS写操作的事务内容;
(3)OP_END_LOG_SEGMENT:标记文件结束;

NameNode加载FSImage完成后,即开始对该FSImage文件之后(通过比较FSImage文件名称中包含的事务编号与EditLog文件名称的起始事务编号大小确定)生成的所有EditLog严格按照事务编号从小到大逐个遵循上述的格式进行每一个HDFS写操作事务回放。

NameNode加载完所有必需的EditLog文件数据后,内存中的目录树即恢复到了最新状态。

2.3 DataNode注册汇报

经过前面两个步骤,主要的元数据被构建,HDFS的整个目录树被完整建立,但是并没有掌握从数据块Block与DataNode之间的对应关系BlocksMap,甚至对DataNode的情况都不掌握,所以需要等待DataNode注册,并完成对从DataNode汇报上来的数据块汇总。待汇总的数据量达到预设比例(dfs.namenode.safemode.threshold-pct)后退出Safemode。

NameNode重启经过加载FSImage和回放EditLog后,所有DataNode不管进程是否发生过重启,都必须经过以下两个步骤:
(0)DataNode重新注册RegisterDataNode;
(1)DataNode汇报所有数据块BlockReport;

对于节点规模较大和元数据量较大的集群,这个阶段的耗时会非常可观。主要有三点原因:
(0)处理BlockReport的逻辑比较复杂,相对其他RPC操作耗时较长。图5对比了BlockReport和AddBlock两种不同RPC的处理时间,尽管AddBlock操作也相对复杂,但是对比来看,BlockReport的处理时间显著高于AddBlock处理时间;
(1)NameNode对每一个BlockReport的RPC请求处理都需要持有全局锁,也就是说对于BlockReport类型RPC请求实际上是串行处理;
(2)NameNode重启时所有DataNode集中在同一时间段进行BlockReport请求;


前文NameNode内存全景中详细描述过Block在NameNode元数据中的关键作用及与Namespace/DataNode/BlocksMap的复杂关系,从中也可以看出,每个新增Block需要维护多个关系,更何况重启过程中所有Block都需要建立同样复杂关系,所以耗时相对较高。

三、重启优化

根据前面对NameNode重启过程的简单梳理,在各个阶段可以适当的实施优化以加快NameNode重启过程。

0、HDFS-7097 解决重启过程中SBN执行Checkpoint时不能处理BlockReport请求的问题;

Fix:2.7.0
Hadoop-2.7.0版本前,SBN(StandbyNameNode)在执行Checkpoint操作前会先获得全局读写锁fsLock,在此期间,BlockReport请求由于不能获得全局写锁会持续处于等待状态,直到Checkpoint完成后释放了fsLock锁后才能继续。NameNode重启的第三个阶段,同样存在这种情况。而且对于规模较大的集群,每次Checkpoint时间在分钟级别,对整个重启过程影响非常大。实际上,Checkpoint是对目录树的持久化操作,并不涉及BlocksMap数据结构,所以Checkpoint期间是可以让BlockReport请求直接通过,这样可以节省期间BlockReport排队等待带来的时间开销,HDFS-7097正是将锁粒度放小解决了Checkpoint过程不能处理BlockReport类型RPC请求的问题。

HDFS-7097相对,另一种思路也值得借鉴,就是重启过程尽可能避免出现Checkpoint。触发Checkpoint有两种情况:时间周期或HDFS写操作事务数,分别通过参数dfs.namenode.checkpoint.period和dfs.namenode.checkpoint.txns控制,默认值分别是3600s和1,000,000,即默认情况下一个小时或者写操作的事务数超过1,000,000触发一次Checkpoint。为了避免在重启过程中频繁执行Checkpoint,可以适当调大dfs.namenode.checkpoint.txns,建议值10,000,000 ~ 20,000,000,带来的影响是EditLog文件累计的个数会稍有增加。从实践经验上看,对一个有亿级别元数据量的NameNode,回放一个EditLog文件(默认1,000,000写操作事务)时间在秒级,但是执行一次Checkpoint时间通常在分钟级别,综合权衡减少Checkpoint次数和增加EditLog文件数收益比较明显。

1、HDFS-6763 解决SBN每间隔1min全局计算和验证Quota值导致进程Hang住数秒的问题;

Fix:2.8.0
ANN(ActiveNameNode)将HDFS写操作实时写入JN的EditLog文件,为同步数据,SBN默认间隔1min从JN拉取一次EditLog文件并进行回放,完成后执行全局Quota检查和计算,当Namespace规模变大后,全局计算和检查Quota会非常耗时,在此期间,整个SBN的Namenode进程会被Hang住,以至于包括DN心跳和BlockReport在内的所有RPC请求都不能及时处理。NameNode重启过程中这个问题影响突出。
实际上,SBN在EditLog Tailer阶段计算和检查Quota完全没有必要,HDFS-6763将这段处理逻辑后移到主从切换时进行,解决SBN进程间隔1min被Hang住的问题。
从优化效果上看,对一个拥有接近五亿元数据量,其中两亿数据块的NameNode,优化前数据块汇报阶段耗时~30min,其中触发超过20次由于计算和检查Quota导致进程Hang住~20s的情况,整个BlockReport阶段存在超过5min无效时间开销,优化后可到~25min。

2、HDFS-7980 简化首次BlockReport处理逻辑优化重启时间;

Fix:2.7.1
NameNode加载完元数据后,所有DataNode尝试开始进行数据块汇报,如果汇报的数据块相关元数据还没有加载,先暂存消息队列,当NameNode完成加载相关元数据后,再处理该消息队列。对第一次块汇报的处理比较特别(NameNode重启后,所有DataNode的BlockReport都会被标记成首次数据块汇报),为提高处理速度,仅验证块是否损坏,之后判断块状态是否为FINALIZED,若是建立数据块与DataNode的映射关系,建立与目录树中文件的关联关系,其他信息一概暂不处理。对于非初次数据块汇报,处理逻辑要复杂很多,对报告的每个数据块,不仅检查是否损坏,是否为FINALIZED状态,还会检查是否无效,是否需要删除,是否为UC状态等等;验证通过后建立数据块与DataNode的映射关系,建立与目录树中文件的关联关系。

初次数据块汇报的处理逻辑独立出来,主要原因有两方面:
(0)加快NameNode的启动时间;测试数据显示含~500M元数据的NameNode在处理800K个数据块的初次块汇报的处理时间比正常块汇报的处理时间可降低一个数量级;
(1)启动过程中,不提供正常读写服务,所以只要确保正常数据(整个Namespace和所有FINALIZED状态Blocks)无误,无效和冗余数据处理完全可以延后到IBR(IncrementalBlockReport)或下次BR(BlockReport);

这本来是非常合理和正常的设计逻辑,但是实现时NameNode在判断是否为首次数据块块汇报的逻辑一直存在问题,导致这段非常好的改进点逻辑实际上长期并未真正执行到,直到HDFS-7980在Hadoop-2.7.1修复该问题。HDFS-7980的优化效果非常明显,测试显示,对含80K Blocks的BlockReport RPC请求的处理时间从~500ms可优化到~100ms,从重启期整个BlockReport阶段看,在超过600M元数据,其中300M数据块的NameNode显示该阶段从~50min优化到~25min。

3、HDFS-7503 解决重启前大删除操作会造成重启后锁内写日志降低处理能力;

Fix:2.7.0
若NameNode重启前产生过大删除操作,当NameNode加载完FSImage并回放了所有EditLog构建起最新目录树结构后,在处理DataNode的BlockReport时,会发现有大量Block不属于任何文件,Hadoop-2.7.0版本前,对于这类情况的输出日志逻辑在全局锁内,由于存在大量IO操作的耗时,会严重拉长处理BlockReport的处理时间,影响NameNode重启时间。HDFS-7503的解决办法非常简单,把日志输出逻辑移出全局锁外。线上效果上看对同类场景优化比较明显,不过如果重启前不触发大的删除操作影响不大。

4、防止热备节点SBN(StandbyNameNode)/冷备节点SNN(SecondaryNameNode)长时间未正常运行堆积大量Editlog拖慢NameNode重启时间;

不论选择HA热备方案SBN(StandbyNameNode)还是冷备方案SNN(SecondaryNameNode)架构,执行Checkpoint的逻辑几乎一致,如图6所示。如果SBN/SNN服务长时间未正常运行,Checkpoint不能按照预期执行,这样会积压大量EditLog。积压的EditLog文件越多,重启NameNode需要加载EditLog时间越长。所以尽可能避免出现SNN/SBN长时间未正常服务的状态。


在一个有500M元数据的NameNode上测试加载一个200K次HDFS事务操作的EditLog文件耗时~5s,按照默认2min的EditLog滚动周期,如果一周时间SBN/SNN未能正常工作,则会累积~5K个EditLog文件,此后一旦发生NameNode重启,仅加载EditLog文件的时间就需要~7h,也就是整个集群存在超过7h不可用风险,所以切记要保证SBN/SNN不能长时间故障。

5、HDFS-6425 HDFS-6772 NameNode重启后DataNode快速退出blockContentsStale状态防止PostponedMisreplicatedBlocks过大影响对其他RPC请求的处理能力;

Fix: 2.6.0, 2.7.0
当集群中大量数据块的实际存储副本个数超过副本数时(跨机房架构下这种情况比较常见),NameNode重启后会迅速填充到PostponedMisreplicatedBlocks,直到相关数据块所在的所有DataNode汇报完成且退出Stale状态后才能被清理。如果PostponedMisreplicatedBlocks数据量较大,每次全遍历需要消耗大量时间,且整个过程也要持有全局锁,严重影响处理BlockReport的性能,HDFS-6425HDFS-6772分别将可能在BlockReport逻辑内部遍历非常大的数据结构PostponedMisreplicatedBlocks优化到异步执行,并在NameNode重启后让DataNode快速退出blockContentsStale状态避免PostponedMisreplicatedBlocks过大入手优化重启效率。

6、降低BlockReport时数据规模;

NameNode处理BlockReport的效率低主要原因还是每次BlockReport所带的Block规模过大造成,所以可以通过调整Block数量阈值,将一次BlockReport分成多盘分别汇报,以提高NameNode对BlockReport的处理效率。可参考的参数为:dfs.blockreport.split.threshold,默认值1,000,000,即当DataNode本地的Block个数超过1,000,000时才会分盘进行汇报,建议将该参数适当调小,具体数值可结合NameNode的处理BlockReport时间及集群中所有DataNode管理的Block量分布确定。

7、重启完成后对比检查数据块上报情况;

前面提到NameNode汇总DataNode上报的数据块量达到预设比例(dfs.namenode.safemode.threshold-pct)后就会退出Safemode,一般情况下,当NameNode退出Safemode后,我们认为已经具备提供正常服务的条件。但是对规模较大的集群,按照这种默认策略及时执行主从切换后,容易出现短时间丢块的问题。考虑在200M数据块的集群,默认配置项dfs.namenode.safemode.threshold-pct=0.999,也就是当NameNode收集到200M*0.999=199.8M数据块后即可退出Safemode,此时实际上还有200K数据块没有上报,如果强行执行主从切换,会出现大量的丢块问题,直到数据块汇报完成。应对的办法比较简单,尝试调大dfs.namenode.safemode.threshold-pct到1,这样只有所有数据块上报后才会退出Safemode。但是这种办法一样不能保证万无一失,如果启动过程中有DataNode汇报完数据块后进程挂掉,同样存在短时间丢失数据的问题,因为NameNode汇总上报数据块时并不检查副本数,所以更稳妥的解决办法是利用主从NameNode的JMX数据对比所有DataNode当前汇报数据块量的差异,当差异都较小后再执行主从切换可以保证不发生上述问题。

8、其他;

除了优化NameNode重启时间,实际运维中还会遇到需要滚动重启集群所有节点或者一次性重启整集群的情况,不恰当的重启方式也会严重影响服务的恢复时间,所以合理控制重启的节奏或选择合适的重启方式尤为关键,HDFS集群启动方式分析一文对集群重启方式进行了详细的阐述,这里就不再展开。

经过多次优化调整,从线上NameNode历次的重启时间监控指标上看,收益非常明显,图7截取了其中几次NameNode重启时元数据量及重启时间开销对比,图中直观显示在500M元数据量级下,重启时间从~4000s优化到~2000s。


这里罗列了一小部分实践过程中可以有效优化重启NameNode时间或者重启全集群的点,其中包括了社区成熟Patch和相关参数优化,虽然实现逻辑都很小,但是实践收益非常明显。当然除了上述提到,NameNode重启还有很多可以优化的地方,比如优化FSImage格式,并行加载等等,社区也在持续关注和优化,部分讨论的思路也值得关注、借鉴和参考。

四、总结

NameNode重启甚至全集群重启在整个Hadoop集群的生命周期内是比较频繁的运维操作,优化重启时间可以极大提升运维效率,避免可能存在的风险。本文通过分析NameNode启动流程,并结合实践过程简单罗列了几个供参考的有效优化点,借此希望能给实践过程提供可优化的方向和思路。

五、参考

[1] NameNode内存全景. http://tech.meituan.com/namenode.html
[2] NameNode内存详解. http://tech.meituan.com/namenode-memory-detail.html
[3] Apache Hadoop. http://hadoop.apache.org/
[4] Hadoop Source. https://github.com/apache/hadoop
[5] HDFS Issues. https://issues.apache.org/jira/browse/HDFS
[6] Cloudera Blog. http://blog.cloudera.com/

Apache Kylin精确去重指标优化

前篇《Apache Kylin精确计数与全局字典揭秘》从精确计数使用场景到当前方案存在的问题及Apache Kylin在解决精确计数问题的思路进行了详细介绍。在此基础上,来自团队同学也是Apache Kylin社区Committer的@Kangkaisen更进一步,将Apache Kylin超高基数的精确去重指标查询提速数十倍。

一、问题背景

某业务方的Cube有12个维度,35个指标,其中13个是精确去重指标,并且有一半以上的精确去重指标单天基数在千万级别,Cube单天数据量1.5亿行左右。

但是一个结果仅有21行的精确去重查询竟然需要12秒多:

1
2
3
4
SELECT A, B, count(distinct uuid), 
FROM table
WHERE dt = 17150
GROUP BY A, B

跟踪整个查询执行过程发现,HBase端耗时~6s,Kylin的Query Server端耗时~5s。

精确去重指标已经在美团点评生产环境大规模使用,精确去重的查询的确比普通Sum指标慢,但是差异并不明显。但是这个查询的性能表现已超出预期,决定分析一下,到底慢在哪。

二、优化过程

2.1 将精确去重指标拆分HBase列族

首先确认了这个Cube的维度设计是合理的,这个查询也精准匹配了cuboid,并且在HBase端也只扫描了21行数据。

那么问题是,为什么在HBase端只扫描21行数据需要~6s?一个显而易见的原因是Kylin的精确去重指标是用bitmap存储的明细数据,而这个Cube有13个精确去重指标,并且基数都很大。

从两方面验证了这个猜想:
(1)同样SQL的查询Sum指标只需要120毫秒,并且HBase端Scan仅需2毫秒;
(2)用HBase HFile命令行工具查看并计算出HFile单个KeyValue的大小,发现普通的指标列族的每个KeyValue大小是29B,精确去重指标列族的每个KeyValue大小是37M;

所以第一个优化就是将精确去重指标拆分到多个HBase列族,优化后效果十分明显。查询时间从12s减少到5.7s,HBase端耗时从6s减少到1.3s,不过Query Server耗时依旧有~4.5s。

2.2 移除不必要的toString避免bitmap deserialize

Kylin的Query Server耗时依旧有4.5s,猜测还是和bitmap比较大有关,但是为什么bitmap大会导致如此耗时呢?

为了分析Query Server端查询处理的时间到底花在了哪,利用[Java Mission Control][1]进行了性能分析。

JMC分析很简单,在Kylin的启动进程中增加以下参数:

1
-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:StartFlightRecording=delay=20s,duration=300s,name=kylin,filename=myrecording.jfr,settings=profile

获得myrecording.jfr文件后,在本机执行jmc,然后打开myrecording.jfr文件就可以进行性能分析。

热点代码的分析如图:


从图中我们可以发现,耗时最多的代码是一个毫无意义的toString。

1
Preconditions.checkState(comparator.compare(last, fetched) <= 0, "Not sorted! last: " + last + " fetched: " + fetched);

其中last和fetched就是一个bitamp。 去掉这个toString之后,Query Server的耗时减少超过1s。

2.3 获取bitmap的字节长度时避免deserialize

在去掉无意义的toString之后,热点代码已经变成了对bitmap的deserialize。

不过bitmap的deserialize共有两处,一处是bitmap本身的deserialize,一处是在获取bitmap的字节长度。

于是很自然的想法就是在获取bitmap的字节长度时避免deserialize bitmap,当时有两种思路: (1)在serialize bitmap时就写入bitmap的字节长度;
(2)在MutableRoaringBitmap序列化的头信息中获取bitmap的字节长度。(Kylin的精确去重使用的bitmap是[RoaringBitmap][2]);

思路1中一个显然问题是如何保证向前兼容,这里向前兼容的方法就是根据MutableRoaringBitmap deserialize时的cookie头信息来确认版本,并在新的serialize方式中写入了版本号,便于之后序列化方式的更新和向前兼容。

经过这个优化后,Kylin Query Server端耗时再次减少超过1s。

2.4 无需上卷聚合的精确去重查询优化

从精确去重指标在美团点评大规模使用以来,我们发现部分用户的应用场景并没有跨Segment上卷聚合的需求,即只需要查询单天的去重值,或是每次全量构建的Cube,也无需跨Segment上卷聚合。

所以我们希望对无需上卷聚合的精确去重查询进行优化,当时考虑了两种可行方案:

方案1:精确去重指标新增一种返回类型

一个极端的做法是对无需跨segment上卷聚合的精确去重查询,我们只存储最终的去重值。

优点:
(1)存储成本会极大降低;
(2)查询速度会明显提高;

缺点:
(1)无法支持上卷聚合,与Kylin指标的设计原则不符合;
(2)无法支持segment的merge,因为要进行merge必须要存储明细的bitmap;
(3)新增一种返回类型,对不清楚的用户可能会有误导;
(4)查询需要上卷聚合时直接报错,用户体验不好,尽管使用这种返回类型的前提是无需上聚合卷;

实现难点: 如果能够接受以上缺点,实现成本并不高,目前没有想到明显的难点。

方案2:serialize bitmap的同时写入distinct count值

优点:
(1)对用户无影响;
(2)符合现在Kylin指标和查询的设计;

缺点:
(1)存储依然需要存储明细的bitmap;
(2)查询速度提升有限,因为即使不进行任何bitmap serialize,bitmap本身太大也会导致HBase scan,网络传输等过程变慢;

实现难点: 如何根据是否需要上卷聚合来确定是否需要serialize bitmap?开始的思路是从查询过程入手,确认在整个查询过程中,哪些地方需要进行上卷聚合。

为此,仔细阅读了Kylin Query Server端的查询代码,HBase Coprocessor端的查询代码,看了Calcite的example例子。发现在HBase端和Kylin Query Server端,Cube build时都有可能需要指标的聚合。

此时又意识到另外一个问题:即使清晰的知道了何时需要聚合,我又该如何把是否聚合的标记传递到精确去重的反序列方法中呢?

现在精确去重的deserialize方法参数只有一个ByteBuffer,如果加参数,就要改变整个kylin指标deserialize的接口,这将会影响所有指标类型,并会造成大范围的改动。所以把这个思路放弃了。

后来想到既然目标是优化无需上卷的精确去重指标,那为什么还要费劲去deserialize出整个bitmap呢,只要个distinct count值就可以。

所以目标就集中在BitmapCounter本身的deserialize上,并联想到早前在Kylin前端加载速度提升十倍的核心思想:延迟加载,就改变了BitmapCounter的deserialize方法,默认只读出distinct count值,不进行bitmap的deserialize,并将那个buffer保留,等到的确需要上卷聚合的时候再根据buffer deserialize 出bitmap。

当然,这个思路可行有一个前提,就是buffer内存拷贝的开销是远小于bitmap deserialize的开销,庆幸的是事实的确如此。

最终经过这个优化,对于无需上卷聚合的精确去重查询,查询速度也有了较大提升。

显然,这个优化加速查询的同时加大了需要上卷聚合的精确去重查询的内存开销。解决的办法:
(1)对于超大数据集并且需要上卷的精确去重查询,用户在分析查询时返回的结果行数应该不会太多;
(2)我们需要做好Query Server端的内存控制;

三、总结

通过总共4个优化,在向前兼容的前提下,后端仅通过100多行的代码改动,对Kylin超高基数的精确去重指标查询有了明显提升,测试中最明显的查询超过50倍的性能提升。

四、反思

(1)善于利用各类命令和工具,快速分析和定位问题;
(2)重写toString,hashCode,equals等基础方法一定要轻量化,避免复杂操作;
(3)设计序列化,通信协议,存储格式时,一定要有版本信息,便于之后的更新和兼容;

五、参考

[1] http://blog.takipi.com/oracle-java-mission-control-the-ultimate-guide/
[2] https://github.com/RoaringBitmap/RoaringBitmap
[3] https://issues.apache.org/jira/browse/KYLIN-2308
[4] https://issues.apache.org/jira/browse/KYLIN-2337
[5] https://issues.apache.org/jira/browse/KYLIN-2349
[6] https://issues.apache.org/jira/browse/KYLIN-2353

Apache Kylin精确计数与全局字典揭秘

Apache Kylin是基于Hadoop之上的SQL多维分析引擎,具有支持海量数据、秒级响应、高并发等特点。Kylin中使用的精确去重计数和全局字典技术在相关领域里非常具有借鉴意义。本文作者来自Apache Kylin社区PMC成员@Sunyerui,谢绝转载。

一、问题背景

在OLAP数据分析中,去重计数(count distinct)是非常常见的需求,且通常希望统计的结果是精确值,也就是说需要精确去重计数。

但在超大的数据集规模上,实现精确去重计数是一件非常困难的事,一句话总结:超大规模数据集下精确去重与快速响应之间的矛盾:

(1)超大规模数据集:意味着需要处理的数据量很多;
(2)精确去重:意味着要保留所有数据细节,这样才能使结果可上卷;
(3)快速响应:需要通过预计算或者内存计算等手段,加快速度;

上述条件相互约束,很难处理。因此目前主流OLAP系统都是采用近似去重计数的方式。

二、现状分析

在Apache Kylin中,目前提供的也是近似去重计数,是基于HLL(HyperLogLog)实现的。

简单来说,每个需要被计数的值都会经过特定Hash函数的计算,将得到的哈希值放入到byte数组中,最后根据特定算法对byte数据的内容进行统计,就可以得到近似的去重结果。

这种方式的好处是,不论数据有多少,byte数组的大小是有理论上限的,通常不会超过128KB,存储压力非常小。也就是说能够满足超大数据集和快速响应的要求。

但最大的问题就是结果是非精确的,这是因为保存的都是经过hash计算的值,而一旦使用hash就一定会有冲突,这就是导致结果不精确的直接原因。此外在实践中发现,hash函数的计算是计算密集型的任务,需要大量的CPU资源。

目前Apache Kylin提供的近似去重计数最高精度误差为1.44%,这在很多场景下是不能满足需求的,这促使我们考虑是否能在现有框架下实现精确去重计数的功能。

三、具体实现

3.1 Int型数据精确去重计数

回顾文初提到三个问题,为了保证去重结果是精确,统计的结果必须要保留细节,而信息保存的最小单位是bit,也就是说在最理想情况下,每个用于统计的值都在结果中用一个bit来保存。这样的分析使我们很容易想到bitmap这种数据结构,它可以以bit为单位保存信息,且可以根据数据分布进行有效压缩,节省最后的存储量。但bitmap通常只能实现对int类型(包括byte和short类型)的处理,所以我们可以先尝试只针对Int型数据实现精确去重计数。

目前业界有很多成熟的bitmap库可用,比如RoaringBitmap,ConciseSet等,其中RoaringBitmap的读写速度最快,存储效率也很高,目前已经在Spark,Drill等开源项目中使用,因此我们也选择了RoaringBitmap。经过试验,证明了我们的想法是可行的,通过实现一种新的基于bitmap的去重计算指标,确实能够实现对Int型数据的精确去重计数。在百万量级的数据量下,存储的结果可以控制在几兆,同时查询能够在秒级内完成,且可以支持任意粒度的上卷聚合计算。

3.2 Trie树与AppendTrie树

上述的结果能够满足Int类型数据的需求,但更多的数据是其它类型的,比如String类型的uuid。那么是否能够采用类似方式支持其它类型的精确去重计数呢?答案是肯定的。只要能将所有数据都统一映射到int类型的数据,那么就可以同样采用bitmap的方式解决问题。这样的映射关系有很多种实现方式,比如基于格式的编码,hash编码等,其中空间和性能效率都比较高,且通用性更强的是基于Trie树的字典编码方式。目前Apache Kylin之前已经实现了TrieDictionary,用于维度值的编码。

Trie树又名前缀树,是是一种有序树,一个节点的所有子孙都有相同的前缀,也就是这个节点对应的字符串,根节点对应空字符串,而每个字符串对应的编码值由对应节点在树中的位置来决定。图1是一棵典型的Trie树示意图,注意并不是每个节点都有对应的字符串值。


在构造Trie树时,将每个值依次加入到树中,可以分为三种情况,如图2所示。


上述的是Trie树在构建过程中的内存模型,当所有数据加入之后,需要将整棵Trie树的数据序列化并持久化。具体的格式如图3所示。


从图3中可以按照,整棵树按照广度优先的顺序依次序列化,其中childOffset和nValuesBeneath的长度是可变的,这是为了根据整棵树的大小尽可能使用更短的数据,降低存储量。此外需要注意到,childOffset的最高两位是标志位,分别标识当前节点是否是最后一个child,以及当前节点是否对应了原始数据的一个值。

当需要从字典中检索某个值的映射id时,直接从序列化后的数据读取即可,不需要反序列化整棵Trie树,这样能在保证检索速度的同时保持较低的内存用量。整个过程如图4所示。


通过上述对Trie树的分析可以看到,Trie树的效率很高,但有一个问题,Trie树的内容是不可变的。也就是说,当一颗Trie树构建完成后,不能再追加新的数据进去,也不能删除现有数据,这是因为每个原始数据对应的映射id是由对应节点在树中的位置决定的,一旦树的结构发生变化,那么会导致部分原始数据的映射id发生变化,从而导致错误的结果。为此,我们需要对Trie树进行改造,使之能够持续追加数据,也就是AppendTrie树。

图5是AppendTrie的序列化格式,可以看到和传统Trie树相比,主要区别在于不保留每个节点的子value数,转而将节点对应的映射id保存到序列化数据中,这样虽然增大了存储空间,但使得整棵树是可以追加的。


经过改造,AppendTrie树已经满足了我们的需求,能够支持我们实现对所有数据的统一编码,进而支持精确去重计数。但从实际情况来看,当统计的数据量超过千万时,整颗树的内存占用和序列化数据都会变得很大,因此需要考虑分片,将整颗树拆成多颗子树,通过控制每颗子树的大小,来实现整棵树的容量扩展。为了实现这个目标,当一棵树的大小超过设定阈值后,需要通过分裂算法将一棵树分裂成两棵子树,图6展示了整个过程。


在一棵AppendTrie树是由多棵子树构成的基础上,我们很容易想到通过LRU之类的算法来控制所有子树的加载和淘汰行为。为此我们基于guava的LoadingCache实现了特定的数据结构CachedTreeMap。这种map继承于TreeMap,同时value通过LoadingCache管理,可以根据策略加载或换出,从而在保证功能的前提下降低了整体的内存占用。

此外,为了支持字典数据的读写并发,数据持久化采用mvcc的理念,每次构建持久化的结果作为一个版本,版本一旦生成就不可再更改,后续更新必须复制版本数据后进行,并持久化为更新的版本。每次读取时则选择当前最新的版本读取,这样就避免了读写冲突。同时设置了基于版本个数和存活时间的版本淘汰机制,保证不会占用过多存储。

经过上述的改进和优化后,AppendTrie树完全达到了我们的要求,可以对所有类型的数据统一做映射编码,支持的数据量可以到几十亿甚至更多,且保持内存占用量的可控,这就是我们的可追加通用字典AppendTrieDictionary。

3.3 全局字典与全类型精确去重计数

基于AppendTrieDictionary,我们可以将任意类型的数据放到一个字典里进行统一映射。在Apache Kylin的现有实现中,Cube的每个Segment都会创建独立的字典,这种方式会导致相同数据在不同Segment字典中被映射成不同的值,这会导致最终的去重结果出错。为此,我们需要在所有Segment中使用同一个字典实例,也就是全局字典GlobaDictionary。

具体来说,根据字典的资源路径(元数据名+库名+表名+列名)可以从元数据中获取同一个字典实例,后续的数据追加也是基于这个唯一的字典实例创建的builder进行的。

另外,经常出现一个cube中需要对多列计算精确去重,这些列的数据基于同一数据源,且所有列是其中某列数据的子集,比如uuid类型的数据,此时可以针对包含全集数据的列建一份全局字典,其它列复用这一个字典即可。

图7展示了整个字典构建到应用的全过程。


四、结论与展望

通过对Bitmap和Trie树的合理运用和针对性优化,以及cache置换策略的应用,我们在Apache Kylin中成功实现了亿级规模的精确去重计数功能,支持任意粒度的上卷聚合,且保证内存可控。这里并没有全新的技术,只是对现有相关技术的组合运用,但解决了业界顶尖的技术难题。这也启发我们除了需要关注新技术,更应该多思考如何挖掘现有技术的潜力。

当然,目前的全局字典还存在一些问题和可改进的点,也欢迎更多讨论和贡献,其中包括:
(1)目前在内存有限,且字典较大时,容易出现字典分片被频繁换入换成的抖动,导致整体效率不高;
(2)全局字典只支持进程内的并发构建,但还不支持跨机器的并发构建;
(3)实际场景中,很多列的数据有高度相似性或属于同一来源,有可能共享同一个全局字典;

Apache CarbonData初探

CarbonData是由华为开源并支持Hadoop的列式存储文件格式,支持索引、压缩以及解编码等。其目的是为了实现同一份数据达到多种需求,而且能够实现更快的交互查询,目前该项目正处于Apache孵化阶段。本文在简单介绍CarbonData基础上,利用SSB基准测试工具对CarbonData与其他多种列式文件存储格式进行简单测试。部分内容来自@大月同学。

一、简介

1.1 背景

针对数据的需求分析主要有以下5点要求:
(1)支持海量数据扫描提取其中某些列;
(2)支持根据主键进行查找的低于秒级响应;
(3)支持海量数据进行交互式查询的秒级响应;
(4)支持快速地抽取单独记录,并且从该记录中获取到所有列信息;
(5)支持HDFS,可以与Hadoop集群进行很好的无缝兼容。
现有的Hadoop生态系统中没有同时满足这五点要求文件格式。比如Parquet/ORC的文件能够满足第一和第五条要求,其他的要求无法满足,基于这些事实华为开发了CarbonData。

1.2 优势

CarbonData文件格式是基于列式存储的,并存储在HDFS之上;其包含了现有列式存储文件格式的许多优点,比如:可分割、可压缩、支持复杂数据类型等。

CarbonData为了解决前面提到的几点要求,加入了许多独特的特性,主要概括为以下四点:
(1)数据及索引:在有过滤的查询中,它可以显著地加速查询性能,减少I/O和CPU资源;CarbonData的索引由多级索引组成,计算引擎可以利用这些索引信息来减少调度和一些处理的开销;扫描数据的时候可以仅仅扫描更细粒度的单元(称为blocklet),而不再是扫描整个文件;
(2)可操作的编码数据:通过支持高效的压缩和全局编码模式,它可以直接在压缩或者编码的数据上查询,仅仅在需要返回结果的时候才进行转换,更好的查询下推;
(3)列组:支持列组,并且使用行格式进行存储,减少查询时行重建的开销;
(4)多种使用场景:顺序存取、随机访问、类OLAP交互式查询等。

1.3 文件格式

一个CarbonData文件是由一系列被称为blocklet组成的,除了blocklet,还有许多其他的元信息,比如模式、偏移量以及索引信息等,这些元信息是存储在CarbonData文件中的footer里。
当在内存中建立索引的时候都需要读取footer里面的信息,因为可以利用这些信息优化后续所有的查询。

每个blocklet又是由许多Data Chunks组成。Data Chunks里面的数据可以按列或者行的形式存储;数据既可以是单独的一列也可以是多列。文件中所有blocklets都包含相同数量和类型的Data Chunks。CarbonData文件格式如图1所示。

CarbonData文件格式

每个Data Chunk又是由许多被称为Pages的单元组成。总共有三种类型的pages:
(1)Data Page:包含一列或者列组的编码数据;
(2)Row ID Page:包含行id的映射,在Data Page以反向索引的形式存储时会被使用;
(3)RLE Page:包含一些额外的元信息,只有在Data Page使用RLE编码的时候会被使用。

二、SSB介绍

SSB全称Star Schema Benchmark,顾名思义,是一套用于测试数据库产品在星型模式下性能表现的基准测试规范,目前在学术界和工业界都得到了广泛的使用。提到数据仓库系统(更广义地说,决策支持系统)的基准测试规范,最权威的莫过于TPC-H和TPC-DS这两套规范,他们都由非营利组织TPC(事务处理性能理事会)发布。SSB实际上就是基于TPC-H修改而来的,将TPC-H的雪花模式简化为了星型模式,将基准查询由TPC-H的复杂Ad-Hoc查询改为了结构更固定的OLAP查询。

SSB的设定为零售业订单的产品、供应商分析场景,Schema包含一张事实表「订单lineorder」和四张维表:「消费者customer」, 「供应商supplier」, 「零件part」, 「日期date」,构成了一个典型的星型模式。图1中,表名下方的"SF * 30,000"代表各表的数据行数。例如,当SF=1时,事实表lineorder包含6,000,000行数据,维表customer包含30,000行数据,Date表的行数固定,不随SF变化。

SSB Schema

SSB的基准查询专注于星型模式下的一类典型查询:读取事实表一次,连接各个维表,对某些维度属性做过滤,最后对某些维度属性分组聚集。在此基础上,SSB重点关注以下方面:
(1)提升「功能覆盖率」:基准查询集合应当尽可能地覆盖对星型模式的各种查询类型。SSB的基准查询分为四组,分别测试带2、3、4个维度属性过滤的情况,基本覆盖了多数场景;
(2)提升「选择度覆盖率」:某个查询最终需要读取的事实表行数(即事实表的选择度)由各个维度过滤条件的FF(过滤因子)决定。基准查询应当覆盖不同的选择度。SSB的基准查询的选择度大到1.9%,小到百万分之1,覆盖了很宽的范围;
(3)减小缓存的影响:如果相邻的两个基准查询扫描的事实数据有很大的重合,后者很有可能直接从缓存中读取数据,这会影响最终的测试结果。因此应当尽量避免基准查询读取重合的事实数据。

SSB的完整规范见http://www.cs.umb.edu/~poneil/StarSchemaB.PDF%E3%80%82

简单说明:
SF(Scale Factor) :生成测试数据集时传入的数据量规模因子,决定了各表最终生成的行数。
FF(Filter Factor):每个where过滤条件筛选出一部分行,被筛选出的行数占过滤前行数的比例叫做FF。在过滤列彼此独立的条件下,表的FF为该表上各个过滤条件FF的乘积。

三、安装使用

SSB的安装是指测试数据集生成工具dbgen的安装,步骤如下:
(1)下载代码。官方提供的代码不支持在Mac上编译,这里我们使用Presto开发人员修改过的版本;

ssb-dbgen
1
2
$ git clone https://github.com/electrum/ssb-dbgen
$ cd ssb-dbgen

(2)如果在Mac上编译,直接运行make即可。如果在Linux上编译,需要修改makefile,将“MACHINE =MAC”改为“MACHINE =LINUX”;
(3)编译后的可执行程序为dbgen,它依赖一个数据分布文件dists.dss。dbgen默认将测试数据生成在当前目录,如果需要生成到其他目录,可以将dbgen和dists.dss拷贝到对应目录使用;
(4)使用dbgen生成测试数据。


四、基准测试用例

SSB提供了一套标准对各个数据仓库/OLAP系统进行性能测试和比较,其最大的特点:使用星型模式、基准查询代表性强、可以生成任意量级的测试数据。SSB的基准查询集分为4组,共13个查询。每组的查询结构类似,但「选择度」不同。这里列出每个查询的SQL和FF以及需要的事实表行数。

ssbquery-set
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
Q1.1
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, date
where lo_orderdate = d_datekey
and d_year = 1993
and lo_discount between 1 and 3
and lo_quantity < 25;
FF = (1/7)*0.5*(3/11) = 0.0194805
# of lineorder = 0.0194805*6,000,000  116,883

Q1.2
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, date
where lo_orderdate = d_datekey
and d_yearmonthnum = 199401
and lo_discount between 4 and 6
and lo_quantity between 26 and 35;
FF = (1/84)*(3/11)*0.2 = 0.00064935
# of lineorder = 0.00064935*6,000,000  3896

Q1.3
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, date
where lo_orderdate = d_datekey
and d_weeknuminyear = 6
and d_year = 1994
and lo_discount between 5 and 7
and lo_quantity between 26 and 35;
FF = (1/364)*(3/11)*0.1 = 0.000075
# of lineorder = 0.000075*6,000,000  450

Q2.1
select sum(lo_revenue), d_year, p_brand1
from lineorder, date, part, supplier
where lo_orderdate = d_datekey and lo_partkey = p_partkey and lo_suppkey = s_suppkey
and p_category = 'MFGR#12'
and s_region = 'AMERICA'
group by d_year, p_brand1
order by d_year, p_brand1;
pcategory = 'MFGR#12', FF = 1/25; sregion, FF=1/5.
FF = (1/25)*(1/5) = 1/125
# of lineorder = (1/125)*6,000,000  48,000

Q2.2
select sum(lo_revenue), d_year, p_brand1
from lineorder, date, part, supplier
where lo_orderdate = d_datekey and lo_partkey = p_partkey and lo_suppkey = s_suppkey
and p_brand1 between 'MFGR#2221' and 'MFGR#2228'
and s_region = 'ASIA'
group by d_year, p_brand1
order by d_year, p_brand1;
FF = (1/125)*(1/5) = 1/625
# of lineorder = (1/625)*6,000,000  9600

Q2.3
select sum(lo_revenue), d_year, p_brand1
from lineorder, date, part, supplier
where lo_orderdate = d_datekey and lo_partkey = p_partkey and lo_suppkey = s_suppkey
and p_brand1 = 'MFGR#2221'
and s_region = 'EUROPE'
group by d_year, p_brand1
order by d_year, p_brand1;
FF = (1/1000)*(1/5) = 1/5000
# of lineorder = (1/5000)*6,000,000  1200

Q3.1
select c_nation, s_nation, d_year, sum(lo_revenue) as revenue
from lineorder, customer, supplier, date
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey
and c_region = 'ASIA'
and s_region = 'ASIA'
and d_year >= 1992 and d_year <= 1997
group by c_nation, s_nation, d_year
order by d_year asc, revenue desc;
FF = (1/5)*(1/5)*(6/7) = 6/175
# of lineorder = (6/175)*6,000,000 ≈ 205,714

Q3.2
select c_city, s_city, d_year, sum(lo_revenue) as revenue
from lineorder, customer, supplier, date
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey
and c_nation = 'UNITED STATES'
and s_nation = 'UNITED STATES'
and d_year >= 1992 and d_year <= 1997
group by c_city, s_city, d_year
order by d_year asc, revenue desc;
FF = (1/25)*(1/25)*(6/7) = 6/4375
# of lineorder = (6/4375)*6,000,000 ≈ 8,228

Q3.3
select c_city, s_city, d_year, sum(lo_revenue) as revenue
from lineorder, customer, supplier, date
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or c_city='UNITED KI5')
and (s_city='UNITED KI1' or s_city='UNITED KI5')
and d_year >= 1992 and d_year <= 1997
group by c_city, s_city, d_year
order by d_year asc, revenue desc;
FF = (1/125)*(1/125)*(6/7) = 6/109375
# of lineorder = (6/109375)*6,000,000 ≈ 329

Q3.4 "needle-in-haystack"
select c_city, s_city, d_year, sum(lo_revenue) as revenue
from lineorder, customer, supplier, date
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or c_city='UNITED KI5')
and (s_city='UNITED KI1' or s_city='UNITED KI5')
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc;
FF = (1/125)*(1/125)*(1/84) = 1/1,312,500
# of lineorder = (1/1,312,500)*6,000,000 ≈ 5

Q4.1
select d_year, c_nation, sum(lo_revenue - lo_supplycost) as profit
from lineorder, date, customer, supplier, part
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_partkey = p_partkey and lo_orderdate = d_datekey
and c_region = 'AMERICA'
and s_region = 'AMERICA'
and (p_mfgr = 'MFGR#1' or p_mfgr = 'MFGR#2')
group by d_year, c_nation
order by d_year, c_nation;
FF = (1/5)(1/5)*(2/5) = 2/125
# of lineorder = (2/125)*6,000,000 ≈ 96000

Q4.2 "Drill Down to Category in 2 Specific Years"
select d_year, s_nation, p_category, sum(lo_revenue - lo_supplycost) as profit
from lineorder, date, customer, supplier, part
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_partkey = p_partkey and lo_orderdate = d_datekey
and c_region = 'AMERICA'
and s_region = 'AMERICA'
and (d_year = 1997 or d_year = 1998)
and (p_mfgr = 'MFGR#1' or p_mfgr = 'MFGR#2')
group by d_year, s_nation, p_category
order by d_year, s_nation, p_category;
FF = (2/7)*(2/125) = 4/875
# of lineorder = (4/875)*6,000,000 ≈ 27,428

Q4.3 "Further Drill Down to cities in US"
select d_year, s_city, p_brand1, sum(lo_revenue - lo_supplycost) as profit
from lineorder, date, customer, supplier, part
where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_partkey = p_partkey and lo_orderdate = d_datekey
and c_region = 'AMERICA'
and s_nation = 'UNITED STATES'
and (d_year = 1997 or d_year = 1998)
and p_category = 'MFGR#14'
group by d_year, s_city, p_brand1
order by d_year, s_city, p_brand1;
FF = (1/5)*(1/25)*(2/7)*(1/25) = 2/21875
# of lineorder = (2/21875)*6,000,000 ≈ 549

五、测试过程

5.1 测试范围

为了尽可能覆盖多种列式存储系统的性能表现,本次测试选择了ORC,Parquet和CarbonData。

5.2 测试数据规模

我们希望测到千万级、亿级和十亿级事实表的规模,因此分别选择了2、20、200的SF(Scale Factor),对应的各表行数如下:


5.3 测试数据准备

由于CarbonData的数据导入必须依赖csv文件,但是dbgen生成的是"|“分割的文本数据,所以利用dbgen生成数据后需要先对测试数据进行一次预处理,将其转换成各系统均可识别的格式。最后,为避免测试过程中,数据缓存对测试结果的影响,为不同的存储系统和SF建立不同的测试表{table}_{format}_{scale},如lineorder_orc_2。
综上考虑,以SF=2为例,数据准备分为三步:
(1)使用dbgen生成文本的测试数据;
(2)预处理生成的测试数据将其转换成csv格式;
(3)将csv格式的测试数据按照{table}_{format}_{scale}导入不同存储格式的表中; 最终在测试库下会生成包括文本存储格式数据在内共60张测试表{table}_{format}_{scale}; 测试数据在HDFS上副本因子统一为3;

附:建表语句

create ssb tables
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
USE ssb;

DROP TABLE IF EXISTS `LINEORDER`;
CREATE TABLE `LINEORDER` (
  LO_ORDERKEY       bigint,
  LO_LINENUMBER     int,
  LO_CUSTKEY        bigint,
  LO_PARTKEY        bigint,
  LO_SUPPKEY        bigint,
  LO_ORDERDATE      int,
  LO_ORDERPRIOTITY  string,
  LO_SHIPPRIOTITY   int,
  LO_QUANTITY       int,
  LO_EXTENDEDPRICE  int,
  LO_ORDTOTALPRICE  int,
  LO_DISCOUNT       int,
  LO_REVENUE        int,
  LO_SUPPLYCOST     int,
  LO_TAX            int,
  LO_COMMITDATE     int,
  LO_SHIPMODE       string
)
STORED BY 'carbondata';

DROP TABLE IF EXISTS `CUSTOMER`;
CREATE TABLE `CUSTOMER` (
  C_CUSTKEY     bigint,
  C_NAME        string,
  C_ADDRESS     string,
  C_CITY        string,
  C_NATION      string,
  C_REGION      string,
  C_PHONE       string,
  C_MKTSEGMENT  string
)
STORED BY 'carbondata';

DROP TABLE IF EXISTS `DATE`;
CREATE TABLE `DATE` (
  D_DATEKEY          int,
  D_DATE             string,
  D_DAYOFWEEK        string,
  D_MONTH            string,
  D_YEAR             int,
  D_YEARMONTHNUM     int,
  D_YEARMONTH        string,
  D_DAYNUMINWEEK     int,
  D_DAYNUMINMONTH    int,
  D_DAYNUMINYEAR     int,
  D_MONTHNUMINYEAR   int,
  D_WEEKNUMINYEAR    int,
  D_SELLINGSEASON    string,
  D_LASTDAYINWEEKFL  int,
  D_LASTDAYINMONTHFL int,
  D_HOLIDAYFL        int,
  D_WEEKDAYFL        int
)
STORED BY 'carbondata';

DROP TABLE IF EXISTS `PART`;
CREATE TABLE `PART`  (
  P_PARTKEY     bigint,
  P_NAME        string,
  P_MFGR        string,
  P_CATEGORY    string,
  P_BRAND1      string,
  P_COLOR       string,
  P_TYPE        string,
  P_SIZE        int,
  P_CONTAINER   string
)
STORED BY 'carbondata';

DROP TABLE IF EXISTS `SUPPLIER`;
CREATE TABLE `SUPPLIER` (
  S_SUPPKEY     bigint,
  S_NAME        string,
  S_ADDRESS     string,
  S_CITY        string,
  S_NATION      string,
  S_REGION      string,
  S_PHONE       string
)
STORED BY 'carbondata';

5.4 测试方法

使用13个基准查询对各系统进行测试,测试期间尽量避免队列资源存在竞争情况。每个基准查询跑三遍,结果取均值。
为减少不同计算引擎之间的差异,本次测试基于Spark 1.5.2进行,详细配置如下:

spark-1.5.2
1
$SPARK_HOME/bin/spark-submit --master yarn-client --queue root.hadoop-hdp.test --executor-cores 4 --executor-memory 8G --num-executors 8 --name "SSB Colume Test"

对于CarbonData选择相同的资源选项。

carbondata-0.1.0
1
$CARBON_HOME/bin/carbon-spark-sql --master yarn-client --queue root.hadoop-hdp.test --executor-cores 4 --executor-memory 8G --num-executors 8 --conf spark.carbon.storepath=hdfs:///user/hive/warehouse/ssb_compress.db --name "SSB Column Test"

附Hadoop集群环境:
(1)JDK:1.7.0_76 HotSpot™ 64-Bit Server
(2)Hadoop:2.7.1
(3)Spark:1.5.2
(4)HDFS Replica Ratio:3

5.5 测试结果

1、数据导入时间对比

数据导入时间可参考:Apache CarbonData Performance Benchmark,数据导入效率表现基本一致,orc与parquet无明显差别,这里不再详述。

2、存储资源占用对比




从数据压缩率的测试结果可以看出:
(1)orc和parquet的数据压缩率与数据本身的关系并不大,基本可以控制在0.3以内,但是CarbonData对数据的压缩能力并不好,对于SSB的测试数据集,压缩率主要集中在0.4上下;
(2)需要关注的的是CarbonData中部分数据甚至超过的原始文本数据大小;

3、查询效率对比

注:下面关于查询效率的测试数据中包含了作业提交时间、JVM启动时间等,数据本身存在少许误差。


从SF=2数据规模下查询效率对比数据来看:
(1)多种存储系统的查询性能差别并不大;
(2) 整体来看,CarbonData稍微优于其他存储系统;
如果考虑到作业提交时间及JVM启动时间的误差,在SF2数据规模下的结果基本没有明显差异;


注:在SF=20数据规模下的测试结果看,结论与SF=2保持一致;


在SF=200的数据规模查询效率对比数据来看:
(1)所有查询模式中,carbon明显优于其他存储系统;
(2)从整体上看,ORC/Parquet没有明显区别,基本可认为性能表现相似;
如果考虑到作业提交时间及JVM启动时间的误差,作业执行时间的差异表现差异非常明显,CarbonData具有非常好的优势;

六、结论

1、从查询性能上看,CarbonData具备非常好的优势,主要原因:
(1)MDK,Min-Max and Inverted Index等辅助信息可对结果数据进行更快速的查询,也能高效重建结果数据;
(2)列组(Column group)技术消除了行数据重建时隐式Join操作;
(3)使用全局字典编码加快计算速度,处理/查询引擎直接在编码数据上进行处理而不需要转换;
(4)延迟解码使得聚合操作更快,只有需要返回结果给用户时才进行解码转换。

2、由于CarbonData在加载数据时需要建立索引和全局字典编码,所以在数据加载和压缩率上比ORC和Parquet都要差,尤其在对一些特殊结构的数据表现较差,更适合一次写多次读且对存储资源使用不敏感的场景。
3、截止2016.10.01最新发布版本carbondata-0.1.0,在功能完备性、系统稳定性等方面还有提升空间,社区也在持续改进完善。
4、从整体上看,CarbonData优异的查询性能表现及社区的持续改进优化,未来非常值得期待。

七、参考

[1] https://github.com/apache/incubator-carbondata
[2] https://cwiki.apache.org/confluence/display/CARBONDATA/CarbonData+Home
[3] https://www.iteblog.com/archives/1806

NameNode RepicationMonitor异常追查

集群版本从2.4.1升级到2.7.1之后,出现了一个诡异的问题,虽然没有影响到线上正常读写服务,但是潜在的问题还是比较严重,经过追查彻底解决,这里简单整理追查过程。

一、问题描述

异常初次出现时收集到的集群异常表现信息有两条:

1、两个关键数据结构持续堆积,监控显示UnderReplicatedBlocks和PendingDeletionBlocks表现明显。

NameNode UnderReplicatedBlocks数据结构变化趋势

NameNode PendingBlocks数据结构变化趋势

说明:没有找到异常同一时间段的监控图,可将上图时间点简单匹配,基本不影响后续的分析。

2、从NameNode的jstack获得信息ReplicationMonitor线程在长期执行chooseRandom函数;

namenode.jstack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
"org.apache.hadoop.hdfs.server.blockmanagement.BlockManager$ReplicationMonitor@254e0df1" daemon prio=10 tid=0x00007f59b4364800 nid=0xa7d9 runnable [0x00007f2baf40b000]
   java.lang.Thread.State: RUNNABLE
        at java.util.AbstractCollection.toArray(AbstractCollection.java:195)
        at java.lang.String.split(String.java:2311)
        at org.apache.hadoop.net.NetworkTopology$InnerNode.getLoc(NetworkTopology.java:282)
        at org.apache.hadoop.net.NetworkTopology$InnerNode.getLoc(NetworkTopology.java:292)
        at org.apache.hadoop.net.NetworkTopology$InnerNode.access$000(NetworkTopology.java:82)
        at org.apache.hadoop.net.NetworkTopology.getNode(NetworkTopology.java:539)
        at org.apache.hadoop.net.NetworkTopology.countNumOfAvailableNodes(NetworkTopology.java:775)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseRandom(BlockPlacementPolicyDefault.java:707)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseTarget(BlockPlacementPolicyDefault.java:383)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseTarget(BlockPlacementPolicyDefault.java:432)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseTarget(BlockPlacementPolicyDefault.java:225)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseTarget(BlockPlacementPolicyDefault.java:120)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager$ReplicationWork.chooseTargets(BlockManager.java:3783)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager$ReplicationWork.access$200(BlockManager.java:3748)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.computeReplicationWorkForBlocks(BlockManager.java:1408)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.computeReplicationWork(BlockManager.java:1314)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.computeDatanodeWork(BlockManager.java:3719)
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager$ReplicationMonitor.run(BlockManager.java:3671)
        at java.lang.Thread.run(Thread.java:745)

由于线上环境的日志级别为INFO,而ReplicationMonitor中INFO级别之上的日志非常少,从中几乎不能获取到任何有用信息;

异常出现场景:
1、坏盘、DataNode Decommision或进程异常退出,但不能稳定复现;
2、外部环境无任何变化和异常,正常读写服务期偶发。

二、追查过程

2.1 处理线上问题

ReplicationMonitor线程运行异常,造成数据块的副本不能及时补充,如果异常长期存在,极有可能出现丢数据的情况,在没有其他信息辅助解决的情况下,唯一的办法就是重启NameNode(传说中的“三大招”之一),好在HA架构的支持,不至于影响到正常数据生产。

2.2 日志

缺少日志,不能定位问题出现的场景,所以首先需要在关键路径上留下必要的信息,方便追查。由于ReplicationMonitor属于独立线程,合理的日志量输出不至于影响服务性能,经过多次调整基本确定需要收集的日志信息:

1、根据NameNode多次jstack信息,怀疑chooseRandom时不停计算countNumOfAvailableNodes,可能存在死循环,尝试输出两类信息:
(1)ReplicationMonitor当前处理的整体参数及正在处理的Block;

BlockManager.javagithub
1
2
3
4
5
LOG.info("numlive = " + numlive);
LOG.info("blockToProcess = " + blocksToProcess);
LOG.info("nodeToProcess = " + nodesToProcess);
LOG.info("blocksInvalidateWorkPct = " + this.blocksInvalidateWorkPct);
LOG.info("workFound = " + workFound);

(2)chooseRandom逻辑中循环体内(调用了countNumOfAvailableNodes)运行超过1min输出该函数入口的所有参数;问题复现后,日志并没有输出,说明异常并不在chooseRandom逻辑本身;

2、结合NameNode的jstack信息并跟进实现逻辑时发现NetworkTopology.InnerNode#getLoc(String loc)的实现存在性能问题:

NetworkTopology.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Given a node's string representation, return a reference to the node
 * @param loc string location of the form /rack/node
 * @return null if the node is not found or the childnode is there but
 * not an instance of {@link InnerNode}
 */
private Node getLoc(String loc) {
  if (loc == null || loc.length() == 0) return this;
  String[] path = loc.split(PATH_SEPARATOR_STR, 2);
  Node childnode = null;
  for(int i=0; i<children.size(); i++) {
    if (children.get(i).getName().equals(path[0])) {
      childnode = children.get(i);
    }
  }
  if (childnode == null) return null; // non-existing node
  if (path.length == 1) return childnode;
  if (childnode instanceof InnerNode) {
    return ((InnerNode)childnode).getLoc(path[1]);
  } else {
    return null;
  }
}

这段逻辑的用意是通过集群网络拓扑结构中节点的字符串标识(如:/IDC/Rack/hostname)获取该节点的对象。实现方法是从拓扑结构中根节点开始逐层向下搜索,直到找到对应的目标节点,逻辑本身没有问题,但是在line286处应该正常break,实现时出现遗漏,其结果是多出一些不必要的时间开销,对于小集群可能影响不大,但是对于IO比较密集的大集群其实影响还是比较大,线下模拟~5K节点的集群拓扑结构,对于NetworkTopology.InnerNode#getLoc(String loc)本身,break可以提升一半的时间开销。

3、通过前面两个阶段仍然不能完全解决问题,只能继续追加日志,这里再次怀疑可能BlockManager.computeReplicationWorkForBlocks(List<List> blocksToReplicate)在调用chooseRandom方法时耗时严重,所以在chooseRandom结束后增加了关键的几条日志:

BlockManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
LOG.info("ReplicationMonitor: block = " + rw.block);
LOG.info("ReplicationMonitor: priority = " + rw.priority);
LOG.info("ReplicationMonitor: srcNode = " + rw.srcNode);
LOG.info("ReplicationMonitor: storagepolicyid = " + rw.bc.getStoragePolicyID());
if (rw.targets == null || rw.targets.length == 0) {
  LOG.info("ReplicationMonitor: targets is empty");
} else {
  LOG.info("ReplicationMonitor: targets.length = " + rw.targets.length);
  for (int i = 0; i < rw.targets.length; i++) {
    LOG.info("ReplicationMonitor: target = " + rw.targets[i] + ", StorageType = " + rw.targets[i].getStorageType());
  }
}
for (Iterator<Node> iterator = excludedNodes.iterator(); iterator.hasNext();) {
  DatanodeDescriptor node = (DatanodeDescriptor) iterator.next();
  LOG.info("ReplicationMonitor: exclude = " + node);
}

包括当前正在处理的Block,优先级(标识缺块的严重程度),源和目标节点集合;(遗漏了关键的信息:Block的Numbytes,后面后详细解释。)

通过这一步基本上能够收集到异常现场信息,同时也可确定异常时ReplicationMonitor的运行情况,从后续的日志也能说明这一点:

namenode.log
1
2
3
4
5
6
7
8
9
10
11
12
2016-04-19 20:08:52,328 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy: Failed to place enough replicas, still in need of 7 to reach 10 (unavailableStorages=[], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=false) For more information, please enable DEBUG log level on org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy
2016-04-19 20:08:52,328 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy: Failed to place enough replicas, still in need of 7 to reach 10 (unavailableStorages=[DISK], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=false) For more information, please enable DEBUG log level on org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy
2016-04-19 20:08:52,328 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy: Failed to place enough replicas, still in need of 7 to reach 10 (unavailableStorages=[DISK, ARCHIVE], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=false) All required storage types are unavailable: unavailableStorages=[DISK, ARCHIVE], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}
2016-04-19 20:08:52,328 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: block = blk_8206926206_7139007477
2016-04-19 20:08:52,328 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: priority = 2
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: srcNode = 10.16.*.*:*
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: storagepolicyid = 0
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: targets is empty
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: exclude = 10.16.*.*:*
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: exclude = 10.16.*.*:*
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: exclude = 10.16.*.*:*
2016-04-19 20:08:52,329 INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: ReplicationMonitor: exclude = 10.16.*.*:**

日志中容易看到当前发生异常时的场景:

(1)Block的副本数尝试从3调整的10;(异常时还有其他副本增加的请求)
(2)ReplicationMonitor尝试进行副本调整时失败,原因是遍历全集群后并没有找到一个合适的节点给该Block提供副本存储,日志显示全集群的节点均被加入到了exclude;

基本能够确定由于chooseRandom函数遍历了全集群导致处理某(些)Block耗时严重,类似情况累积会恶化这种问题;

到这里基本可以解释为什么几个关键数据结构(UnderReplicatedBlocks和PendingDeletionBlocks)的量持续增加,根本原因在于ReplicationMonitor在尝试对某个Block进行副本调整时,遍历全集群不能选出合适的节点,导致处理一个Block都会耗时严重,如果多个类似Block累积会滚雪球式使情况恶化,而且更加糟糕的是UnderReplicatedBlocks本质是一个优先级队列,如果正好这些Block的优先级较高,处理失败发生超时后还会回到原来的优先队列里,导致后续正常Block也会被阻塞,即使在超时时间范围内ReplicationMonitor可以正常工作,限于其本身的限流及周期(3s)运行机制,实际上可处理的规模非常小,而UnderReplicatedBlocks及PendingDeletionBlocks的生产者丝毫没有变慢,所以造成了数据源源不断的进入队列,但是消费非常缓慢。线上监控数据看到某次极端情况一度累积到1000K规模的UnderReplicatedBlocks,其实风险已经非常高了。

虽然从日志能够解释通UnderReplicatedBlocks和PendingDeletionBlocks持续升高了,但是仍然遗留了一个关键问题:为什么在副本调整时全集群遍历都没有选出合适的节点?

2.3 暴力破解

此前已经在社区找到类似问题反馈: https://issues.apache.org/jira/browse/HDFS-8718 但是很遗憾没看到解决方案;

尝试从各种可能和怀疑中解释前面留下的问题并在线下进行各种场景复现:
(1)线下模拟了~5000节点集群规模遍历的时间开销,基本能够反映线上的情况;
(2)构造负载严重不均衡时节点选择的场景,不能复现;
(3)异构存储实现逻辑可能造成的chooseRandom遍历全集群,尝试构造各种异构存储组合并,不能复现;
(4)并发进行删除和副本调整,没有复现;(后面详细介绍)

其实(2)和(3)的验证必要性不是很大,负载问题通过源码简单分析即可,异构存储线上并没有开启。复现结果是:没有结果。

不得已选择临时解决方案:在BlockManager.computeReplicationWorkForBlocks(List<List> blocksToReplicate)的第二个阶段,针对需要调整副本的Block集合批量进行chooseTargets时加入时间判断,并设定了阈值,当超时发生时退出本轮目标选择逻辑,可以解决PendingDeletionBlocks长时间不能被处理到的问题,代价是牺牲少量处理UnderReplicatedBlocks的时间;上线后符合预期,PendingDeletionBlocks规模得到了有效控制,但是UnderReplicatedBlocks的问题依然存在。

2.4 调整参数

期间,我们从前面新增的日志里同时发现了一个有意思的现象,正常情况下workFound的值相对较高,但是一旦出现异常,开始严重下降。workFound标识的是ReplicationMonitor本轮可以调度出去的Block数,影响该值的三个关键参数如下:

hdfs-site.xmlapache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<property>
  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
  <value>5</value>
</property>
<property>
  <name>dfs.namenode.replication.max-streams</name>
  <value>50</value>
  <description>
        The maximum number of outgoing replication streams a given node should have
        at one time considering all but the highest priority replications needed.
  </description>
</property>
<property>
  <name>dfs.namenode.replication.max-streams-hard-limit</name>
  <value>100</value>
  <description>
        The maximum number of outgoing replication streams a given node should have
        at one time.
  </description>
</property>

为控制单DN并发数默认值为<2,2,4>,此外可调度的Block数与集群规模正相关,正常情况其实完全满足运行需求,但是由于存在Block不符预期,所以造成workFound量会下降。
结合集群实际基础环境,尝试大幅提高并发度,设置为<5,50,100>,提高ReplicationMonitor每一轮的处理效率。参数调整后,情况得到了明显改善。

说明:结合实际情况谨慎调整该参数,可能会给集群内的网络带来压力。

虽然通过一系列调整能够暂缓和改善线上情况,但是依然没有回答前面留下的疑问,也没有彻底解决问题。只有从头再来梳理流程。

三、ReplicationMonitor工作流程

ReplicationMonitor是NameNode内部线程,负责维护数据块的副本数稳定,包括清理无效块和对不符预期副本数的Block进行增删工作。ReplicationMonitor是周期运行线程,默认每3s执行一次,主要由两个关键函数组成:computeDatanodeWork和processPendingReplications(rescanPostponedMisreplicatedBlocks在NameNode启动/主从切换被调用,不包括在本次异常分析范围内)。

BlockManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
privateclass ReplicationMonitor implements Runnable {

  @Override
  publicvoid run() {
    while (namesystem.isRunning()) {
      try {
        // Process replication work only when active NN is out of safe mode.
        if (namesystem.isPopulatingReplQueues()) {
          computeDatanodeWork();
          processPendingReplications();
          rescanPostponedMisreplicatedBlocks();
        }
        Thread.sleep(replicationRecheckInterval);
      } catch (Throwable t) {
        ......
      }
    }
  }
}

computeDatanodeWork
(1)从UnderReplicatedBlocks中取出给定阈值(默认为集群节点总数的2倍)数量范围内需要进行复制的Block集合;由于UnderReplicatedBlocks是一个优先级队列,所以每次一定是按照优先级从高到低获取;
(2)遍历选出的Block集合,对于每一个Block,根据当前副本分布及chooseTarget策略,选择合适的DataNode集合作为目标节点,准备副本复制;
(3)将Block进行副本复制的指令分发到NameNode里对应DatanodeDescriptor数据结构中,待该DataNode下次heartbeat过来后及时下发,同时将该Block从UnderReplicatedBlocks拿出来暂存到pendingReplications;
(4)DataNode接收到指令后把对应Block复制到目标节点,复制结束后,目标节点向NameNode汇报RECEIVED_BLOCK,此后便可以从pendingReplications中删除对应的Block;这里引入pendingReplications的目的是防止Block在复制过程中出现异常后超时,当在给定时间内(默认为5min)仍没有完成复制,需要将其从pendingReplications转移到timedOutItems集合中;超时检查的工作由PendingReplicationBlocks#PendingReplicationMonitor负责。
(5)将InvalidateBlocks中待删除的Blocks按照DataNode分组后取出分发到NameNode里对应DatanodeDescriptor数据结构中,同样待该DataNode的heartbeat过来后及时下发删除指令;

processPendingReplications
computeDatanodeWork步骤4出现超时后,将对应的Block从pendingReplications转移到timedOutItems后并没有其他处理逻辑,但是Block复制的事情还得继续,所以还需要将Block再拿回到UnderReplicatedBlocks后重复前面的工作;从timedOutItems拿回到UnderReplicatedBlocks的工作即由processPendingReplications来负责;

可以看出computeDatanodeWork,processPendingReplications和PendingReplicationMonitor组成了一个生产者消费者的环,下图可以说明这个过程。

ReplicationMonitor相关数据流动图示

ReplicationMonitor涉及到两个关键的数据结构:UnderReplicatedBlocks和InvalidateBlocks;这两个数据结构到底是什么,数据哪里来。

(1)UnderReplicatedBlocks:副本数不足的Block集合;
* 写数据完成时进行副本检查,副本不足Block;
* 用户调用setReplication增加副本;
* DataNode节点异常,其上的所有Block;

(2)InvalidateBlocks:无效Block集合;
* 文件删除操作;
* 用户调用setReplication降低副本;

可以简单理解副本调整和数据删除本质上是一个异步操作,当NameNode接收到客户端的setReplication或delete请求后,简单处理后即可返回,实际的工作是由ReplicationMonitor周期异步进行处理。

四、根本原因

继续追踪日志时,针对触发问题的Block检索了所有日志,发现另外一个现象,每次chooseTarget目标节点选择失败后,总会紧跟一条删除操作的日志:

namenode.log
1
2016-04-19 09:23:08,453 INFO BlockStateChange: BLOCK* addToInvalidates: blk_8197702254_7129783384 10.16.*.*:* 10.16.*.*:* 10.16.*.*:**

审计日志中也能对照到同一时间点,对应的文件确实有删除操作。这个现象在多次异常复现时稳定发生。可以猜测与删除操作有关联。

删除操作的流程是先把目录树的节点删除,根据删除结果收集到的Block集合,删除每一个Block。

其中在逐个Block进行删除过程中,发现其逻辑有疑点,主要在block.setNumBytes(BlockCommand.NO_ACK),其中NO_ACK=Long.MAX_VALUE,也即先将该Block的numbytes设置为最大值,再后续的操作。

BlockManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void removeBlock(Block block) {
  assert namesystem.hasWriteLock();
  // No need to ACK blocks that are being removed entirely
  // from the namespace, since the removal of the associated
  // file already removes them from the block map below.
  block.setNumBytes(BlockCommand.NO_ACK);
  addToInvalidates(block);
  removeBlockFromMap(block);
  // Remove the block from pendingReplications and neededReplications
  pendingReplications.remove(block);
  neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
  if (postponedMisreplicatedBlocks.remove(block)) {
    postponedMisreplicatedBlocksCount.decrementAndGet();
  }
}

虽然对目录树的操作及removeBlock的操作均会持有全局写锁,但是很自然将Block的NumBytes设置成Long.MAX_VALUE的逻辑与chooseTarget遍历全集群仍不能选出合适节点的事实结合起来。

接下来自然是验证chooseTarget的处理逻辑:

BlockManager.javagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
......
} finally {
  namesystem.writeUnlock();
}
final Set<Node> excludedNodes = new HashSet<Node>();
for(ReplicationWork rw : work){
  // Exclude all of the containing nodes from being targets.
  // This list includes decommissioning or corrupt nodes.
  excludedNodes.clear();
  for (DatanodeDescriptor dn : rw.containingNodes) {
    excludedNodes.add(dn);
  }
  // choose replication targets: NOT HOLDING THE GLOBAL LOCK
  // It is costly to extract the filename for which chooseTargets is called,
  // so for now we pass in the block collection itself.
  rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
}
namesystem.writeLock();

上面的实现可以看出,chooseTargets之前释放了全局锁,chooseTargets后重新申请到全局锁,唯独中间的chooseTargets在锁之外。至此,问题触发条件、场景等基本清楚:

(1)setReplication将文件的副本调大,此时会有一批属于该文件的Block进入UnderReplicatedBlocks等待ReplicationMonitor处理;
(2)ReplicationMonitor从UnderReplicatedBlocks中取出部分Block,并在前期根据处理逻辑初始化相关参数,将每个Block打包成ReplicationWork,取出的所有Block完成打包后组成ReplicationWork集合,这个过程持有全局锁;
(3)当步骤2释放完全局锁后,被删除请求的RPC抢到全局锁,恰好这次删除操作对应文件即是步骤(1)中的文件,此时Block的NumBytes被设置成Long.MAX_VALUE,并被从BlocksMap,pendingReplications及UnderReplicatedBlocks中删除,但是该Block对象的引用还被步骤(2)中的ReplicationWork集合持有,不会被JVM回收,不同的是ReplicationWork集合中对应Block的NumBytes已经被修改成Long.MAX_VALUE;
(4)ReplicationMonitor中computeReplicationWorkForBlocks继续进行chooseTarget时显然已经不可能在集群中选出合适的节点,即使遍历完整个集群,本质上还是由于块大小已经是Long.MAX_VALUE,不可能有节点能满足需求。

通过单元测试对该场景能够稳定复现。

五、解决方式

问题分析完后,解决办法其实比较简单:

(1)如果ReplicationMonitor遇到了Block的NumBytes=BlockCommand.NO_ACK,直接将该Block从UnderReplicatedBlocks中删除;
(2)如果chooseTarget时遇到了Block的NumBytes=BlockCommand.NO_ACK,直接返回空,无需再遍历整个集群节点;

至此彻底解决了线上隐藏将近了一个月的Bug。线上再没有出现该异常。详细Patch见:https://issues.apache.org/jira/browse/HDFS-10453

六、经验

回头看追查的整个过程,有几点值得总结的经验:

1、日志经过多次才调整到位,中间遗漏了关键的信息(block.getNumBytes),如果开始及时收集到这个信息,可以省去很多时间,所以如果能够准确快速收集关键数据,问题已经解决一半;

2、场景复现时提高并发其实是可以复现的,当时仅利用小工具模拟简单的场景,没有在真实环境进行高并发复现,错过一次可以定位的机会,合理的假设怀疑和严谨的场景复现很重要;

3、虽然问题在线上存在了超过两周时间,但是并没有实际影响到集群正常服务,得益于中间合理可控的缓解手段。如果不能彻底解决可以尝试通过各种方法缓解或绕过问题值得借鉴,这种方法论随处可见,但是只有亲自趟过坑后才能印象深刻。

4、回过头再看整个问题,解决问题的思路没有问题,但追查过程其实存在一个严重Bug,不再展开详述。