HDFS数据流–在HDFS中读写文件

时间:2020-01-09 10:34:31  来源:igfitidea点击:

在这篇文章中,我们将看到Hadoop中的HDFS数据流。在HDFS中读取文件时内部会发生什么,而在HDFS中写入文件时会内部发生什么。

查询配置

在HDFS中读取或者写入文件时,Hadoop框架要做的第一件事是查阅配置文件(core-site.xml和core-default.xml)以获取使用的FileSystem。查找的属性是fs.defaultFS,它具有URI作为值(hdfs:// hostname:port)。在URI方案中使用HDFS。

它必须寻找的另一个属性是fs.SCHEME.impl形式,该名称命名为FileSystem实现类。由于方案是HDFS,因此查找的配置属性是fs.hdfs.impl,值是DistributedFileSystem(实现类)。请注意,在最新版本中,此属性fs.hdfs.impl已被fs.AbstractFileSystem.hdfs.impl取代,其值为Hdfs。因此,默认情况下,HDFS文件系统的实现类不是org.apache.hadoop.hdfs.DistributedFileSystem.java,而是org.apache.hadoop.fs.Hdfs.java。在本文中,我们将采用DistributedFileSystem类作为HDFS方案的实现类。

获取DFSClient的实例

确定实现类(即DistributedFileSystem.java)并对其进行初始化后,此类DistributedFileSystem会依次创建DFSClient的实例。

DFSClient可以连接到Hadoop Filesystem并执行基本文件任务。 DFSClient读取与HDFS相关的配置,其中包括块大小(dfs.blocksize)和块复制因子(dfs.replication)的配置。

无论我们是从HDFS读取文件还是将文件写入HDFS,此阶段过程都很普遍。现在,让我们看看将文件写入HDFS时会发生什么。

将文件写入HDFS –内部步骤

一旦通过任何客户端应用程序发出将文件写入HDFS的请求,执行上述步骤(初始化DistributedFileSystem类并创建DFSClient的实例)之后,将调用DistributedFileSystem的create()方法。

请参阅Java程序在HDFS中写入文件,以了解如何使用Hadoop Java API在HDFS中写入文件。

DistributedFileSystem还将连接到Namenode来暗示它有关创建有关新文件的元数据。 Namenode执行与新文件相关的各种检查。如果验证失败,则不会创建文件,并且IOExcpetion会返回给客户端。

如果验证通过,则Namenode将存储有关文件的元数据。从DistributedFileSystem的create()方法中,将依次调用DFSClient的create()方法,该方法返回DFSOutputStream,通过该流传输数据。

客户端写入数据时,它由DFSOutputStream在内部缓存。数据也分解为数据包,其中每个数据包的大小通常为64K。这些数据包入队到dataQueue中。

还有另一个DataStreamer类,负责将这些数据包发送到管道中的Datanodes。 DataStreamer类从Namenode检索具有块位置(必须在其中写入文件块)的Datanode列表。如果我们采用默认的复制因子3,则管道中将有三个Datanode。

DataStreamer线程从dataQueue中拾取数据包,将其发送到存储它的管道中的第一个数据节点,然后Datanode将那些数据包转发到存储它们的第二个Datanode,然后将数据包转发到管道中的第三个Datanode。

除了dataQueue之外,DFSOutputStream还维护另一个称为ackQueue的队列。当DataStreamer线程将数据包发送到管道中的第一个Datanode时,它将数据包从dataQueue移动到ackQueue。仅当从管道中的所有Datanode收到成功的数据包确认时,才会从ackQueue中删除相应的数据包。请注意,来自Datanode的确认也以相反的顺序进行流水线处理。

当管道中的每个DataNode已完成本地写入块时,DataNode也会将其块存储通知给NameNode。

如果发生任何错误(例如,写入块的Datanode失败),则关闭管道,并将所有未完成的数据包从ackQueue中移出并添加到dataQueue的前端。通过从原始管道中消除错误的datanode来建立新的管道。现在,DataStreamer开始从dataQueue发送数据包。

将所有文件数据写入流后,客户端将在流上调用close()。在关闭流之前,队列中剩余的数据包会被刷新到数据节点并收到确认。然后,仅通知Namenode信号完成。

下图表示在HDFS中写入文件的情况下的HDFS数据流。

HDFS文件写入数据流

从HDFS读取文件–内部步骤

现在,让我们看看在HDFS中读取文件的内部流程。

一旦通过任何客户端应用程序请求从HDFS读取文件,则在执行上述常见步骤(初始化DistributedFileSystem类并创建DFSClient实例)之后,将调用DistributedFileSystem的open()方法,该方法又调用open ()方法在DFSClient上创建DFSInputStream的实例。

请参阅Java程序以从HDFS读取文件,以了解如何使用Hadoop Java API来读取HDFS中的文件。

DFSInputStream连接到Namenode以获取具有该文件的前几个块的文件块的Datanode列表。在Namenode返回的列表中,Datanode也按它们与客户端的接近程度进行排序。如果客户端应用程序恰好在存储文件块的同一Datanode上运行,则该Datanode优先于任何远程节点。

然后客户端在流上调用read(),DFSInputStream已经具有一个Datanode的列表,并连接到具有文件的第一个块的Datanode,并继续流式传输该块直到到达块的末尾。然后,关闭与该Datanode的连接,并对具有下一个块的Datanode重复相同的过程。

并行地,如果需要,DFSInputStream还将与Namenode通信以获取文件更多块的datanode位置。

读取文件的所有块后,客户端将在FSDataInputStream上调用close()。

如果从Datanode读取块数据时发生任何错误,DFSINputStream将连接到该块的下一个最近的Datanode。请注意,如果我们采用默认的复制因子3,则每个块都存储在三个Datanodes中。