前言:过了好久终于把spark学完啦(好吧其实早就学完了但是鸽了快4个月hh),这次笔记很长(我看了下word字数统计总共10w左右),所以分成4期上传,这是第一期,主要是基础的内容,包括Spark概念介绍 + Spark部署 + Spark运行流程介绍,接下来三期分别是:Spark Core、Spark SQL和SparkStreaming(虽然老了点,但是用来理解Spark流处理思路还是够用的)~

Spark3.x指北全系列目录:

SparkCore内容请看:Spark3.x指北——2:Spark Core

SparkSQL内容请看:Spark3.x指北——3:SparkSQL

SparkStreaming内容请看:Spark3.x指北——4:SparkStreaming

目录

1 Spark概述

1.1 什么是Spark

1.2 Spark and Hadoop

首先从时间节点上来看:

Hadoop

Spark

然后我们再从功能上来看:

Hadoop

Spark

1.3 Spark or Hadoop

1.4 Spark核心模块

Spark Core

Spark SQL

Spark Streaming

Spark MLlib

Spark GraphX

2 快速上手,创建基于Scala的Spark程序

2.1 创建Maven项目 & 添加Scala框架

2.1.1 增加 Scala 插件

2.1.2 添加Spark相关maven依赖

2.1.3 测试环境搭建

2.2 基于spark的wordcount

2.3 日志相关配置

3 spark运行环境

3 spark运行环境

3.1 本地运行环境

3.1.1 解压缩文件

3.1.2 启动 Local 环境

1) 进入解压缩后的路径,执行如下指令 bin/spark-shell

2) 启动成功后,可以输入网址进行Web UI监控页面访问 http://虚拟机地址:4040

3.1.3 命令行工具

3.1.4 退出本地模式

3.1.5 提交应用

3.2 standalone模式

3.2.1 解压缩文件

3.2.2 修改配置文件

1) 进入解压缩后路径的conf目录,修改slaves.template文件名为slaves

2) 修改slaves文件,添加work节点

3) 修改spark-env.sh.template 文件名为 spark-env.sh

4) 修改spark-env.sh 文件,添加JAVA_HOME环境变量和集群对应的master节点

5) 分发spark-standalone 目录

3.2.3 启动集群

1) 执行脚本命令

2) 查看Master资源监控Web UI界面: http://node1:8080

3.2.4 提交应用

3.2.5 提交参数说明

3.2.6 配置历史服务

1) 修改spark-defaults.conf.template 文件名为 spark-defaults.conf

2) 修改spark-default.conf 文件,配置日志存储路径

3) 修改spark-env.sh 文件, 添加日志配置

4) 重新分发spark-standalone/conf配置文件

5) 重新启动集群和历史服务

6) 重新执行任务

7) 查看历史服务:http://node1:18080

3.3 YARN模式

3.3.1 解压缩文件

1) 修改hadoop配置文件$HADOOP_HOME/etc/hadoop/yarn-site.xml, 并分发

2) 修改conf/spark-env.sh,添加 JAVA_HOME 和YARN_CONF_DIR配置,并且分发到node2和node3

3.3.3 启动HDFS 以及YARN集群

3.3.4 提交应用

提交应用的报错以及配置完善

1)创建FairScheduler配置文件

2)为yarn-site添加补充内容(基于3.3.2 -1)中完整的yarn-site.xml进行添加!)

3)将这些内容分发到node2和node3上

3.3.5 配置历史服务器

1) 修改spark-defaults.conf.template文件名为spark-defaults.conf

2) 修改spark-default.conf文件,配置日志存储路径

3) 修改spark-env.sh文件, 添加日志配置

4) 修改spark-defaults.conf

5) 启动历史服务

6) 重新提交应用

3.4 windows模式

3.4.1 解压缩文件 将文件spark-3.0.0-bin-hadoop3.2.tgz 解压缩到无中文无空格的路径中

3.4.2 启动本地环境

1) 执行解压缩文件路径下bin目录中的spark-shell.cmd文件,启动Spark本地环境

2) 在bin目录中创建input目录,并添加word.txt文件,并执行wordcount的代码

3.4.3 命令行提交应用

3.5 Spark部署模式汇总 & 对比

3.6 常见端口号

4 Spark运行框架

4.1 运行框架

4.2 核心组件

4.2.1 Driver(Spark计算组件)

4.2.2 Executor(Spark计算组件)

4.2.3 Master & Worker(Standalone下的资源调度组件)

4.2.4 ApplicationMaster(计算组件 与 资源调度组件 之间的中间件)

4.3 核心概念

4.3.1 Executor 与 Core

4.3.2 并行度(Parallelism)

