# Apache Spark批处理系统
# 什么是Apache Spark
# 简单理解
想象你要处理一个巨大的Excel表格(比如几百万行数据):
- 传统方式:一行一行处理,很慢
- Spark方式:把表格分成很多小块,多台电脑同时处理,速度快很多
# 官方定义
Apache Spark是一个快速、通用、可扩展的大数据处理引擎。
核心特点:
- 速度快:2014年用1/10的资源比Hadoop快3倍
- 易用性:支持Java、Scala、Python、R等多种语言
- 通用性:支持批处理、流处理、机器学习、图计算
- 容错性:自动恢复失败的任务
# Spark为什么诞生?MapReduce的局限性
MapReduce的三大局限性:
- 框架表达能力有限
- 只有Map和Reduce两个基础算子
- 复杂算法需要多轮MapReduce,编程复杂
- 单个作业Shuffle开销大
- Shuffle阶段数据以阻塞方式传输
- 大量磁盘IO,延迟高
- 多作业衔接效率低
- 作业间数据传递依赖HDFS
- 频繁的磁盘读写,应用程序延迟高
# Spark vs Hadoop详细对比
特性 | Hadoop MapReduce | Apache Spark |
---|---|---|
计算模式 | 磁盘计算 | 内存计算(优先) |
数据存储 | HDFS文件 | RDD内存数据集 |
容错机制 | 数据复制 | 血缘关系重算 |
迭代计算 | 每轮读写HDFS | 数据驻留内存 |
算子丰富度 | Map + Reduce | 80+丰富算子 |
编程复杂度 | 高(需要大量代码) | 低(简洁API) |
实时处理 | 不支持 | 支持(Spark Streaming) |
速度 | 基准 | 快10-100倍 |
# Spark核心概念
# 基础架构详解
graph TB
Client[客户端] -->|提交应用| Driver[驱动器Driver]
Driver -->|请求资源| CM[集群管理器ClusterManager]
CM -->|启动Executor| Worker1[工作节点Worker1]
CM -->|启动Executor| Worker2[工作节点Worker2]
CM -->|启动Executor| Worker3[工作节点Worker3]
Worker1 --> Executor1[执行器Executor1]
Worker2 --> Executor2[执行器Executor2]
Worker3 --> Executor3[执行器Executor3]
Driver <-->|任务分发/结果收集| Executor1
Driver <-->|任务分发/结果收集| Executor2
Driver <-->|任务分发/结果收集| Executor3
# 核心组件详解
# Driver(驱动器)
- 作用:应用程序的主控制器,就像项目经理
- 职责:
- 启动应用程序的主方法
- 创建SparkContext
- 构建DAG(有向无环图)
- 任务调度和监控
- 结果收集
Driver部署方式:
- Client模式:Driver运行在客户端,适合调试
- Cluster模式:Driver运行在集群中,适合生产环境
# Cluster Manager(集群管理器)
- 作用:资源管理者,负责整个集群的资源分配
- 类型:
- Standalone:Spark自带(Master + Worker)
- YARN:Hadoop生态(ResourceManager + NodeManager)
- Mesos:通用集群管理
- Kubernetes:容器化部署
# Executor(执行器)
- 作用:真正的工人,执行具体计算任务
- 特点:
- 运行在Worker节点上的进程
- 启动多个线程执行Task
- 管理内存和磁盘存储
- 向Driver汇报任务状态
# SparkContext
- 作用:Spark应用的入口点,就像总指挥部
- 职责:
- 连接集群
- 创建RDD
- 广播变量和累加器
- 任务调度
# 应用程序执行流程
sequenceDiagram
participant Client as 客户端
participant Driver as Driver程序
participant CM as 集群管理器
participant Executor as 执行器
Client->>Driver: 1. 启动Driver程序
Driver->>Driver: 2. 创建SparkContext
Driver->>CM: 3. 申请计算资源
CM->>Executor: 4. 启动Executor进程
Executor->>Driver: 5. 向Driver注册
Driver->>Driver: 6. 构建DAG图并划分Stage
Driver->>Executor: 7. 分发Task任务
Executor->>Executor: 8. 执行Task并返回结果
Executor->>Driver: 9. 返回执行结果
Driver->>Client: 10. 收集结果并输出
# DAG(有向无环图)
什么是DAG?
- Directed Acyclic Graph
- 描述RDD之间的依赖关系
- 没有环路的有向图
graph LR
A[原始数据] --> B[RDD1]
B -->|map| C[RDD2]
C -->|filter| D[RDD3]
D -->|reduce| E[结果]
DAG的作用:
- 优化执行计划:分析依赖关系,优化执行顺序
- 容错恢复:记录血缘关系,支持失败重算
- 任务调度:合理分配资源和任务
# Application、Job、Stage、Task层次关系
graph TB
App[Application应用程序]
App --> Job1[Job1作业1]
App --> Job2[Job2作业2]
Job1 --> Stage11[Stage1.1]
Job1 --> Stage12[Stage1.2]
Stage11 --> Task111[Task1.1.1]
Stage11 --> Task112[Task1.1.2]
Stage12 --> Task121[Task1.2.1]
Stage12 --> Task122[Task1.2.2]
Job2 --> Stage21[Stage2.1]
Stage21 --> Task211[Task2.1.1]
Stage21 --> Task212[Task2.1.2]
简单理解:
- Application:整个Spark程序
- Job:每个Action操作触发一个Job
- Stage:根据宽依赖划分的任务阶段
- Task:运行在Executor上的最小工作单元
# RDD详解
# 什么是RDD
类比理解:想象你有一本很厚的电话簿要查找信息:
- 传统方式:一页页翻,很慢
- RDD方式:把电话簿撕成很多页,分给不同的人同时查找
RDD特点:
- 分区:数据分散在多台机器上
- 只读:创建后不能修改,只能转换产生新RDD
- 弹性:自动容错,节点故障可以恢复
- 惰性求值:定义操作时不立即执行
# RDD操作分类
# 转换操作(Transformation)- 懒执行
不会立即计算,只是记录操作步骤
# 常见转换操作
rdd.map(func) # 对每个元素应用函数
rdd.filter(func) # 过滤满足条件的元素
rdd.flatMap(func) # 先map再flatten
rdd.distinct() # 去重
rdd.union(other_rdd) # 合并两个RDD
# 行动操作(Action)- 立即执行
触发实际计算并返回结果
# 常见行动操作
rdd.count() # 计算元素个数
rdd.collect() # 收集所有元素到Driver
rdd.first() # 获取第一个元素
rdd.take(n) # 获取前n个元素
rdd.saveAsTextFile() # 保存到文件
# RDD血缘关系(Lineage)
什么是血缘关系? 就像家族谱系一样,记录每个RDD是怎么从父RDD产生的。
graph LR
A[原始数据RDD] -->|map操作| B[RDD1]
B -->|filter操作| C[RDD2]
C -->|reduce操作| D[结果]
作用:
- 容错:如果某个分区丢失,可以根据血缘关系重新计算
- 优化:Spark可以分析血缘关系进行优化
# 依赖关系与Stage划分
# RDD依赖关系详解
# 窄依赖(Narrow Dependency)
定义:父RDD的每个分区最多只被子RDD的一个分区使用
graph TB
subgraph "窄依赖示例"
A1[父RDD分区A] --> B1[子RDD分区A']
A2[父RDD分区B] --> B2[子RDD分区B']
A3[父RDD分区C] --> B3[子RDD分区C']
A4[父RDD分区D] --> B4[子RDD分区D']
end
关系类型:
- 一对一:map、filter
- 多对一:union、coalesce
特点:
- ✅ 不需要shuffle
- ✅ 没有磁盘IO,效率高
- ✅ 可以流水线处理
- ✅ 支持本地故障恢复
# 宽依赖(Wide Dependency)
定义:父RDD的一个分区被子RDD的多个分区使用
graph TB
subgraph "宽依赖示例"
C1[父RDD分区A] --> D1[子RDD分区X]
C2[父RDD分区B] --> D1
C3[父RDD分区C] --> D2[子RDD分区Y]
C4[父RDD分区D] --> D2
C1 --> D2
C2 --> D3[子RDD分区Z]
C3 --> D3
C4 --> D3
end
特点:
- ❌ 需要shuffle操作
- ❌ 产生大量磁盘IO,效率较低
- ❌ 需要全量重算故障分区
- ⚠️ 无法流水线处理
常见操作:
groupByKey()
- 按key分组reduceByKey()
- 按key聚合join()
- 连接操作distinct()
- 去重repartition()
- 重新分区
# Stage划分机制
Stage划分原则:以宽依赖为边界进行划分
graph LR
A[textFile] --> B[flatMap]
B --> C[map]
C --> D[reduceByKey]
D --> E[collect]
subgraph Stage1
A --> B
B --> C
end
subgraph Stage2
D --> E
end
C -.->|宽依赖| D
划分方法:
- 反向解析DAG:从Action操作开始往前追溯
- 遇到宽依赖就断开:创建新的Stage
- 遇到窄依赖就合并:加入当前Stage
# Stage类型详解
# ShuffleMapStage(中间阶段)
- 输入:外部数据或其他Stage输出
- 输出:Shuffle数据,作为下一Stage输入
- 特点:
- 不是最终Stage
- 输出必须经过Shuffle
- 不一定在每个DAG中存在
- 任务类型:ShuffleMapTask
# ResultStage(结果阶段)
- 输入:外部数据或ShuffleMapStage输出
- 输出:最终结果或存储
- 特点:
- 最终Stage
- 每个DAG必有一个
- 产生应用程序结果
- 任务类型:ResultTask
# 数据传输机制
# Stage内部数据传输(Pipeline流水线)
- 条件:所有依赖都是窄依赖
- 方式:Pipeline(流水线)方式
- 特点:
- 非阻塞传输
- 内存中直接传递
- 高效率
graph LR
A[数据] -->|内存| B[Task1]
B -->|内存| C[Task2]
C -->|内存| D[Task3]
D --> E[结果]
# Stage之间数据传输(Shuffle)
- 条件:存在宽依赖
- 方式:Shuffle方式
- 特点:
- 阻塞传输
- 需要磁盘存储
- 网络传输开销
sequenceDiagram
participant SM as ShuffleMapTask
participant Disk as 本地磁盘
participant RT as 下游Task
SM->>SM: 按分区函数划分
SM->>Disk: 写入ShuffleBlockFile
RT->>Disk: 读取ShuffleBlockFile
RT->>RT: 存入Buffer继续计算
# Task类型与执行
# ShuffleMapTask
- 位置:ShuffleMapStage中
- 作用:处理数据并输出Shuffle文件
- 输出:按分区划分的磁盘文件
# ResultTask
- 位置:ResultStage中
- 作用:处理数据并产生最终结果
- 输出:应用程序结果
Task数量计算:
示例:3个分区的RDD经过2个Stage
ShuffleMapStage: 3个ShuffleMapTask
ResultStage: 3个ResultTask
总计:6个Task
# Shuffle机制深度解析
Shuffle的代价:
- 磁盘IO:写入和读取中间结果
- 网络IO:跨节点数据传输
- CPU开销:序列化/反序列化
- 内存占用:缓存shuffle数据
为什么Shuffle慢?
graph LR
A[正常处理] --> B[内存计算]
B --> C[结果]
D[Shuffle处理] --> E[内存计算]
E --> F[磁盘写入]
F --> G[网络传输]
G --> H[磁盘读取]
H --> I[内存计算]
I --> J[结果]
# Spark程序执行流程
# 整体流程
graph TB
A[创建SparkContext] --> B[创建RDD]
B --> C[RDD转换操作]
C --> D[行动操作触发]
D --> E[任务调度和执行]
E --> F[返回结果]
# 详细执行过程
# 步骤1:创建应用程序
from pyspark import SparkContext
sc = SparkContext("local", "MyApp")
# 步骤2:定义RDD转换
text_file = sc.textFile("data.txt")
words = text_file.flatMap(lambda line: line.split(" "))
word_pairs = words.map(lambda word: (word, 1))
# 步骤3:行动操作触发计算
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
result = word_counts.collect() # 这里才开始实际计算
# 任务调度
Stage划分:
- 以宽依赖为边界划分Stage
- 每个Stage内部是窄依赖,可以流水线执行
Task分配:
- 每个分区对应一个Task
- Executor并行执行多个Task
# API伪代码讲解
# SparkContext创建与配置
# SparkContext伪代码
class SparkContext:
def __init__(self, master, app_name, conf=None):
"""
master: 集群URL或"local"
app_name: 应用程序名称
conf: 配置对象
"""
self.master = master
self.app_name = app_name
self.conf = conf or SparkConf()
def parallelize(self, collection, num_partitions=None):
"""从集合创建RDD"""
return RDD(collection, num_partitions)
def textFile(self, path, min_partitions=None):
"""从文件创建RDD"""
return RDD(read_file(path), min_partitions)
# RDD核心API设计
# RDD抽象类伪代码
class RDD:
def __init__(self, data, partitions):
self.data = data
self.partitions = partitions
self.lineage = [] # 血缘关系
# 转换操作(Transformation)- 懒执行
def map(self, func):
"""对每个元素应用函数"""
new_rdd = RDD(None, self.partitions)
new_rdd.lineage = self.lineage + [('map', func)]
return new_rdd
def filter(self, func):
"""过滤满足条件的元素"""
new_rdd = RDD(None, self.partitions)
new_rdd.lineage = self.lineage + [('filter', func)]
return new_rdd
def flatMap(self, func):
"""先map再flatten"""
new_rdd = RDD(None, self.partitions)
new_rdd.lineage = self.lineage + [('flatMap', func)]
return new_rdd
# 宽依赖操作
def groupByKey(self):
"""按key分组 - 会产生shuffle"""
new_rdd = RDD(None, self.partitions)
new_rdd.lineage = self.lineage + [('groupByKey', None)]
new_rdd.dependency_type = 'wide'
return new_rdd
def reduceByKey(self, func):
"""按key聚合 - 会产生shuffle但有本地合并优化"""
new_rdd = RDD(None, self.partitions)
new_rdd.lineage = self.lineage + [('reduceByKey', func)]
new_rdd.dependency_type = 'wide'
return new_rdd
# 行动操作(Action)- 立即执行
def collect(self):
"""收集所有数据到Driver"""
return self._execute_lineage()
def count(self):
"""计算元素个数"""
return len(self._execute_lineage())
def first(self):
"""获取第一个元素"""
return self._execute_lineage()[0]
def take(self, num):
"""获取前num个元素"""
return self._execute_lineage()[:num]
# 持久化操作
def cache(self):
"""缓存到内存"""
self.storage_level = 'MEMORY_ONLY'
return self
def persist(self, storage_level):
"""持久化到指定存储级别"""
self.storage_level = storage_level
return self
def checkpoint(self):
"""设置检查点"""
self.checkpointed = True
return self
# 内部执行方法
def _execute_lineage(self):
"""执行血缘关系中的所有操作"""
result = self.data
for operation, func in self.lineage:
if operation == 'map':
result = [func(x) for x in result]
elif operation == 'filter':
result = [x for x in result if func(x)]
elif operation == 'flatMap':
result = [item for x in result for item in func(x)]
# ... 其他操作
return result
# 词频统计完整API示例
# 词频统计API使用伪代码
def word_count_example():
# 1. 创建SparkContext
sc = SparkContext("local[*]", "WordCount")
# 2. 创建输入RDD
text_rdd = sc.textFile("input.txt")
# 3. 数据转换链(懒执行)
words_rdd = text_rdd.flatMap(lambda line: line.split())
pairs_rdd = words_rdd.map(lambda word: (word, 1))
counts_rdd = pairs_rdd.reduceByKey(lambda a, b: a + b)
# 4. 触发执行(Action操作)
results = counts_rdd.collect()
# 5. 处理结果
for word, count in results:
print(f"{word}: {count}")
# 6. 关闭SparkContext
sc.stop()
# 广播变量和累加器API
# 广播变量伪代码
class Broadcast:
def __init__(self, value):
self.value = value
def value(self):
"""获取广播的值"""
return self._value
# 累加器伪代码
class Accumulator:
def __init__(self, init_value):
self._value = init_value
def add(self, term):
"""累加值"""
self._value += term
def value(self):
"""获取累加结果"""
return self._value
# 使用示例
def broadcast_accumulator_example():
sc = SparkContext("local[*]", "BroadcastExample")
# 广播小表数据
small_table = {"A": 1, "B": 2, "C": 3}
broadcast_table = sc.broadcast(small_table)
# 创建累加器
error_count = sc.accumulator(0)
def process_data(item):
lookup_value = broadcast_table.value.get(item)
if lookup_value is None:
error_count.add(1)
return (item, lookup_value)
# 处理数据
data_rdd = sc.parallelize(["A", "B", "D", "E"])
result_rdd = data_rdd.map(process_data)
# 收集结果
results = result_rdd.collect()
print(f"错误计数: {error_count.value}")
# 缓存和检查点API
# 缓存和检查点使用伪代码
def caching_checkpoint_example():
sc = SparkContext("local[*]", "CacheCheckpointExample")
# 设置检查点目录
sc.setCheckpointDir("hdfs://checkpoint/")
# 创建复杂计算链
data_rdd = sc.parallelize(range(1000000))
# 复杂转换操作
processed_rdd = data_rdd.map(lambda x: x * 2) \
.filter(lambda x: x % 3 == 0) \
.map(lambda x: x + 1)
# 缓存频繁使用的RDD
processed_rdd.cache()
# 设置检查点截断长血缘链
processed_rdd.checkpoint()
# 触发执行和检查点
count = processed_rdd.count()
# 基于缓存/检查点的后续操作
final_result = processed_rdd.map(lambda x: x ** 2).reduce(lambda a, b: a + b)
print(f"最终结果: {final_result}")
# Shuffle优化API示例
# Shuffle优化API对比
def shuffle_optimization_examples():
sc = SparkContext("local[*]", "ShuffleOptimization")
# 创建测试数据
data_rdd = sc.parallelize([("key1", 1), ("key2", 2), ("key1", 3), ("key2", 4)])
# ❌ 低效方式:groupByKey后再聚合
def inefficient_groupby():
grouped = data_rdd.groupByKey() # 产生大量shuffle
result = grouped.map(lambda x: (x[0], sum(x[1])))
return result.collect()
# ✅ 高效方式:直接使用reduceByKey
def efficient_reduceby():
result = data_rdd.reduceByKey(lambda a, b: a + b) # 本地预聚合
return result.collect()
# ❌ 低效的Join操作
def inefficient_join():
large_rdd = sc.parallelize([("key1", "large_data1"), ("key2", "large_data2")])
small_rdd = sc.parallelize([("key1", "small1"), ("key2", "small2")])
result = large_rdd.join(small_rdd) # 两个RDD都要shuffle
return result.collect()
# ✅ 使用广播变量优化Join
def optimized_broadcast_join():
large_rdd = sc.parallelize([("key1", "large_data1"), ("key2", "large_data2")])
small_dict = {"key1": "small1", "key2": "small2"}
broadcast_small = sc.broadcast(small_dict)
def join_with_broadcast(item):
key, value = item
small_value = broadcast_small.value.get(key)
return (key, (value, small_value))
result = large_rdd.map(join_with_broadcast)
return result.collect()
# 关键概念深度剖析
# Merge vs Combine区别
graph TB
subgraph "Merge(归并)"
A1["文件1: (hello,1)(world,1)"] --> C1[归并排序]
A2["文件2: (hello,1)(bye,1)"] --> C1
C1 --> A3["结果: (bye,1)(hello,1)(hello,1)(world,1)"]
end
subgraph "Combine(合并)"
B1["输入: (hello,1)(hello,1)(world,1)"] --> C2[预聚合]
C2 --> B2["结果: (hello,2)(world,1)"]
end
Merge(归并):
- 目的:文件整理,多个有序文件合并为一个
- 操作:归并排序算法,保持数据有序性
- 数据变化:重新组织,不改变数据内容
- 时机:spill文件过多时,或Reduce端数据整理
Combine(合并):
- 目的:数据压缩,减少I/O和网络传输
- 操作:预聚合计算(mini-reduce)
- 数据变化:减少数据量,改变数据内容
- 时机:spill前、merge中都可应用
# 分布式缓存机制
graph TB
subgraph "传统Reduce-side Join"
A1[大表100GB] --> C1[Shuffle阶段]
A2[小表100MB] --> C1
C1 --> D1[网络开销巨大]
end
subgraph "Map-side Join with 分布式缓存"
B1[大表100GB] --> E1[Map阶段]
B2[小表100MB] --> F1[分布式缓存]
F1 --> E1
E1 --> D2[避免Shuffle,性能提升]
end
核心应用:Map-side Join优化
容错机制:
- 副本分发:利用HDFS多副本机制
- 故障恢复:节点故障时从其他副本重新下载
- 自动重试:缓存文件损坏时自动重新获取
# 缓冲区溢写必要性分析
Map任务:即使缓冲区无限大,仍需溢写磁盘
- 数据持久化需求:Map输出需要跨进程、跨时间、跨节点访问
- 容错要求:Reduce故障时需要重新读取Map输出
- 架构必然性:分布式计算的数据传递要求
Reduce任务:缓冲区足够大时可不溢写
- 数据来源稳定:输入来自Map的持久化输出
- 故障恢复简单:可重新从Map输出获取数据
- 调度灵活性:任务可在任意节点重新执行
# Spark与HDFS关系
# 架构关系
graph TB
subgraph "Hadoop生态系统"
subgraph "计算层"
MR[Spark分布式计算]
end
subgraph "存储层"
HDFS[HDFS分布式存储]
end
MR <--> HDFS
end
subgraph "设计理念"
A[计算向数据靠拢] --> B[数据本地化优化]
B --> C[减少网络传输]
C --> D[提高处理效率]
end
# 数据本地化层次
graph TD
A[任务调度] --> B{数据在本节点?}
B -->|是| C[Data Local最优]
B -->|否| D{数据在本机架?}
D -->|是| E[Rack Local次优]
D -->|否| F[Off-rack最后选择]
- Data Local:任务与数据在同一节点(最优)
- Rack Local:任务与数据在同一机架
- Off-rack:跨机架执行(最后选择)
# 容错机制
# 故障类型与处理
graph TB
subgraph "故障处理策略"
A[Driver故障] --> A1[程序重启]
B[Executor故障] --> B1[任务重新调度]
C[Task故障] --> C1[基于血缘关系重算]
end
Driver故障:
- 问题:整个应用程序中断
- 处理:需要重新启动应用程序
- 影响:所有计算重新开始
Executor故障:
- 检测:心跳机制超时检测
- 处理:重新调度失败任务到其他节点
- 透明性:对用户完全透明
Task故障:
- 处理方式:根据血缘关系重新计算
- 重试机制:达到最大尝试次数后标记作业失败
- 优化:检查点机制减少重算成本
# 数据恢复策略
sequenceDiagram
participant Task as 失败Task
participant Driver as Driver
participant Executor as 新Executor
Task->>Driver: 任务失败通知
Driver->>Driver: 分析血缘关系
Driver->>Executor: 重新调度Task
Executor->>Executor: 基于血缘关系重算
Executor->>Driver: 返回计算结果
重新计算vs数据冗余的权衡:
- Spark选择重新计算策略
- 简化系统设计,降低存储开销
- 利用血缘关系保证正确性
# 常见问题与误区
# 常见误区澄清
❌ 误区1:Spark完全基于内存计算
- ✅ 实际:Spark优先使用内存,内存不足时会spill到磁盘
❌ 误区2:Spark总是比MapReduce快
- ✅ 实际:对于简单的一次性ETL任务,性能提升有限
❌ 误区3:cache()立即将数据加载到内存
- ✅ 实际:cache()是懒执行的,需要Action操作触发
❌ 误区4:宽依赖总是不好的
- ✅ 实际:某些场景下宽依赖是必需的,关键是合理使用
❌ 误区5:RDD设计为不可变是缺陷
- ✅ 实际:不可变设计是以空间换时间,提供容错性和并发安全
# 为什么RDD设计为不可变?
设计原因:
- 容错性:可以根据血缘关系重新计算
- 并发安全:多线程访问无需加锁
- 缓存安全:缓存的数据不会被意外修改
- 函数式编程:符合函数式编程思想
以空间换时间的体现:
- 每次转换都创建新RDD(增加内存使用)
- 避免了数据同步和锁的开销(提高性能)
# Client vs Cluster部署模式
为什么需要两种模式?
# Client模式
graph TB
subgraph "客户端机器"
Client[SparkSubmit进程]
Driver[Driver程序]
end
subgraph "集群"
Executor1[Executor1]
Executor2[Executor2]
end
Client --> Driver
Driver <--> Executor1
Driver <--> Executor2
特点:
- Driver运行在客户端
- 适合调试和开发
- 客户端需要保持连接
- 结果直接返回客户端
使用场景:
- 交互式分析(spark-shell)
- 开发调试
- 数据探索
# Cluster模式
graph TB
subgraph "客户端机器"
Client[SparkSubmit进程]
end
subgraph "集群"
Driver[Driver程序]
Executor1[Executor1]
Executor2[Executor2]
end
Client -.->|提交后断开| Driver
Driver <--> Executor1
Driver <--> Executor2
特点:
- Driver运行在集群中
- 适合生产环境
- 客户端提交后可以断开
- 更好的资源利用
使用场景:
- 生产环境批处理
- 长时间运行的作业
- 自动化任务
# 习题讲解
# MapReduce局限性与Spark优势
Q1: 与Spark相比,MapReduce存在哪些局限性?
答案解析:
MapReduce存在三个主要局限性:
- 编程框架表达能力有限
- 仅提供Map和Reduce两个基础算子
- 复杂操作(如join、sort)需要用户自己实现
- 增加了编程难度和代码复杂度
- 单个作业Shuffle开销大
- Map端必须将结果写入本地磁盘
- Reduce端阻塞等待Map完成后才能读取
- 大量磁盘IO造成高延迟
- 多作业衔接效率低
- 作业间通过HDFS传递数据
- 迭代计算需要反复读写磁盘
- 整体应用延迟非常高
对比示例:
迭代计算场景:
MapReduce: 数据 → HDFS → 作业1 → HDFS → 作业2 → HDFS → ... (每次都要磁盘IO)
Spark: 数据 → 内存 → 转换1 → 转换2 → 转换3 → ... (数据驻留内存)
# 计算模型对比
Q2: 请说明Spark的逻辑计算模型和物理计算模型之间的区别与联系。
graph TB
subgraph "逻辑计算模型"
A[Operator DAG] --> B[算子级别描述]
C[RDD Lineage] --> D[数据转换描述]
end
subgraph "物理计算模型"
E[多个并行实例] --> F[实际执行]
G[Task分布执行] --> H[资源调度]
end
B -.->|映射| E
D -.->|映射| G
答案解析:
逻辑计算模型:
- Operator DAG:从算子角度描述计算过程
- RDD Lineage:从数据变换角度描述计算过程
- 描述了算子之间的数据流动关系
物理计算模型:
- 逻辑执行图上的一个算子在物理执行图上有多个并行实例
- 具体的Task分布在不同的Executor上执行
- 涉及实际的资源分配和任务调度
联系:物理执行图是逻辑执行图的具体实现,一对多的映射关系。
# RDD血缘关系与故障恢复
Q3: 什么是RDD Lineage?当Lineage较长时,如何加快故障恢复?
graph LR
A[外部数据源] --> B[RDD1]
B -->|map| C[RDD2]
C -->|filter| D[RDD3]
D -->|groupByKey| E[RDD4]
E -->|map| F[RDD5]
F -->|Action| G[输出]
H[检查点] -.->|截断血缘| D
答案解析:
RDD Lineage定义:
- Driver中SparkContext维护的记录RDD转换的DAG
- 记录了从外部数据源到最终输出的完整转换过程
- 支持基于血缘关系的故障恢复
故障恢复机制:
- 当某个RDD分区丢失时,可通过Lineage重新计算
- 只需重算丢失分区的依赖链,不需要全量重算
长血缘链优化:
- 设置检查点:在适当位置设置checkpoint
- 写入可靠存储:如HDFS等分布式文件系统
- 截断血缘链:检查点后的计算不依赖之前的血缘
- 加速恢复:基于检查点重算,减少计算量
# 部署模式架构区别
Q4: 请简述Standalone Client与Standalone Cluster两种模式下Spark架构的区别。
# Client模式架构
graph TB
subgraph "客户端节点"
SparkSubmit[SparkSubmit]
Driver[Driver程序]
end
subgraph "Master节点"
Master[Master]
end
subgraph "Worker节点"
Worker1[Worker1] --> Executor1[Executor1]
Worker2[Worker2] --> Executor2[Executor2]
end
SparkSubmit --> Driver
Driver --> Master
Master --> Worker1
Master --> Worker2
Driver <--> Executor1
Driver <--> Executor2
# Cluster模式架构
graph TB
subgraph "客户端节点"
SparkSubmit[SparkSubmit]
end
subgraph "Master节点"
Master[Master]
end
subgraph "Worker节点"
Worker1[Worker1] --> Executor1[Executor1]
Worker2[Worker2] --> DriverWrapper[DriverWrapper]
Worker3[Worker3] --> Executor2[Executor2]
end
SparkSubmit -.->|提交后断开| Master
Master --> Worker1
Master --> Worker2
Master --> Worker3
DriverWrapper <--> Executor1
DriverWrapper <--> Executor2
关键区别:
- Client模式:Driver和Client在同一进程中运行
- Cluster模式:Driver作为Worker启动的DriverWrapper进程运行
# Stage划分机制
Q5: Spark如何划分DAG中的Stage?
graph LR
A[textFile] -->|窄依赖| B[flatMap]
B -->|窄依赖| C[map]
C -->|宽依赖| D[groupByKey]
D -->|窄依赖| E[map]
E -->|Action| F[collect]
subgraph Stage1
A --> B
B --> C
end
subgraph Stage2
D --> E
E --> F
end
答案解析:
划分原则:
- 反向解析DAG:从Action操作开始向前追溯
- 宽依赖边界:遇到宽依赖就创建新Stage
- 窄依赖合并:窄依赖操作合并到同一Stage
划分结果:
- Stage内部都是窄依赖关系(支持流水线)
- Stage之间是宽依赖关系(需要Shuffle)
- 只有Stage之间的数据传输需要Shuffle
# Application与Job关系
Q6: Spark中的应用和作业是何种关系?
graph TB
Application[Application应用程序]
Application --> Job1[Job1]
Application --> Job2[Job2]
Application --> Job3[Job3]
subgraph "触发条件"
Action1[collect] --> Job1
Action2[count] --> Job2
Action3[saveAsTextFile] --> Job3
end
答案解析:
关系定义:
- 从逻辑角度:一个Application由一个或多个DAG组成,一个DAG对应一个Job
- 从物理执行角度:一个Application等于一个或多个Job
触发机制:
- 每个Action操作触发一个Job
- 一个Application可以包含多个Action,因此有多个Job
- Job是Spark调度和执行的基本单位
# Stage与Task联系
Q7: Spark中Stage与Task存在怎样的联系?
graph TB
Job[Job作业] --> Stage1[Stage1]
Job --> Stage2[Stage2]
Stage1 --> Task11[Task1.1]
Stage1 --> Task12[Task1.2]
Stage1 --> Task13[Task1.3]
Stage2 --> Task21[Task2.1]
Stage2 --> Task22[Task2.2]
Stage2 --> Task23[Task2.3]
答案解析:
基本定义:
- Task:运行在Executor上的最小工作单元
- Stage:一组相关的、相互之间没有Shuffle依赖的Task集合(TaskSet)
数量关系:
- 一个Stage包含多个Task
- Task数量 = Stage内RDD的分区数
- 每个分区对应一个Task
调度关系:
- Stage是Job的基本调度单位
- Task是具体的执行单位
- 同一Stage内的Task可以并行执行
# 数据交换方式
Q8: Spark中Stage内部如何进行数据交换?Stage之间如何进行数据交换?
# Stage内部数据交换
graph LR
A[数据] -->|Pipeline| B[Task1]
B -->|内存传递| C[Task2]
C -->|内存传递| D[Task3]
D --> E[结果]
- 方式:Pipeline(流水线)方式
- 特点:内存直接传递,非阻塞,高效
# Stage之间数据交换
sequenceDiagram
participant SM as ShuffleMapTask
participant Disk as 本地磁盘
participant RT as ResultTask
SM->>SM: 按partition函数划分
SM->>Disk: 写入ShuffleBlockFile
Note over Disk: Shuffle Write阶段
RT->>Disk: 读取ShuffleBlockFile
RT->>RT: 缓存并继续计算
Note over RT: Shuffle Read阶段
答案解析:
Stage之间需要Shuffle:
- Shuffle Write:ShuffleMapTask按分区函数写入本地磁盘
- Shuffle Read:下游Task读取相应的ShuffleBlockFile
为什么不同:
- Stage内部是窄依赖,数据流向确定
- Stage之间是宽依赖,需要重新分布数据
# 持久化与检查点对比
Q9: Spark的RDD持久化和检查点机制存在哪些异同点?
graph TB
subgraph "持久化机制"
A[RDD] --> B[内存/磁盘存储]
B --> C[保留血缘关系]
C --> D[程序结束时清除]
end
subgraph "检查点机制"
E[RDD] --> F[可靠外部存储HDFS]
F --> G[截断血缘关系]
G --> H[永久保存]
end
答案解析:
相同之处:
- 都可以为Spark提供容错机制
不同之处:
方面 | RDD持久化 | 检查点机制 |
---|---|---|
存储位置 | Spark内部节点 | 外部可靠存储(HDFS) |
主要目的 | 加速计算+容错 | 纯粹的故障恢复 |
执行时机 | 计算过程中 | 作业结束后独立作业 |
生命周期 | 程序结束后清除 | 永久保存 |
血缘关系 | 保留RDD Lineage | 截断血缘链 |
# 广播变量应用场景
Q10: Spark的广播变量机制通常用于什么场景?
graph TB
subgraph "传统Join问题"
A[大表雇员1TB] --> C[Shuffle]
B[小表部门1MB] --> C
C --> D[网络开销巨大]
end
subgraph "广播变量优化"
E[大表雇员1TB] --> G[Map端Join]
F[小表部门1MB] --> H[广播到各节点]
H --> G
G --> I[避免Shuffle]
end
答案解析:
主要场景:小表和大表自然连接
- 问题:传统join会导致大表产生大量Shuffle开销
- 解决方案:将小表广播到所有节点,在Map端完成join
- 效果:避免大表Shuffle,显著提升性能
适用条件:
- 其中一个表足够小,能完全装入内存
- 连接操作频繁
- 对性能要求较高
典型例子:
部门表(小)join 雇员表(大)
产品表(小)join 订单表(大)
地区表(小)join 销售表(大)
# 循环迭代API实现
Q11: 如何使用Spark API对有界数据实现循环迭代?请给出伪代码并解释为什么这么做。
# 迭代算法伪代码
def iterative_algorithm_example():
"""
以PageRank算法为例展示循环迭代API使用
"""
sc = SparkContext("local[*]", "IterativeAlgorithm")
# 1. 初始化数据
# 构建链接关系图: (页面, [链接到的页面列表])
links = sc.parallelize([
("A", ["B", "C"]),
("B", ["C"]),
("C", ["A"]),
("D", ["A", "B"])
]).partitionBy(4).cache() # 关键:预分区并缓存
# 2. 初始化排名: 每个页面初始排名为1.0
ranks = links.map(lambda page_links: (page_links[0], 1.0))
# 3. 迭代计算
for iteration in range(10): # 迭代10轮
# 计算每个页面对链接页面的贡献
contributions = links.join(ranks).flatMap(
lambda page_data: [
(link, page_data[1][1] / len(page_data[1][0]))
for link in page_data[1][0]
]
)
# 更新排名: 0.15是阻尼因子
ranks = contributions.reduceByKey(lambda x, y: x + y).map(
lambda page_rank: (page_rank[0], page_rank[1] * 0.85 + 0.15)
)
# 关键优化:周期性缓存中间结果
if iteration % 5 == 0:
ranks.cache()
ranks.count() # 触发缓存
return ranks.collect()
def kmeans_iterative_example():
"""
K-Means聚类算法迭代示例
"""
sc = SparkContext("local[*]", "KMeans")
# 数据点
points = sc.parallelize([
[1.0, 1.0], [1.5, 2.0], [3.0, 4.0],
[5.0, 7.0], [3.5, 5.0], [4.5, 5.0]
]).cache() # 缓存原始数据
# 初始化聚类中心
centers = [[2.0, 2.0], [4.0, 6.0]]
broadcast_centers = sc.broadcast(centers)
for iteration in range(20):
# 分配每个点到最近的聚类中心
closest_points = points.map(
lambda point: (
closest_center(point, broadcast_centers.value),
(point, 1)
)
)
# 重新计算聚类中心
new_centers = closest_points.reduceByKey(
lambda p1, p2: (
[p1[0][i] + p2[0][i] for i in range(len(p1[0]))],
p1[1] + p2[1]
)
).map(
lambda center_data: [
center_data[1][0][i] / center_data[1][1]
for i in range(len(center_data[1][0]))
]
).collect()
# 检查收敛性
if converged(centers, new_centers):
break
# 更新聚类中心
centers = new_centers
broadcast_centers = sc.broadcast(centers)
return centers
def gradient_descent_example():
"""
梯度下降算法迭代示例
"""
sc = SparkContext("local[*]", "GradientDescent")
# 训练数据 (特征, 标签)
training_data = sc.parallelize([
([1.0, 2.0], 1.0),
([2.0, 3.0], 1.0),
([3.0, 4.0], 0.0),
([4.0, 5.0], 0.0)
]).cache()
# 初始化权重
weights = [0.0, 0.0]
learning_rate = 0.1
for iteration in range(100):
# 广播当前权重
broadcast_weights = sc.broadcast(weights)
# 计算梯度
gradients = training_data.map(
lambda point: compute_gradient(point, broadcast_weights.value)
).reduce(lambda g1, g2: [g1[i] + g2[i] for i in range(len(g1))])
# 更新权重
weights = [
weights[i] - learning_rate * gradients[i] / training_data.count()
for i in range(len(weights))
]
# 周期性检查点
if iteration % 20 == 0:
# 可以设置检查点保存权重
print(f"Iteration {iteration}, weights: {weights}")
return weights
# 为什么这么做?核心原理解析
graph TB
subgraph "MapReduce迭代问题"
A[数据] --> B[HDFS读取]
B --> C[MapReduce作业1]
C --> D[HDFS写入]
D --> E[HDFS读取]
E --> F[MapReduce作业2]
F --> G[HDFS写入]
G --> H[...]
end
subgraph "Spark迭代优化"
I[数据] --> J[加载到内存]
J --> K[迭代计算1]
K --> L[迭代计算2]
L --> M[迭代计算3]
M --> N[内存中保持]
end
1. 内存数据复用
# 关键优化:缓存频繁访问的数据
links.cache() # 链接关系每轮都要用
points.cache() # 数据点每轮都要用
为什么:
- 迭代算法需要多轮访问相同数据集
- 内存缓存避免每轮重新读取磁盘
- MapReduce每轮都要读写HDFS,开销巨大
2. 广播小数据
# 将小的聚类中心广播到所有节点
broadcast_centers = sc.broadcast(centers)
为什么:
- 避免每次迭代都要Shuffle分发参数
- 减少网络传输开销
- 提高算法收敛速度
3. 周期性检查点
if iteration % 5 == 0:
ranks.cache() # 周期性缓存中间结果
ranks.count() # 触发实际缓存
为什么:
- 截断过长的血缘链
- 防止故障时需要从头重算
- 在迭代次数很多时特别重要
4. 预分区优化
links.partitionBy(4).cache() # 预分区避免后续shuffle
为什么:
- join操作如果两个RDD使用相同分区器,可以避免shuffle
- 减少每轮迭代的网络传输
- 特别适用于图算法
# Spark vs MapReduce迭代性能对比
算法类型 | MapReduce性能 | Spark性能 | 提升倍数 |
---|---|---|---|
PageRank | 每轮磁盘IO | 内存迭代 | 10-100x |
K-Means | 每轮HDFS读写 | 数据缓存 | 5-20x |
梯度下降 | 频繁磁盘访问 | 内存计算 | 20-100x |
核心优势总结:
- 数据驻留:中间结果保持在内存中,避免反复I/O
- 血缘优化:RDD血缘关系支持增量计算和容错
- 广播机制:小数据广播避免重复传输
- 缓存策略:智能缓存频繁访问的数据集
- 分区优化:合理分区减少shuffle开销
这些特性使得Spark特别适合机器学习、图计算等需要大量迭代的算法,相比MapReduce有显著的性能优势。