Hadoop YARN MapReduce 作业生命周期核心源码行级解析

1. 作业提交(Job Submission)

1.1 关键源码与行级注释

入口类:org.apache.hadoop.mapreduce.Job

public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE); // 1. 检查Job状态,必须为DEFINE,防止重复提交
    setUseNewAPI();               // 2. 检查是否为新API模式,兼容老代码
    connect();                    // 3. 连接YARN集群,创建Cluster对象
    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
            // 4. 执行提交逻辑
            return submitter.submitJobInternal(Job.this, cluster);
        }
    });
}

核心方法:org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

public JobStatus submitJobInternal(Job job, Cluster cluster) throws ... {
    checkSpecs(job); // 1. 校验输入输出路径、分片等配置
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(...); // 2. 获取staging目录
    copyAndConfigureFiles(job, jobStagingArea); // 3. 上传作业jar、依赖、配置到HDFS
    // 4. 构建ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = createApplicationSubmissionContext(...);
    // 5. 通过YARNRunner提交作业
    JobID jobId = submitClient.submitJob(appContext);
    return getJobStatus(jobId);
}

1.2 设计思想

  • 提交前本地校验,防止无效作业消耗集群资源
  • staging目录隔离多用户,提升安全性
  • 作业所有依赖统一上传,支持异构环境运行

1.3 参数调优

  • mapreduce.job.jar:明确指定作业jar,避免依赖不全
  • mapreduce.job.ubertask.enable:小作业开启Uber,减少启动延迟
  • mapreduce.job.split.metainfo.maxsize:防止分片元数据过大导致OOM

1.4 实战案例

报错: Permission denied: user=xxx, access=WRITE, inode="/tmp/hadoop-yarn/staging"

排查: 检查staging目录权限与属主

解决: 赋予提交用户写权限或调整目录属主


2. 作业初始化(Job Initialization)

2.1 关键源码与行级注释

入口:org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService#submitApplication

public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) {
    ApplicationSubmissionContext appContext = request.getApplicationSubmissionContext();
    ApplicationId appId = appContext.getApplicationId();
    // 1. 校验appContext有效性,包括资源、队列、权限等
    // 2. 分配唯一ApplicationId
    // 3. 持久化应用信息,用于恢复和HA
    // 4. 通知Scheduler,调度AM Container
    // 5. 返回提交响应
}

2.2 设计思想

  • 作业与资源调度解耦,支持弹性伸缩和高可用
  • 持久化用于容错和作业恢复

2.3 参数调优

  • yarn.app.mapreduce.am.resource.mb:AM内存,内存溢出时适当提升
  • yarn.scheduler.capacity.maximum-am-resource-percent:AM总资源上限,防止AM挤占队列

2.4 实战案例

现象: 作业卡在ACCEPTED,AM无资源分配

排查: 检查队列资源和AM内存设置

解决: 降低AM内存或释放队列资源


3. ApplicationMaster 启动(AM Launch)

3.1 关键源码与行级注释

入口:org.apache.hadoop.mapreduce.v2.app.MRAppMaster#run

public void run() {
    amRmClient.registerApplicationMaster(host, port, trackingUrl); // 1. 注册AM,向RM报告自身信息
    job = createJob(...); // 2. 构建Job对象,加载配置和作业描述
    job.init(conf, ...);  // 3. 初始化Job,准备切分等
    job.start();          // 4. 启动Job进入调度
    // 5. 启动心跳/事件主循环,监听任务状态
}

3.2 设计思想

  • AM作为作业生命周期管理者,负责所有任务调度、监控、容错
  • Job对象采用状态机模式,易于扩展

3.3 参数调优

  • mapreduce.am.max-attempts:提升关键作业容错性
  • mapreduce.jobhistory.address:单独部署JobHistoryServer,便于追踪

3.4 实战案例

现象: AM反复重启

排查: 检查AM日志、依赖、内存

解决: 修正依赖、提升AM内存、合理设置重试次数


4. 任务切分(Input Splits)

4.1 关键源码与行级注释

入口:org.apache.hadoop.mapreduce.lib.input.TextInputFormat#getSplits

public List<InputSplit> getSplits(JobContext job) throws IOException {
    List<InputSplit> splits = new ArrayList<>();
    for (FileStatus file: listStatus(job)) { // 1. 遍历输入目录所有文件
        long length = file.getLen();
        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
        long splitSize = computeSplitSize(...); // 2. 计算单个split大小
        long bytesRemaining = length;
        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
            // 3. 按splitSize切分,生成InputSplit
            splits.add(makeSplit(...));
            bytesRemaining -= splitSize;
        }
        // 4. 剩余部分作为最后一个split
    }
    return splits;
}

4.2 设计思想

  • 按HDFS block本地性切分,减少跨节点IO
  • 支持自定义InputFormat扩展小文件合并等场景

4.3 参数调优

  • mapreduce.input.fileinputformat.split.maxsize:调大减少Map数量
  • mapreduce.input.fileinputformat.split.minsize:调小合并小文件

4.4 实战案例

现象: 1TB文件被切分成数千个Map任务

排查: 查看split参数与block size

解决: 调大maxsize,减少Map数量


5. 资源申请与任务调度

5.1 关键源码与行级注释

AM向RM申请资源:org.apache.hadoop.mapreduce.v2.app.MRAppMaster

amRmClient.addContainerRequest(containerRequest); // 1. 构造ContainerRequest,申请资源
// 回调onContainersAllocated
public void onContainersAllocated(List<Container> containers) {
    for (Container c : containers) {
        // 2. 为分配的Container分配Map/Reduce任务
        // 3. 通过NMClient启动任务进程
        nmClient.startContainer(taskLaunchContext, c);
    }
}