4.3.3 有向无环图(DAG)

4.4 提交流程


1 Spark概述

1.1 什么是Spark

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

1.2 Spark and Hadoop

在之前的学习中,Hadoop的MapReduce是大家广为熟知的计算框架,那为什么咱们还 要学习新的计算框架Spark呢,这里就不得不提到Spark和Hadoop的关系。

首先从时间节点上来看:

Hadoop
  • 2006年1月,Doug Cutting加入Yahoo,领导Hadoop的开发
  • 2008年1月,Hadoop成为Apache顶级项目
  • 2011年1.0正式发布
  • 2012年3月稳定版发布
  • 2013年10月发布2.X (Yarn)版本
Spark
  • 2009年,Spark诞生于伯克利大学的AMPLab实验室
  • 2010年,伯克利大学正式开源了Spark项目
  • 2013年6月,Spark成为了Apache基金会下的项目
  • 2014年2月,Spark以飞快的速度成为了Apache的顶级项目
  • 2015年至今,Spark变得愈发火爆,大量的国内公司开始重点部署或者使用Spark

然后我们再从功能上来看:

Hadoop
  • Hadoop是由java语言编写的,在分布式服务器集群上存储海量数据并运行分布式 分析应用的开源框架
  • 作为Hadoop 分布式文件系统,HDFS处于Hadoop生态圈的最下层,存储着所有 的数据,支持着 Hadoop 的所有服务。它的理论基础源于 Google 的 TheGoogleFileSystem 这篇论文,它是GFS的开源实现。
  • MapReduce 是一种编程模型,Hadoop根据Google的MapReduce 论文将其实现, 作为Hadoop 的分布式计算模型,是Hadoop的核心。基于这个框架,分布式并行 程序的编写变得异常简单。综合了HDFS的分布式存储和MapReduce的分布式计 算,Hadoop在处理海量数据时,性能横向扩展变得非常容易。
  • HBase是对Google 的Bigtable 的开源实现,但又和Bigtable 存在许多不同之处。 HBase 是一个基于HDFS的分布式数据库,擅长实时地随机读/写超大规模数据集。 它也是Hadoop非常重要的组件。
Spark
  • Spark是一种由Scala语言开发的快速、通用、可扩展的大数据分析引擎
  • Spark Core 中提供了Spark最基础与最核心的功能
  • Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用 SQL 或者Apache Hive 版本的SQL方言(HQL)来查询数据。
  • Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的 处理数据流的API。 由上面的信息可以获知,Spark出现的时间相对较晚,并且主要功能主要是用于数据计算, 所以其实Spark一直被认为是Hadoop 框架的升级版。

1.3 Spark or Hadoop

Hadoop 的MR框架和Spark框架都是数据处理框架,那么我们在使用时如何选择呢?

  • Hadoop MapReduce 由于其设计初衷并不是为了满足循环迭代式数据流处理,因此在多 并行运行的数据可复用场景(如:机器学习、图挖掘算法、交互式数据挖掘算法)中存 在诸多计算效率等问题。所以Spark应运而生,Spark就是在传统的MapReduce 计算框 架的基础上,利用其计算过程的优化,从而大大加快了数据分析、挖掘的运行和读写速 度,并将计算单元缩小到更适合并行计算和重复使用的RDD计算模型。
  • 机器学习中 ALS、凸优化梯度下降等。这些都需要基于数据集或者数据集的衍生数据 反复查询反复操作。MR这种模式不太合适,即使多MR串行处理,性能和时间也是一 个问题。数据的共享依赖于磁盘。另外一种是交互式数据挖掘,MR 显然不擅长。而 Spark 所基于的scala语言恰恰擅长函数的处理。
  • Spark 是一个分布式数据快速分析项目。它的核心技术是弹性分布式数据集(Resilient Distributed Datasets),提供了比 MapReduce 丰富的模型,可以快速在内存中对数据集 进行多次迭代,来支持复杂的数据挖掘算法和图形计算算法。
  • Spark和Hadoop的根本差异是多个作业之间的数据通信问题 : Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘。
  • Spark Task 的启动时间快。Spark采用fork线程的方式,而Hadoop采用创建新的进程 的方式。
  • Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个MR作业之间的数据交 互都要依赖于磁盘交互
  • Spark的缓存机制比HDFS的缓存机制高效。

经过上面的比较,我们可以看出在绝大多数的数据计算场景中,Spark确实会比MapReduce 更有优势。但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会 由于内存资源不够导致Job执行失败,此时,MapReduce其实是一个更好的选择,所以Spark 并不能完全替代MR。

1.4 Spark核心模块

Spark Core

