Spark 基础

Spark 介绍

Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模的数据进行快速分析查询。

Spark 框架包括:

  • Spark Core 是该平台的基础
  • 用于交互式查询的 Spark SQL
  • 用于实时分析的 Spark Streaming
  • 用于机器学习的 Spark MLlib
  • 用于图形处理的 Spark GraphX

Image

Spark core

Spark Core 是该平台的基础。它要负责内存管理、故障恢复、计划安排、分配与监控作业,以及和存储系统进行交互。您可以通过为 Java、Scala、Python 和 R 而构建的应用程序编程接口 (API) 使用 Spark Core。这些 API 会将复杂的分布式处理隐藏在简单的高级操作符的背后。

Spark core

Spark 计算

暂时无法在飞书文档外展示此内容

RDD(Resilient Distributed Datasets)

弹性分布式数据集

弹性(Resilient)

Spark 的核心数据结构有弹性,能复原,说明spark在设计之初就考虑把spark应用在大规模的分布式集群中,因为这种大规模集群,任何一台服务器是随时都可能出故障的,如果正在进行计算的子任务(Task)所在的服务器出故障,那么这个子任务自然在这台服务器无法继续执行,这时RDD所具有的”弹性”就派上了用场,它可以使这个失败的子任务在集群内进行迁移,从而保证整体任务(Job)对故障机器的平滑过渡。

分布式(Distributed)

也就是数据的切分规则,根据一些特定的规则切分后的数据子集,就可以在独立的task中进行处理,而这些task又是分散在集群多个服务器上并行的同时的执行,这就是 spark 中 Distributed 的含义。spark源码中RDD是个表示数据的基类,在这个基类之上衍生了很多的子RDD,不同的子RDD具有不同的功能,但是他们都要具备的能力就是能够被切分(partition)

数据集(Datasets)

RDD 并非是 Spark 的数据存储结构,RDD 表示的是 Spark 中数据处理的逻辑。

算子

对数据的某种操作,分为两大类:Transformation 和 Action 算子

Transformation 算子

主要做的是就是将一个已有的 RDD 生成另外一个 RDD。Transformation 具有 lazy 特性(延迟加载)。Transformation 算子的代码不会真正被执行。只有当我们的程序里面遇到一个 Action 算子的时候,代码才会真正的被执行。

map(...)
filter(...)
flatMap(...)
...

Action 算子

触发代码的运行,我们一段 Spark 代码里面至少需要有一个 Action 操作。

reduce(...)
collect()
count()
...

Spark 的任务划分

Image

基本概念

Application:

用户编写的 Spark 应用程序,里面包括大量数据操作。

Job

根据 Action 算子将应用程序中的大量数据操作划分为一个个 Job,Job 之间按照串行方式执行。一个 Job 执行完成才会启动另一个 Job。

Stage

一个 Job 里的算子全部为 Transformation 算子,既描述 RDD 之间的转换关系。RDD 之间的转换关系又称为 RDD 依赖。RDD 依赖分为:窄依赖(narrow dependency)和宽依赖(wide dependency),宽依赖又称之为 shuffle 依赖。

Shuffle

有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为 Shuffle。

Image

首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。例如,逐个元素地执行map、然后filter操作;而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。第二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。

  • 宽依赖往往对应着shuffle操作,需要在运行的过程中将同一个RDD分区传入到不同的RDD分区中,中间可能涉及到多个节点之间数据的传输,而窄依赖的每个父RDD分区通常只会传入到另一个子RDD分区,通常在一个节点内完成。
  • 当RDD分区丢失时,对于窄依赖来说,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重新计算与子RDD分区对应的父RDD分区就行。这个计算对数据的利用是100%的
  • 当RDD分区丢失时,对于宽依赖来说,重算的父RDD分区只有一部分数据是对应丢失的子RDD分区的,另一部分就造成了多余的计算。宽依赖中的子RDD分区通常来自多个父RDD分区,极端情况下,所有父RDD都有可能重新计算。如下图,par4丢失,则需要重新计算par1,par2,par3,产生了冗余数据par5

Image

与 Job 的划分类似,根据宽依赖进行 Stage 划分。遇到宽依赖则划分为一个 Stage。相互依赖的 Stage 之间串行执行,没有相互依赖的 Stage 会并行执行。

Image

Task

Task 是 Spark 最细的执行单元。Task 的数量其实就是 Stage 的并行度。Task 的数量对应着 RDD Partition 的数量, 即每个 Partition 都被分配一个 Task。

Partition

