Spark
type
Post
status
Published
date
Mar 22, 2026
slug
spark
summary
tags
category
Big Data
icon
password
Place
RDD
Resilient Distributed Dataset(弹性分布式数据集)
RDD指的是一个抽象的概念,用户通过操作RDD来不需要关心底层细节。
4大属性:
- partitions:数据分片,RDD的数据被切分为数据分片,散落在集群的不同节点上
- partitioner(分区器):分片切割规则,Partitioner 决定数据去哪个 分区号 (Partition ID)(逻辑层)(拥有 Partitioner 的通常是Shuffle 类转换算子)
- dependencies:RDD依赖,指的是上一个RDD(从哪个父RDD而来)
- compute:转换函数(父RDD到现在RDD的转换函数)
分布式计算过程
Spark 采用懒执行(Lazy Evaluation)机制。当遇到 Transformation 算子(如 map, filter)时,并不会立即计算,而是隐式构建 DAG(有向无环图)。
当触发action算子时,DAGShceduler会以宽依赖将DAG划分为stage,然后以从后向前,递归的形式执行。
一旦确定stage可以运行了,就会将stage转化为TaskSet(stage里由多少个partition就有多少个Task,打包为一个taskSet)。接下来将TaskSet扔给TaskScheduler。
TaskScheduler以任务的本地倾向性将任务分配到数据所在的节点去执行
什么是 Shuffle?
Shuffle指的是集群范围内跨节点、跨进程的数据分发。
之前学的几个算子比如map,filter,mapPartition,flatmap都是用于RDD内部的数据转换,不会引入Shuffle计算
而groupByKey,sortByKey,reduceByKey,aggregateByKey都会引入Shuffle计算,并且这些算子只可以作用在paired(KV)RDD上。
磁盘 I/O (Disk I/O): 这是最核心的瓶颈。Map 端必须将中间结果写入磁盘以保证容错,而 Reduce 端又必须从磁盘读取这些文件。大量的小文件读写会严重拖慢速度。
网络 I/O (Network I/O): 这是数据流动的瓶颈。下游 Reduce 任务需要跨节点去上游拉取属于自己的数据。如果数据量大或发生数据倾斜,会导致集群带宽被打满,引发网络拥堵。
CPU 开销 (CPU Overhead): 这是计算资源的瓶颈。
- 首先是序列化与反序列化,CPU 必须将对象在内存和字节流之间转换。
- 其次是排序与压缩,Shuffle 过程通常涉及 Key 的排序和数据的压缩解压,这都是 CPU 密集型操作。”
为什么 Spark SQL 比原生的 RDD 快?
DataFrame = RDD + Schema(数据格式) + 优化器
RDD相当于一个黑盒,不知道其内部细节,无法进行优化,与RDD相比,DataFrame的最大优势在于其结构化特性。DataFrame具有明确的schema信息,这使得Spark能够理解数据的结构,从而应用各种优化技术。
DataFrame会多走一层DataFrame API
DataFrame API里有两个优化器,分别是Catalyst优化器和Tungsten。
Catalyst优化器主要是逻辑优化和物理优化。逻辑优化和MySQL很像,索引下推,条件下推等
Tungsten优化有点难理解
排序的区别
1. ORDER BY:全局排序
- 业务场景:产品经理需要一份数据看板,每天统计出搜索次数最多的前 100 个热词(Top 100)。由于是全局维度的绝对排名,数据量经过聚合后已经很小,可以进行全局排序后写入结果表。
2. SORT BY:局部排序并写入
- 业务场景:日常的 ODS 到 DWD 层的清洗落盘。为了让下游在按时间段查询时能利用 Parquet 文件的“谓词下推”和“最小值/最大值”索引(直接跳过无关的文件块),要求写入的每个文件内部按时间戳
ts有序。
3. DISTRIBUTE BY:按键重分区并写入
- 业务场景:上游采集的日志极其零散,或者存在大量小文件。为了让下游以
uid为维度的聚合计算(比如统计每个人的日活)避免大范围的跨节点 Shuffle,我们在写这层表时,干脆把同一个uid的日志强行分发到同一个文件(节点)里。这里暂不需要排序。
4. CLUSTER BY:分发并局部排序(同字段)写入
- 业务场景:下游的推荐算法团队要求提供一份数据进行
Sort-Merge Join训练模型。他们要求:同一个uid的数据必须存在同一个文件里,并且在这个文件内部,还要按uid升序排好。
DISTRIBUTE BY + SORT BY
- 业务场景:做用户行为轨迹(Session)分析。不仅要把同一个用户(
uid)的所有行为聚合到一个文件里(方便单节点处理这个用户的所有逻辑),并且在文件内部,必须严格按照用户的动作发生时间(ts)先后排好序,方便用窗口函数(比如LAG,LEAD)计算他的操作间隔。
Spark执行计划分析