Spark Core 中提供了 Spark 最基础与最核心的功能,Spark其他的功能如:Spark SQL, Spark Streaming,GraphX, MLlib 都是在 Spark Core 的基础上进行扩展的

Spark SQL

Spark SQL 是 Spark 用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL 或者Apache Hive 版本的SQL方言(HQL)来查询数据。

Spark Streaming

Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理 数据流的API。

Spark MLlib

MLlib 是 Spark 提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等 额外的功能,还提供了一些更底层的机器学习原语。

Spark GraphX

GraphX 是Spark 面向图计算提供的框架与算法库。

2 快速上手,创建基于Scala的Spark程序

在大数据早期的课程中我们已经学习了MapReduce 框架的原理及基本使用,并了解了 其底层数据处理的实现方式。接下来,就让咱们走进Spark的世界,了解一下它是如何带领 我们完成数据处理的。

2.1 创建Maven项目 & 添加Scala框架

2.1.1 增加 Scala 插件

Spark 由 Scala 语言开发的,所以本课件接下来的开发所使用的语言也为Scala,咱们当 前使用的Spark版本为3.0.0,默认采用的Scala编译版本为2.12,所以后续开发时。我们依然采用这个版本。开发前请保证IDEA开发工具中含有Scala开发插件。

       同时,为这个新项目添加Scala框架:

选中最上层文件夹(项目文件夹)——双击shift——搜索add framework——添加Scala的环境。

2.1.2 添加Spark相关maven依赖

       由于Scala使用的是2.12的版本,所以我们使用3.0的spark。

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

2.1.3 测试环境搭建

package main.spark_core

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_WordCount {
  def main(args: Array[String]): Unit = {
    //创建spark框架和机器的连接
    //指定配置项
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    //使用上述配置项进行连接spark
    val sc = new SparkContext(sparkConf)



    //关闭连接
    sc.stop()
  }
}

运行代码,查看控制台运行日志输出:

       若出现maven导入成功但是找不到spark相关包,那么可以清理IDEA缓存并重启,或是clean maven & install。

2.2 基于spark的wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_WordCount {
  def main(args: Array[String]): Unit = {
    //TODO 创建spark框架和机器的连接
    //指定配置项
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    //使用上述配置项进行连接spark
    val sc = new SparkContext(sparkConf)


    //TODO WordCount业务流程
    //读取文件,获取一行一行的数据
    val lines: RDD[String] = sc.textFile("datas")

    //将一行的数据进行拆分,形成分词,再扁平化
    //"hello word" => Array("hello", "word") => List(("hello", 1), ("word", 1))
    //  进行 str -> 1 这一步目的在于,便于后续使用reduce进行value的聚合
    val words: RDD[(String, Int)] = lines.flatMap(strings => strings.split(" ").map(str => str -> 1))
    words.foreach(println)
    //(Hello,1)
    //(World,1)
    //(Hello,1)
    //(Spark,1)
    //(darren,1)
    //(is,1)
    //(learning,1)
    //(spark,1)

    //通过key将value进行分组 & 聚合
    val results = words.reduceByKey(_ + _)
    results.foreach(println)
    //(learning,2)
    //(is,2)
    //(Hello,4)
    //(World,2)
    //(darren,2)
    //(Spark,2)
    //(spark,2)
   
    //TODO 关闭与spark连接
    //关闭连接
    sc.stop()
  }
}

业务处理流程其实与scala version是大差不差的。在spark中,提供了根据key直接聚合value的API。过去在scala中,我们可能需要先进行groupBy分组,再通过reduce进行聚合;但使用reduceByKey的API,可以直接根据key分组,然后对value聚合。

2.3 日志相关配置

       执行过程中,会产生大量的执行日志,如果为了能够更好的查看程序的执行结果,可以在项 目的resources目录中创建log4j.properties文件,并添加日志配置信息:

log4j.rootCategory=ERROR, console

log4j.appender.console=org.apache.log4j.ConsoleAppender

log4j.appender.console.target=System.err

log4j.appender.console.layout=org.apache.log4j.PatternLayout

log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n



# Set the default spark-shell log level to ERROR. When running the spark-shell, the

# log level for this class is used to overwrite the root logger's log level, so that

# the user can have different defaults for the shell and regular Spark apps.

log4j.logger.org.apache.spark.repl.Main=ERROR



# Settings to quiet third party logs that are too verbose

log4j.logger.org.spark_project.jetty=ERROR

log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR

log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR

log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR

log4j.logger.org.apache.parquet=ERROR

log4j.logger.parquet=ERROR



# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support

log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL

log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

关于该配置,需要确保其放置位置在spark工程的main/resource目录下,同时确保resource是资源文件夹(有一些maven项目可能没有这个文件夹,需要手动添加)。需要先将resource标记为resources文件夹,再将src标记为可执行文件:

