[TOC]
尚硅谷大数据技术之Hadoop源码解析
(作者:尚硅谷大数据研发部)
版本:V3.3
第0章 RPC通信原理解析
0)回顾
1)需求:
模拟RPC的客户端、服务端、通信协议三者如何工作的
2)代码编写:
(1)在HDFSClient项目基础上创建包名com.atguigu.rpc
(2)创建RPC协议
1 | package com.atguigu.rpc; |
(3)创建RPC服务端
1 | package com.atguigu.rpc; |
(4)创建RPC客户端
1 | package com.atguigu.rpc; |
3)测试
(1)启动服务端
观察控制台打印:服务器开始工作
在控制台Terminal窗口输入,jps,查看到NNServer服务
(2)启动客户端
观察客户端控制台打印:我是客户端
观察服务端控制台打印:服务端,创建路径/input
4)总结
RPC的客户端调用通信协议方法,方法的执行在服务端;
通信协议就是接口规范。
第1章 NameNode启动源码解析
0)在pom.xml中增加如下依赖
1 | <dependencies> |
1)ctrl + n 全局查找namenode,进入NameNode.java
NameNode官方说明
1 | NameNode serves as both directory namespace manager and "inode table" for the Hadoop DFS. There is a single NameNode running in any DFS deployment. (Well, except when there is a second backup/failover NameNode, or when using federated NameNodes.) The NameNode controls two critical tables: 1) filename->blocksequence (namespace) 2) block->machinelist ("inodes") The first table is stored on disk and is very precious. The second table is rebuilt every time the NameNode comes up. 'NameNode' refers to both this class as well as the 'NameNode server'. The 'FSNamesystem' class actually performs most of the filesystem management. The majority of the 'NameNode' class itself is concerned with exposing the IPC interface and the HTTP server to the outside world, plus some configuration management. NameNode implements the ClientProtocol interface, which allows clients to ask for DFS services. ClientProtocol is not designed for direct use by authors of DFS client code. End-users should instead use the FileSystem class. NameNode also implements the DatanodeProtocol interface, used by DataNodes that actually store DFS data blocks. These methods are invoked repeatedly and automatically by all the DataNodes in a DFS deployment. NameNode also implements the NamenodeProtocol interface, used by secondary namenodes or rebalancing processes to get partial NameNode state, for example partial blocksMap etc. |
2)ctrl + f,查找main方法
NameNode.java
1 | public static void main(String argv[]) throws Exception { |
点击createNameNode
1 | public static NameNode createNameNode(String argv[], Configuration conf) |
点击NameNode
1 | public NameNode(Configuration conf) throws IOException { |
点击initialize
1 | protected void initialize(Configuration conf) throws IOException { |
1.1 启动9870端口服务
1)点击startHttpServer
1 | private void startHttpServer(final Configuration conf) throws IOException { |
2)点击startHttpServer方法中的httpServer.start();
1 | void start() throws IOException { |
点击setupServlets
1 | private static void setupServlets(HttpServer2 httpServer, Configuration conf) { |
1.2 加载镜像文件和编辑日志
1)点击loadNamesystem
NameNode.java
1 | protected void loadNamesystem(Configuration conf) throws IOException { |
1.3 初始化NN的RPC服务端
1)点击createRpcServer
NameNode.java
1 | protected NameNodeRpcServer createRpcServer(Configuration conf) throws IOException { |
NameNodeRpcServer.java
1 | public NameNodeRpcServer(Configuration conf, NameNode nn) |
1.4 NN启动资源检查
1)点击startCommonServices
NameNode.java
1 | private void startCommonServices(Configuration conf) throws IOException { |
2)点击startCommonServices
FSNamesystem.java
1 | void startCommonServices(Configuration conf, HAContext haContext) throws IOException { |
点击NameNodeResourceChecker
NameNodeResourceChecker.java
1 | public NameNodeResourceChecker(Configuration conf) throws IOException { |
点击checkAvailableResources
FNNamesystem.java
1 | void checkAvailableResources() { |
NameNodeResourceChecker.java
1 | public boolean hasAvailableDiskSpace() { |
NameNodeResourcePolicy.java
1 | static boolean areResourcesAvailable( |
ctrl + h,查找实现类CheckedVolume
NameNodeResourceChecker.java
1 | public boolean isResourceAvailable() { |
1.5 NN对心跳超时判断
Ctrl + n 搜索namenode,ctrl + f搜索startCommonServices
点击namesystem.startCommonServices(conf, haContext);
点击blockManager.activate(conf, completeBlocksTotal);
点击datanodeManager.activate(conf);
DatanodeManager.java
1 | void activate(final Configuration conf) { |
DatanodeManager.java
1 | void activate() { |
1.6 安全模式
FSNamesystem.java
1 | void startCommonServices(Configuration conf, HAContext haContext) throws IOException { |
点击getCompleteBlocksTotal
1 | public long getCompleteBlocksTotal() { |
点击activate
1 | public void activate(Configuration conf, long blockTotal) { |
点击activate
1 | void activate(long total) { |
点击setBlockTotal
1 | void setBlockTotal(long total) { |
点击areThresholdsMet
1 | private boolean areThresholdsMet() { |
第2章 DataNode启动源码解析
0)在pom.xml中增加如下依赖
1 | <dependencies> |
1)ctrl + n 全局查找datanode,进入DataNode.java
DataNode官方说明
1 | DataNode is a class (and program) that stores a set of blocks for a DFS deployment. A single deployment can have one or many DataNodes. Each DataNode communicates regularly with a single NameNode. It also communicates with client code and other DataNodes from time to time. DataNodes store a series of named blocks. The DataNode allows client code to read these blocks, or to write new block data. The DataNode may also, in response to instructions from its NameNode, delete blocks or copy blocks to/from other DataNodes. The DataNode maintains just one critical table: block-> stream of bytes (of BLOCK_SIZE or less) This info is stored on a local disk. The DataNode reports the table's contents to the NameNode upon startup and every so often afterwards. DataNodes spend their lives in an endless loop of asking the NameNode for something to do. A NameNode cannot connect to a DataNode directly; a NameNode simply returns values from functions invoked by a DataNode. DataNodes maintain an open server socket so that client code or other DataNodes can read/write data. The host/port for this server is reported to the NameNode, which then sends that information to clients or other DataNodes that might be interested. |
2)ctrl + f,查找main方法
DataNode.java
1 | public static void main(String args[]) { |
2.1 初始化DataXceiverServer
点击initDataXceiver
1 | private void initDataXceiver() throws IOException { |
2.2 初始化HTTP服务
点击startInfoServer();
DataNode.java
1 | private void startInfoServer() |
DatanodeHttpServer.java
1 | public DatanodeHttpServer(final Configuration conf, |
2.3 初始化DN的RPC服务端
点击initIpcServer
DataNode.java
1 | private void initIpcServer() throws IOException { |
2.4 DN向NN注册
点击refreshNamenodes
BlockPoolManager.java
1 | void refreshNamenodes(Configuration conf) |
点击startAll()
1 | synchronized void startAll() throws IOException { |
点击start ()
BPOfferService.java
1 | void start() { |
点击start ()
BPServiceActor.java
1 | void start() { |
ctrl + f 搜索run方法
1 | public void run() { |
点击register
1 | void register(NamespaceInfo nsInfo) throws IOException { |
ctrl + n 搜索NameNodeRpcServer
NameNodeRpcServer.java
ctrl + f 在NameNodeRpcServer.java中搜索registerDatanode
1 | public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) |
FSNamesystem.java
1 | void registerDatanode(DatanodeRegistration nodeReg) throws IOException { |
BlockManager.java
1 | public void registerDatanode(DatanodeRegistration nodeReg) |
2.5 向NN发送心跳
点击BPServiceActor.java中的run方法中的offerService方法
BPServiceActor.java
1 | private void offerService() throws Exception { |
ctrl + n 搜索NameNodeRpcServer
NameNodeRpcServer.java
1 | public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, |
点击handleHeartbeat
DatanodeManager.java
1 | public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, |
HeartbeatManager.java
1 | synchronized void updateHeartbeat(final DatanodeDescriptor node, |
BlockManager.java
1 | void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports, |
DatanodeDescriptor.java
1 | void updateHeartbeat(StorageReport[] reports, long cacheCapacity, |
第3章 HDFS上传源码解析
3.1 create创建过程
添加依赖
1 | <dependencies> |
3.1.1 DN向NN发起创建请求
用户自己写的代码
1 | @Test |
FileSystem.java
1 | public FSDataOutputStream create(Path f) throws IOException { |
选中create,点击ctrl+h,找到实现类DistributedFileSystem.java,查找create方法。
DistributedFileSystem.java
1 | @Override |
点击create,进入DFSClient.java
1 | public DFSOutputStream create(String src, FsPermission permission, |
点击newStreamForCreate,进入DFSOutputStream.java
1 | static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, |
3.1.2 NN处理DN的创建请求
1)点击create
ClientProtocol.java
1 | HdfsFileStatus create(String src, FsPermission masked, |
2)Ctrl + h查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create
NameNodeRpcServer.java
1 | public HdfsFileStatus create(String src, FsPermission masked, |
FSNamesystem.java
1 | HdfsFileStatus startFile(String src, PermissionStatus permissions, |
3.1.3 DataStreamer启动流程
NN处理完DN请求后,再次回到DN端,启动对应的线程
DFSOutputStream.java
1 | static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, |
点击DFSOutputStream
1 | protected DFSOutputStream(DFSClient dfsClient, String src, |
1)点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java
1 | protected synchronized void start() { |
点击DataStreamer,进入DataStreamer.java
1 | class DataStreamer extends Daemon { |
点击Daemon,进入Daemon.java
1 | public class Daemon extends Thread { |
说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法
DataStreamer.java
1 | @Override |
3.2 write上传过程
3.1.1 向DataStreamer的队列里面写数据
1)用户写的代码
1 | @Test |
2)点击write
1 | public void write(byte b[]) throws IOException { |
3)点击write
OutputStream.java
1 | public abstract void write(int b) throws IOException; |
ctrl + h 查找write实现类,选择FSOutputSummer.java,在该类中查找write
FSOutputSummer.java
1 | public synchronized void write(int b) throws IOException { |
ctrl + h 查找writeChunk实现类DFSOutputStream.java
1 |
|
DataStreamer.java
1 | void queuePacket(DFSPacket packet) { |
3.1.2 建立管道之机架感知(块存储位置)
Ctrl + n全局查找DataStreamer,搜索run方法
DataStreamer.java
1 | @Override |
点击nextBlockOutputStream
1 | protected LocatedBlock nextBlockOutputStream() throws IOException { |
ctrl + h 点击NameNodeRpcServer,在该类中搜索addBlock
NameNodeRpcServer.java
1 | public LocatedBlock addBlock(String src, String clientName, |
FSNamesystrm.java
1 | LocatedBlock getAdditionalBlock( |
Crtl + h 查找chooseTarget实现BlockPlacementPolicyDefault.java
1 | LocatedBlock getAdditionalBlock( |
3.1.3 建立管道之Socket发送
点击nextBlockOutputStream
1 | protected LocatedBlock nextBlockOutputStream() throws IOException { |
3.1.4 建立管道之Socket接收
Ctrl +n 全局查找DataXceiverServer.java,在该类中查找run方法
1 | public void run() { |
点击DataXceiver(线程),查找run方法
1 | public void run() { |
Ctrl +alt +b 查找writeBlock的实现类DataXceiver.java
1 | public void writeBlock(... ...) throws IOException { |
3.1.5 客户端接收DN写数据应答Response
Ctrl + n全局查找DataStreamer,搜索run方法
DataStreamer.java
1 | @Override |
点击response再点击ResponseProcessor,ctrl + f 查找run方法
1 | public void run() { |
第4章 Yarn源码解析
4.1 Yarn客户端向RM提交作业
1)在wordcount程序的驱动类中点击
Job.java
1 | boolean result = job.waitForCompletion(true); |
点击submitJobInternal()
JobSubmitter.java
1 | JobStatus submitJobInternal(Job job, Cluster cluster) |
2)创建提交环境
ctrl + alt +B 查找submitJob实现类,YARNRunner.java
1 | public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) |
3)向Yarn提交
点击submitJob方法中的submitApplication()
YARNRunner.java
1 | ApplicationId applicationId = |
ctrl + alt +B 查找submitApplication实现类,YarnClientImpl.java
1 | public ApplicationId |
ctrl + alt +B 查找submitApplication实现类,ClientRMService.java
1 | public SubmitApplicationResponse submitApplication( |
4.2 RM启动MRAppMaster
0)在pom.xml中增加如下依赖
1 | <dependency> |
ctrl +n 查找MRAppMaster,搜索main方法
1 | public static void main(String[] args) { |
ctrl + alt +B 查找serviceInit实现类,MRAppMaster.java
1 | protected void serviceInit(final Configuration conf) throws Exception { |
点击MRAppMaster.java 中的initAndStartAppMaster 方法中的appMaster.start();
1 | public void start() { |
ctrl + alt +B 查找handle实现类,GenericEventHandler.java
1 | class GenericEventHandler implements EventHandler<Event> { |
4.3 调度器任务执行(YarnChild)
1)启动MapTask
ctrl +n 查找YarnChild,搜索main方法
1 | public static void main(String[] args) throws Throwable { |
ctrl + alt +B 查找run实现类,maptask.java
1 | public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) |
Mapper.java(和Map联系在一起)
1 | public void run(Context context) throws IOException, InterruptedException { |
2)启动ReduceTask
在YarnChild.java类中的main方法中ctrl + alt +B 查找run实现类,reducetask.java
1 | public void run(JobConf job, final TaskUmbilicalProtocol umbilical) |
Reduce.java
1 | public void run(Context context) throws IOException, InterruptedException { |
第5章 MapReduce源码解析
说明:在讲MapReduce课程时,已经讲过源码,在这就不再赘述。
5.1 Job提交流程源码和切片源码详解
1)Job提交流程源码详解
1 | waitForCompletion() |
2)FileInputFormat切片源码解析(input.getSplits(job))
5.2 MapTask & ReduceTask源码解析
1)MapTask源码解析流程
2)ReduceTask源码解析流程
第6章 Hadoop源码编译
6.1 前期准备工作
1)官网下载源码
https://hadoop.apache.org/release/3.1.3.html
2)修改源码中的HDFS副本数的设置
3)CentOS虚拟机准备
(1)CentOS联网
配置CentOS能连接外网。Linux虚拟机ping www.baidu.com 是畅通的
注意:采用root角色编译,减少文件夹权限出现问题
(2)Jar包准备(Hadoop源码、JDK8、Maven、Ant 、Protobuf)
hadoop-3.1.3-src.tar.gz
jdk-8u212-linux-x64.tar.gz
apache-maven-3.6.3-bin.tar.gz
protobuf-2.5.0.tar.gz(序列化的框架)
cmake-3.17.0.tar.gz
6.2 工具包安装
注意:所有操作必须在root用户下完成
0)分别创建/opt/software/hadoop_source和/opt/module/hadoop_source路径
*1***)上传软件包到指定的目录,例如
1 | [root@hadoop101 hadoop_source]$ pwd |
*2***)解压软件包指定的目录,例如:
1 | [root@hadoop101 hadoop_source]$ tar -zxvf apache-maven-3.6.3-bin.tar.gz -C /opt/module/hadoop_source/ |
3)安装JDK
(1)解压JDK
1 | [root@hadoop101 hadoop_source]# tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/hadoop_source/ |
(2)配置环境变量
1 | [root@hadoop101 jdk1.8.0_212]# vim /etc/profile.d/my_env.sh |
输入如下内容:
1 | #JAVA_HOME |
(3)刷新JDK环境变量
1 | [root@hadoop101 jdk1.8.0_212]# source /etc/profile |
(4)验证JDK是否安装成功
1 | [root@hadoop101 hadoop_source]$ java -version |
4)配置maven环境变量,maven镜像,并验证
(1)配置maven的环境变量
1 | [root@hadoop101 hadoop_source]# vim /etc/profile.d/my_env.sh |
(2)修改maven的镜像
1 | [root@hadoop101 apache-maven-3.6.3]# vi conf/settings.xml |
(3)验证maven安装是否成功
1 | [root@hadoop101 hadoop_source]# mvn -version |
5)安装相关的依赖(注意安装顺序不可乱,可能会出现依赖找不到问题)
(1)安装gcc make
1 | [root@hadoop101 hadoop_source]# yum install -y gcc* make |
(2)安装压缩工具
1 | [root@hadoop101 hadoop_source]# yum -y install snappy* bzip2* lzo* zlib* lz4* gzip* |
(3)安装一些基本工具
1 | [root@hadoop101 hadoop_source]# yum -y install openssl* svn ncurses* autoconf automake libtool |
(4)安装扩展源,才可安装zstd
1 | [root@hadoop101 hadoop_source]# yum -y install epel-release |
(5)安装zstd
1 | [root@hadoop101 hadoop_source]# yum -y install *zstd* |
6)手动安装cmake
(1)在解压好的cmake目录下,执行./bootstrap进行编译,此过程需一小时请耐心等待
1 | [root@hadoop101 cmake-3.17.0]$ pwd |
(2)执行安装
1 | [root@hadoop101 cmake-3.17.0]$ make && make install |
(3)验证安装是否成功
1 | [root@hadoop101 cmake-3.17.0]$ cmake -version |
7)安装protobuf,进入到解压后的protobuf目录
1 | [root@hadoop101 protobuf-2.5.0]$ pwd |
(1)依次执行下列命令 –prefix 指定安装到当前目录
1 | [root@hadoop101 protobuf-2.5.0]$ ./configure --prefix=/opt/module/hadoop_source/protobuf-2.5.0 |
(2)配置环境变量
1 | [root@hadoop101 protobuf-2.5.0]$ vim /etc/profile.d/my_env.sh |
输入如下内容
1 | PROTOC_HOME=/opt/module/hadoop_source/protobuf-2.5.0 |
(3)验证
1 | [root@hadoop101 protobuf-2.5.0]$ source /etc/profile |
8)到此,软件包安装配置工作完成。
6.3 编译源码
1)进入解压后的Hadoop源码目录下
1 | [root@hadoop101 hadoop-3.1.3-src]$ pwd |
注意:第一次编译需要下载很多依赖jar包,编译时间会很久,预计1小时左右,最终成功是全部SUCCESS,爽!!!
2)成功的64位hadoop包在/opt/module/hadoop_source/hadoop-3.1.3-src/hadoop-dist/target下
1 | [root@hadoop101 target]# pwd |