DRIVER LOG
它记录的是 Driver进程 的标准输出和错误日志,这个一般是程序程序报错后进行失败排查
Spark UI(HISTORY URL)
最常用的,任务结束后的完整分析
- Jobs:所有Job的执行情况
- Stages:每个Stage的耗时、Task分布
- Storage:缓存的RDD信息
- Environment:所有Spark配置参数
- Executors:每个Executor的内存、GC时间、Shuffle读写量
- SQL:SQL执行计划(如果用了Spark SQL)
集群所有执行计划
因为History Server是整个集群共享的,我们要选择自己执行的application ID

Job
你的Spark代码每触发一次"行动操作(Action)"就产生一个Job

Stage
一个Job可能有多个Stage,这里Stage按Shuffle边界划分

Task
Metric | Min | 25th percentile | Median | 75th percentile | Max |
Duration | 最快 Task 耗时 | 25% Task 在此耗时以下 | 一半 Task 在此耗时以下 | 75% Task 在此耗时以下 | 最慢 Task 耗时 |
GC Time | 最少 GC 耗时 | 25% Task 在此以下 | GC 耗时中位数 | 75% Task 在此以下 | 最长 GC 耗时 |
Input Size / Records | 最小读入量/条数 | 25% Task 在此以下 | 读入量中位数 | 75% Task 在此以下 | 最大读入量/条数 |
Shuffle Write Size / Records | 最小写出量/条数 | 25% Task 在此以下 | 写出量中位数 | 75% Task 在此以下 | 最大写出量/条数 |

Executors
ㅤ | RDD Blocks | Storage Memory | Disk Used | Cores | Active Tasks | Failed Tasks | Complete Tasks | Total Tasks | Task Time (GC Time) | Input | Shuffle Read | Shuffle Write | Excluded |
Active(N) | 缓存的 RDD 分区数 | 已用/总可用存储内存 | 溢写到磁盘的数据量 | 该组 Executor 的总核数 | 当前正在运行的 Task 数 | 执行失败的 Task 数 | 已完成的 Task 数 | 分配到的 Task 总数 | Task 执行总时间(其中 GC 总时间) | 读取外部数据源的总量 | Shuffle 读入总量 | Shuffle 写出总量 | 被黑名单排除的 Executor 数 |
Dead(N) | 同上,已死亡的 Executor | 同上 | 同上 | 同上 | 同上 | 同上 | 同上 | 同上 | 同上 | 同上 | 同上 | 同上 | 同上 |
Total(N) | 所有 Executor 汇总 | 汇总 | 汇总 | 汇总 | 汇总 | 汇总 | 汇总 | 汇总 | 汇总 | 汇总 | 汇总 | 汇总 | 汇总 |

TRACKING URL
这里就是任务没结束的时候的Spark UI
AM URL
这里只有在任务提交都失败的时候就去看,任务成功的话没必要看

日志怎么打
- Driver 代码:
main函数里,算子外部的代码。 - 能打日志吗? 能,随便打。
- Executor 代码:
map,filter,foreach等算子内部的 Lambda 表达式。 - 能打日志吗?
- 业务流水日志:绝对禁止(如“正在处理第x条数据”)。
- 异常错误日志:必须打(在
catch块里,记录导致崩溃的那条脏数据)。
但是不用刻意练习,因为所以实际开发中,Executor 日志的问题基本不用刻意去想,你按正常习惯写,日志就已经在 Driver 里了。 算子大部分都是Lambda紧凑型的,本身就没有给我们写日志的机会
上一篇
Hadoop
下一篇
Prompt-Engineering
Loading...