3 spark运行环境

Spark 作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行, 在国 内工作中主流的环境为Yarn,不过逐渐容器式环境也慢慢流行起来。

3 spark运行环境

Spark 作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行, 在国 内工作中主流的环境为Yarn,不过逐渐容器式环境也慢慢流行起来。

3.1 本地运行环境

3.1.1 解压缩文件

将spark-3.0.0-bin-hadoop3.2.tgz 文件上传到 Linux 并解压缩,放置在指定位置,路径中 不要包含中文或空格,课件后续如果涉及到解压缩操作,不再强调。

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module

cd /opt/module

mv spark-3.0.0-bin-hadoop3.2 spark-local

3.1.2 启动 Local 环境

1) 进入解压缩后的路径,执行如下指令 bin/spark-shell

       若启动成功,将可以直接在内部书写scala代码。可以看到,启动时,将spark context简称为sc,且配置信息与我们spark-wordcount入门案例差不多,同时将spark session简称为spark:

2) 启动成功后,可以输入网址进行Web UI监控页面访问 http://虚拟机地址:4040

       由于已经配置过主机名映射,所以主机名可以直接作为地址。

3.1.3 命令行工具

在解压缩文件夹下的data目录中,添加word.txt文件:

在命令行工具中执行如下代码指 令(和IDEA中代码简化版一致):

sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

3.1.4 退出本地模式

按键Ctrl+C或输入Scala指令 :quit

3.1.5 提交应用

       我们可以通过将我们自己的程序打包成jar包,上传至文件系统,然后通过以下命令提交我们自己的程序。

       此处我们演示通过该命令,提交spark自带的案例程序SparkPi.Scala(注意,每行命令后不要加空格,否则无法识别路径!):

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master local[2] \

./examples/jars/spark-examples_2.12-3.0.0.jar \

10
  • --class 表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
  • --master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
  • spark-examples_2.12-3.0.0.jar 运行的应用类所在的 jar 包,实际使用时,可以设定为咱 们自己打的jar包
  • 数字10表示程序的入口参数,用于设定当前应用的任务数量

3.2 standalone模式

local 本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的 集群中去执行,这里我们来看看只使用Spark自身节点运行的集群模式,也就是我们所谓的 独立部署(Standalone)模式。Spark的Standalone 模式体现了经典的master-slave模式。 集群规划:

Spark

node1

node2

node3

Master & Worker

Worker

Worker

3.2.1 解压缩文件

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module

cd /opt/module

mv spark-3.0.0-bin-hadoop3.2 spark-standalone

3.2.2 修改配置文件

1) 进入解压缩后路径的conf目录,修改slaves.template文件名为slaves
cd spark-standalone

mv conf/slaves.template slaves
2) 修改slaves文件,添加work节点
node1

node2

node3
3) 修改spark-env.sh.template 文件名为 spark-env.sh
mv spark-env.sh.template spark-env.sh 
4) 修改spark-env.sh 文件,添加JAVA_HOME环境变量和集群对应的master节点
export JAVA_HOME=/export/server/jdk

SPARK_MASTER_HOST=node1

SPARK_MASTER_PORT=7077

注意:

  • 若不确定JAVA_HOME,可以通过echo $JAVA_HOME来查看你的JAVA_HOME路径。
  • 7077端口,相当于hadoop3内部通信的8020端口,此处的端口需要确认自己的Hadoop 配置

5) 分发spark-standalone 目录
# 递归复制整个目录到 node2

scp -r /opt/module/spark-standalone node2:/opt/module/



# 递归复制整个目录到 node3

scp -r /opt/module/spark-standalone node3:/opt/module/

3.2.3 启动集群

1) 执行脚本命令
sbin/start-all.sh

注意:

       确保你当前是位于spark-standalone文件夹下。

2) 查看Master资源监控Web UI界面: http://node1:8080

3.2.4 提交应用

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master spark://node1:7077 \

./examples/jars/spark-examples_2.12-3.0.0.jar \

10
  • --class 表示要执行程序的主类
  • --master spark://linux1:7077 独立部署模式,连接到 Spark 集群
  • spark-examples_2.12-3.0.0.jar 运行类所在的 jar 包
  • 数字10表示程序的入口参数,用于设定当前应用的任务数量

我们可以在WEB UI处查看sparkPi任务运行情况:

       共计3个节点参与该任务,每个节点使用了1024MB内存,耗时31s,由root用户发起该任务。

3.2.5 提交参数说明

在提交应用中,一般会同时一些提交参数。以下是一个通用的spark mission submit command:

