Spark

type
Post
status
Published
date
Mar 22, 2026
slug
spark
summary
tags
category
Big Data
icon
password
Place

RDD

Resilient Distributed Dataset(弹性分布式数据集)
RDD指的是一个抽象的概念,用户通过操作RDD来不需要关心底层细节。
4大属性
  • partitions:数据分片,RDD的数据被切分为数据分片,散落在集群的不同节点上
  • partitioner(分区器):分片切割规则,Partitioner 决定数据去哪个 分区号 (Partition ID)(逻辑层)(拥有 Partitioner 的通常是Shuffle 类转换算子
  • dependencies:RDD依赖,指的是上一个RDD(从哪个父RDD而来)
  • compute:转换函数(父RDD到现在RDD的转换函数)

分布式计算过程

Spark 采用懒执行(Lazy Evaluation)机制。当遇到 Transformation 算子(如 map, filter)时,并不会立即计算,而是隐式构建 DAG(有向无环图)。
当触发action算子时,DAGShceduler会以宽依赖将DAG划分为stage,然后以从后向前,递归的形式执行。
一旦确定stage可以运行了,就会将stage转化为TaskSet(stage里由多少个partition就有多少个Task,打包为一个taskSet)。接下来将TaskSet扔给TaskScheduler。
TaskScheduler以任务的本地倾向性将任务分配到数据所在的节点去执行

什么是 Shuffle?

Shuffle指的是集群范围内跨节点、跨进程的数据分发
之前学的几个算子比如map,filter,mapPartition,flatmap都是用于RDD内部的数据转换,不会引入Shuffle计算
而groupByKey,sortByKey,reduceByKey,aggregateByKey都会引入Shuffle计算,并且这些算子只可以作用在paired(KV)RDD上。
磁盘 I/O (Disk I/O): 这是最核心的瓶颈。Map 端必须将中间结果写入磁盘以保证容错,而 Reduce 端又必须从磁盘读取这些文件。大量的小文件读写会严重拖慢速度。
网络 I/O (Network I/O): 这是数据流动的瓶颈。下游 Reduce 任务需要跨节点去上游拉取属于自己的数据。如果数据量大或发生数据倾斜,会导致集群带宽被打满,引发网络拥堵。
CPU 开销 (CPU Overhead): 这是计算资源的瓶颈。
  • 首先是序列化与反序列化,CPU 必须将对象在内存和字节流之间转换。
  • 其次是排序与压缩,Shuffle 过程通常涉及 Key 的排序和数据的压缩解压,这都是 CPU 密集型操作。”

为什么 Spark SQL 比原生的 RDD 快?

DataFrame = RDD + Schema(数据格式) + 优化器
RDD相当于一个黑盒,不知道其内部细节,无法进行优化,与RDD相比,DataFrame的最大优势在于其结构化特性。DataFrame具有明确的schema信息,这使得Spark能够理解数据的结构,从而应用各种优化技术。
DataFrame会多走一层DataFrame API
DataFrame API里有两个优化器,分别是Catalyst优化器和Tungsten。
Catalyst优化器主要是逻辑优化和物理优化。逻辑优化和MySQL很像,索引下推,条件下推等
Tungsten优化有点难理解

排序的区别

 

1. ORDER BY:全局排序

  • 业务场景:产品经理需要一份数据看板,每天统计出搜索次数最多的前 100 个热词(Top 100)。由于是全局维度的绝对排名,数据量经过聚合后已经很小,可以进行全局排序后写入结果表。

2. SORT BY:局部排序并写入

  • 业务场景:日常的 ODS 到 DWD 层的清洗落盘。为了让下游在按时间段查询时能利用 Parquet 文件的“谓词下推”和“最小值/最大值”索引(直接跳过无关的文件块),要求写入的每个文件内部按时间戳 ts 有序。

3. DISTRIBUTE BY:按键重分区并写入

  • 业务场景:上游采集的日志极其零散,或者存在大量小文件。为了让下游以 uid 为维度的聚合计算(比如统计每个人的日活)避免大范围的跨节点 Shuffle,我们在写这层表时,干脆把同一个 uid 的日志强行分发到同一个文件(节点)里。这里暂不需要排序。

4. CLUSTER BY:分发并局部排序(同字段)写入

  • 业务场景:下游的推荐算法团队要求提供一份数据进行 Sort-Merge Join 训练模型。他们要求:同一个 uid 的数据必须存在同一个文件里,并且在这个文件内部,还要按 uid 升序排好。

DISTRIBUTE BY + SORT BY

  • 业务场景:做用户行为轨迹(Session)分析。不仅要把同一个用户(uid)的所有行为聚合到一个文件里(方便单节点处理这个用户的所有逻辑),并且在文件内部,必须严格按照用户的动作发生时间(ts)先后排好序,方便用窗口函数(比如 LAG, LEAD)计算他的操作间隔。

Spark执行计划分析

notion image

DRIVER LOG

它记录的是 Driver进程 的标准输出和错误日志,这个一般是程序程序报错后进行失败排查

Spark UI(HISTORY URL)

最常用的,任务结束后的完整分析
  • Jobs:所有Job的执行情况
  • Stages:每个Stage的耗时、Task分布
  • Storage:缓存的RDD信息
  • Environment:所有Spark配置参数
  • Executors:每个Executor的内存、GC时间、Shuffle读写量
  • SQL:SQL执行计划(如果用了Spark SQL)

集群所有执行计划

因为History Server是整个集群共享的,我们要选择自己执行的application ID
notion image

Job

你的Spark代码每触发一次"行动操作(Action)"就产生一个Job
 
notion image

Stage

一个Job可能有多个Stage,这里Stage按Shuffle边界划分
notion image

Task

Metric
Min
25th percentile
Median
75th percentile
Max
Duration
最快 Task 耗时
25% Task 在此耗时以下
一半 Task 在此耗时以下
75% Task 在此耗时以下
最慢 Task 耗时
GC Time
最少 GC 耗时
25% Task 在此以下
GC 耗时中位数
75% Task 在此以下
最长 GC 耗时
Input Size / Records
最小读入量/条数
25% Task 在此以下
读入量中位数
75% Task 在此以下
最大读入量/条数
Shuffle Write Size / Records
最小写出量/条数
25% Task 在此以下
写出量中位数
75% Task 在此以下
最大写出量/条数
notion image

Executors

RDD Blocks
Storage Memory
Disk Used
Cores
Active Tasks
Failed Tasks
Complete Tasks
Total Tasks
Task Time (GC Time)
Input
Shuffle Read
Shuffle Write
Excluded
Active(N)
缓存的 RDD 分区数
已用/总可用存储内存
溢写到磁盘的数据量
该组 Executor 的总核数
当前正在运行的 Task 数
执行失败的 Task 数
已完成的 Task 数
分配到的 Task 总数
Task 执行总时间(其中 GC 总时间)
读取外部数据源的总量
Shuffle 读入总量
Shuffle 写出总量
被黑名单排除的 Executor 数
Dead(N)
同上,已死亡的 Executor
同上
同上
同上
同上
同上
同上
同上
同上
同上
同上
同上
同上
Total(N)
所有 Executor 汇总
汇总
汇总
汇总
汇总
汇总
汇总
汇总
汇总
汇总
汇总
汇总
汇总
notion image

TRACKING URL

这里就是任务没结束的时候的Spark UI

AM URL

这里只有在任务提交都失败的时候就去看,任务成功的话没必要看
notion image

日志怎么打

  • Driver 代码main 函数里,算子外部的代码。
    • 能打日志吗? 能,随便打。
  • Executor 代码map, filter, foreach 等算子内部的 Lambda 表达式。
    • 能打日志吗?
      • 业务流水日志绝对禁止(如“正在处理第x条数据”)。
      • 异常错误日志必须打(在 catch 块里,记录导致崩溃的那条脏数据)。
但是不用刻意练习,因为所以实际开发中,Executor 日志的问题基本不用刻意去想,你按正常习惯写,日志就已经在 Driver 里了。 算子大部分都是Lambda紧凑型的,本身就没有给我们写日志的机会
上一篇
Hadoop
下一篇
Prompt-Engineering
Loading...