HDFS 读文件源码学习

HDFS 读文件源码学习

通过 IDEA 创建一个 maven 项目,在 maven 中加入 hadoop 依赖, 通过 Java 写一个最简单的 Demo,作为 HDFS Client,以 open方法作为入口,学习 HDFS Client 如何与 NameNode 建立通信的源码,关注主流程即可。

本文代码基于 Hadoop 3.2.1,不同版本源码可能会有一点不同。

public class HdfsUtil {
    public static void main(String[] args) throws IOException {
        // 创建配置文件,识别 HDFS 文件系统
        Configuration conf = new Configuration();
        // 设置 HDFS 文件系统的主机和端口,此配置对应 core-site.xml 文件中的 fs.defaultFS
        conf.set("fs.defaultFS", "hdfs://sandbox:8020");
        // 获取 FileSystem 对象,这是抽象类,在这里实际上是 DistributedFileSystem
        FileSystem fs = FileSystem.get(conf);
        // 打开 HDFS 文件输入流
        FSDataInputStream inputStream = fs.open(new Path("/test.txt"));
    }
}

进入 open 方法

public FSDataInputStream open(Path f) throws IOException {
  // IO_FILE_BUFFER_SIZE_KEY = io.file.buffer.size  读写文件时缓存的大小
  // IO_FILE_BUFFER_SIZE_DEFAULT = 4096
  // 调用下面这个抽象方法
  return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
      IO_FILE_BUFFER_SIZE_DEFAULT));
}
// 抽象方法,HDFS 中对应的实现类是:DistributedFileSystem
public abstract FSDataInputStream open(Path f, int bufferSize)
  throws IOException;

DistributedFileSystem 中的 open 方法

public FSDataInputStream open(Path f, final int bufferSize)
    throws IOException {
  // 读操作的次数加一,statistics 中记录 HDFS 读写等数据信息
  statistics.incrementReadOps(1);
  // 和上面类似,更新一些 HDFS 相关数据
  storageStatistics.incrementOpCounter(OpType.OPEN);
  // 将相对路径转化为绝对路径
  Path absF = fixRelativePart(f);
  // 匿名内部类,实现了 doCall 和 next 方法,在最后 resolove 方法中调用了这两个方法
  return new FileSystemLinkResolver<FSDataInputStream>() {
    @Override
    public FSDataInputStream doCall(final Path p) throws IOException {
      final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
      return dfs.createWrappedInputStream(dfsis);
    }
    @Override
    public FSDataInputStream next(final FileSystem fs, final Path p)
        throws IOException {
      return fs.open(p, bufferSize);
    }
  }.resolve(this, absF);
}

在 resolve 方法中,主要是调用 doCall 方法,出现异常后才会调用 next 方法。

public FSDataInputStream doCall(final Path p) throws IOException {
  // 通过 dfs 获得 InputStream 流对象
  final DFSInputStream dfsis =
      dfs.open(getPathName(p), bufferSize, verifyChecksum);
  // 将上一个流对象包装成 HdfsDataInputStream 对象再返回
  return dfs.createWrappedInputStream(dfsis);
}

dfsDistributedFileSystem 中的一个属性,它对应的类是 DFSClient,通过DFSClient 与 NameNode 进行通信,与 DataNode 进行读写操作。我们来看看 DFSClient 类对自己的描述,这一段非常清晰的解释了它的作用。

DFSClient can connect to a Hadoop Filesystem and perform basic file tasks.  It uses the ClientProtocol to communicate with a NameNode daemon, and connects directly to DataNodes to read/write block data.
Hadoop DFS users should obtain an instance of DistributedFileSystem, which uses DFSClient to handle filesystem tasks.

进入 dfs.open() 方法,src 表示文件位置,buffersize 表示缓存大小,verifyChecksum 表示是否进行校验和检查。

public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
    throws IOException {
  // 检查文件系统是否打开处于可用状态
  checkOpen();
  try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
    // 从 NameNode 获取 block 信息
    LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
    // 构造 DFSInputStream 对象
    return openInternal(locatedBlocks, src, verifyChecksum);
  }
}

进入 getLocatedBlocks ,最后会通过 DFSClient 调用以下方法,这里已经出现了 NameNode,

static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
    String src, long start, long length)
    throws IOException {
  try {
    // 通过 client 请求 NameNode
    return namenode.getBlockLocations(src, start, length);
  } catch(RemoteException re) {
    throw re.unwrapRemoteException(AccessControlException.class,
        FileNotFoundException.class,
        UnresolvedPathException.class);
  }
}

最终通过 RPC 远程调用的方式获取 NameNode block 信息,返回给客户端。

public LocatedBlocks getBlockLocations(String src, long offset, long length)
    throws IOException {
  GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
      .newBuilder()
      .setSrc(src)
      .setOffset(offset)
      .setLength(length)
      .build();
  try {
    // rpc 代理对象,通过 rpc 远程过程调用获取 block 信息
    GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null,
        req);
    return resp.hasLocations() ?
        PBHelperClient.convert(resp.getLocations()) : null;
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
}

这一步是获取 LocatedBlocks,完成这一步仅仅是获取到了 block 信息,需要返回到 构造 DFSInputStream 对象 这一步,将 LocatedBlocks 作为参数,调用 openInternal 方法,接着会通过构造函数创建 DFSInputStream 对象,以下是它的构造方法:

DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
    LocatedBlocks locatedBlocks) throws IOException {
  this.dfsClient = dfsClient;
  this.verifyChecksum = verifyChecksum;
  this.src = src;
  synchronized (infoLock) {
    this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
  }
  this.locatedBlocks = locatedBlocks;
  openInfo(false);
}

主要是设置 DFSInputStream 对象的参数值,接着查看 openInfo 方法,这里主要是获取最后一块 block 的信息,因为前面的 block 都是 128M(根据配置可以更改),最后一块可能小于 128M,比较特殊。

fetchLocatedBlocksAndGetLastBlockLength该方法最终也会通过 DFSClient,通过 RPC 远程从 NameNode 获取 block 信息。

void openInfo(boolean refreshLocatedBlocks) throws IOException {
  final DfsClientConf conf = dfsClient.getConf();
  synchronized(infoLock) {
    lastBlockBeingWrittenLength =
        fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
    int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
    while (retriesForLastBlockLength > 0) {
      // Getting last block length as -1 is a special case. When cluster
      // restarts, DNs may not report immediately. At this time partial block
      // locations will not be available with NN for getting the length. Lets
      // retry for 3 times to get the length.
      if (lastBlockBeingWrittenLength == -1) {
        DFSClient.LOG.warn("Last block locations not available. "
            + "Datanodes might not have reported blocks completely."
            + " Will retry for " + retriesForLastBlockLength + " times");
        waitFor(conf.getRetryIntervalForGetLastBlockLength());
        lastBlockBeingWrittenLength =
            fetchLocatedBlocksAndGetLastBlockLength(true);
      } else {
        break;
      }
      retriesForLastBlockLength--;
    }
    if (lastBlockBeingWrittenLength == -1
        && retriesForLastBlockLength == 0) {
      throw new IOException("Could not obtain the last block locations.");
    }
  }
}

总结

HDFS 读文件时与读本地文件类似,通过打开文件流的方式进行读取,主要关注 DistributedFileSystem 这个实现类,使用 DFSClient 与 HDFS 通信,从 NameNode 获取 block 信息列表。

i

发表评论

电子邮件地址不会被公开。 必填项已用*标注