bin/spark-submit \

--class <main class>

--master <master url> \

... # other options \

<application jars>

[application-arguments]

       对应的参数解释如下:

参数

解释

举例

--class

Spark 程序中包含主函数的类

--master

Spark 程序运行的模式(环境)

模式:

local[*]

spark://linux1:7077

Yarn

--executor-memory 1G

指定每个executor可用内存为1G

符合集群内存配置即可,具体情况具体分 析。

--total-executor-cores 2

指定所有executor使用的cpu核数 为2个

--executor-cores

指定每个executor使用的cpu核数

application-jar

打包好的应用jar,包含依赖。这 个 URL 在集群中全局可见。 比 如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的 path 都包含同样的jar

application-arguments

传给main()方法的参数

3.2.6 配置历史服务

由于spark-shell 停止掉后,集群监控linux1:4040页面就看不到历史任务的运行情况,所以 开发时都配置历史服务器记录任务运行情况。

1) 修改spark-defaults.conf.template 文件名为 spark-defaults.conf
#cd spark-standalone/conf

mv spark-defaults.conf.template spark-defaults.conf
2) 修改spark-default.conf 文件,配置日志存储路径
#vim spark-default.conf

spark.eventLog.enabled true

spark.eventLog.dir hdfs://node1:8020/directory

注意,需要启动hadoop集群,HDFS上的directory目录需要提前存在,若不存在,可通过以下命令添加:

sbin/start-dfs.sh

hadoop fs -mkdir /directory
3) 修改spark-env.sh 文件, 添加日志配置
#vim spark-env.sh

export SPARK_HISTORY_OPTS="

-Dspark.history.ui.port=18080

-Dspark.history.fs.logDirectory=hdfs://node1:8020/directory

-Dspark.history.retainedApplications=30"
  • 参数1含义:WEB UI访问的端口号为18080
  • 参数2含义:指定历史服务器日志存储路径
  • 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序 信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
4) 重新分发spark-standalone/conf配置文件
# 递归复制整个目录到 node2

scp -r /opt/module/spark-standalone/conf node2:/opt/module/spark-standalone



# 递归复制整个目录到 node3

scp -r /opt/module/spark-standalone/conf node3:/opt/module/spark-standalone
5) 重新启动集群和历史服务

       启动集群:

#启动 & 停止集群

sbin/stop-all.sh

sbin/start-all.sh

       启动历史服务:

#启动历史服务

sbin/start-history-server.sh

6) 重新执行任务
bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master spark://node1:7077 \

./examples/jars/spark-examples_2.12-3.0.0.jar \

10
7) 查看历史服务:http://node1:18080

       里面是所有spark-standalone执行过的程序。

3.3 YARN模式

独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这 种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark主 要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是 和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的Yarn环境 下Spark 是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。

在这种情况下,spark on yarn的spark文件夹只需要在一台机器上有即可,由yarn根据spark框架分配任务给其余机器,其余机器不需要有spark文件夹。

3.3.1 解压缩文件

将spark-3.0.0-bin-hadoop3.2.tgz 文件上传到 linux 并解压缩,放置在指定位置。

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/

cd /opt/module/

mv spark-3.0.0-bin-hadoop3.2 spark-yarn

 3.3.2 修改配置文件

1) 修改hadoop配置文件$HADOOP_HOME/etc/hadoop/yarn-site.xml, 并分发

注意:该配置是配置HADOOP的YARN相关配置项,不是在spark-yarn中操作!可使用echo $HADOOP_HOME查看Hadoop的路径。

<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀>掉,默认是true -->

<property>

     <name>yarn.nodemanager.pmem-check-enabled</name>

     <value>false</value>

</property>



<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀>掉,默认是true -->

<property>

     <name>yarn.nodemanager.vmem-check-enabled</name>

     <value>false</value>

</property>



#分发到node2 & node3

scp /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml



scp /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml

补充:之前的hadoop-yarn配置文件可能存在问题,会导致yarn-node list无法被连接,因此无法进行计算任务。具体yarn-site.xml请参考以下完整配置:

<?xml version="1.0"?>

<!--

  Licensed under the Apache License, Version 2.0 (the "License");

  you may not use this file except in compliance with the License.

  You may obtain a copy of the License at



    http://www.apache.org/licenses/LICENSE-2.0



  Unless required by applicable law or agreed to in writing, software

  distributed under the License is distributed on an "AS IS" BASIS,

  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

  See the License for the specific language governing permissions and

  limitations under the License. See accompanying LICENSE file.

-->

<configuration>



<!-- Site specific YARN configuration properties -->



