
HDFS 读文件源码学习
通过 IDEA 创建一个 maven 项目,在 maven 中加入 hadoop 依赖, 通过 Java 写一个最简单的 Demo,作为 HDFS Client,以 open
方法作为入口,学习 HDFS Client 如何与 NameNode 建立通信的源码,关注主流程即可。
本文代码基于 Hadoop 3.2.1,不同版本源码可能会有一点不同。
public class HdfsUtil {
public static void main(String[] args) throws IOException {
// 创建配置文件,识别 HDFS 文件系统
Configuration conf = new Configuration();
// 设置 HDFS 文件系统的主机和端口,此配置对应 core-site.xml 文件中的 fs.defaultFS
conf.set("fs.defaultFS", "hdfs://sandbox:8020");
// 获取 FileSystem 对象,这是抽象类,在这里实际上是 DistributedFileSystem
FileSystem fs = FileSystem.get(conf);
// 打开 HDFS 文件输入流
FSDataInputStream inputStream = fs.open(new Path("/test.txt"));
}
}
进入 open
方法
public FSDataInputStream open(Path f) throws IOException {
// IO_FILE_BUFFER_SIZE_KEY = io.file.buffer.size 读写文件时缓存的大小
// IO_FILE_BUFFER_SIZE_DEFAULT = 4096
// 调用下面这个抽象方法
return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT));
}
// 抽象方法,HDFS 中对应的实现类是:DistributedFileSystem
public abstract FSDataInputStream open(Path f, int bufferSize)
throws IOException;
DistributedFileSystem
中的 open
方法
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
// 读操作的次数加一,statistics 中记录 HDFS 读写等数据信息
statistics.incrementReadOps(1);
// 和上面类似,更新一些 HDFS 相关数据
storageStatistics.incrementOpCounter(OpType.OPEN);
// 将相对路径转化为绝对路径
Path absF = fixRelativePart(f);
// 匿名内部类,实现了 doCall 和 next 方法,在最后 resolove 方法中调用了这两个方法
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p) throws IOException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
在 resolve 方法中,主要是调用 doCall 方法,出现异常后才会调用 next 方法。
public FSDataInputStream doCall(final Path p) throws IOException {
// 通过 dfs 获得 InputStream 流对象
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
// 将上一个流对象包装成 HdfsDataInputStream 对象再返回
return dfs.createWrappedInputStream(dfsis);
}
dfs
是 DistributedFileSystem
中的一个属性,它对应的类是 DFSClient
,通过DFSClient
与 NameNode 进行通信,与 DataNode 进行读写操作。我们来看看 DFSClient
类对自己的描述,这一段非常清晰的解释了它的作用。
DFSClient can connect to a Hadoop Filesystem and perform basic file tasks. It uses the ClientProtocol to communicate with a NameNode daemon, and connects directly to DataNodes to read/write block data.
Hadoop DFS users should obtain an instance of DistributedFileSystem, which uses DFSClient to handle filesystem tasks.
进入 dfs.open()
方法,src
表示文件位置,buffersize
表示缓存大小,verifyChecksum
表示是否进行校验和检查。
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException {
// 检查文件系统是否打开处于可用状态
checkOpen();
try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
// 从 NameNode 获取 block 信息
LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
// 构造 DFSInputStream 对象
return openInternal(locatedBlocks, src, verifyChecksum);
}
}
进入 getLocatedBlocks
,最后会通过 DFSClient 调用以下方法,这里已经出现了 NameNode,
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length)
throws IOException {
try {
// 通过 client 请求 NameNode
return namenode.getBlockLocations(src, start, length);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
}
}
最终通过 RPC 远程调用的方式获取 NameNode block 信息,返回给客户端。
public LocatedBlocks getBlockLocations(String src, long offset, long length)
throws IOException {
GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
.newBuilder()
.setSrc(src)
.setOffset(offset)
.setLength(length)
.build();
try {
// rpc 代理对象,通过 rpc 远程过程调用获取 block 信息
GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null,
req);
return resp.hasLocations() ?
PBHelperClient.convert(resp.getLocations()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
这一步是获取 LocatedBlocks
,完成这一步仅仅是获取到了 block 信息,需要返回到 构造 DFSInputStream 对象 这一步,将 LocatedBlocks
作为参数,调用 openInternal
方法,接着会通过构造函数创建 DFSInputStream
对象,以下是它的构造方法:
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
LocatedBlocks locatedBlocks) throws IOException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
this.locatedBlocks = locatedBlocks;
openInfo(false);
}
主要是设置 DFSInputStream
对象的参数值,接着查看 openInfo
方法,这里主要是获取最后一块 block 的信息,因为前面的 block 都是 128M(根据配置可以更改),最后一块可能小于 128M,比较特殊。
fetchLocatedBlocksAndGetLastBlockLength
该方法最终也会通过 DFSClient,通过 RPC 远程从 NameNode 获取 block 信息。
void openInfo(boolean refreshLocatedBlocks) throws IOException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
}
retriesForLastBlockLength--;
}
if (lastBlockBeingWrittenLength == -1
&& retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}
总结
HDFS 读文件时与读本地文件类似,通过打开文件流的方式进行读取,主要关注 DistributedFileSystem
这个实现类,使用 DFSClient 与 HDFS 通信,从 NameNode 获取 block 信息列表。