5.2 设计思想

  • 异步批量资源申请,提升调度效率
  • 支持优先级和本地性调度

5.3 参数调优

  • mapreduce.map.memory.mb/reduce.memory.mb:与数据量匹配
  • mapreduce.map.cpu.vcores:核数匹配CPU密集型任务
  • yarn.scheduler.maximum-allocation-mb:提升单任务最大资源

5.4 实战案例

现象: 任务长时间排队

排查: 检查YARN资源与任务内存参数

解决: 降低单任务内存,分批提交


6. 任务执行(Task Execution)

6.1 Map阶段行级解析

入口:org.apache.hadoop.mapred.MapTask#runNewMapper

public void runNewMapper(...) {
    RecordReader<K1, V1> input = ...; // 1. 创建RecordReader读取InputSplit
    Mapper<K1, V1, K2, V2>.Context context = ...; // 2. 创建MapContext
    mapper.run(context); // 3. 调用用户自定义map()方法
    // 4. Map输出写入环形缓冲区
    // 5. 缓冲区满时Spill到本地磁盘
}

溢写(Spill)逻辑:org.apache.hadoop.mapred.MapTask$MapOutputBuffer

if (buffer usage > spill threshold) {
    // 1. 对缓冲区数据按分区排序
    // 2. 写入本地磁盘临时文件
}
6.1.1 设计思想
  • 内存缓冲+溢写,保障大数据量下的稳定性
  • Map端预分区,降低Reduce端压力
6.1.2 参数调优
  • mapreduce.task.io.sort.mb:调大减少溢写次数
  • mapreduce.map.output.compress:提升IO和网络传输效率

6.2 Shuffle阶段行级解析

入口:org.apache.hadoop.mapreduce.task.reduce.Shuffle#fetchOutputs

while (remainingOutputs) {
    fetcher.fetch(); // 1. 多线程HTTP拉取各Map输出
    // 2. 边拉边归并到内存或磁盘
    mergeManager.closeInMemoryFile();
}
6.2.1 设计思想
  • 多线程拉取,充分利用带宽
  • 内存+磁盘归并,适应不同数据量
6.2.2 参数调优
  • mapreduce.reduce.shuffle.parallelcopies:调高提升拉取速度
  • mapreduce.reduce.shuffle.input.buffer.percent:加大内存归并比例

6.3 Reduce阶段行级解析

入口:org.apache.hadoop.mapred.ReduceTask#runNewReducer

public void runNewReducer(...) {
    RawKeyValueIterator rIter = ...; // 1. 归并排序所有拉取到的数据
    Reducer<K2, V2, K3, V3>.Context context = ...;
    reducer.run(context);  // 2. 调用用户自定义reduce()方法
    // 3. 输出写入HDFS
}
6.3.1 设计思想
  • 归并分组,支持大规模分布式聚合
  • 用户自定义逻辑与系统归并解耦
6.3.2 参数调优
  • mapreduce.reduce.memory.mb:归并数据量大时调高
  • mapreduce.reduce.shuffle.merge.percent:合理归并阈值

7. 任务监控与容错

7.1 关键源码与行级注释

心跳机制:org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl

public void progressing(TaskAttemptID attemptID) {
    // 1. Task定期向AM汇报进度
    // 2. AM更新Task状态
    // 3. 超时未上报则判定为失败,重新调度
}

7.2 设计思想

  • 心跳+超时重试,保证作业高可用
  • 最大重试次数防止死循环

7.3 参数调优

  • mapreduce.task.timeout:大作业适当调大
  • mapreduce.map.maxattempts/reduce.maxattempts:容忍偶发异常

8. 作业完成与资源释放

8.1 关键源码与行级注释

AM作业完成:org.apache.hadoop.mapreduce.v2.app.MRAppMaster

amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, ...); // 1. 向RM汇报作业完成
// 2. 清理staging目录、释放Container等资源

8.2 设计思想

  • 主动释放资源,提升集群可用性
  • 延迟删除便于问题追踪

8.3 参数调优

  • yarn.nodemanager.delete.debug-delay-sec:生产环境调大便于排查

9. 结果输出与作业日志

9.1 关键源码与行级注释

写结果:org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter#write

public void write(K key, V value) throws IOException {
    // 1. 格式化输出为文本行
    // 2. 写入HDFS文件
}

日志收集:org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer

  • 负责汇总作业历史,提供Web UI查询

9.2 参数调优

  • mapreduce.output.fileoutputformat.compress:大结果集建议开启压缩
  • mapreduce.jobhistory.intermediate-done-dir:定期清理日志目录

方法论总结与设计技巧

  1. 分层解耦:YARN调度与作业生命周期独立,易于弹性扩展与容错。
  2. 数据本地性:任务调度优先本地,减少网络IO。
  3. 内存与IO优化:Map/Reduce溢写与归并参数需与数据量动态匹配。
  4. 监控与追踪:利用YARN UI、JobHistoryServer、AM/Task日志全链路排查。
  5. 容错与资源平衡:合理设置重试和超时,防止异常任务拖垮集群。

参考资料


高阶扩展

  • 自定义InputFormat/OutputFormat/Partitioner,应对复杂数据源和分区场景
  • MapReduce on YARN与Spark on YARN调度对比
  • 自动参数调优与运维脚本集成
  • 与HDFS、Hive、HBase等生态协同优化

如需某个阶段的详细源码逐行注释参数调优实战脚本,可随时留言提问!

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