<property>

    <name>yarn.log.server.url</name>

    <value>http://node1:19888/jobhistory/logs</value>

    <description></description>

</property>



  <property>

    <name>yarn.web-proxy.address</name>

    <value>node1:8089</value>

    <description>proxy server hostname and port</description>

  </property>





  <property>

    <name>yarn.log-aggregation-enable</name>

    <value>true</value>

    <description>Configuration to enable or disable log aggregation</description>

  </property>



  <property>

    <name>yarn.nodemanager.remote-app-log-dir</name>

    <value>/tmp/logs</value>

    <description>Configuration to enable or disable log aggregation</description>

  </property>





<!-- Site specific YARN configuration properties -->

  <property>

    <name>yarn.resourcemanager.hostname</name>

    <value>node1</value>

    <description></description>

  </property>



  <property>

    <name>yarn.resourcemanager.scheduler.class</name>

    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>

    <description></description>

  </property>



  <property>

    <name>yarn.nodemanager.local-dirs</name>

    <value>/data/nm-local</value>

    <description>Comma-separated list of paths on the local filesystem where intermediate data is written.</description>

  </property>





  <property>

    <name>yarn.nodemanager.log-dirs</name>

    <value>/data/nm-log</value>

    <description>Comma-separated list of paths on the local filesystem where logs are written.</description>

  </property>





  <property>

    <name>yarn.nodemanager.log.retain-seconds</name>

    <value>10800</value>

    <description>Default time (in seconds) to retain log files on the NodeManager Only applicable if log-aggregation is disabled.</description>

  </property>







  <property>

    <name>yarn.nodemanager.aux-services</name>

    <value>mapreduce_shuffle</value>

    <description>Shuffle service that needs to be set for Map Reduce applications.</description>

  </property>



<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->

<property>

     <name>yarn.nodemanager.pmem-check-enabled</name>

     <value>false</value>

</property>



<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->

<property>

     <name>yarn.nodemanager.vmem-check-enabled</name>

     <value>false</value>

</property>





<!-- 添加以下配置 -->

<property>

    <name>yarn.resourcemanager.address</name>

    <value>node1:8032</value>

    <description>The address of the applications manager interface in the RM.</description>

</property>



<property>

    <name>yarn.resourcemanager.scheduler.address</name>

    <value>node1:8030</value>

    <description>The address of the scheduler interface.</description>

</property>



<property>

    <name>yarn.resourcemanager.resource-tracker.address</name>

    <value>node1:8031</value>

    <description>The address of the resource tracker interface.</description>

</property>



<property>

    <name>yarn.resourcemanager.webapp.address</name>

    <value>node1:8088</value>

    <description>The http address of the RM web application.</description>

</property>



<property>

    <name>yarn.resourcemanager.webapp.https.address</name>

    <value>node1:8090</value>

    <description>The https address of the RM web application.</description>

</property>



</configuration>
2) 修改conf/spark-env.sh,添加 JAVA_HOME 和YARN_CONF_DIR配置,并且分发到node2和node3

注意:请使用自己的JAVA_HOME以及$HADOOP_HOME/etc/hadoop地址!YARN_CONF_DIR关联的配置文件就是我们在1)中配置的文件!

export JAVA_HOME=/export/server/jdk

YARN_CONF_DIR=/export/server/hadoop/etc/hadoop

3.3.3 启动HDFS 以及YARN集群

注意:若之前在hadoop中配置了使用权限,使得hadoop只能被hadoop用户使用,则需要通过hadoop用户进行这些集群的启动。

#一键启动hdfs

$HADOOP_HOME/sbin/start-dfs.sh

#一键停止hdfs

$HADOOP_HOME/sbin/stop-dfs.sh



#一键启动yarn

$HADOOP_HOME/sbin/start-yarn.sh

#一键停止yarn

$HADOOP_HOME/sbin/stop-yarn.sh

3.3.4 提交应用

#cd /opt/module/spark-yarn

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode cluster \

./examples/jars/spark-examples_2.12-3.0.0.jar \

10

查看http://node1:8088 页面,点击 History,查看历史页面

提交应用的报错以及配置完善

       在提交上述任务时出现的报错如下:

问题在于配置了使用 FairScheduler,但是没有配置相应的 fair-scheduler.xml 文件。

1)创建FairScheduler配置文件
# 创建或编辑fair-scheduler.xml

vi /export/server/hadoop/etc/hadoop/fair-scheduler.xml



<?xml version="1.0"?>

