Hadoop YARN MapReduce 作业生命周期核心源码行级解析
分层解耦:YARN调度与作业生命周期独立,易于弹性扩展与容错。数据本地性:任务调度优先本地,减少网络IO。内存与IO优化:Map/Reduce溢写与归并参数需与数据量动态匹配。监控与追踪:利用YARN UI、JobHistoryServer、AM/Task日志全链路排查。容错与资源平衡:合理设置重试和超时,防止异常任务拖垮集群。
Hadoop YARN MapReduce 作业生命周期核心源码行级解析
1. 作业提交(Job Submission)
1.1 关键源码与行级注释
入口类:org.apache.hadoop.mapreduce.Job
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE); // 1. 检查Job状态,必须为DEFINE,防止重复提交
setUseNewAPI(); // 2. 检查是否为新API模式,兼容老代码
connect(); // 3. 连接YARN集群,创建Cluster对象
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
// 4. 执行提交逻辑
return submitter.submitJobInternal(Job.this, cluster);
}
});
}
核心方法:org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal
public JobStatus submitJobInternal(Job job, Cluster cluster) throws ... {
checkSpecs(job); // 1. 校验输入输出路径、分片等配置
Path jobStagingArea = JobSubmissionFiles.getStagingDir(...); // 2. 获取staging目录
copyAndConfigureFiles(job, jobStagingArea); // 3. 上传作业jar、依赖、配置到HDFS
// 4. 构建ApplicationSubmissionContext
ApplicationSubmissionContext appContext = createApplicationSubmissionContext(...);
// 5. 通过YARNRunner提交作业
JobID jobId = submitClient.submitJob(appContext);
return getJobStatus(jobId);
}
1.2 设计思想
- 提交前本地校验,防止无效作业消耗集群资源
- staging目录隔离多用户,提升安全性
- 作业所有依赖统一上传,支持异构环境运行
1.3 参数调优
mapreduce.job.jar:明确指定作业jar,避免依赖不全mapreduce.job.ubertask.enable:小作业开启Uber,减少启动延迟mapreduce.job.split.metainfo.maxsize:防止分片元数据过大导致OOM
1.4 实战案例
报错:
Permission denied: user=xxx, access=WRITE, inode="/tmp/hadoop-yarn/staging"排查: 检查staging目录权限与属主
解决: 赋予提交用户写权限或调整目录属主
2. 作业初始化(Job Initialization)
2.1 关键源码与行级注释
入口:org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService#submitApplication
public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) {
ApplicationSubmissionContext appContext = request.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
// 1. 校验appContext有效性,包括资源、队列、权限等
// 2. 分配唯一ApplicationId
// 3. 持久化应用信息,用于恢复和HA
// 4. 通知Scheduler,调度AM Container
// 5. 返回提交响应
}
2.2 设计思想
- 作业与资源调度解耦,支持弹性伸缩和高可用
- 持久化用于容错和作业恢复
2.3 参数调优
yarn.app.mapreduce.am.resource.mb:AM内存,内存溢出时适当提升yarn.scheduler.capacity.maximum-am-resource-percent:AM总资源上限,防止AM挤占队列
2.4 实战案例
现象: 作业卡在
ACCEPTED,AM无资源分配排查: 检查队列资源和AM内存设置
解决: 降低AM内存或释放队列资源
3. ApplicationMaster 启动(AM Launch)
3.1 关键源码与行级注释
入口:org.apache.hadoop.mapreduce.v2.app.MRAppMaster#run
public void run() {
amRmClient.registerApplicationMaster(host, port, trackingUrl); // 1. 注册AM,向RM报告自身信息
job = createJob(...); // 2. 构建Job对象,加载配置和作业描述
job.init(conf, ...); // 3. 初始化Job,准备切分等
job.start(); // 4. 启动Job进入调度
// 5. 启动心跳/事件主循环,监听任务状态
}
3.2 设计思想
- AM作为作业生命周期管理者,负责所有任务调度、监控、容错
- Job对象采用状态机模式,易于扩展
3.3 参数调优
mapreduce.am.max-attempts:提升关键作业容错性mapreduce.jobhistory.address:单独部署JobHistoryServer,便于追踪
3.4 实战案例
现象: AM反复重启
排查: 检查AM日志、依赖、内存
解决: 修正依赖、提升AM内存、合理设置重试次数
4. 任务切分(Input Splits)
4.1 关键源码与行级注释
入口:org.apache.hadoop.mapreduce.lib.input.TextInputFormat#getSplits
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<>();
for (FileStatus file: listStatus(job)) { // 1. 遍历输入目录所有文件
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
long splitSize = computeSplitSize(...); // 2. 计算单个split大小
long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
// 3. 按splitSize切分,生成InputSplit
splits.add(makeSplit(...));
bytesRemaining -= splitSize;
}
// 4. 剩余部分作为最后一个split
}
return splits;
}
4.2 设计思想
- 按HDFS block本地性切分,减少跨节点IO
- 支持自定义InputFormat扩展小文件合并等场景
4.3 参数调优
mapreduce.input.fileinputformat.split.maxsize:调大减少Map数量mapreduce.input.fileinputformat.split.minsize:调小合并小文件
4.4 实战案例
现象: 1TB文件被切分成数千个Map任务
排查: 查看split参数与block size
解决: 调大maxsize,减少Map数量
5. 资源申请与任务调度
5.1 关键源码与行级注释
AM向RM申请资源:org.apache.hadoop.mapreduce.v2.app.MRAppMaster
amRmClient.addContainerRequest(containerRequest); // 1. 构造ContainerRequest,申请资源
// 回调onContainersAllocated
public void onContainersAllocated(List<Container> containers) {
for (Container c : containers) {
// 2. 为分配的Container分配Map/Reduce任务
// 3. 通过NMClient启动任务进程
nmClient.startContainer(taskLaunchContext, c);
}
}
5.2 设计思想
- 异步批量资源申请,提升调度效率
- 支持优先级和本地性调度
5.3 参数调优
mapreduce.map.memory.mb/reduce.memory.mb:与数据量匹配mapreduce.map.cpu.vcores:核数匹配CPU密集型任务yarn.scheduler.maximum-allocation-mb:提升单任务最大资源
5.4 实战案例
现象: 任务长时间排队
排查: 检查YARN资源与任务内存参数
解决: 降低单任务内存,分批提交
6. 任务执行(Task Execution)
6.1 Map阶段行级解析
入口:org.apache.hadoop.mapred.MapTask#runNewMapper
public void runNewMapper(...) {
RecordReader<K1, V1> input = ...; // 1. 创建RecordReader读取InputSplit
Mapper<K1, V1, K2, V2>.Context context = ...; // 2. 创建MapContext
mapper.run(context); // 3. 调用用户自定义map()方法
// 4. Map输出写入环形缓冲区
// 5. 缓冲区满时Spill到本地磁盘
}
溢写(Spill)逻辑:org.apache.hadoop.mapred.MapTask$MapOutputBuffer
if (buffer usage > spill threshold) {
// 1. 对缓冲区数据按分区排序
// 2. 写入本地磁盘临时文件
}
6.1.1 设计思想
- 内存缓冲+溢写,保障大数据量下的稳定性
- Map端预分区,降低Reduce端压力
6.1.2 参数调优
mapreduce.task.io.sort.mb:调大减少溢写次数mapreduce.map.output.compress:提升IO和网络传输效率
6.2 Shuffle阶段行级解析
入口:org.apache.hadoop.mapreduce.task.reduce.Shuffle#fetchOutputs
while (remainingOutputs) {
fetcher.fetch(); // 1. 多线程HTTP拉取各Map输出
// 2. 边拉边归并到内存或磁盘
mergeManager.closeInMemoryFile();
}
6.2.1 设计思想
- 多线程拉取,充分利用带宽
- 内存+磁盘归并,适应不同数据量
6.2.2 参数调优
mapreduce.reduce.shuffle.parallelcopies:调高提升拉取速度mapreduce.reduce.shuffle.input.buffer.percent:加大内存归并比例
6.3 Reduce阶段行级解析
入口:org.apache.hadoop.mapred.ReduceTask#runNewReducer
public void runNewReducer(...) {
RawKeyValueIterator rIter = ...; // 1. 归并排序所有拉取到的数据
Reducer<K2, V2, K3, V3>.Context context = ...;
reducer.run(context); // 2. 调用用户自定义reduce()方法
// 3. 输出写入HDFS
}
6.3.1 设计思想
- 归并分组,支持大规模分布式聚合
- 用户自定义逻辑与系统归并解耦
6.3.2 参数调优
mapreduce.reduce.memory.mb:归并数据量大时调高mapreduce.reduce.shuffle.merge.percent:合理归并阈值
7. 任务监控与容错
7.1 关键源码与行级注释
心跳机制:org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl
public void progressing(TaskAttemptID attemptID) {
// 1. Task定期向AM汇报进度
// 2. AM更新Task状态
// 3. 超时未上报则判定为失败,重新调度
}
7.2 设计思想
- 心跳+超时重试,保证作业高可用
- 最大重试次数防止死循环
7.3 参数调优
mapreduce.task.timeout:大作业适当调大mapreduce.map.maxattempts/reduce.maxattempts:容忍偶发异常
8. 作业完成与资源释放
8.1 关键源码与行级注释
AM作业完成:org.apache.hadoop.mapreduce.v2.app.MRAppMaster
amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, ...); // 1. 向RM汇报作业完成
// 2. 清理staging目录、释放Container等资源
8.2 设计思想
- 主动释放资源,提升集群可用性
- 延迟删除便于问题追踪
8.3 参数调优
yarn.nodemanager.delete.debug-delay-sec:生产环境调大便于排查
9. 结果输出与作业日志
9.1 关键源码与行级注释
写结果:org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter#write
public void write(K key, V value) throws IOException {
// 1. 格式化输出为文本行
// 2. 写入HDFS文件
}
日志收集:org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
- 负责汇总作业历史,提供Web UI查询
9.2 参数调优
mapreduce.output.fileoutputformat.compress:大结果集建议开启压缩mapreduce.jobhistory.intermediate-done-dir:定期清理日志目录
方法论总结与设计技巧
- 分层解耦:YARN调度与作业生命周期独立,易于弹性扩展与容错。
- 数据本地性:任务调度优先本地,减少网络IO。
- 内存与IO优化:Map/Reduce溢写与归并参数需与数据量动态匹配。
- 监控与追踪:利用YARN UI、JobHistoryServer、AM/Task日志全链路排查。
- 容错与资源平衡:合理设置重试和超时,防止异常任务拖垮集群。
参考资料
高阶扩展
- 自定义InputFormat/OutputFormat/Partitioner,应对复杂数据源和分区场景
- MapReduce on YARN与Spark on YARN调度对比
- 自动参数调优与运维脚本集成
- 与HDFS、Hive、HBase等生态协同优化
如需某个阶段的详细源码逐行注释或参数调优实战脚本,可随时留言提问!
更多推荐


所有评论(0)