Partition 是RDD内部并行计算的一个计算单元,RDD 的数据集在逻辑上被划分为多个 Partition ,每一个 Partition 称为分区,分区的格式决定了并行计算的粒度,而每个分区的数值计算都是在一个 Task 中进行的,因此 Task 的个数,也是由 RDD (准确来说是作业最后一个RDD) 的分区数决定。

Spark 分区数量

RDD分区的一个分区原则:尽可能让分区的个数等于集群核心数目

  • 通过配置自定义分区个数
  • 根据 Spark 集群环境自动确定

Spark 分区方式

  • 文件:将文件按照分区数量拆成一个个块。
  • kv数据:根据具体分区器进行分区
    • HashPartitioner: hash 分区,根据 key 的 hashcode 对分区数量进去取余,如果余数小于0,则用余数+分区的个数将数据进行划分。
    • RangePartitioner:范围分区,对数据进行采样(水塘抽样)确认每个分区的边界。
    • 自定义分区器。

Spark 的任务运行过程

基本概念

Driver表示 Application 中 的 main 函数,创建 SparkContext。由 SparkContext 负责与集群中的Cluster Manager 进行通信,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭 SparkContext。
Cluster Manager指的是在集群上获取资源(Worker)的外部服务。不同的集群方式具有不同的 Cluster Manager。
Worker集群中可以运行 Application 代码的节点。
ExecutorWorker 节点上的一个进程,该进程负责运行某些 Task,并且负责将数据存在内存或者磁盘上。它负责将 Task 包装成 taskRunner,并从线程池中抽取出一个空闲线程运行 Task,每个 Executor 能并行运行 Task 的数量取决于设置的 core 的个数,既各个 Executor 使用的并发线程数目(Task 被执行的并发度 = Executor 数目 * 每个 Executor 的 core 数)。
DAGScheduler根据 Job 构建基于 Stage 的 DAG 图,既划分 Stage,并提交 Stage 给 TaskScheduler,其划分 Stage 的依据是RDD之间的依赖关系(每个 Stage 封装一个 TaskSet)。
TaskScheduler为每个 Executor 分配要运行的 Task,并监控 Task 的运行情况。
SchedulerBackendSchedulerBackend 是 TaskScheduler 的调度后端接口。TaskScheduler 给 Task 分配 Executor 实际是通过 SchedulerBackend 来完成的,SchedulerBackend 给 Task 分配完资源后将与分配给 Task 的 Executor 通信,并要求后者运行 Task。

Spark 节点之间的关系图

Image

Spark 任务分配图

Image

Spark 的基本运行流程

Image

  1. 构建 Spark Application 的运行环境(启动 SparkContext,初始化 DAGScheduler、TaskScheduler、SchedulerBackend),SparkContext 向 Cluster Manager 注册并申请运行 Executor 资源。
  2. Cluster Manager 分配 Executor 资源并启动 ExecutorBackend,Executor 运行情况将随着心跳发送到 Cluster Manager 上。
  3. Executor 向 Driver 的 SchedulerBackend 注册。
  4. 运行 Application 代码,遇到 Action 算子拆分 Job 形成 DAG 图,提交给 DAGScheduler。
  5. DAGScheduler 根据窄依赖将 Job 拆分为多个 Stage,Stage 中一个 RDD 分区对应一个 Task,形成可并行处理的 TaskSet。之后提交 TaskSet 到 TaskScheduler。
  6. TaskScheduler 管理 Task 的状态,并通过 SchedulerBackend 分配 Task 到具体 Executor 中执行。
  7. SchedulerBackend 将 Task 序列化并发送到具体 Executor 中执行。
  8. Spark 作业执行完毕,Driver 向 Cluster Manager 注销资源,Cluster Manager 释放所有资源。

Spark 分布式运行方式

使用不同的 Cluster Manager,Spark 支持多种分布式运行方式

Spark 支持多种资源调度器:

  • Standalone:独立模式,Spark 原生的简单集群管理器,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统,使用 Standalone 可以很方便地搭建一个集群;
  • Hadoop Yarn
  • Apache Mesos
  • Kubernetes

根据 Drive 端所处位置有不同的运行模式

  • Client:Drive 端运行在本地 Client 中,在 Spark 任务执行过程中 Client 不能离开。
  • Cluster:Drive 端运行在 Cluster Manager 提供的资源中,Spark 执行过程 Client 可以离开。

Spark on Standalone

在 standalone 下,主要的节点有 Client 节点、Master 节点、Worker 节点。Master 节点充当 Cluster Manager,Driver 可以运行在本地的 Client 节点(Client 模式)或者 Worker 节点(Cluster 模式)中。

Client 模式

Image

Cluster 模式

Image

Spark on YARN

Spark 运行模式

Client 模式

Image

Cluster 模式

Image