<allocations>

  <defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>



  <queue name="default">

    <minResources>1024 mb,1vcores</minResources>

    <maxResources>10240 mb,10vcores</maxResources>

    <maxRunningApps>50</maxRunningApps>

    <weight>1.0</weight>

    <schedulingMode>fair</schedulingMode>

    <aclSubmitApps>*</aclSubmitApps>

    <aclAdministerApps>*</aclAdministerApps>

  </queue>



  <queuePlacementPolicy>

    <rule name="specified" create="false"/>

    <rule name="default" queue="default"/>

  </queuePlacementPolicy>

</allocations>
2)为yarn-site添加补充内容(基于3.3.2 -1)中完整的yarn-site.xml进行添加!)
<property>

    <name>yarn.scheduler.fair.allocation.file</name>

    <value>/export/server/hadoop/etc/hadoop/fair-scheduler.xml</value>

    <description>Path to fair scheduler allocation file</description>

</property>
3)将这些内容分发到node2和node3上
#同步yarn-site.xml

# 同步到node2

scp /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/



# 同步到node3 

scp /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/





#同步fair.schuduler

scp /export/server/hadoop/etc/hadoop/fair-scheduler.xml node2:/export/server/hadoop/etc/hadoop/

scp /export/server/hadoop/etc/hadoop/fair-scheduler.xml node3:/export/server/hadoop/etc/hadoop/

3.3.5 配置历史服务器

       在刚刚的提交中,我们是看不到程序结果的,只能看到是否运行成功。因此需要配置历史服务器,来使我们能够查看程序运行结果。我们需要在node1的/opt/module/spark-yarn/conf中对下列配置文件进行修改(主机名都填自己的主机名!):

1) 修改spark-defaults.conf.template文件名为spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
2) 修改spark-default.conf文件,配置日志存储路径
spark.eventLog.enabled true

spark.eventLog.dir hdfs://node1:8020/directory

注意:需要启动hadoop集群,HDFS上的目录需要提前存在:

[hadoop@node1 hadoop]# sbin/start-dfs.sh

[hadoop@node1 hadoop]# hadoop fs -mkdir /directory
3) 修改spark-env.sh文件, 添加日志配置
export SPARK_HISTORY_OPTS="

-Dspark.history.ui.port=18080

-Dspark.history.fs.logDirectory=hdfs://node1:8020/directory

-Dspark.history.retainedApplications=30"
  • 参数1含义:WEB UI访问的端口号为18080
  • 参数2含义:指定历史服务器日志存储路径
  • 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序 信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
4) 修改spark-defaults.conf
spark.yarn.historyServer.address=node1:18080

spark.history.ui.port=18080
5) 启动历史服务
sbin/start-history-server.sh 
6) 重新提交应用
bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode client \

./examples/jars/spark-examples_2.12-3.0.0.jar \

10

3.4 windows模式

3.4.1 解压缩文件 将文件spark-3.0.0-bin-hadoop3.2.tgz 解压缩到无中文无空格的路径中

3.4.2 启动本地环境

1) 执行解压缩文件路径下bin目录中的spark-shell.cmd文件,启动Spark本地环境

       同样的,spark context被叫做了sc,我们可以通过sc去执行spark相关操作。

2) 在bin目录中创建input目录,并添加word.txt文件,并执行wordcount的代码
sc.textFile("input/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

3.4.3 命令行提交应用

在DOS命令行窗口中执行提交指令

spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10

       任务仅仅花费了1s,明显比在虚拟机中快得多:

3.5 Spark部署模式汇总 & 对比

模式

Spark安装机器数

需启动的进程

所属者

应用场景

Local

1

Spark

测试

Standalone

3

Master

Worker

Spark

单独部署

Yarn

1

Yarn

HDFS

Yarn

混合部署

  • 由于spark-standalone是由spark自己进行资源管理,所以spark框架的每个节点都需要安装spark。
  • 但是Yarn是由yarn去分发计算任务,因此只需要master确认spark计算任务,然后由yarn进行分配即可,只需要在master上有spark,其余节点无需安装spark。

3.6 常见端口号

  • Spark查看当前Spark-shell运行任务情况端口号:4040(计算)
  • Spark Master 内部通信服务端口号:7077
  • Standalone 模式下,Spark Master Web 端口号:8080(资源)
  • Spark历史服务器端口号:18080
  • Hadoop YARN任务运行情况查看端口号:8088

4 Spark运行框架

4.1 运行框架

Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。 如下图所示,它展示了一个 Spark执行时的基本结构。图形中的Driver表示master, 负责管理整个集群中的作业任务调度。图形中的Executor 则是 slave,负责实际执行任务。

4.2 核心组件

4.2.1 Driver(Spark计算组件)

Spark 驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。 Driver 在 Spark 作业执行时主要负责:

  • 将用户程序转化为作业(job)
  • 在Executor之间调度任务(task)
  • 跟踪Executor的执行情况
  • 通过UI展示查询运行情况

实际上,我们无法准确地描述Driver的定义,因为在整个的编程过程中没有看到任何有关 Driver 的字眼。所以简单理解,所谓的Driver就是驱使整个应用运行起来的程序,也称之为 Driver 类。

4.2.2 Executor(Spark计算组件)

Spark Executor 是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业 中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点 上继续运行。 Executor 有两个核心功能:

  • 负责运行组成Spark应用的任务,并将结果返回给驱动器进程
  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存 式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存 数据加速运算。

4.2.3 Master & Worker(Standalone下的资源调度组件)

       与上面的 Driver 和 Excutor 不同,Master & Worker并不是Spark计算框架,而是Spark独立部署下的资源调度框架。

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调 度的功能,所以环境中还有其他两个核心组件:Master和Worker,这里的Master是一个进 程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于Yarn环境中的RM, 而 Worker 呢,也是进程,一个Worker运行在集群中的一台服务器上,由Master分配资源对 数据进行并行的处理和计算,类似于Yarn环境中NM。

4.2.4 ApplicationMaster(计算组件 与 资源调度组件 之间的中间件)

Hadoop 用户向YARN集群提交应用程序时,提交程序中应该包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务job,监控整 个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。

说的简单点就是,ResourceManager(资源)和Driver(计算)之间的解耦合靠的就是 ApplicationMaster。通过ApplicationMaster,将资源的申请与最终的计算分离,防止二者耦合度过高,导致框架灵活性下降。

4.3 核心概念

4.3.1 Executor 与 Core

Spark Executor 是集群中运行在工作节点(Worker)中的一个JVM进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点Executor的内存大小和使用的虚拟CPU核(Core)数量。

应用程序相关启动参数如下:

名称

说明

--num-executors

配置Executor 的数量

--executor-memory

为每个Excutor分配的内存大小

--executor-cores

为每个Excutor分配的CPU核数

4.3.2 并行度(Parallelism)

在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行 计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将 整个集群并行执行任务的数量称之为并行度。那么一个作业到底并行度是多少呢?这个取决 于框架的默认配置。应用程序也可以在运行过程中动态修改。

并行与并发的区分如下:

  • 并行是同时执行任务。假设CPU有8个core,那么8个Excutor都能够使用1个core,就称作并行。
  • 并发是想要同时执行任务,但由于资源限制没法同时执行。假设CPU有4个core,8个Excutor都想要执行任务,就会由于core不够陷入并发。

4.3.3 有向无环图(DAG)

       在Hadoop中,每一个任务都将被强制分配为 Map & Reduce 两部分执行,所以在多个复杂任务的情况下,将会出现串行化计算,即job1进行MR后的结果作为job2的MR输入…这毫无疑问会导致复杂任务运行效率降低。

       而在spark中,其将复杂任务翻译为了有向无环图,可以有效避免复杂任务被强制串行化为多个MR。这个复杂任务可以被转换为多个MR计算的 “并行” ,不用像hadoop一样被转换为串行执行。通过有向图,同时确保了多个MR计算之间的先后顺序,确保计算逻辑的正确性。

       此处为spark的“并行”加上了引号,因为在有依赖的复杂任务执行场景下,实际上在数据流的处理上仍然需要串行来保证逻辑正确,但是基于DAG的优化,可以使其在计算上能够实现类似并行执行的效果,大大优化了执行效率。

       但需要注意,DAG一定确保无环,否则将会出现循环依赖的情况导致任务执行出现异常!

4.4 提交流程

所谓的提交流程,其实就是我们开发人员根据需求写的应用程序通过Spark客户端提交 给Spark 运行环境执行计算的流程。在不同的部署环境中,这个提交过程基本相同,但是又 有细微的区别,我们这里不进行详细的比较,但是因为国内工作中,将Spark引用部署到 Yarn 环境中会更多一些,所以本课程中的提交流程是基于Yarn环境的。

Spark 应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client 和Cluster。

两种模式主要区别在于Driver程序的运行节点位置:

  • Yarn Client模式Driver位于Client客户端,在本地机器的Client上运行,而不在Yarn内部运行,此时由Driver去唤醒ApplicationMaster。客户端模式下,由于Driver位于客户端,所以客户端在执行任务时必须一直链接不能断开,否则将出现错误。
  • Yarn Cluster模式Driver位于Yarn内部的节点上,在集群模式中运行,此时Driver与ApplicationMaster位于同一个进程上。由于Driver位于Yarn内部,与客户端并无关联,因此当Cluster模式客户端提交任务后,是否断开都不会影响任务执行。 fle参数 = true:

     

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