深入理解spark核心思想及源码分析.pdf
http://www.100md.com
2020年11月16日
![]() |
| 第1页 |
![]() |
| 第6页 |
![]() |
| 第12页 |
![]() |
| 第22页 |
![]() |
| 第42页 |
![]() |
| 第82页 |
参见附件(6684KB,91页)。
深入理解spark核心思想及源码分析
本书对Spark源代码进行了全面而深入的分析,旨在为Spark的优化、定制和扩展提供原理性的指导。阿里巴巴集团专家鼎力推荐,阿里巴巴资深Java开发和大数据专家撰写,Spark以其先进的设计理念,迅速成为社区的热门项目

相关内容部分预览






图书简介
本书对Spark源代码进行了全面而深入的分析,旨在为Spark的优化、定制和扩展提供原理性的指导。阿里巴巴集团专家鼎力推荐,阿里巴巴资深Java开发和大数据专家撰写。
本书对Spark的核心模块、部署和协作模块的实现原理与使用技巧进行了深入的剖析与解读。
本书分为三篇:
准备篇(第1~2章),介绍了Spark的环境搭建、设计理念与基本架构,帮助读者了解一些背景知识。
核心设计篇(第3~7章),着重讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析。通过这部分的内容,读者可以通过源码剖析更加深入理解Spark的核心设计与实现,以便在实际使用中能够快速解决线上问题并对性能进行调优。
扩展篇(第8~11章),主要讲解基于Spark核心的各种扩展及应用,包括SQL处理引擎、Hive处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等内容。通过阅读这部分内容,读者可以扩展实际项目中对Spark的应用场景,让Spark焕发活力。
图书目录
Contents 目录
前言
准 备 篇
第1章 环境准备2
1.1 运行环境准备2
1.1.1 安装JDK3
1.1.2 安装Scala3
1.1.3 安装Spark4
1.2 Spark初体验4
1.2.1 运行spark-shell4
1.2.2 执行word count5
1.2.3 剖析spark-shell7
1.3 阅读环境准备11
1.4 Spark源码编译与调试13
1.5 小结17
第2章 Spark设计理念与基本架构18
2.1 初识Spark18
2.1.1 Hadoop MRv1的局限18
2.1.2 Spark使用场景20
2.1.3 Spark的特点20
2.2 Spark基础知识20
2.3 Spark基本设计思想22
2.3.1 Spark模块设计22
2.3.2 Spark模型设计24
2.4 Spark基本架构25
2.5 小结26
核心设计篇
第3章 SparkContext的初始化28
3.1 SparkContext概述28
3.2 创建执行环境SparkEnv30
3.2.1 安全管理器SecurityManager31
3.2.2 基于Akka的分布式消息系统ActorSystem31
3.2.3 map任务输出跟踪器mapOutputTracker32
3.2.4 实例化ShuffleManager34
3.2.5 shuffle线程内存管理器ShuffleMemoryManager34
3 ......
大数据技术丛书
深入理解Spark:
核心思想与源码分析
耿嘉安 著图书在版编目(CIP)数据
深入理解 Spark:核心思想与源码分析 耿嘉安著 . —北京:机械工业出版社,2015.12
(大数据技术丛书)
ISBN 978-7-111-52234-8
I. 深… II.耿… III.数据处理软件 IV. TP274
中国版本图书馆 CIP 数据核字(2015)第 280808 号
深入理解Spark:核心思想与源码分析
出版发行:机械工业出版社(北京市西城区百万庄大街 22 号 邮政编码:100037)
责任编辑:高婧雅 责任校对:董纪丽
印 刷: 版 次:2016 年 1 月第 1 版第 1 次印刷
开 本:186mm×240mm 116 印 张:30.25
书 号: ISBN 978-7-111-52234-8 定 价:99.00 元
凡购本书,如有缺页、倒页、脱页,由本社发行部调换
客服热线:(010)88379426 88361066 投稿热线:(010)88379604
购书热线:(010)68326294 88379649 68995259 读者信箱:hzit@hzbook.com
版权所有·侵权必究
封底无防伪标均为盗版
本书法律顾问:北京大成律师事务所 韩光 邹晓东
Preface 前?言
为什么写这本书
要回答这个问题,需要从我个人的经历说起。说来惭愧,我第一次接触计算机是在高三。
当时跟大家一起去网吧玩 CS,跟身边的同学学怎么“玩”。正是通过这种“玩”的过程,让
我了解到计算机并没有那么神秘,它也只是台机器,用起来似乎并不比打开电视机费劲多少。
高考填志愿的时候,凭着直觉“糊里糊涂”就选择了计算机专业。等到真正学习计算机课程
的时候却又发现,它其实很难!
早在 2004 年,还在学校的我跟很多同学一样,喜欢看 Flash,也喜欢谈论 Flash 甚至做
Flash。感觉Flash 正如它的名字那样“闪光”。那些年,在学校里,知道 Flash 的人可要比知
道Java 的人多得多,这说明当时的 Flash十分火热。此外,Oracle也成为关系型数据库里的领
军人物,很多人甚至觉得懂Oracle 要比懂 Flash、Java 及其他数据库要厉害得多!
2007 年,我刚刚参加工作不久。那时 Struts1、Spring、Hibernate几乎可以称为那些
用Java 作为开发语言的软件公司的三驾马车。很快,Struts2替代了Struts1 的地位,让我第
一次意识到 IT 领域的技术更新竟然如此之快!随着很多传统软件公司向互联网公司转型,Hibernate也难以确保其地位,iBATIS诞生了!
2010 年,有关Hadoop 的技术图书涌入中国,当时很多公司用它只是为了数据统计、数
据挖掘或者搜索。一开始,人们对于Hadoop 的认识和使用可能相对有限。大约2011 年的时
候,关于云计算的概念在网上炒得火热,当时依然在做互联网开发的我,对其只是“道听途
说”。后来跟同事借了一本有关云计算的书,回家挑着看了一些内容,也没什么收获,怅然若
失! 20 世纪60年代,美国的军用网络作为互联网的雏形,很多内容已经与云计算中的某些
说法类似。到 20 世纪80 年代,互联网就已经启用了云计算,如今为什么又要重提这样的概
念?这个问题我可能回答不了,还是交给历史吧。
2012年,国内又呈现出大数据热的态势。从国家到媒体、教育、IT等几乎所有领域,人
人都在谈大数据。我的亲戚朋友中,无论老师、销售人员,还是工程师们都可以针对大数据谈
谈自己的看法。我也找来一些 Hadoop 的书籍进行学习,希望能在其中探索到大数据的奥妙。有幸在工作过程中接触到阿里的开放数据处理服务(open data processing service,ODPS),并且基于 ODPS 与其他小伙伴一起构建阿里的大数据商业解决方案—御膳房。去
杭州出差的过程中,有幸认识和仲,跟他学习了阿里的实时多维分析平台—Garuda 和实
时计算平台—Galaxy 的部分知识。和仲推荐我阅读 Spark 的源码,这样会对实时计算及流
式计算有更深入的了解。2015 年春节期间,自己初次上网查阅 Spark 的相关资料学习,开始
研究 Spark 源码。还记得那时只是出于对大数据的热爱,想使自己在这方面的技术能力有所
提升。
从阅读 Hibernate源码开始,到后来阅读Tomcat、Spring 的源码,我也在从学习源码的
过程中成长,我对源码阅读也越来越感兴趣。随着对Spark源码阅读的深入,发现很多内容
从网上找不到答案,只能自己“硬啃”了。随着自己的积累越来越多,突然有一天发现,我
所总结的这些内容好像可以写成一本书了!从闪光(Flash)到火花(Spark),足足有 11 个年
头了。无论是 Flash、Java,还是 Spring、iBATIS,我一直扮演着一个追随者,我接受这些书
籍的洗礼,从未给予。如今我也是Spark 的追随者,不同的是,我不再只想简单攫取,还要
给予。
最后还想说一下,2016 年是我从事 IT工作的第 10 个年头,此书特别作为送给自己的 10
周年礼物。
本书特色
T 按照源码分析的习惯设计,从脚本分析到初始化再到核心内容,最后介绍Spark 的扩
展内容。整个过程遵循由浅入深、由深到广的基本思路。
T 本书涉及的所有内容都有相应的例子,以便于读者对源码的深入研究。
T 本书尽可能用图来展示原理,加速读者对内容的掌握。
T 本书讲解的很多实现及原理都值得借鉴,能帮助读者提升架构设计、程序设计等方面
的能力。
T 本书尽可能保留较多的源码,以便于初学者能够在像地铁、公交这样的地方,也能轻
松阅读。
读者对象
源码阅读是一项苦差事,人力和时间成本都很高,尤其是对于 Spark 陌生或者刚刚开始
学习的人来说,难度可想而知。本书尽可能保留源码,使得分析过程不至于产生跳跃感,目
的是降低大多数人的学习门槛。如果你是从事IT 工作 1 ~ 3 年的新人或者是希望学习Spark
核心知识的人,本书非常适合你。如果你已经对Spark有所了解或者已经在使用它,还想进
一步提高自己,那么本书更适合你。
如果你是一个开发新手,对 Java、Linux等基础知识不是很了解,那么本书可能不太适合
你。如果你已经对Spark 有深入的研究,本书也许可以作为你的参考资料。
IVV
总体说来,本书适合以下人群:
T 想要使用Spark,但对Spark实现原理不了解,不知道怎么学习的人;
T 大数据技术爱好者,以及想深入了解Spark 技术内部实现细节的人;
T 有一定Spark使用基础,但是不了解Spark 技术内部实现细节的人;
T 对性能优化和部署方案感兴趣的大型互联网工程师和架构师;
T 开源代码爱好者。喜欢研究源码的同学可以从本书学到一些阅读源码的方式与方法。
本书不会教你如何开发 Spark应用程序,只是用一些经典例子演示。本书简单介绍
Hadoop MapReduce、Hadoop YARN、Mesos、Tachyon、ZooKeeper、HDFS、Amazon S3,但
不会过多介绍这些框架的使用,因为市场上已经有丰富的这类书籍供读者挑选。本书也不会
过多介绍 Scala、Java、Shell的语法,读者可以在市场上选择适合自己的书籍阅读。
如何阅读本书
本书分为三大部分(不包括附录):
准备篇(第 1 ~ 2 章),简单介绍了 Spark 的环境搭建和基本原理,帮助读者了解一些背
景知识。
核心设计篇(第 3 ~ 7 章),着重讲解SparkContext的初始化、存储体系、任务提交与执
行、计算引擎及部署模式的原理和源码分析。
扩展篇(第8 ~ 11 章),主要讲解基于 Spark核心的各种扩展及应用,包括:SQL 处理
引擎、Hive 处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等
内容。
本书最后还添加了几个附录,包括:附录 A 介绍的 Spark 中最常用的工具类 Utils ;附录
B是 Akka 的简介与工具类 AkkaUtils的介绍;附录 C为Jetty 的简介和工具类 JettyUtils的介
绍;附录 D为 Metrics库的简介和测量容器 MetricRegistry的介绍;附录 E 演示了 Hadoop1.0
版本中的 word count例子;附录F 介绍了工具类 CommandUtils的常用方法;附录 G是关于
Netty的简介和工具类 NettyUtils的介绍;附录 H 列举了笔者编译 Spark源码时遇到的问题及
解决办法。
为了降低读者阅读理解 Spark 源码的门槛,本书尽可能保留源码实现,希望读者能够怀
着一颗好奇的心,Spark 当前很火热,其版本更新也很快,本书以Spark 1.2.3 版本为主,有兴
趣的读者也可按照本书的方式,阅读Spark的最新源码。
勘误和支持
本书内容很多,限于笔者水平有限,书中内容难免有错误之处。在本书出版后的任何
时间,如果你对本书有任何问题或者意见,都可以通过邮箱 beliefer@163.com或博客 http:
www.cnblogs.comjiaan-geng联系我,说出你的建议或者想法,希望与大家共同进步。VI
致谢
感谢苍天,让我生活在这样一个时代,能接触互联网和大数据;感谢父母,这么多年来,在学习、工作及生活上的帮助与支持;感谢妻子在生活中的照顾和谦让。
感谢杨福川和高婧雅给予本书出版的大力支持与帮助。
感谢冰夷老大和王贲老大让我有幸加入阿里,接触大数据应用;感谢和仲对 Galaxy 和
Garuda耐心细致的讲解以及对Spark 的推荐;感谢张中在百忙之中给本书写评语;感谢周亮、澄苍、民瞻、石申、清无、少侠、征宇、三步、谢衣、晓五、法星、曦轩、九翎、峰阅、丁
卯、阿末、紫丞、海炎、涵康、云飏、孟天、零一、六仙、大知、井凡、隆君、太奇、晨炫、既望、宝升、都灵、鬼厉、归钟、梓撤、昊苍、水村、惜冰、惜陌、元乾等同仁在工作上的
支持和帮助。
耿嘉安 于北京
Contents 目?录
前言
准 备 篇
第 1 章 环境准备 ············································· 2
1.1 运行环境准备 ··········································· 2
1.1.1 安装 JDK ········································· 3
1.1.2 安装 Scala ········································ 3
1.1.3 安装 Spark ······································· 4
1.2 Spark 初体验 ············································· 4
1.2.1 运行spark-shell ······························· 4
1.2.2 执行word count ······························ 5
1.2.3 剖析spark-shell ······························· 7
1.3 阅读环境准备 ········································· 11
1.4 Spark 源码编译与调试 ························· 13
1.5 小结 ··························································· 17
第2 章 Spark 设计理念与基本架构 ····· 18
2.1 初识 Spark ··············································· 18
2.1.1 Hadoop MRv1 的局限··················· 18
2.1.2 Spark 使用场景 ····························· 20
2.1.3 Spark 的特点 ································· 20
2.2 Spark基础知识 ······································ 20
2.3 Spark基本设计思想 ····························· 22
2.3.1 Spark模块设计 ····························· 22
2.3.2 Spark模型设计 ····························· 24
2.4 Spark基本架构 ······································ 25
2.5 小结 ··························································· 26
核心设计篇
第 3 章 SparkContext 的初始化 ············· 28
3.1 SparkContext概述 ································· 28
3.2 创建执行环境SparkEnv ······················ 30
3.2.1 安全管理器 SecurityManager ······· 31
3.2.2 基于 Akka 的分布式消息
系统 ActorSystem·························· 31
3.2.3 map 任务输出跟踪器
mapOutputTracker ························· 32
3.2.4 实例化 Shuf?eManager ················· 34
3.2.5 shuf?e 线程内存管理器
Shuf?eMemoryManager ················ 34
3.2.6 块传输服务BlockTransferService ··· 35
3.2.7 BlockManagerMaster介绍 ············ 35VIII
3.2.8 创建块管理器BlockManager ······· 36
3.2.9 创建广播管理器Broadcast-
Manager ········································· 36
3.2.10 创建缓存管理器CacheManager ··· 37
3.2.11 HTTP文件服务器 HttpFile-
Server ··········································· 37
3.2.12 创建测量系统 MetricsSystem ···· 39
3.2.13 创建 SparkEnv ····························· 40
3.3 创建 metadataCleaner ···························· 41
3.4 SparkUI 详解 ·········································· 42
3.4.1 listenerBus 详解 ···························· 43
3.4.2 构造 JobProgressListener ·············· 46
3.4.3 SparkUI的创建与初始化 ············· 47
3.4.4 Spark UI的页面布局与展示 ········ 49
3.4.5 SparkUI的启动 ····························· 54
3.5 Hadoop 相关配置及 Executor环境
变量 ··························································· 54
3.5.1 Hadoop 相关配置信息 ·················· 54
3.5.2 Executor环境变量 ························ 54
3.6 创建任务调度器 TaskScheduler ········· 55
3.6.1 创建TaskSchedulerImpl ··············· 55
3.6.2 TaskSchedulerImpl的初始化 ······· 57
3.7 创建和启动 DAGScheduler ················· 57
3.8 TaskScheduler的启动 ··························· 60
3.8.1 创建LocalActor ···························· 60
3.8.2 ExecutorSource的创建与注册 ····· 62
3.8.3 ExecutorActor的构建与注册 ······· 64
3.8.4 Spark 自身ClassLoader的创建 ··· 64
3.8.5 启动 Executor的心跳线程 ··········· 66
3.9 启动测量系统 MetricsSystem ············· 69
3.9.1 注册 Sources ·································· 70
3.9.2 注册 Sinks······································ 70
3.9.3 给Sinks 增加 Jetty的Servlet-
ContextHandler ······························ 71
3.10 创建和启动 ExecutorAllocation-
Manager ················································· 72
3.11 ContextCleaner的创建与启动 ·········· 73
3.12 Spark 环境更新 ···································· 74
3.13 创建 DAGSchedulerSource 和
BlockManagerSource ··························· 76
3.14 将 SparkContext 标记为激活 ············ 77
3.15 小结 ························································ 78
第 4 章 存储体系 ··········································· 79
4.1 存储体系概述 ········································· 79
4.1.1 块管理器BlockManager的实现 ··· 79
4.1.2 Spark 存储体系架构 ····················· 81
4.2 shuf?e 服务与客户端 ···························· 83
4.2.1 Block的RPC 服务 ······················· 84
4.2.2 构造传输上下文Transpor-
tContext ·········································· 85
4.2.3 RPC客户端工厂 Transport-
ClientFactory ································· 86
4.2.4 Netty 服务器 TransportServer ······· 87
4.2.5 获取远程shuf?e 文件··················· 88
4.2.6 上传shuf?e 文件··························· 89
4.3 BlockManagerMaster对Block-
Manager 的管理 ····································· 90
4.3.1 BlockManagerMasterActor ··········· 90
4.3.2 询问 Driver 并获取回复方法 ······· 92
4.3.3 向 BlockManagerMaster注册
BlockManagerId ···························· 93
4.4 磁盘块管理器 DiskBlockManager ····· 94
4.4.1 DiskBlockManager的构造过程 ··· 94IX
4.4.2 获取磁盘文件方法getFile ··········· 96
4.4.3 创建临时Block 方法create-
TempShuf?eBlock ························· 96
4.5 磁盘存储 DiskStore ······························· 97
4.5.1 NIO读取方法 getBytes ················ 97
4.5.2 NIO写入方法 putBytes ················ 98
4.5.3 数组写入方法 putArray ················ 98
4.5.4 Iterator写入方法 putIterator ········· 98
4.6 内存存储 MemoryStore ························ 99
4.6.1 数据存储方法putBytes ·············· 101
4.6.2 Iterator写入方法putIterator
详解 ············································· 101
4.6.3 安全展开方法unrollSafely ········· 102
4.6.4 确认空闲内存方法ensureFree-
Space ············································ 105
4.6.5 内存写入方法putArray ·············· 107
4.6.6 尝试写入内存方法tryToPut ······ 108
4.6.7 获取内存数据方法 getBytes ······ 109
4.6.8 获取数据方法 getValues ············· 110
4.7 Tachyon存储TachyonStore··············· 110
4.7.1 Tachyon 简介 ······························· 111
4.7.2 TachyonStore 的使用 ·················· 112
4.7.3 写入 Tachyon内存的方法
putIntoTachyonStore ···················· 113
4.7.4 获取序列化数据方法getBytes ···· 113
4.8 块管理器 BlockManager ···················· 114
4.8.1 移出内存方法dropFrom-
Memory ······································· 114
4.8.2 状态报告方法reportBlockStatus··· 116
4.8.3 单对象块写入方法 putSingle ····· 117
4.8.4 序列化字节块写入方法
putBytes ······································· 118
4.8.5 数据写入方法 doPut ··················· 118
4.8.6 数据块备份方法 replicate ··········· 121
4.8.7 创建DiskBlockObjectWriter
的方法getDiskWriter ·················· 125
4.8.8 获取本地Block数据方法
getBlockData ······························· 125
4.8.9 获取本地shuf?e数据方法
doGetLocal ·································· 126
4.8.10 获取远程Block数据方法
doGetRemote ····························· 127
4.8.11 获取 Block 数据方法 get ·········· 128
4.8.12 数据流序列化方法
dataSerializeStream ···················· 129
4.9 metadataCleaner和 broadcast-
Cleaner ···················································· 129
4.10 缓存管理器CacheManager ············· 130
4.11 压缩算法 ·············································· 133
4.12 磁盘写入实现DiskBlockObject-
Writer ···················································· 133
4.13 块索引shuf?e管理器Index-
Shuf?eBlockManager ························ 135
4.14 shuf?e内存管理器Shuf?e-
MemoryManager ································ 137
4.15 小结 ······················································ 138
第 5 章 任务提交与执行 ··························· 139
5.1 任务概述 ················································ 139
5.2 广播Hadoop的配置信息 ··················· 142
5.3 RDD转换及 DAG 构建 ····················· 144
5.3.1 为什么需要 RDD ························ 144
5.3.2 RDD实现分析 ···························· 146
5.4 任务提交 ················································ 152X
5.4.1 任务提交的准备 ························· 152
5.4.2 finalStage的创建与Stage的
划分 ·············································· 157
5.4.3 创建Job ······································· 163
5.4.4 提交Stage···································· 164
5.4.5 提交Task ····································· 165
5.5 执行任务 ················································ 176
5.5.1 状态更新 ····································· 176
5.5.2 任务还原 ····································· 177
5.5.3 任务运行 ····································· 178
5.6 任务执行后续处理 ······························ 179
5.6.1 计量统计与执行结果序列化 ····· 179
5.6.2 内存回收 ····································· 180
5.6.3 执行结果处理 ····························· 181
5.7 小结 ························································· 187
第 6 章 计算引擎 ········································· 188
6.1 迭代计算 ················································ 188
6.2 什么是shuf?e ······································· 192
6.3 map端计算结果缓存处理 ················· 194
6.3.1 map 端计算结果缓存聚合·········· 195
6.3.2 map端计算结果简单缓存·········· 200
6.3.3 容量限制 ····································· 201
6.4 map端计算结果持久化 ····················· 204
6.4.1 溢出分区文件 ····························· 205
6.4.2 排序与分区分组 ························· 207
6.4.3 分区索引文件 ····························· 209
6.5 reduce端读取中间计算结果 ············· 210
6.5.1 获取map 任务状态····················· 213
6.5.2 划分本地与远程 Block ··············· 215
6.5.3 获取远程Block ··························· 217
6.5.4 获取本地 Block ··························· 218
6.6 reduce端计算 ······································· 219
6.6.1 如何同时处理多个map 任务的
中间结果 ······································ 219
6.6.2 reduce端在缓存中对中间计算
结果执行聚合和排序 ················· 220
6.7 map 端与 reduce 端组合分析 ············ 221
6.7.1 在 map端溢出分区文件,在reduce 端合并组合 ················· 221
6.7.2 在map端简单缓存、排序分组,在reduce 端合并组合 ················· 222
6.7.3 在map端缓存中聚合、排序分组,在reduce 端组合 ························· 222
6.8 小结 ························································· 223
第7章 部署模式 ········································· 224
7.1 local部署模式 ······································ 225
7.2 local-cluster部署模式 ························· 225
7.2.1 LocalSparkCluster的启动 ·········· 226
7.2.2 CoarseGrainedSchedulerBackend
的启动 ········································· 236
7.2.3 启动AppClient ···························· 237
7.2.4 资源调度 ····································· 242
7.2.5 local-cluster模式的任务执行 ····· 253
7.3 Standalone部署模式 ··························· 255
7.3.1 启动 Standalone模式 ·················· 255
7.3.2 启动 Master 分析 ························ 257
7.3.3 启动 Worker分析························ 259
7.3.4 启动 Driver Application分析 ····· 261
7.3.5 Standalone模式的任务执行 ······· 263
7.3.6 资源回收 ····································· 263
7.4 容错机制 ················································ 266
7.4.1 Executor异常退出 ······················ 266XI
7.4.2 Worker 异常退出························· 268
7.4.3 Master异常退出 ························· 269
7.5 其他部署方案 ······································· 276
7.5.1 YARN··········································· 277
7.5.2 Mesos ··········································· 280
7.6 小结 ························································· 282
扩 展 篇
第 8 章 Spark SQL ····································· 284
8.1 Spark SQL 总体设计 ··························· 284
8.1.1 传统关系型数据库 SQL运行
原理 ············································· 285
8.1.2 Spark SQL运行架构··················· 286
8.2 字典表 Catalog ····································· 288
8.3 Tree和 TreeNode ································· 289
8.4 词法解析器 Parser 的设计与实现 ··· 293
8.4.1 SQL语句解析的入口 ················· 294
8.4.2 建表语句解析器 DDLParser ······ 295
8.4.3 SQL语句解析器SqlParser········· 296
8.4.4 Spark 代理解析器SparkSQL-
Parser ··········································· 299
8.5 Rule 和 RuleExecutor ·························· 300
8.6 Analyzer与Optimizer的设计与
实现 ························································· 302
8.6.1 语法分析器Analyzer ·················· 304
8.6.2 优化器 Optimizer ························ 305
8.7 生成物理执行计划 ······························ 306
8.8 执行物理执行计划 ······························ 308
8.9 Hive ························································· 311
8.9.1 Hive SQL 语法解析器 ················ 311
8.9.2 Hive SQL元数据分析 ················ 313
8.9.3 Hive SQL物理执行计划 ············ 314
8.10 应用举例:JavaSparkSQL ·············· 314
8.11 小结 ······················································· 320
第 9 章 流式计算 ········································· 321
9.1 Spark Streaming总体设计 ················· 321
9.2 StreamingContext初始化 ··················· 323
9.3 输入流接收器规范Receiver ············· 324
9.4 数据流抽象DStream ·························· 325
9.4.1 Dstream 的离散化 ······················· 326
9.4.2 数据源输入流 InputDStream ······ 327
9.4.3 Dstream 转换及构建 DStream
Graph ··········································· 329
9.5 流式计算执行过程分析 ····················· 330
9.5.1 流式计算例子CustomReceiver ···· 331
9.5.2 Spark Streaming执行环境构建 ··· 335
9.5.3 任务生成过程 ····························· 347
9.6 窗口操作 ················································ 355
9.7 应用举例 ················································ 357
9.7.1 安装mosquitto ···························· 358
9.7.2 启动mosquitto ···························· 358
9.7.3 MQTTWordCount························ 359
9.8 小结 ························································· 361
第 10 章 图计算 ············································ 362
10.1 Spark GraphX 总体设计 ··················· 362
10.1.1 图计算模型 ····························· 363
10.1.2 属性图 ····································· 365
10.1.3 GraphX的类继承体系 ··········· 367
10.2 图操作 ·················································· 368
10.2.1 属性操作 ································· 368XII
10.2.2 结构操作 ································· 368
10.2.3 连接操作 ································· 369
10.2.4 聚合操作 ································· 370
10.3 Pregel API ············································ 371
10.3.1 Dijkstra算法 ··························· 373
10.3.2 Dijkstra的实现 ······················· 376
10.4 Graph的构建 ······································ 377
10.4.1 从边的列表加载Graph ·········· 377
10.4.2 在Graph中创建图的方法 ····· 377
10.5 顶点集合抽象VertexRDD··············· 378
10.6 边集合抽象EdgeRDD ····················· 379
10.7 图分割 ·················································· 380
10.8 常用算法·············································· 382
10.8.1 网页排名 ································· 382
10.8.2 Connected Components的
应用 ········································· 386
10.8.3 三角关系统计 ························· 388
10.9 应用举例·············································· 390
10.10 小结 ···················································· 391
第 11 章 机器学习 ······································· 392
11.1 机器学习概论 ····································· 392
11.2 Spark MLlib 总体设计 ······················ 394
11.3 数据类型 ·············································· 394
11.3.1 局部向量 ································· 394
11.3.2 标记点 ····································· 395
11.3.3 局部矩阵 ································· 396
11.3.4 分布式矩阵 ····························· 396
11.4 基础统计 ·············································· 398
11.4.1 摘要统计 ································· 398
11.4.2 相关统计 ································· 399
11.4.3 分层抽样 ································· 401
11.4.4 假设检验 ································· 401
11.4.5 随机数生成 ····························· 402
11.5 分类和回归 ········································· 405
11.5.1 数学公式 ································· 405
11.5.2 线性回归 ································· 407
11.5.3 分类 ········································· 407
11.5.4 回归 ········································· 410
11.6 决策树 ·················································· 411
11.6.1 基本算法 ································· 411
11.6.2 使用例子 ································· 412
11.7 随机森林 ·············································· 413
11.7.1 基本算法 ································· 414
11.7.2 使用例子 ································· 414
11.8 梯度提升决策树 ································ 415
11.8.1 基本算法 ································· 415
11.8.2 使用例子 ································· 416
11.9 朴素贝叶斯 ········································· 416
11.9.1 算法原理 ································· 416
11.9.2 使用例子 ································· 418
11.10 保序回归 ··········································· 418
11.10.1 算法原理 ······························ 418
11.10.2 使用例子 ······························ 419
11.11 协同过滤 ············································ 419
11.12 聚类 ···················································· 420
11.12.1 K-means ······························· 420
11.12.2 高斯混合 ······························ 422
11.12.3 快速迭代聚类 ······················ 422
11.12.4 latent Dirichlet allocation ····· 422
11.12.5 流式 K-means ······················ 423
11.13 维数减缩 ··········································· 424
11.13.1 奇异值分解 ·························· 424
11.13.2 主成分分析 ·························· 425XIII
11.14 特征提取与转型 ······························ 425
11.14.1 术语频率反转 ······················ 425
11.14.2 单词向量转换 ······················ 426
11.14.3 标准尺度 ······························ 427
11.14.4 正规化尺度 ·························· 428
11.14.5 卡方特征选择器 ·················· 428
11.14.6 Hadamard积 ························ 429
11.15 频繁模式挖掘 ··································· 429
11.16 预言模型标记语言 ·························· 430
11.17 管道 ···················································· 431
11.17.1 管道工作原理 ······················ 432
11.17.2 管道API介绍 ····················· 433
11.17.3 交叉验证 ······························ 435
11.18 小结 ···················································· 436
附录 A Utils ··················································· 437
附录 B Akka ·················································· 446
附录 C Jetty ··················································· 450
附录 D Metrics ············································· 453
附录 E Hadoop word count ···················· 456
附录 F CommandUtils ······························ 458
附录 G Netty ················································· 461
附录 H 源码编译错误································ 465准 备 篇
■ 第1章 环境准备
■ 第2章 Spark设计理念与基本架构第1章
环 境 准 备
凡事豫则立,不豫则废;言前定,则不跲;事前定,则不困。
—《礼记·中庸》
本章导读
在深入了解一个系统的原理、实现细节之前,应当先准备好它的源码编译环境、运行环
境。如果能在实际环境安装和运行 Spark,显然能够提升读者对于 Spark的一些感受,对系统
能有个大体的印象,有经验的技术人员甚至能够猜出一些Spark采用的编程模型、部署模式
等。当你通过一些途径知道了系统的原理之后,难道不会问问自己:“这是怎么做到的?”如
果只是游走于系统使用、原理了解的层面,是永远不可能真正理解整个系统的。很多IDE本
身带有调试的功能,每当你阅读源码,陷入重围时,调试能让我们更加理解运行期的系统。
如果没有调试功能,不敢想象阅读源码会怎样困难。
本章的主要目的是帮助读者构建源码学习环境,主要包括以下内容:
T 在 Windows 环境下搭建源码阅读环境;
T 在 Linux 环境下搭建基本的执行环境;
T Spark 的基本使用,如 spark-shell。
1.1 运行环境准备
考虑到大部分公司的开发和生成环境都采用Linux 操作系统,所以笔者选用了64 位的
Linux。在正式安装 Spark之前,先要找台好机器。为什么?因为笔者在安装、编译、调试的
过程中发现 Spark非常耗费内存,如果机器配置太低,恐怕会跑不起来。Spark 的开发语言是
Chapter 1第 1 章 环 境 准 备 3
Scala,而Scala需要运行在JVM之上,因而搭建Spark 的运行环境应该包括JDK和 Scala。
1.1.1 安装 JDK
使用命令 getconf LONG_BIT 查看Linux 机器是32 位还是64 位,然后下载相应版本的
JDK 并安装。
下载地址:
http:www.oracle.comtechnetworkjavajavasedownloadsindex.html
配置环境:
cd ~
vim .bash_pro?le
添加如下配置:
export JAVA_HOME=optjava
export PATH=PATH:JAVA_HOMEbin
export CLASSPATH=.:JAVA_HOMElibdt.jar:JAVA_HOMElibtools.jar
由于笔者的机器上已经安装过 openjdk,所以未使用以上方式,openjdk的安装命令如下:
su -c yum install java-1.7.0-openjdk
安装完毕后,使用 java –version命令查看,确认安装正常,如图 1-1所示。
图1-1 查看安装是否正常
1.1.2 安装 Scala
下载地址:http:www.scala-lang.orgdownload
选择最新的 Scala 版本下载,下载方法如下:
wget http:downloads.typesafe.comscala2.11.5scala-2.11.5.tgz
移动到选好的安装目录,例如:
mv scala-2.11.5.tgz ~install
进入安装目录,执行以下命令:
chmod 755 scala-2.11.5.tgz
tar -xzvf scala-2.11.5.tgz
配置环境:
cd ~
vim .bash_pro?le
添加如下配置:4 准 备 篇
export SCALA_HOME=HOMEinstallscala-2.11.5
export PATH=PATH:SCALA_HOMEbin:HOMEbin
安装完毕后输入 scala,进入 scala命令行说明 scala 安装正确,如图 1-2所示。
图 1-2 进入scala 命令行
1.1.3 安装 Spark
下载地址:http:spark.apache.orgdownloads.html
选择最新的 Spark版本下载,下载方法如下:
wget http:archive.apache.orgdistsparkspark-1.2.0spark-1.2.0-bin-hadoop1.tgz
移动到选好的安装目录,如:
mv spark-1.2.0-bin-hadoop1.tgz~install
进入安装目录,执行以下命令:
chmod 755 spark-1.2.0-bin-hadoop1.tgz
tar -xzvf spark-1.2.0-bin-hadoop1.tgz
配置环境:
cd ~
vim .bash_pro?le
添加如下配置:
export SPARK_HOME=HOMEinstallspark-1.2.0-bin-hadoop1
1.2 Spark初体验
本节通过Spark的基本使用,让读者对Spark能有初步的认识,便于引导读者逐步深入学习。
1.2.1 运行 spark-shell
要运行spark-shell,需要先对 Spark 进行配置。
1)进入Spark的 conf 文件夹:
cd ~installspark-1.2.0-bin-hadoop1conf
2)复制一份 spark-env.sh.template,命名为spark-env.sh,对它进行编辑,命令如下:
cp spark-env.sh.template spark-env.sh
vim spark-env.sh第 1 章 环 境 准 备 5
3)添加如下配置:
export SPARK_MASTER_IP=127.0.0.1
export SPARK_LOCAL_IP=127.0.0.1
4)启动 spark-shell:
cd ~installspark-1.2.0-bin-hadoop1bin
.spark-shell
最后我们会看到 spark 启动的过程,如图1-3 所示。
图1-3 Spark启动过程
从以上启动日志中我们可以看到 SparkEnv、MapOutputTracker、BlockManagerMaster、DiskBlockManager、MemoryStore、HttpFileServer、SparkUI等信息。它们是做什么的?此处
望文生义即可,具体内容将在后边的章节详细讲解。
1.2.2 执行 word count
这一节,我们通过word count这个耳熟能详的例子来感受下Spark 任务的执行过程。启
动 spark-shell 后,会打开 scala 命令行,然后按照以下步骤输入脚本。
1)输入 val lines = sc.textFile(..README.md, 2),执行结果如图1-4所示。
图1-4 步骤 1 执行结果6 准 备 篇
2)输入val words = lines.?atMap(line => line.split( )),执行结果如图 1-5 所示。
图 1-5 步骤2执行结果
3)输入val ones = words.map(w => (w,1)),执行结果如图 1-6 所示。
图 1-6 步骤3执行结果
4)输入val counts = ones.reduceByKey(_ + _),执行结果如图1-7 所示。
图 1-7 步骤4执行结果
5)输入counts.foreach(println),任务执行过程如图1-8和图1-9 所示。输出结果如图1-10
所示。1
图1-8 步骤 5 执行过程部分(一)
图1-9 步骤 5 执行过程部分(二)
因截图时,一屏放不下,故分为两图。第 1 章 环 境 准 备 7
图1-10 步骤 5 输出结果
在这些输出日志中,我们先是看到 Spark 中任务的提交与执行过程,然后看到单词计数
的输出结果,最后打印一些任务结束的日志信息。有关任务的执行分析,笔者将在第 5 章中
展开。
1.2.3 剖析 spark-shell
通过 word count在 spark-shell 中执行的过程,我们想看看spark-shell 做了什么。spark-
shell中有以下一段脚本,见代码清单1-1。
代码清单1-1 spark-shell中的一段脚本
function main {
if cygwin; then
stty -icanonmin 1 -echo > devnull 2>1
export SPARK_SUBMIT_OPTS=SPARK_SUBMIT_OPTS -Djline.terminal=unix
FW DIRbinspark-submit --class org.apache.spark.repl.Main {SUBMISSION_
OPTS[@]} spark-shell {APPLICATION_OPTS[@]}
sttyicanon echo > devnull 2>1
else
export SPARK_SUBMIT_OPTS
FW DIRbinspark-submit --class org.apache.spark.repl.Main {SUBMISSION_
OPTS[@]} spark-shell {APPLICATION_OPTS[@]}
}
我们看到脚本 spark-shell 里执行了 spark-submit脚本,打开 spark-submit 脚本,发现其中
包含以下脚本。8 准 备 篇
exec SPARK_HOMEbinspark-class org.apache.spark.deploy.SparkSubmit {ORIG_
ARGS[@]}
脚本spark-submit 在执行 spark-class脚本时,给它增加了参数 SparkSubmit。打开 spark-
class 脚本,其中包含以下脚本,见代码清单1-2。
代码清单1-2 spark-class
if [ -n {JAVA_HOME} ]; then
RUNNER={JAVA_HOME}binjava
else
if [ `command -v java` ]; then
RUNNER=java
else
echo JAVA_HOME is not set >2
exit 1
exec RUNNER -cp CLASSPATH JAVA_OPTS @
读到这里,应该知道 Spark启动了以 SparkSubmit 为主类的jvm 进程。
为便于在本地对Spark 进程使用远程监控,给spark-class脚本追加以下 jmx 配置:
JAVA _OPTS=-XX:MaxPermSize=128m OUR_JAVA_OPTS -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.
authenticate=false -Dcom.sun.management.jmxremote.ssl=false
在本地打开 jvisualvm,添加远程主机,如图1-11所示。
右击已添加的远程主机,添加 JMX连接,如图1-12所示。
图 1-11 添加远程主机 图1-12 添加JMX连接
单击右侧的“线程”选项卡,选择 main线程,然后单击“线程 Dump”按钮,如图 1-13
所示。
从 dump 的内容中找到线程 main 的信息,如代码清单 1-3所示。第 1 章 环 境 准 备 9
图1-13 查看 Spark线程
代码清单1-3 main线程dump信息
main - Thread t@1
java.lang.Thread.State: RUNNABLE
at java.io.FileInputStream.read0(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:210)
at scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)
at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)
at scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.
java:933)
at scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)
at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)
at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)
at org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.
scala:80)
at scala.tools.nsc.interpreter.InteractiveReaderclass.readLine(Interactive-
Reader.scala:43)
at org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25)
at org.apache.spark.repl.SparkILoop.readOneLine1(SparkILoop.scala:619)
at org.apache.spark.repl.SparkILoop.innerLoop1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at org.apache.spark.repl.SparkILoopanonfunprocess1.applymcZsp
(SparkI-Loop.scala:968)
at org.apache.spark.repl.SparkILoopanonfunprocess1.apply(SparkILoop.
scala:916)
at org.apache.spark.repl.SparkILoopanonfunprocess1.apply(SparkILoop.
scala:916)
at scala.tools.nsc.util.ScalaClassLoader.savingContextLoader(ScalaClass
Loader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)10 准 备 篇
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.re?ect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.re?ect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:57)
at sun.re?ect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces-
sorImpl.java:43)
at java.lang.re?ect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
从main线程的栈信息中可看出程序的调用顺序:SparkSubmit.main→repl.Main →SparkI-
Loop.process。SparkILoop.process方法中会调用initializeSpark方法,initializeSpark的实现见
代码清单1-4。
代码清单1-4 initializeSpark的实现
def initializeSpark {
intp.beQuietDuring {
command(
@transient val sc = {
val _sc = org.apache.spark.repl.Main.interp.createSparkContext
println(Spark context available as sc.)
_sc
}
)
command(import org.apache.spark.SparkContext._)
}
}
我们看到 initializeSpark调用了 createSparkContext方法,createSparkContext的实现见代
码清单1-5。
代码清单1-5 createSparkContext的实现
def createSparkContext: SparkContext = {
valexecUri = System.getenv(SPARK_EXECUTOR_URI)
valjars = SparkILoop.getAddedJars
valconf = new SparkConf
.setMaster(getMaster)
.setAppName(Spark shell)
.setJars(jars)
.set(spark.repl.class.uri, intp.classServer.uri)
if (execUri != null) {
conf.set(spark.executor.uri, execUri)
}
sparkContext = new SparkContext(conf)
logInfo(Created spark context..)
sparkContext
}第 1 章 环 境 准 备 11
这里最终使用SparkConf 和SparkContext来完成初始化,具体内容将在第3章讲解。代
码分析中涉及的repl 主要用于与 Spark 实时交互。
1.3 阅读环境准备
准备 Spark阅读环境,同样需要一台好机器。笔者调试源码的机器的内存是 8 GB。源
码阅读的前提是在IDE 环境中打包、编译通过。常用的IDE有IntelliJ IDEA、Eclipse。笔者
选择用Eclipse编译 Spark,原因有二:一是由于使用多年对它比较熟悉,二是社区中使用
Eclipse编译Spark的资料太少,在这里可以做个补充。在 Windows系统编译 Spark源码,除
了安装JDK 外,还需要安装以下工具。
(1)安装Scala
由于 Spark 1.20版本的 sbt 里指定的 Scala 版本是 2.10.4,具体见Spark 源码目录下的文件
\project\plugins.sbt,其中有一行:scalaVersion := 2.10.4。所以选择下载 scala-2.10.4.msi,下
载地址:http:www.scala-lang.orgdownload。
下载完毕,安装 scala-2.10.4.msi。
(2)安装 SBT
由于 Scala 使用SBT作为构建工具,所以需要下载 SBT。下载地址:http:www.scala-sbt.
org,下载最新的安装包 sbt-0.13.8.msi 并安装。
(3)安装 Git Bash
由于 Spark源码使用Git 作为版本控制工具,所以需要下载Git的客户端工具,推荐使用
Git Bash,因为它更符合 Linux 下的操作习惯。下载地址:http:msysgit.github.io,下载最新
的版本并安装。
(4)安装Eclipse Scala IDE插件
Eclipse通过强大的插件方式支持各种 IDE 工具的集成,要在Eclipse中编译、调试、运
行Scala程序,就需要安装Eclipse Scala IDE 插件。下载地址:http:scala-ide.orgdownload
current.html。
由于笔者本地的Eclipse版本是Eclipse 4.4 (Luna),所以选择安装插件http:download.
scala-ide.orgsdklithiume44scala211stablesite,如图 1-14所示。
图 1-14 Eclipse Scala IDE插件安装地址
在 Eclipse中选择 Help 菜单,然后选择 Install New Software…选项,打开Install 对话框,如图1-15 所示。12 准 备 篇
图 1-15 Install对话框
单击Add按钮,打开 Add Repository对话框,输入插件地址,如图 1-16 所示。
图 1-16 添加 Scala IDE插件地址
全选插件的内容,完成安装,如图 1-17所示。
图 1-17 安装 Scala IDE插件第 1 章 环 境 准 备 13
1.4 Spark 源码编译与调试
1. 下载 Spark 源码
首先,访问 Spark 官网 http:spark.apache.org,如图1-18所示。
图1-18 Spark官网
单击 Download Spark按钮,在下一个页面找到 git 地址,如图 1-19 所示。
图1-19 Spark官方git 地址
打开 Git Bash工具,输入git clone git:github.comapachespark.git命令将源码下载到本
地,如图1-20 所示。
图1-20 下载 Spark源码
2. 构建 Scala 应用
使用 cmd 命令行进到 Spark 根目录,执行 sbt 命令。会下载和解析很多 jar 包,要等很长
时间,笔者大概花了一个多小时才执行完。14 准 备 篇
3. 使用 sbt生成 Eclipse 工程文件
等sbt提示符(>)出现后,输入Eclipse命令,开始生成Eclipse工程文件,也需要花费
很长时间,笔者本地大致花了40 分钟。完成时的状况如图1-21 所示。
图1-21 sbt编译过程
现在我们查看 Spark 下的子文件夹,发现其中都生成了 .project和 .classpath 文件。比如
mllib项目下就生成了.project和.classpath文件,如图1-22 所示。
图1-22 sbt生成的项目文件
4. 编译 Spark 源码
由于Spark 使用 Maven作为项目管理工具,所以需要将 Spark 项目作为 Maven项目导入
Eclipse中,如图1-23 所示。
单击 Next按钮进入下一个对话框,如图1-24 所示。第 1 章 环 境 准 备 15
图 1-23 导入Maven 项目
全选所有项目,单击 Finish按钮,这样就完成了导入,如图 1-25所示。
图1-24 选择 Maven 项目 图1-25 导入完成的项目16 准 备 篇
导入完成后,需要设置每个子项目的build path。右击每个项目,选择“Build Path”→
“Con?gure Build Path…”,打开 Java Build Path界面,如图 1-26所示。
图 1-26 Java编译目录
单击Add External JARs 按钮,将 Spark 项目下的lib_managed 文件夹的子文件夹 bundles
和jars 内的jar 包添加进来。
lib_managedjars文件夹下有很多打好的 spark的包,比如:spark-catalyst_2.10-1.3.2-
SNAPSHOT.jar。这些jar包有可能与你下载的Spark源码的版本不一致,导致你在调
试源码时,发生 jar 包冲突。所以请将它们排除出去。
Eclipse在对项目编译时,笔者本地出现了很多错误,有关这些错误的解决建议参见附录
H。所有错误解决后运行mvn clean install,如图 1-27 所示。
5. 调试 Spark 源码
以 Spark 源码自带的 JavaWordCount为例,介绍如何调试 Spark 源码。右击 JavaWord-
Count.java,选择“ Debug As”→“ Java Application”即可。如果想修改配置参数,右击
JavaWordCount.java,选择“Debug As”→“ Debug Con?gurations…”,从打开的对话框中选
择JavaWordCount,在右侧标签可以修改Java 执行参数、JRE、classpath、环境变量等配置,如图1-28 所示。
读者也可以在 Spark 源码中设置断点,进行跟踪调试。
注 意第 1 章 环 境 准 备 17
图1-27 编译成功
图1-28 源码调试
1.5 小结
本章通过引导大家在Linux 操作系统下搭建基本的执行环境,并且介绍spark-shell 等
脚本的执行,来帮助读者由浅入深地进行 Spark 源码的学习。由于目前多数开发工作都在
Windows系统下进行,并且 Eclipse有最广大的用户群,即便是一些开始使用 IntelliJ的用户对
Eclipse也不陌生,所以在Windows 环境下搭建源码阅读环境时,选择这些最常用的工具,能
降低读者的学习门槛,并且替大家节省时间。第2章
Spark 设计理念与基本架构
若夫乘天地之正,而御六气之辩,以游无穷者,彼且恶乎待哉?
—《庄子·逍遥游》
本章导读
上一章,介绍了Spark 环境的搭建,为方便读者学习Spark 做好准备。本章首先从Spark
产生的背景开始,介绍 Spark 的主要特点、基本概念、版本变迁。然后简要说明 Spark 的主要
模块和编程模型。最后从 Spark 的设计理念和基本架构入手,使读者能够对 Spark 有宏观的认
识,为之后的内容做一些准备工作。
Spark是一个通用的并行计算框架,由加州伯克利大学(UCBerkeley)的 AMP 实验室开
发于2009 年,并于2010 年开源,2013 年成长为Apache 旗下大数据领域最活跃的开源项目之
一。Spark 也是基于 map reduce 算法模式实现的分布式计算框架,拥有 Hadoop MapReduce 所
具有的优点,并且解决了Hadoop MapReduce中的诸多缺陷。
2.1 初识 Spark
2.1.1 Hadoop MRv1 的局限
Hadoop1.0版本采用的是MRv1版本的MapReduce编程模型。MRv1版本的实现都封装在
org.apache.hadoop.mapred包中, MRv1的Map和Reduce是通过接口实现的。MRv1包括三个部分:
T 运行时环境(JobTracker和 TaskTracker);
T 编程模型(MapReduce);
Chapter 2第 2 章 Spark 设计理念与基本架构 19
T 数据处理引擎(Map 任务和Reduce 任 务)。
MRv1存在以下不足:
T 可扩展性差:在运行时,JobTracker 既负责资源管理又负责任务调度,当集群繁忙时,JobTracker很容易成为瓶颈,最终导致它的可扩展性问题。
T 可用性差:采用了单节点的 Master,没有备用 Master 及选举操作,这导致一旦 Master
出现故障,整个集群将不可用。
T 资源利用率低:TaskTracker 使用 slot 等量划分本节点上的资源量。slot 代表计算资源
(CPU、内存等)。一个 Task 获取到一个 slot 后才有机会运行,Hadoop 调度器负责将
各个 TaskTracker 上的空闲slot 分配给Task 使用。一些Task并不能充分利用slot,而
其他Task 也无法使用这些空闲的资源。slot 分为 Map slot 和Reduce slot 两种,分别
供 MapTask 和 Reduce Task 使用。有时会因为作业刚刚启动等原因导致MapTask 很
多,而Reduce Task 任务还没有调度的情况,这时Reduce slot也会被闲置。
T 不能支持多种 MapReduce 框架:无法通过可插拔方式将自身的MapReduce框架替换
为其他实现,如 Spark、Storm 等。
MRv1的示意如图 2-1所示。
Apache 为了解决以上问题,对Hadoop进行升级改造,MRv2 最终诞生了。MRv2重
用了 MRv1中的编程模型和数据处理引擎,但是运行时环境被重构了。JobTracker被拆
分成了通用的资源调度平台(ResourceManager,RM)和负责各个计算框架的任务调度模
型(ApplicationMaster,AM)。MRv2 中 MapReduce 的核心不再是MapReduce框架,而是
YARN。在以 YARN 为核心的MRv2 中,MapReduce 框架是可插拔的,完全可以替换为其他
MapReduce 实现,比如 Spark、Storm等。MRv2的示意如图2-2所示。1
图2-1 MRv1示意图 图 2-2 MRv2示意图
Hadoop MRv2 虽然解决了MRv1 中的一些问题,但是由于对HDFS 的频繁操作(包括计
算结果持久化、数据备份及 shuf?e等)导致磁盘 IO成为系统性能的瓶颈,因此只适用于离
图2-1和图2-2 都来源自http:blog.chinaunix.netuid-28311809-ud-4383551.html。20 准 备 篇
线数据处理,而不能提供实时数据处理能力。
2.1.2 Spark 使用场景
Hadoop常用于解决高吞吐、批量处理的业务场景,例如离线计算结果用于浏览量统计。
如果需要实时查看浏览量统计信息,Hadoop 显然不符合这样的要求。Spark 通过内存计算能
力极大地提高了大数据处理速度,满足了以上场景的需要。此外,Spark 还支持 SQL 查询、流式计算、图计算、机器学习等。通过对 Java、Python、Scala、R等语言的支持,极大地方
便了用户的使用。
2.1.3 Spark 的特点
Spark看到MRv1的问题,对 MapReduce 做了大量优化,总结如下:
T 快速处理能力。随着实时大数据应用越来越多,Hadoop作为离线的高吞吐、低响应
框架已不能满足这类需求。Hadoop MapReduce的Job将中间输出和结果存储在HDFS
中,读写HDFS造成磁盘IO成为瓶颈。Spark允许将中间输出和结果存储在内存中,避免了大量的磁盘IO。同时Spark自身的DAG执行引擎也支持数据在内存中的计算。
Spark官网声称性能比Hadoop快100倍,如图2-3所示。即便是内存不足,需要磁盘
IO,其速度也是Hadoop的10倍以上。
T 易于使用。Spark现在支持Java、Scala、Python和 R等
语言编写应用程序,大大降低了使用者的门槛。自带了
80多个高等级操作符,允许在 Scala、Python、R的 shell
中进行交互式查询。
T 支持查询。Spark 支持SQL及 Hive SQL 对数据查询。
T 支持流式计算。与MapReduce只能处理离线数据相比,Spark 还支持实时的流计算。Spark依赖 Spark Streaming
对数据进行实时的处理,其流式处理能力还要强于Storm。
T 可用性高。Spark 自身实现了 Standalone部署模式,此模式下的 Master可以有多
个,解决了单点故障问题。此模式完全可以使用其他集群管理器替换,比如YARN、Mesos、EC2等。
T 丰富的数据源支持。Spark 除了可以访问操作系统自身的文件系统和HDFS,还可以
访问Cassandra、HBase、Hive、Tachyon以及任何Hadoop 的数据源。这极大地方便
了已经使用HDFS、Hbase的用户顺利迁移到 Spark。
2.2 Spark基础知识
1. 版本变迁
经过4年多的发展,Spark目前的版本是 1.4.1。我们简单看看它的版本发展过程。
图2-3 Hadoop与Spark执行
逻辑回归时间比较第 2 章 Spark 设计理念与基本架构 21
1)Spark 诞生于 UCBerkeley的AMP实验室(2009)。
2)Spark 正式对外开源(2010年)。
3)Spark 0.6.0 版本发布(2012-10-15),进行了大范围的性能改进,增加了一些新特性,并对 Standalone部署模式进行了简化。
4)Spark 0.6.2 版本发布(2013-02-07),解决了一些 bug,并增强了系统的可用性。
5)Spark 0.7.0 版本发布(2013-02-27),增加了更多关键特性,例如,Python API、Spark
Streaming的 alpha 版本等。
6)Spark 0.7.2版本发布(2013-06-02),性能改进并解决了一些bug,新增API使用的例子。
7)Spark 接受进入 Apache 孵化器(2013-06-21)。
8)Spark 0.7.3 版本发布(2013-07-16),解决一些 bug,更新 Spark Streaming API 等。
9)Spark 0.8.0 版本发布(2013-09-25),一些新功能及可用性改进。
10)Spark 0.8.1 版本发布(2013-12-19),支持Scala 2.9、YARN 2.2、Standalone部署模
式下调度的高可用性、shuf?e的优化等。
11)Spark 0.9.0 版本发布(2014-02-02),增加了GraphX,机器学习新特性,流式计算新
特性,核心引擎优化(外部聚合、加强对YARN 的支持)等。
12)Spark 0.9.1 版本发布(2014-04-09),增强使用 YARN 的稳定性,改进 Scala 和
Python API的奇偶性。
13)Spark 1.0.0 版本发布(2014-05-30),Spark SQL、MLlib、GraphX 和 Spark Streaming
都增加了新特性并进行了优化。Spark核心引擎还增加了对安全 YARN 集群的支持。
14)Spark 1.0.1 版本发布(2014-07-11),增加了 Spark SQL 的新特性和对 JSON 数据的
支持等。
15)Spark 1.0.2 版本发布(2014-08-05),Spark核心 API 及 Streaming、Python、MLlib 的
bug 修复。
16)Spark 1.1.0 版本发布(2014-09-11)。
17)Spark 1.1.1版本发布(2014-11-26),Spark核心API及Streaming、Python、SQL、GraphX
和 MLlib的 bug 修复。
18)Spark 1.2.0 版本发布(2014-12-18)。
19)Spark 1.2.1版本发布(2015-02-09),Spark核心API及Streaming、Python、SQL、GraphX
和 MLlib的 bug 修复。
20)Spark 1.3.0 版本发布(2015-03-13)。
21)Spark 1.4.0 版本发布(2015-06-11)。
22)Spark 1.4.1 版本发布(2015-07-15),DataFrame API及 Streaming、Python、SQL 和
MLlib的 bug 修复。
2. 基本概念
要想对 Spark 有整体性的了解,推荐读者阅读 Matei Zaharia 的Spark 论文。此处笔者先介22 准 备 篇
绍Spark中的一些概念:
T RDD(resillient distributed dataset):弹性分布式数据集。
T Task :具体执行任务。Task 分为 Shuf?eMapTask 和 ResultTask 两种。Shuf?eMapTask
和 ResultTask 分别类似于 Hadoop 中的 Map 和 Reduce。
T Job:用户提交的作业。一个 Job可能由一到多个Task 组成。
T Stage:Job分成的阶段。一个Job 可能被划分为一到多个Stage。
T Partition:数据分区。即一个RDD 的数据可以划分为多少个分区。
T NarrowDependency:窄依赖,即子RDD依赖于父RDD 中固定的Partition。Narrow-
Dependency分为OneToOneDependency和RangeDependency两种。
T Shuf?eDependency:shuf?e 依赖,也称为宽依赖,即子 RDD 对父RDD 中的所有
Partition都有依赖。
T DAG(directed acycle graph):有向无环图。用于反映各 RDD 之间的依赖关系。
3. Scala 与 Java 的比较
Spark 为什么要选择 Java 作为开发语言?笔者不得而知。如果能对二者进行比较,也许
能看出一些端倪。表2-1 列出了 Scala 与 Java 的比较。
表 2-1 Scala 与Java的比较
比项项 Scala Java
语言类型 面向函数为主,兼有面向对象 面向对象(Java8 也增加了lambda函数编程)
简洁性 非常简洁 不简洁
类型推断
丰富的类型推断,例如深度和链式的类型推断、duck type、隐式类型转换等,但也因此增加了编译
时长
少量的类型推断
可读性 一般,丰富的语法糖导致的各种奇幻用法,例如
方法签名
好
学习成本 较高 一般
语言特性 非常丰富的语法糖和更现代的语言特性,例如
Option、模式匹配、使用空格的方法调用
丰富
并发编程 使用Actor 的消息模型 使用阻塞、锁、阻塞队列等
通过以上比较似乎仍然无法判断Spark选择 Java作为开发语言的原因。由于函数式编程
更接近计算机思维,因此便于通过算法从大数据中建模,这应该更符合Spark 作为大数据框
架的理念吧!
2.3 Spark 基本设计思想
2.3.1 Spark 模块设计
整个Spark主要由以下模块组成:第 2 章 Spark 设计理念与基本架构 23
T Spark Core:Spark的核心功能实现,包括:SparkContext的初始化(Driver Application
通过 SparkContext提交)、部署模式、存储体系、任务提交与执行、计算引擎等。
T Spark SQL:提供 SQL 处理能力,便于熟悉关系型数据库操作的工程师进行交互查询。
此外,还为熟悉 Hadoop 的用户提供 Hive SQL处理能力。
T Spark Streaming:提供流式计算处理能力,目前支持Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis 和简单的TCP套接字等数据源。此外,还提供窗口操作。
T GraphX:提供图计算处理能力,支持分布式,Pregel提供的 API 可以解决图计算中的
常见问题。
T MLlib:提供机器学习相关的统计、分类、回归等领域的多种算法实现。其一致的
API 接口大大降低了用户的学习成本。
Spark SQL、Spark Streaming、GraphX、MLlib的能
力都是建立在核心引擎之上,如图 2-4所示。
1. Spark 核心功能
Spark Core 提供 Spark 最基础与最核心的功能,主
要包括以下功能。
T SparkContext:通常而言,Driver Application的执行与输出都是通过 SparkContext来完
成的,在正式提交Application之前,首先需要初始化SparkContext。SparkContext隐
藏了网络通信、分布式部署、消息通信、存储能力、计算能力、缓存、测量系统、文
件服务、Web 服务等内容,应用程序开发者只需要使用 SparkContext 提供的 API完成
功能开发。SparkContext内置的DAGScheduler 负责创建Job,将 DAG 中的RDD 划
分到不同的 Stage,提交Stage 等功能。内置的 TaskScheduler 负责资源的申请、任务
的提交及请求集群对任务的调度等工作。
T 存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用
磁盘,这极大地减少了磁盘IO,提升了任务执行的效率,使得Spark适用于实时计
算、流式计算等场景。此外,Spark还提供了以内存为中心的高容错的分布式文件系统
Tachyon供用户进行选择。Tachyon能够为Spark提供可靠的内存级的文件共享服务。
T 计算引擎:计算引擎由SparkContext中的 DAGScheduler、RDD 以及具体节点上
的 Executor负责执行的 Map 和Reduce任务组成。DAGScheduler 和 RDD 虽然位于
SparkContext内部,但是在任务正式提交与执行之前会将 Job中的 RDD组织成有向
无关图(简称 DAG),并对 Stage 进行划分,决定了任务执行阶段任务的数量、迭代
计算、shuf?e 等过程。
T 部署模式:由于单节点不足以提供足够的存储及计算能力,所以作为大数据处理的
Spark 在 SparkContext的TaskScheduler 组件中提供了对Standalone部署模式的实现和
Yarn、Mesos 等分布式资源管理系统的支持。通过使用 Standalone、Yarn、Mesos等部
署模式为Task 分配计算资源,提高任务的并发执行效率。除了可用于实际生产环境
图 2-4 Spark各模块依赖关系24 准 备 篇
的 Standalone、Yarn、Mesos 等部署模式外,Spark还提供了 Local模式和 local-cluster
模式便于开发和调试。
2. Spark 扩展功能
为了扩大应用范围,Spark 陆续增加了一些扩展功能,主要包括:
T Spark SQL :SQL 具有普及率高、学习成本低等特点,为了扩大 Spark的应用面,增
加了对SQL及 Hive的支持。Spark SQL 的过程可以总结为:首先使用SQL语句解析
器(SqlParser)将 SQL 转换为语法树(Tree),并且使用规则执行器(RuleExecutor)将
一系列规则(Rule)应用到语法树,最终生成物理执行计划并执行。其中,规则执行
器包括语法分析器(Analyzer)和优化器(Optimizer)。Hive的执行过程与SQL 类似。
T Spark Streaming:Spark Streaming 与Apache Storm类似,也用于流式计算。Spark
Streaming支持 Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis和简单的 TCP套
接字等多种数据输入源。输入流接收器(Receiver)负责接入数据,是接入数据流的
接口规范。Dstream 是Spark Streaming中所有数据流的抽象,Dstream 可以被组织为
DStream Graph。Dstream本质上由一系列连续的RDD组成。
T GraphX:Spark提供的分布式图计算框架。GraphX主要遵循整体同步并行(bulk
synchronous parallell,BSP)计算模式下的Pregel模型实现。GraphX提供了对图的抽象
Graph,Graph由顶点(Vertex)、边(Edge)及继承了Edge的EdgeTriplet(添加了srcAttr
和dstAttr用来保存源顶点和目的顶点的属性)三种结构组成。GraphX目前已经封装了
最短路径、网页排名、连接组件、三角关系统计等算法的实现,用户可以选择使用。
T MLlib :Spark 提供的机器学习框架。机器学习是一门涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多领域的交叉学科。MLlib目前已经提供了基础统计、分
类、回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩
减、特征提取与转型、频繁模式挖掘、预言模型标记语言、管道等多种数理统计、概
率论、数据挖掘方面的数学算法。
2.3.2 Spark 模型设计
1. Spark 编程模型
Spark 应用程序从编写到提交、执行、输出的整个过程如图2-5所示,图中描述的步骤如下。
1)用户使用 SparkContext提供的 API(常用的有 textFile、sequenceFile、runJob、stop 等)
编写 Driver application程序。此外SQLContext、HiveContext及StreamingContext对 Spark-
Context进行封装,并提供了SQL、Hive 及流式计算相关的 API。
2)使用SparkContext提交的用户应用程序,首先会使用BlockManager和 Broadcast-
Manager将任务的Hadoop 配置进行广播。然后由DAGScheduler 将任务转换为RDD 并组织成
DAG,DAG 还将被划分为不同的Stage。最后由TaskScheduler 借助 ActorSystem 将任务提交
给集群管理器(Cluster Manager)。第 2 章 Spark 设计理念与基本架构 25
3)集群管理器(Cluster Manager)给任务分配资源,即将具体任务分配到Worker上,Worker创建 Executor来处理任务的运行。Standalone、YARN、Mesos、EC2 等都可以作为
Spark 的集群管理器。
2. RDD计算模型
RDD 可以看做是对各种数据计算模型的统一抽象,Spark 的计算过程主要是RDD 的迭代
计算过程,如图2-6 所示。RDD 的迭代计算过程非常类似于管道。分区数量取决于partition
数量的设定,每个分区的数据只会在一个 Task 中计算。所有分区可以在多个机器节点的
Executor上并行执行。
图2-5 代码执行过程
图2-6 RDD 计算模型
2.4 Spark 基本架构
从集群部署的角度来看,Spark 集群由以下部分组成:
T Cluster Manager:Spark 的集群管理器,主要负责资源的分配与管理。集群管理器分配
的资源属于一级分配,它将各个Worker上的内存、CPU 等资源分配给应用程序,但
是并不负责对 Executor的资源分配。目前,Standalone、YARN、Mesos、EC2 等都可
以作为 Spark 的集群管理器。26 准 备 篇
T Worker :Spark 的工作节点。对Spark 应用程序来说,由集群管理器分配得到资
源的 Worker节点主要负责以下工作:创建Executor,将资源和任务进一步分配给
Executor,同步资源信息给Cluster Manager。
T Executor:执行计算任务的一线进程。主要负责任务的执行以及与 Worker、Driver
App 的信息同步。
T Driver App :客户端驱动程序,也可以理解为客户端应用程序,用于将任务程序转换
为 RDD 和 DAG,并与 Cluster Manager进行通信与调度。
这些组成部分之间的整体关系如图2-7 所示。
图 2-7 Spark 基本架构图
2.5 小结
每项技术的诞生都会由某种社会需求所驱动, Spark 正是在实时计算的大量需求下诞生的。
Spark借助其优秀的处理能力、可用性高、丰富的数据源支持等特点,在当前大数据领域变得
火热,参与的开发者也越来越多。Spark经过几年的迭代发展,如今已经提供了丰富的功能。
笔者相信,Spark在未来必将产生更耀眼的火花。核心设计篇
■ 第3章 SparkContext 的初始化
■ 第4章 存储体系
■ 第5章 任务提交与执行
■ 第6章 计算引擎
■ 第7章 部署模式第3章
SparkContext 的初始化
道生一, 一生二, 二生三, 三生万物。
—《道德经》
本章导读
SparkContext 的初始化是Driver 应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark 的读者理解源码。读者朋友如果能边
跟踪代码,边学习本章内容,也许是快速理解 SparkContext初始化过程的便捷途径。已经熟
练使用Spark 的开发人员可以选择跳过本章内容。
本章将在介绍 SparkContext初始化过程的同时,向读者介绍各个组件的作用,为阅读后
面的章节打好基础。 Spark 中的组件很多,就其功能而言涉及网络通信、分布式、消息、存储、计算、缓存、测量、清理、文件服务、Web UI 的方方面面。
3.1 SparkContext概述
Spark Driver用于提交用户应用程序,实际可以看作 Spark的客户端。了解 Spark Driver
的初始化,有助于读者理解用户应用程序在客户端的处理过程。
Spark Driver 的初始化始终围绕着SparkContext 的初始化。SparkContext 可以算得上是所
有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动。SparkContext 初始
化完毕,才能向Spark 集群提交任务。在平坦的公路上,发动机只需以较低的转速、较低的
功率就可以游刃有余;在山区,你可能需要一台能够提供大功率的发动机才能满足你的需求。
这些参数都是通过驾驶员操作油门、档位等传送给发动机的,而 SparkContext 的配置参数则
Chapter 3第 3 章 SparkContext 的初始化 29
由SparkConf 负责,SparkConf就是你的操作面板。
SparkConf 的构造很简单,主要是通过 ConcurrentHashMap 来维护各种 Spark 的配置属
性。SparkConf代码结构见代码清单3-1。Spark 的配置属性都是以“spark.”开头的字符串。
代码清单3-1 SparkConf代码结构
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._
def this = this(true)
private val settings = new ConcurrentHashMap[String, String]
if (loadDefaults) {
加载任何以spark.开头的系统属性
for ((key, value) <- Utils.getSystemProperties if key.startsWith(spark.)) {
set(key, value)
}
}
其余代码省略
现在开始介绍 SparkContext。SparkContext 的初始化步骤如下:
1)创建 Spark 执行环境 SparkEnv;
2)创建 RDD 清理器 metadataCleaner;
3)创建并初始化 Spark UI;
4)Hadoop 相关配置及 Executor环境变量的设置;
5)创建任务调度 TaskScheduler;
6)创建和启动 DAGScheduler;
7)TaskScheduler 的启动;
8)初始化块管理器 BlockManager(BlockManager是存储体系的主要组件之一,将在第 4
章介绍);
9)启动测量系统MetricsSystem;
10)创建和启动 Executor分配管理器ExecutorAllocationManager;
11)ContextCleaner的创建与启动;
12)Spark 环境更新;
13)创建DAGSchedulerSource 和BlockManagerSource;
14)将 SparkContext标记为激活。
SparkContext的主构造器参数为SparkConf,其实现如下。
clas s SparkContext(con?g: SparkConf) extends Logging with ExecutorAllocationClient {
private val creationSite: CallSite = Utils.getCallSite
private val allowMultipleContexts: Boolean =
con?g.getBoolean(spark.driver.allowMultipleContexts, false)
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
上面代码中的 CallSite存储了线程栈中最靠近栈顶的用户类及最靠近栈底的 Scala或者
Spark核心类信息。Utils.getCallSite的详细信息见附录A。SparkContext 默认只有一个实例(由30 核心设计篇
属性 spark.driver.allowMultipleContexts来控制,用户需要多个 SparkContext 实例时,可以将
其设置为true),方法markPartiallyConstructed用来确保实例的唯一性,并将当前SparkContext
标记为正在构建中。
接下来会对SparkConf 进行复制,然后对各种配置信息进行校验,代码如下。
private[spark] val conf = con?g.clone
conf.validateSettings
if (!conf.contains(spark.master)) {
throw new SparkException(A master URL must be set in your con?guration)
}
if (!conf.contains(spark.app.name)) {
thro w new SparkException(An application name must be set in your con?guration)
}
从上面校验的代码看到必须指定属性spark.master 和 spark.app.name,否则会抛出异常,结束初始化过程。spark.master用于设置部署模式,spark.app.name用于指定应用程序名称。
3.2 创建执行环境 SparkEnv
SparkEnv是 Spark 的执行环境对象,其中包括众多与Executor执行相关的对象。由于
在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下
Worker另起的 CoarseGrainedExecutorBackend进程中也会创建 Executor,所以 SparkEnv 存在
于Driver或者 CoarseGrainedExecutorBackend进程中。创建SparkEnv 主要使用SparkEnv 的
createDriverEnv,SparkEnv.createDriverEnv方法有三个参数:conf、isLocal和 listenerBus。
val isLocal = (master == local || master.startsWith(local[))
private[spark] val listenerBus = new LiveListenerBus
conf.set(spark.executor.id, driver)
private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)
上面代码中的conf是对 SparkConf的复制,isLocal标识是否是单机模式,listenerBus 采
用监听器模式维护各类事件的处理,在3.4.1 节会详细介绍。
SparkEnv的方法createDriverEnv最终调用create创建SparkEnv。SparkEnv的构造步骤如下:
1)创建安全管理器SecurityManager;
2)创建基于 Akka 的分布式消息系统ActorSystem;
3)创建 Map 任务输出跟踪器 mapOutputTracker;
4)实例化 Shuf?eManager;
5)创建 Shuf?eMemoryManager;
6)创建块传输服务BlockTransferService;
7)创建 BlockManagerMaster;第 3 章 SparkContext 的初始化 31
8)创建块管理器 BlockManager;
9)创建广播管理器BroadcastManager;
10)创建缓存管理器CacheManager;
11)创建HTTP 文件服务器 HttpFileServer;
12)创建测量系统MetricsSystem;
13)创建SparkEnv。
3.2.1 安全管理器 SecurityManager
SecurityManager主要对权限、账号进行设置,如果使用 Hadoop YARN 作为集群管理器,则需要使用证书生成 secret key 登录,最后给当前系统设置默认的口令认证实例,此实例采用
匿名内部类实现,参见代码清单3-2。
代码清单3-2 SecurityManager的实现
private val secretKey = generateSecretKey
使用HTTP连接设置口令认证
if (authOn) {
Authenticator.setDefault(
new Authenticator {
override def getPasswordAuthentication: PasswordAuthentication = {
var passAuth: PasswordAuthentication = null
val userInfo = getRequestingURL.getUserInfo
if (userInfo != null) {
val parts = userInfo.split(:, 2)
pass Auth = new PasswordAuthentication(parts(0), parts(1).
toCharArray)
}
return passAuth
}
})
}
3.2.2 基于 Akka 的分布式消息系统 ActorSystem
ActorSystem 是 Spark 中最基础的设施,Spark 既使用它发送分布式消息,又用它实现并
发编程。消息系统可以实现并发?要解释清楚这个问题,首先应该简单介绍下 Scala语言的
Actor并发编程模型:Scala认为 Java 线程通过共享数据以及通过锁来维护共享数据的一致性
是糟糕的做法,容易引起锁的争用,降低并发程序的性能,甚至会引入死锁的问题。在 Scala
中只需要自定义类型继承Actor,并且提供act 方法,就如同Java里实现Runnable 接口,需要
实现 run方法一样。但是不能直接调用 act 方法,而是通过发送消息的方式 (Scala 发送消息是
异步的) 传递数据。如:32 核心设计篇
Actor ! message
Akka是 Actor 编程模型的高级类库,类似于 JDK 1.5 之后越来越丰富的并发工具包,简
化了程序员并发编程的难度。ActorSystem 便是 Akka 提供的用于创建分布式消息通信系统的
基础类。Akka的具体信息见附录 B。
正是因为 Actor轻量级的并发编程、消息发送以及 ActorSystem 支持分布式消息发送等特
点,Spark选择了ActorSystem。
SparkEnv中创建 ActorSystem 时用到了 AkkaUtils工具类,见代码清单 3-3。AkkaUtils.
createActorSystem方法用于启动ActorSystem,见代码清单3-4。AkkaUtils 使用了Utils 的静
态方法startServiceOnPort, startServiceOnPort 最终会回调方法startService: Int => (T, Int),此
处的startService实际是方法 doCreateActorSystem。真正启动 ActorSystem 是由doCreate-
ActorSystem方法完成的,doCreateActorSystem的具体实现细节请见附录B。Spark 的 Driver
中Akka的默认访问地址是akka:sparkDriver,Spark 的 Executor中 Akka 的默认访问地址是
akka: sparkExecutor。如果不指定ActorSystem 的端口,那么所有节点的 ActorSystem 端口在
每次启动时随机产生。关于startServiceOnPort的实现,请见附录A。
代码清单3-3 AkkaUtils工具类创建和启动ActorSystem
val (actorSystem, boundPort) =
Option(defaultActorSystem) match {
case Some(as) => (as, port)
case None =>
val a ctorSystemName = if (isDriver) driverActorSystemName else
executorActorSystemName
Akka Utils.createActorSystem(actorSystemName, hostname, port, conf,securityManager)
}
代码清单3-4 ActorSystem的创建和启动
def createActorSystem(
name: String,host: String,port: Int,conf: SparkConf,securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
3.2.3 map 任务输出跟踪器 mapOutputTracker
mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce 阶段任务获取第 3 章 SparkContext 的初始化 33
地址及中间输出结果。每个 map 任务或者reduce 任务都会有其唯一标识,分别为 mapId 和
reduceId。每个reduce 任务的输入可能是多个map 任务的输出,reduce 会到各个 map 任务的
所在节点上拉取Block,这一过程叫做 shuf?e。每批 shuf?e过程都有唯一的标识shuf?eId。
这里先介绍下 MapOutputTrackerMaster。MapOutputTrackerMaster内部使用 mapStatuses:
TimeStampedHashMap[Int, Array[MapStatus]]来维护跟踪各个map 任务的输出状态。其中key
对应 shuf?eId,Array存储各个 map任务对应的状态信息MapStatus。由于 MapStatus维护
了map输出 Block的地址BlockManagerId,所以 reduce 任务知道从何处获取map 任务的中
间输出。MapOutputTrackerMaster还使用cachedSerializedStatuses:TimeStampedHashMap[Int,Array[Byte]]维护序列化后的各个 map 任务的输出状态。其中 key 对应 shuf?eId,Array 存储
各个序列化MapStatus生成的字节数组。
Driver和Executor处理MapOutputTrackerMaster的方式有所不同。
T 如果当前应用程序是 Driver,则创建 MapOutputTrackerMaster,然后创建
MapOutputTrackerMasterActor,并且注册到 ActorSystem 中。
T 如果当前应用程序是 Executor,则创建 MapOutputTrackerWorker,并从 ActorSystem
中找到 MapOutputTrackerMasterActor。
无论是 Driver还是 Executor,最后都由 mapOutputTracker的属性 trackerActor持有
MapOutputTrackerMasterActor的引用,参见代码清单3-5。
代码清单3-5 registerOrLookup方法用于查找或者注册Actor的实现
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo(Registering + name)
actorSystem.actorOf(Props(newActor), name = name)
} else {
AkkaUtils.makeDriverRef(name, conf, actorSystem)
}
}
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
}
mapOutputTracker.trackerActor = registerOrLookup(
MapOutputTracker,new M apOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrack
erMaster], conf))
在后面章节大家会知道map任务的状态正是由Executor向持有的MapOutputTracker-
MasterActor发送消息,将map任务状态同步到mapOutputTracker的mapStatuses和cached-
SerializedStatuses的。Executor究竟是如何找到MapOutputTrackerMasterActor的?registerOrLookup34 核心设计篇
方法通过调用 AkkaUtils.makeDriverRef找到 MapOutputTrackerMasterActor,实际正是利用
ActorSystem提供的分布式消息机制实现的,具体细节参见附录 B。这里第一次使用到了 Akka
提供的功能,以后大家会渐渐感觉到使用Akka的便捷。
3.2.4 实例化 ShuffleManager
Shuf?eManager 负责管理本地及远程的 block 数据的 shuf?e 操作。Shuf?eManager
默认为通过反射方式生成的 SortShuf?eManager 的实例,可以修改属性 spark.shuf?e.
manager 为 hash 来显式控制使用 HashShuf?eManager。SortShuf?eManager 通过持有的
IndexShuf?eBlockManager 间接操作 BlockManager 中的 DiskBlockManager 将 map 结果写入
本地,并根据 shuf?eId、mapId 写入索引文件,也能通过 MapOutputTrackerMaster 中维护
的 mapStatuses 从本地或者其他远程节点读取文件。有读者可能会问,为什么需要 shuf?e ?
Spark 作为并行计算框架,同一个作业会被划分为多个任务在多个节点上并行执行,reduce
的输入可能存在于多个节点上,因此需要通过“洗牌”将所有 reduce 的输入汇总起来,这
个过程就是 shuf?e。这个问题以及对 Shuf?eManager 的具体使用会在第 5 章和第 6 章详述。
Shuf?eManager 的实例化见代码清单 3-6。代码清单 3-6 最后创建的 Shuf?eMemoryManager
将在 3.2.5 节介绍。
代码清单3-6 Shuf?eManager的实例化及Shuf?eMemoryManager的创建
val shortShuf?eMgrNames = Map(
hash -> org.apache.spark.shuf?e.hash.HashShuf?eManager,sort -> org.apache.spark.shuf?e.sort.SortShuf?eManager)
val shuf?eMgrName = conf.get(spark.shuf?e.manager, sort)
val shuf?eMgrClass = shortShuf?eMgrNames.get
OrElse(shuf?eMgrName.toLowerCase, shuf?eMgrName)
val shuf?eManager = instantiateClass[Shuf?eManager](shuf?eMgrClass)
val shuf?eMemoryManager = new Shuf?eMemoryManager(conf)
3.2.5 shuffle 线程内存管理器 ShuffleMemoryManager
Shuf?eMemoryManager负责管理shuf?e 线程占有内存的分配与释放,并通过thread-
Memory:mutable.HashMap[Long, Long]缓存每个线程的内存字节数,见代码清单3-7。
代码清单3-7 Shuf?eMemoryManager的数据结构
private[spark] class Shuf?eMemoryManager(maxMemory: Long) extends Logging {
priv ate val threadMemory = new mutable.HashMap[Long, Long] threadId ->
memory bytes
def this(conf: SparkConf) = this(Shuf?eMemoryManager.getMaxMemory(conf))
getMaxMemory方法用于获取 shuf?e 所有线程占用的最大内存,实现如下。
def getMaxMemory(conf: SparkConf): Long = {第 3 章 SparkContext 的初始化 35
val memoryFraction = conf.getDouble(spark.shuf?e.memoryFraction, 0.2)
val safetyFraction = conf.getDouble(spark.shuf?e.safetyFraction, 0.8)
(Runtime.getRuntime.maxMemory memoryFraction safetyFraction).toLong
}
从上面代码可以看出,shuf?e 所有线程占用的最大内存的计算公式为:
Java 运行时最大内存 Spark 的 shuf?e 最大内存占比 Spark的安全内存占比
可以配置属性 spark.shuf?e.memoryFraction修改Spark 的 shuf?e最大内存占比,配置属性
spark.shuf?e.safetyFraction修改 Spark 的安全内存占比。
Shuf?eMemoryManager通常运行在 Executor中,Driver 中的 Shuf?eMemoryManager
只有在 local模式下才起作用。
3.2.6 块传输服务 BlockTransferService
BlockTransferService默 认为 NettyBlockTransferService(可以配置属性 spark.shuf?e.
blockTransferService使用 NioBlockTransferService),它使用Netty提供的异步事件驱动的网络
应用框架,提供 web服务及客户端,获取远程节点上 Block的集合。
val blockTransferService =
conf.get(spark.shuf?e.blockTransferService, netty).toLowerCase match {
case netty =>
new NettyBlockTransferService(conf, securityManager, numUsableCores)
case nio =>
new NioBlockTransferService(conf, securityManager)
}
NettyBlockTransferService 的具体实现将在第 4 章详细介绍。这里大家可能觉得奇怪,这
样的网络应用为何也要放在存储体系?大家不妨先带着疑问,直到你真正了解了存储体系。
3.2.7 BlockManagerMaster 介绍
BlockManagerMaster负责对 Block 的管理和协调,具体操作依赖于 BlockManager-
MasterActor。Driver和 Executor处理 BlockManagerMaster的方式不同:
T 如果当前应用程序是Driver,则创建BlockManagerMasterActor,并且注册到 Actor-
System 中。
T 如果当前应用程序是Executor,则从 ActorSystem中找到BlockManagerMasterActor。
无论是 Driver还是 Executor,最后 BlockManagerMaster的属性 driverActor将持有对
BlockManagerMasterActor的引用。BlockManagerMaster的创建代码如下。
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
BlockManagerMaster,new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
注 意36 核心设计篇
registerOrLookup已在3.2.3节介绍过了,不再赘述。BlockManagerMaster及BlockManager-
MasterActor的具体实现将在第4章详细介绍。
3.2.8 创建块管理器 BlockManager
BlockManager负责对 Block 的管理,只有在 BlockManager的初始化方法 initialize
被调用后,它才是有效的。BlockManager作为存储系统的一部分,具体实现见第 4 章。
BlockManager的创建代码如下。
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,seri alizer, conf, mapOutputTracker, shuffleManager, blockTransferService,securityManager,numUsableCores)
3.2.9 创建广播管理器 BroadcastManager
BroadcastManager用于将配置信息和序列化后的RDD、Job以及Shuf?eDependency等信息
在本地存储。如果为了容灾,也会复制到其他节点上。创建BroadcastManager的代码实现如下。
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
BroadcastManager必须在其初始化方法initialize被调用后,才能生效。initialize方法
实际利用反射生成广播工厂实例broadcastFactory(可以配置属性spark.broadcast.factory指
定,默认为 org.apache.spark.broadcast.TorrentBroadcastFactory)。BroadcastManager的广播
方法 newBroadcast实际代理了工厂broadcastFactory的 newBroadcast方法来生成广播对
象。unbroadcast 方法实际代理了工厂broadcastFactory的unbroadcast方法生成非广播对象。
BroadcastManager的initialize、unbroadcast 及 newBroadcast 方法见代码清单3-8。
代码清单3-8 BroadcastManager的实现
private def initialize {
synchronized {
if (!initialized) {
val b roadcastFactoryClass = conf.get(spark.broadcast.factory, org.
apache.spark.broadcast.TorrentBroadcastFactory)
broadcastFactory =
Clas s.forName(broadcastFactoryClass).newInstance.asInstanceOf
[BroadcastFactory]
broadcastFactory.initialize(isDriver, conf, securityManager)
initialized = true
}
}
}
private val nextBroadcastId = new AtomicLong(0)
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
broa dcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.
getAndIncrement)
}第 3 章 SparkContext 的初始化 37
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
}
}
3.2.10 创建缓存管理器 CacheManager
CacheManager用于缓存RDD 某个分区计算后的中间结果,缓存计算结果发生在迭代计
算的时候,将在6.1 节讲到。而CacheManager将在 4.10 节详细描述。创建CacheManager的
代码如下。
val cacheManager = new CacheManager(blockManager)
3.2.11 HTTP 文件服务器 HttpFileServer
HttpFileServer的创建参见代码清单3-9。HttpFileServer主要提供对jar 及其他文件的http
访问,这些jar包包括用户上传的jar 包。端口由属性spark.?leserver.port配置,默认为0,表
示随机生成端口号。
代码清单3-9 HttpFileServer的创建
val httpFileServer =
if (isDriver) {
val ?leServerPort = conf.getInt(spark.?leserver.port, 0)
val server = new HttpFileServer(conf, securityManager, ?leServerPort)
server.initialize
conf.set(spark.?leserver.uri, server.serverUri)
server
} else {
null
}
HttpFileServer的初始化过程见代码清单 3-10,主要包括以下步骤:
1)使用 Utils 工具类创建文件服务器的根目录及临时目录(临时目录在运行时环境关闭时
会删除)。Utils 工具的详细介绍见附录A。
2)创建存放 jar 包及其他文件的文件目录。
3)创建并启动 HTTP 服务。
代码清单3-10 HttpFileServer的初始化
def initialize {
baseDir = Utils.createTempDir(Utils.getLocalDir(conf), httpd)
leDir = new File(baseDir, ?les)
jarDir = new File(baseDir, jars)
leDir.mkdir
jarDir.mkdir38 核心设计篇
logInfo(HTTP File server directory is + baseDir)
http Server = new HttpServer(conf, baseDir, securityManager, requestedPort,HTTP ?le server)
httpServer.start
serverUri = httpServer.uri
logDebug(HTTP ?le server started at: + serverUri)
}
HttpServer的构造和 start方法的实现中,再次使用了 Utils的静态方法 startServiceOnPort,因此会回调doStart 方法,见代码清单3-11。有关Jetty的 API 使用参见附录C。
代码清单3-11 HttpServer的启动
def start {
if (server != null) {
throw new ServerStateException(Server is already started)
} else {
logInfo(Starting HTTP Server)
val (actualServer, actualPort) =
Util s.startServiceOnPort[Server](requestedPort, doStart, conf,serverName)
server = actualServer
port = actualPort
}
}
doStart方法中启动内嵌的Jetty 所提供的HTTP服务,见代码清单3-12。
代码清单3-12 HttpServer的启动功能实现
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server
val connector = new SocketConnector
connector.setMaxIdleTime(60 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)
val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
if (securityManager.isAuthenticationEnabled{
logDebug(HttpServer is using security)
val sh = setupSecurityHandler(securityManager)
make sure we go through security handler to get resources
sh.setHandler(handlerList)第 3 章 SparkContext 的初始化 39
server.setHandler(sh)
} else {
logDebug(HttpServer is not using security)
server.setHandler(handlerList)
}
server.start
val actualPort = server.getConnectors(0).getLocalPort
(server, actualPort)
}
3.2.12 创建测量系统 MetricsSystem
MetricsSystem 是 Spark 的测量系统,创建 MetricsSystem 的代码如下。
val metricsSystem = if (isDriver) {
MetricsSystem.createMe ......
深入理解Spark:
核心思想与源码分析
耿嘉安 著图书在版编目(CIP)数据
深入理解 Spark:核心思想与源码分析 耿嘉安著 . —北京:机械工业出版社,2015.12
(大数据技术丛书)
ISBN 978-7-111-52234-8
I. 深… II.耿… III.数据处理软件 IV. TP274
中国版本图书馆 CIP 数据核字(2015)第 280808 号
深入理解Spark:核心思想与源码分析
出版发行:机械工业出版社(北京市西城区百万庄大街 22 号 邮政编码:100037)
责任编辑:高婧雅 责任校对:董纪丽
印 刷: 版 次:2016 年 1 月第 1 版第 1 次印刷
开 本:186mm×240mm 116 印 张:30.25
书 号: ISBN 978-7-111-52234-8 定 价:99.00 元
凡购本书,如有缺页、倒页、脱页,由本社发行部调换
客服热线:(010)88379426 88361066 投稿热线:(010)88379604
购书热线:(010)68326294 88379649 68995259 读者信箱:hzit@hzbook.com
版权所有·侵权必究
封底无防伪标均为盗版
本书法律顾问:北京大成律师事务所 韩光 邹晓东
Preface 前?言
为什么写这本书
要回答这个问题,需要从我个人的经历说起。说来惭愧,我第一次接触计算机是在高三。
当时跟大家一起去网吧玩 CS,跟身边的同学学怎么“玩”。正是通过这种“玩”的过程,让
我了解到计算机并没有那么神秘,它也只是台机器,用起来似乎并不比打开电视机费劲多少。
高考填志愿的时候,凭着直觉“糊里糊涂”就选择了计算机专业。等到真正学习计算机课程
的时候却又发现,它其实很难!
早在 2004 年,还在学校的我跟很多同学一样,喜欢看 Flash,也喜欢谈论 Flash 甚至做
Flash。感觉Flash 正如它的名字那样“闪光”。那些年,在学校里,知道 Flash 的人可要比知
道Java 的人多得多,这说明当时的 Flash十分火热。此外,Oracle也成为关系型数据库里的领
军人物,很多人甚至觉得懂Oracle 要比懂 Flash、Java 及其他数据库要厉害得多!
2007 年,我刚刚参加工作不久。那时 Struts1、Spring、Hibernate几乎可以称为那些
用Java 作为开发语言的软件公司的三驾马车。很快,Struts2替代了Struts1 的地位,让我第
一次意识到 IT 领域的技术更新竟然如此之快!随着很多传统软件公司向互联网公司转型,Hibernate也难以确保其地位,iBATIS诞生了!
2010 年,有关Hadoop 的技术图书涌入中国,当时很多公司用它只是为了数据统计、数
据挖掘或者搜索。一开始,人们对于Hadoop 的认识和使用可能相对有限。大约2011 年的时
候,关于云计算的概念在网上炒得火热,当时依然在做互联网开发的我,对其只是“道听途
说”。后来跟同事借了一本有关云计算的书,回家挑着看了一些内容,也没什么收获,怅然若
失! 20 世纪60年代,美国的军用网络作为互联网的雏形,很多内容已经与云计算中的某些
说法类似。到 20 世纪80 年代,互联网就已经启用了云计算,如今为什么又要重提这样的概
念?这个问题我可能回答不了,还是交给历史吧。
2012年,国内又呈现出大数据热的态势。从国家到媒体、教育、IT等几乎所有领域,人
人都在谈大数据。我的亲戚朋友中,无论老师、销售人员,还是工程师们都可以针对大数据谈
谈自己的看法。我也找来一些 Hadoop 的书籍进行学习,希望能在其中探索到大数据的奥妙。有幸在工作过程中接触到阿里的开放数据处理服务(open data processing service,ODPS),并且基于 ODPS 与其他小伙伴一起构建阿里的大数据商业解决方案—御膳房。去
杭州出差的过程中,有幸认识和仲,跟他学习了阿里的实时多维分析平台—Garuda 和实
时计算平台—Galaxy 的部分知识。和仲推荐我阅读 Spark 的源码,这样会对实时计算及流
式计算有更深入的了解。2015 年春节期间,自己初次上网查阅 Spark 的相关资料学习,开始
研究 Spark 源码。还记得那时只是出于对大数据的热爱,想使自己在这方面的技术能力有所
提升。
从阅读 Hibernate源码开始,到后来阅读Tomcat、Spring 的源码,我也在从学习源码的
过程中成长,我对源码阅读也越来越感兴趣。随着对Spark源码阅读的深入,发现很多内容
从网上找不到答案,只能自己“硬啃”了。随着自己的积累越来越多,突然有一天发现,我
所总结的这些内容好像可以写成一本书了!从闪光(Flash)到火花(Spark),足足有 11 个年
头了。无论是 Flash、Java,还是 Spring、iBATIS,我一直扮演着一个追随者,我接受这些书
籍的洗礼,从未给予。如今我也是Spark 的追随者,不同的是,我不再只想简单攫取,还要
给予。
最后还想说一下,2016 年是我从事 IT工作的第 10 个年头,此书特别作为送给自己的 10
周年礼物。
本书特色
T 按照源码分析的习惯设计,从脚本分析到初始化再到核心内容,最后介绍Spark 的扩
展内容。整个过程遵循由浅入深、由深到广的基本思路。
T 本书涉及的所有内容都有相应的例子,以便于读者对源码的深入研究。
T 本书尽可能用图来展示原理,加速读者对内容的掌握。
T 本书讲解的很多实现及原理都值得借鉴,能帮助读者提升架构设计、程序设计等方面
的能力。
T 本书尽可能保留较多的源码,以便于初学者能够在像地铁、公交这样的地方,也能轻
松阅读。
读者对象
源码阅读是一项苦差事,人力和时间成本都很高,尤其是对于 Spark 陌生或者刚刚开始
学习的人来说,难度可想而知。本书尽可能保留源码,使得分析过程不至于产生跳跃感,目
的是降低大多数人的学习门槛。如果你是从事IT 工作 1 ~ 3 年的新人或者是希望学习Spark
核心知识的人,本书非常适合你。如果你已经对Spark有所了解或者已经在使用它,还想进
一步提高自己,那么本书更适合你。
如果你是一个开发新手,对 Java、Linux等基础知识不是很了解,那么本书可能不太适合
你。如果你已经对Spark 有深入的研究,本书也许可以作为你的参考资料。
IVV
总体说来,本书适合以下人群:
T 想要使用Spark,但对Spark实现原理不了解,不知道怎么学习的人;
T 大数据技术爱好者,以及想深入了解Spark 技术内部实现细节的人;
T 有一定Spark使用基础,但是不了解Spark 技术内部实现细节的人;
T 对性能优化和部署方案感兴趣的大型互联网工程师和架构师;
T 开源代码爱好者。喜欢研究源码的同学可以从本书学到一些阅读源码的方式与方法。
本书不会教你如何开发 Spark应用程序,只是用一些经典例子演示。本书简单介绍
Hadoop MapReduce、Hadoop YARN、Mesos、Tachyon、ZooKeeper、HDFS、Amazon S3,但
不会过多介绍这些框架的使用,因为市场上已经有丰富的这类书籍供读者挑选。本书也不会
过多介绍 Scala、Java、Shell的语法,读者可以在市场上选择适合自己的书籍阅读。
如何阅读本书
本书分为三大部分(不包括附录):
准备篇(第 1 ~ 2 章),简单介绍了 Spark 的环境搭建和基本原理,帮助读者了解一些背
景知识。
核心设计篇(第 3 ~ 7 章),着重讲解SparkContext的初始化、存储体系、任务提交与执
行、计算引擎及部署模式的原理和源码分析。
扩展篇(第8 ~ 11 章),主要讲解基于 Spark核心的各种扩展及应用,包括:SQL 处理
引擎、Hive 处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等
内容。
本书最后还添加了几个附录,包括:附录 A 介绍的 Spark 中最常用的工具类 Utils ;附录
B是 Akka 的简介与工具类 AkkaUtils的介绍;附录 C为Jetty 的简介和工具类 JettyUtils的介
绍;附录 D为 Metrics库的简介和测量容器 MetricRegistry的介绍;附录 E 演示了 Hadoop1.0
版本中的 word count例子;附录F 介绍了工具类 CommandUtils的常用方法;附录 G是关于
Netty的简介和工具类 NettyUtils的介绍;附录 H 列举了笔者编译 Spark源码时遇到的问题及
解决办法。
为了降低读者阅读理解 Spark 源码的门槛,本书尽可能保留源码实现,希望读者能够怀
着一颗好奇的心,Spark 当前很火热,其版本更新也很快,本书以Spark 1.2.3 版本为主,有兴
趣的读者也可按照本书的方式,阅读Spark的最新源码。
勘误和支持
本书内容很多,限于笔者水平有限,书中内容难免有错误之处。在本书出版后的任何
时间,如果你对本书有任何问题或者意见,都可以通过邮箱 beliefer@163.com或博客 http:
www.cnblogs.comjiaan-geng联系我,说出你的建议或者想法,希望与大家共同进步。VI
致谢
感谢苍天,让我生活在这样一个时代,能接触互联网和大数据;感谢父母,这么多年来,在学习、工作及生活上的帮助与支持;感谢妻子在生活中的照顾和谦让。
感谢杨福川和高婧雅给予本书出版的大力支持与帮助。
感谢冰夷老大和王贲老大让我有幸加入阿里,接触大数据应用;感谢和仲对 Galaxy 和
Garuda耐心细致的讲解以及对Spark 的推荐;感谢张中在百忙之中给本书写评语;感谢周亮、澄苍、民瞻、石申、清无、少侠、征宇、三步、谢衣、晓五、法星、曦轩、九翎、峰阅、丁
卯、阿末、紫丞、海炎、涵康、云飏、孟天、零一、六仙、大知、井凡、隆君、太奇、晨炫、既望、宝升、都灵、鬼厉、归钟、梓撤、昊苍、水村、惜冰、惜陌、元乾等同仁在工作上的
支持和帮助。
耿嘉安 于北京
Contents 目?录
前言
准 备 篇
第 1 章 环境准备 ············································· 2
1.1 运行环境准备 ··········································· 2
1.1.1 安装 JDK ········································· 3
1.1.2 安装 Scala ········································ 3
1.1.3 安装 Spark ······································· 4
1.2 Spark 初体验 ············································· 4
1.2.1 运行spark-shell ······························· 4
1.2.2 执行word count ······························ 5
1.2.3 剖析spark-shell ······························· 7
1.3 阅读环境准备 ········································· 11
1.4 Spark 源码编译与调试 ························· 13
1.5 小结 ··························································· 17
第2 章 Spark 设计理念与基本架构 ····· 18
2.1 初识 Spark ··············································· 18
2.1.1 Hadoop MRv1 的局限··················· 18
2.1.2 Spark 使用场景 ····························· 20
2.1.3 Spark 的特点 ································· 20
2.2 Spark基础知识 ······································ 20
2.3 Spark基本设计思想 ····························· 22
2.3.1 Spark模块设计 ····························· 22
2.3.2 Spark模型设计 ····························· 24
2.4 Spark基本架构 ······································ 25
2.5 小结 ··························································· 26
核心设计篇
第 3 章 SparkContext 的初始化 ············· 28
3.1 SparkContext概述 ································· 28
3.2 创建执行环境SparkEnv ······················ 30
3.2.1 安全管理器 SecurityManager ······· 31
3.2.2 基于 Akka 的分布式消息
系统 ActorSystem·························· 31
3.2.3 map 任务输出跟踪器
mapOutputTracker ························· 32
3.2.4 实例化 Shuf?eManager ················· 34
3.2.5 shuf?e 线程内存管理器
Shuf?eMemoryManager ················ 34
3.2.6 块传输服务BlockTransferService ··· 35
3.2.7 BlockManagerMaster介绍 ············ 35VIII
3.2.8 创建块管理器BlockManager ······· 36
3.2.9 创建广播管理器Broadcast-
Manager ········································· 36
3.2.10 创建缓存管理器CacheManager ··· 37
3.2.11 HTTP文件服务器 HttpFile-
Server ··········································· 37
3.2.12 创建测量系统 MetricsSystem ···· 39
3.2.13 创建 SparkEnv ····························· 40
3.3 创建 metadataCleaner ···························· 41
3.4 SparkUI 详解 ·········································· 42
3.4.1 listenerBus 详解 ···························· 43
3.4.2 构造 JobProgressListener ·············· 46
3.4.3 SparkUI的创建与初始化 ············· 47
3.4.4 Spark UI的页面布局与展示 ········ 49
3.4.5 SparkUI的启动 ····························· 54
3.5 Hadoop 相关配置及 Executor环境
变量 ··························································· 54
3.5.1 Hadoop 相关配置信息 ·················· 54
3.5.2 Executor环境变量 ························ 54
3.6 创建任务调度器 TaskScheduler ········· 55
3.6.1 创建TaskSchedulerImpl ··············· 55
3.6.2 TaskSchedulerImpl的初始化 ······· 57
3.7 创建和启动 DAGScheduler ················· 57
3.8 TaskScheduler的启动 ··························· 60
3.8.1 创建LocalActor ···························· 60
3.8.2 ExecutorSource的创建与注册 ····· 62
3.8.3 ExecutorActor的构建与注册 ······· 64
3.8.4 Spark 自身ClassLoader的创建 ··· 64
3.8.5 启动 Executor的心跳线程 ··········· 66
3.9 启动测量系统 MetricsSystem ············· 69
3.9.1 注册 Sources ·································· 70
3.9.2 注册 Sinks······································ 70
3.9.3 给Sinks 增加 Jetty的Servlet-
ContextHandler ······························ 71
3.10 创建和启动 ExecutorAllocation-
Manager ················································· 72
3.11 ContextCleaner的创建与启动 ·········· 73
3.12 Spark 环境更新 ···································· 74
3.13 创建 DAGSchedulerSource 和
BlockManagerSource ··························· 76
3.14 将 SparkContext 标记为激活 ············ 77
3.15 小结 ························································ 78
第 4 章 存储体系 ··········································· 79
4.1 存储体系概述 ········································· 79
4.1.1 块管理器BlockManager的实现 ··· 79
4.1.2 Spark 存储体系架构 ····················· 81
4.2 shuf?e 服务与客户端 ···························· 83
4.2.1 Block的RPC 服务 ······················· 84
4.2.2 构造传输上下文Transpor-
tContext ·········································· 85
4.2.3 RPC客户端工厂 Transport-
ClientFactory ································· 86
4.2.4 Netty 服务器 TransportServer ······· 87
4.2.5 获取远程shuf?e 文件··················· 88
4.2.6 上传shuf?e 文件··························· 89
4.3 BlockManagerMaster对Block-
Manager 的管理 ····································· 90
4.3.1 BlockManagerMasterActor ··········· 90
4.3.2 询问 Driver 并获取回复方法 ······· 92
4.3.3 向 BlockManagerMaster注册
BlockManagerId ···························· 93
4.4 磁盘块管理器 DiskBlockManager ····· 94
4.4.1 DiskBlockManager的构造过程 ··· 94IX
4.4.2 获取磁盘文件方法getFile ··········· 96
4.4.3 创建临时Block 方法create-
TempShuf?eBlock ························· 96
4.5 磁盘存储 DiskStore ······························· 97
4.5.1 NIO读取方法 getBytes ················ 97
4.5.2 NIO写入方法 putBytes ················ 98
4.5.3 数组写入方法 putArray ················ 98
4.5.4 Iterator写入方法 putIterator ········· 98
4.6 内存存储 MemoryStore ························ 99
4.6.1 数据存储方法putBytes ·············· 101
4.6.2 Iterator写入方法putIterator
详解 ············································· 101
4.6.3 安全展开方法unrollSafely ········· 102
4.6.4 确认空闲内存方法ensureFree-
Space ············································ 105
4.6.5 内存写入方法putArray ·············· 107
4.6.6 尝试写入内存方法tryToPut ······ 108
4.6.7 获取内存数据方法 getBytes ······ 109
4.6.8 获取数据方法 getValues ············· 110
4.7 Tachyon存储TachyonStore··············· 110
4.7.1 Tachyon 简介 ······························· 111
4.7.2 TachyonStore 的使用 ·················· 112
4.7.3 写入 Tachyon内存的方法
putIntoTachyonStore ···················· 113
4.7.4 获取序列化数据方法getBytes ···· 113
4.8 块管理器 BlockManager ···················· 114
4.8.1 移出内存方法dropFrom-
Memory ······································· 114
4.8.2 状态报告方法reportBlockStatus··· 116
4.8.3 单对象块写入方法 putSingle ····· 117
4.8.4 序列化字节块写入方法
putBytes ······································· 118
4.8.5 数据写入方法 doPut ··················· 118
4.8.6 数据块备份方法 replicate ··········· 121
4.8.7 创建DiskBlockObjectWriter
的方法getDiskWriter ·················· 125
4.8.8 获取本地Block数据方法
getBlockData ······························· 125
4.8.9 获取本地shuf?e数据方法
doGetLocal ·································· 126
4.8.10 获取远程Block数据方法
doGetRemote ····························· 127
4.8.11 获取 Block 数据方法 get ·········· 128
4.8.12 数据流序列化方法
dataSerializeStream ···················· 129
4.9 metadataCleaner和 broadcast-
Cleaner ···················································· 129
4.10 缓存管理器CacheManager ············· 130
4.11 压缩算法 ·············································· 133
4.12 磁盘写入实现DiskBlockObject-
Writer ···················································· 133
4.13 块索引shuf?e管理器Index-
Shuf?eBlockManager ························ 135
4.14 shuf?e内存管理器Shuf?e-
MemoryManager ································ 137
4.15 小结 ······················································ 138
第 5 章 任务提交与执行 ··························· 139
5.1 任务概述 ················································ 139
5.2 广播Hadoop的配置信息 ··················· 142
5.3 RDD转换及 DAG 构建 ····················· 144
5.3.1 为什么需要 RDD ························ 144
5.3.2 RDD实现分析 ···························· 146
5.4 任务提交 ················································ 152X
5.4.1 任务提交的准备 ························· 152
5.4.2 finalStage的创建与Stage的
划分 ·············································· 157
5.4.3 创建Job ······································· 163
5.4.4 提交Stage···································· 164
5.4.5 提交Task ····································· 165
5.5 执行任务 ················································ 176
5.5.1 状态更新 ····································· 176
5.5.2 任务还原 ····································· 177
5.5.3 任务运行 ····································· 178
5.6 任务执行后续处理 ······························ 179
5.6.1 计量统计与执行结果序列化 ····· 179
5.6.2 内存回收 ····································· 180
5.6.3 执行结果处理 ····························· 181
5.7 小结 ························································· 187
第 6 章 计算引擎 ········································· 188
6.1 迭代计算 ················································ 188
6.2 什么是shuf?e ······································· 192
6.3 map端计算结果缓存处理 ················· 194
6.3.1 map 端计算结果缓存聚合·········· 195
6.3.2 map端计算结果简单缓存·········· 200
6.3.3 容量限制 ····································· 201
6.4 map端计算结果持久化 ····················· 204
6.4.1 溢出分区文件 ····························· 205
6.4.2 排序与分区分组 ························· 207
6.4.3 分区索引文件 ····························· 209
6.5 reduce端读取中间计算结果 ············· 210
6.5.1 获取map 任务状态····················· 213
6.5.2 划分本地与远程 Block ··············· 215
6.5.3 获取远程Block ··························· 217
6.5.4 获取本地 Block ··························· 218
6.6 reduce端计算 ······································· 219
6.6.1 如何同时处理多个map 任务的
中间结果 ······································ 219
6.6.2 reduce端在缓存中对中间计算
结果执行聚合和排序 ················· 220
6.7 map 端与 reduce 端组合分析 ············ 221
6.7.1 在 map端溢出分区文件,在reduce 端合并组合 ················· 221
6.7.2 在map端简单缓存、排序分组,在reduce 端合并组合 ················· 222
6.7.3 在map端缓存中聚合、排序分组,在reduce 端组合 ························· 222
6.8 小结 ························································· 223
第7章 部署模式 ········································· 224
7.1 local部署模式 ······································ 225
7.2 local-cluster部署模式 ························· 225
7.2.1 LocalSparkCluster的启动 ·········· 226
7.2.2 CoarseGrainedSchedulerBackend
的启动 ········································· 236
7.2.3 启动AppClient ···························· 237
7.2.4 资源调度 ····································· 242
7.2.5 local-cluster模式的任务执行 ····· 253
7.3 Standalone部署模式 ··························· 255
7.3.1 启动 Standalone模式 ·················· 255
7.3.2 启动 Master 分析 ························ 257
7.3.3 启动 Worker分析························ 259
7.3.4 启动 Driver Application分析 ····· 261
7.3.5 Standalone模式的任务执行 ······· 263
7.3.6 资源回收 ····································· 263
7.4 容错机制 ················································ 266
7.4.1 Executor异常退出 ······················ 266XI
7.4.2 Worker 异常退出························· 268
7.4.3 Master异常退出 ························· 269
7.5 其他部署方案 ······································· 276
7.5.1 YARN··········································· 277
7.5.2 Mesos ··········································· 280
7.6 小结 ························································· 282
扩 展 篇
第 8 章 Spark SQL ····································· 284
8.1 Spark SQL 总体设计 ··························· 284
8.1.1 传统关系型数据库 SQL运行
原理 ············································· 285
8.1.2 Spark SQL运行架构··················· 286
8.2 字典表 Catalog ····································· 288
8.3 Tree和 TreeNode ································· 289
8.4 词法解析器 Parser 的设计与实现 ··· 293
8.4.1 SQL语句解析的入口 ················· 294
8.4.2 建表语句解析器 DDLParser ······ 295
8.4.3 SQL语句解析器SqlParser········· 296
8.4.4 Spark 代理解析器SparkSQL-
Parser ··········································· 299
8.5 Rule 和 RuleExecutor ·························· 300
8.6 Analyzer与Optimizer的设计与
实现 ························································· 302
8.6.1 语法分析器Analyzer ·················· 304
8.6.2 优化器 Optimizer ························ 305
8.7 生成物理执行计划 ······························ 306
8.8 执行物理执行计划 ······························ 308
8.9 Hive ························································· 311
8.9.1 Hive SQL 语法解析器 ················ 311
8.9.2 Hive SQL元数据分析 ················ 313
8.9.3 Hive SQL物理执行计划 ············ 314
8.10 应用举例:JavaSparkSQL ·············· 314
8.11 小结 ······················································· 320
第 9 章 流式计算 ········································· 321
9.1 Spark Streaming总体设计 ················· 321
9.2 StreamingContext初始化 ··················· 323
9.3 输入流接收器规范Receiver ············· 324
9.4 数据流抽象DStream ·························· 325
9.4.1 Dstream 的离散化 ······················· 326
9.4.2 数据源输入流 InputDStream ······ 327
9.4.3 Dstream 转换及构建 DStream
Graph ··········································· 329
9.5 流式计算执行过程分析 ····················· 330
9.5.1 流式计算例子CustomReceiver ···· 331
9.5.2 Spark Streaming执行环境构建 ··· 335
9.5.3 任务生成过程 ····························· 347
9.6 窗口操作 ················································ 355
9.7 应用举例 ················································ 357
9.7.1 安装mosquitto ···························· 358
9.7.2 启动mosquitto ···························· 358
9.7.3 MQTTWordCount························ 359
9.8 小结 ························································· 361
第 10 章 图计算 ············································ 362
10.1 Spark GraphX 总体设计 ··················· 362
10.1.1 图计算模型 ····························· 363
10.1.2 属性图 ····································· 365
10.1.3 GraphX的类继承体系 ··········· 367
10.2 图操作 ·················································· 368
10.2.1 属性操作 ································· 368XII
10.2.2 结构操作 ································· 368
10.2.3 连接操作 ································· 369
10.2.4 聚合操作 ································· 370
10.3 Pregel API ············································ 371
10.3.1 Dijkstra算法 ··························· 373
10.3.2 Dijkstra的实现 ······················· 376
10.4 Graph的构建 ······································ 377
10.4.1 从边的列表加载Graph ·········· 377
10.4.2 在Graph中创建图的方法 ····· 377
10.5 顶点集合抽象VertexRDD··············· 378
10.6 边集合抽象EdgeRDD ····················· 379
10.7 图分割 ·················································· 380
10.8 常用算法·············································· 382
10.8.1 网页排名 ································· 382
10.8.2 Connected Components的
应用 ········································· 386
10.8.3 三角关系统计 ························· 388
10.9 应用举例·············································· 390
10.10 小结 ···················································· 391
第 11 章 机器学习 ······································· 392
11.1 机器学习概论 ····································· 392
11.2 Spark MLlib 总体设计 ······················ 394
11.3 数据类型 ·············································· 394
11.3.1 局部向量 ································· 394
11.3.2 标记点 ····································· 395
11.3.3 局部矩阵 ································· 396
11.3.4 分布式矩阵 ····························· 396
11.4 基础统计 ·············································· 398
11.4.1 摘要统计 ································· 398
11.4.2 相关统计 ································· 399
11.4.3 分层抽样 ································· 401
11.4.4 假设检验 ································· 401
11.4.5 随机数生成 ····························· 402
11.5 分类和回归 ········································· 405
11.5.1 数学公式 ································· 405
11.5.2 线性回归 ································· 407
11.5.3 分类 ········································· 407
11.5.4 回归 ········································· 410
11.6 决策树 ·················································· 411
11.6.1 基本算法 ································· 411
11.6.2 使用例子 ································· 412
11.7 随机森林 ·············································· 413
11.7.1 基本算法 ································· 414
11.7.2 使用例子 ································· 414
11.8 梯度提升决策树 ································ 415
11.8.1 基本算法 ································· 415
11.8.2 使用例子 ································· 416
11.9 朴素贝叶斯 ········································· 416
11.9.1 算法原理 ································· 416
11.9.2 使用例子 ································· 418
11.10 保序回归 ··········································· 418
11.10.1 算法原理 ······························ 418
11.10.2 使用例子 ······························ 419
11.11 协同过滤 ············································ 419
11.12 聚类 ···················································· 420
11.12.1 K-means ······························· 420
11.12.2 高斯混合 ······························ 422
11.12.3 快速迭代聚类 ······················ 422
11.12.4 latent Dirichlet allocation ····· 422
11.12.5 流式 K-means ······················ 423
11.13 维数减缩 ··········································· 424
11.13.1 奇异值分解 ·························· 424
11.13.2 主成分分析 ·························· 425XIII
11.14 特征提取与转型 ······························ 425
11.14.1 术语频率反转 ······················ 425
11.14.2 单词向量转换 ······················ 426
11.14.3 标准尺度 ······························ 427
11.14.4 正规化尺度 ·························· 428
11.14.5 卡方特征选择器 ·················· 428
11.14.6 Hadamard积 ························ 429
11.15 频繁模式挖掘 ··································· 429
11.16 预言模型标记语言 ·························· 430
11.17 管道 ···················································· 431
11.17.1 管道工作原理 ······················ 432
11.17.2 管道API介绍 ····················· 433
11.17.3 交叉验证 ······························ 435
11.18 小结 ···················································· 436
附录 A Utils ··················································· 437
附录 B Akka ·················································· 446
附录 C Jetty ··················································· 450
附录 D Metrics ············································· 453
附录 E Hadoop word count ···················· 456
附录 F CommandUtils ······························ 458
附录 G Netty ················································· 461
附录 H 源码编译错误································ 465准 备 篇
■ 第1章 环境准备
■ 第2章 Spark设计理念与基本架构第1章
环 境 准 备
凡事豫则立,不豫则废;言前定,则不跲;事前定,则不困。
—《礼记·中庸》
本章导读
在深入了解一个系统的原理、实现细节之前,应当先准备好它的源码编译环境、运行环
境。如果能在实际环境安装和运行 Spark,显然能够提升读者对于 Spark的一些感受,对系统
能有个大体的印象,有经验的技术人员甚至能够猜出一些Spark采用的编程模型、部署模式
等。当你通过一些途径知道了系统的原理之后,难道不会问问自己:“这是怎么做到的?”如
果只是游走于系统使用、原理了解的层面,是永远不可能真正理解整个系统的。很多IDE本
身带有调试的功能,每当你阅读源码,陷入重围时,调试能让我们更加理解运行期的系统。
如果没有调试功能,不敢想象阅读源码会怎样困难。
本章的主要目的是帮助读者构建源码学习环境,主要包括以下内容:
T 在 Windows 环境下搭建源码阅读环境;
T 在 Linux 环境下搭建基本的执行环境;
T Spark 的基本使用,如 spark-shell。
1.1 运行环境准备
考虑到大部分公司的开发和生成环境都采用Linux 操作系统,所以笔者选用了64 位的
Linux。在正式安装 Spark之前,先要找台好机器。为什么?因为笔者在安装、编译、调试的
过程中发现 Spark非常耗费内存,如果机器配置太低,恐怕会跑不起来。Spark 的开发语言是
Chapter 1第 1 章 环 境 准 备 3
Scala,而Scala需要运行在JVM之上,因而搭建Spark 的运行环境应该包括JDK和 Scala。
1.1.1 安装 JDK
使用命令 getconf LONG_BIT 查看Linux 机器是32 位还是64 位,然后下载相应版本的
JDK 并安装。
下载地址:
http:www.oracle.comtechnetworkjavajavasedownloadsindex.html
配置环境:
cd ~
vim .bash_pro?le
添加如下配置:
export JAVA_HOME=optjava
export PATH=PATH:JAVA_HOMEbin
export CLASSPATH=.:JAVA_HOMElibdt.jar:JAVA_HOMElibtools.jar
由于笔者的机器上已经安装过 openjdk,所以未使用以上方式,openjdk的安装命令如下:
su -c yum install java-1.7.0-openjdk
安装完毕后,使用 java –version命令查看,确认安装正常,如图 1-1所示。
图1-1 查看安装是否正常
1.1.2 安装 Scala
下载地址:http:www.scala-lang.orgdownload
选择最新的 Scala 版本下载,下载方法如下:
wget http:downloads.typesafe.comscala2.11.5scala-2.11.5.tgz
移动到选好的安装目录,例如:
mv scala-2.11.5.tgz ~install
进入安装目录,执行以下命令:
chmod 755 scala-2.11.5.tgz
tar -xzvf scala-2.11.5.tgz
配置环境:
cd ~
vim .bash_pro?le
添加如下配置:4 准 备 篇
export SCALA_HOME=HOMEinstallscala-2.11.5
export PATH=PATH:SCALA_HOMEbin:HOMEbin
安装完毕后输入 scala,进入 scala命令行说明 scala 安装正确,如图 1-2所示。
图 1-2 进入scala 命令行
1.1.3 安装 Spark
下载地址:http:spark.apache.orgdownloads.html
选择最新的 Spark版本下载,下载方法如下:
wget http:archive.apache.orgdistsparkspark-1.2.0spark-1.2.0-bin-hadoop1.tgz
移动到选好的安装目录,如:
mv spark-1.2.0-bin-hadoop1.tgz~install
进入安装目录,执行以下命令:
chmod 755 spark-1.2.0-bin-hadoop1.tgz
tar -xzvf spark-1.2.0-bin-hadoop1.tgz
配置环境:
cd ~
vim .bash_pro?le
添加如下配置:
export SPARK_HOME=HOMEinstallspark-1.2.0-bin-hadoop1
1.2 Spark初体验
本节通过Spark的基本使用,让读者对Spark能有初步的认识,便于引导读者逐步深入学习。
1.2.1 运行 spark-shell
要运行spark-shell,需要先对 Spark 进行配置。
1)进入Spark的 conf 文件夹:
cd ~installspark-1.2.0-bin-hadoop1conf
2)复制一份 spark-env.sh.template,命名为spark-env.sh,对它进行编辑,命令如下:
cp spark-env.sh.template spark-env.sh
vim spark-env.sh第 1 章 环 境 准 备 5
3)添加如下配置:
export SPARK_MASTER_IP=127.0.0.1
export SPARK_LOCAL_IP=127.0.0.1
4)启动 spark-shell:
cd ~installspark-1.2.0-bin-hadoop1bin
.spark-shell
最后我们会看到 spark 启动的过程,如图1-3 所示。
图1-3 Spark启动过程
从以上启动日志中我们可以看到 SparkEnv、MapOutputTracker、BlockManagerMaster、DiskBlockManager、MemoryStore、HttpFileServer、SparkUI等信息。它们是做什么的?此处
望文生义即可,具体内容将在后边的章节详细讲解。
1.2.2 执行 word count
这一节,我们通过word count这个耳熟能详的例子来感受下Spark 任务的执行过程。启
动 spark-shell 后,会打开 scala 命令行,然后按照以下步骤输入脚本。
1)输入 val lines = sc.textFile(..README.md, 2),执行结果如图1-4所示。
图1-4 步骤 1 执行结果6 准 备 篇
2)输入val words = lines.?atMap(line => line.split( )),执行结果如图 1-5 所示。
图 1-5 步骤2执行结果
3)输入val ones = words.map(w => (w,1)),执行结果如图 1-6 所示。
图 1-6 步骤3执行结果
4)输入val counts = ones.reduceByKey(_ + _),执行结果如图1-7 所示。
图 1-7 步骤4执行结果
5)输入counts.foreach(println),任务执行过程如图1-8和图1-9 所示。输出结果如图1-10
所示。1
图1-8 步骤 5 执行过程部分(一)
图1-9 步骤 5 执行过程部分(二)
因截图时,一屏放不下,故分为两图。第 1 章 环 境 准 备 7
图1-10 步骤 5 输出结果
在这些输出日志中,我们先是看到 Spark 中任务的提交与执行过程,然后看到单词计数
的输出结果,最后打印一些任务结束的日志信息。有关任务的执行分析,笔者将在第 5 章中
展开。
1.2.3 剖析 spark-shell
通过 word count在 spark-shell 中执行的过程,我们想看看spark-shell 做了什么。spark-
shell中有以下一段脚本,见代码清单1-1。
代码清单1-1 spark-shell中的一段脚本
function main {
if cygwin; then
stty -icanonmin 1 -echo > devnull 2>1
export SPARK_SUBMIT_OPTS=SPARK_SUBMIT_OPTS -Djline.terminal=unix
FW DIRbinspark-submit --class org.apache.spark.repl.Main {SUBMISSION_
OPTS[@]} spark-shell {APPLICATION_OPTS[@]}
sttyicanon echo > devnull 2>1
else
export SPARK_SUBMIT_OPTS
FW DIRbinspark-submit --class org.apache.spark.repl.Main {SUBMISSION_
OPTS[@]} spark-shell {APPLICATION_OPTS[@]}
}
我们看到脚本 spark-shell 里执行了 spark-submit脚本,打开 spark-submit 脚本,发现其中
包含以下脚本。8 准 备 篇
exec SPARK_HOMEbinspark-class org.apache.spark.deploy.SparkSubmit {ORIG_
ARGS[@]}
脚本spark-submit 在执行 spark-class脚本时,给它增加了参数 SparkSubmit。打开 spark-
class 脚本,其中包含以下脚本,见代码清单1-2。
代码清单1-2 spark-class
if [ -n {JAVA_HOME} ]; then
RUNNER={JAVA_HOME}binjava
else
if [ `command -v java` ]; then
RUNNER=java
else
echo JAVA_HOME is not set >2
exit 1
exec RUNNER -cp CLASSPATH JAVA_OPTS @
读到这里,应该知道 Spark启动了以 SparkSubmit 为主类的jvm 进程。
为便于在本地对Spark 进程使用远程监控,给spark-class脚本追加以下 jmx 配置:
JAVA _OPTS=-XX:MaxPermSize=128m OUR_JAVA_OPTS -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.
authenticate=false -Dcom.sun.management.jmxremote.ssl=false
在本地打开 jvisualvm,添加远程主机,如图1-11所示。
右击已添加的远程主机,添加 JMX连接,如图1-12所示。
图 1-11 添加远程主机 图1-12 添加JMX连接
单击右侧的“线程”选项卡,选择 main线程,然后单击“线程 Dump”按钮,如图 1-13
所示。
从 dump 的内容中找到线程 main 的信息,如代码清单 1-3所示。第 1 章 环 境 准 备 9
图1-13 查看 Spark线程
代码清单1-3 main线程dump信息
main - Thread t@1
java.lang.Thread.State: RUNNABLE
at java.io.FileInputStream.read0(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:210)
at scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)
at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)
at scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.
java:933)
at scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)
at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)
at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)
at org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.
scala:80)
at scala.tools.nsc.interpreter.InteractiveReaderclass.readLine(Interactive-
Reader.scala:43)
at org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25)
at org.apache.spark.repl.SparkILoop.readOneLine1(SparkILoop.scala:619)
at org.apache.spark.repl.SparkILoop.innerLoop1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at org.apache.spark.repl.SparkILoopanonfunprocess1.applymcZsp
(SparkI-Loop.scala:968)
at org.apache.spark.repl.SparkILoopanonfunprocess1.apply(SparkILoop.
scala:916)
at org.apache.spark.repl.SparkILoopanonfunprocess1.apply(SparkILoop.
scala:916)
at scala.tools.nsc.util.ScalaClassLoader.savingContextLoader(ScalaClass
Loader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)10 准 备 篇
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.re?ect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.re?ect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:57)
at sun.re?ect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces-
sorImpl.java:43)
at java.lang.re?ect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
从main线程的栈信息中可看出程序的调用顺序:SparkSubmit.main→repl.Main →SparkI-
Loop.process。SparkILoop.process方法中会调用initializeSpark方法,initializeSpark的实现见
代码清单1-4。
代码清单1-4 initializeSpark的实现
def initializeSpark {
intp.beQuietDuring {
command(
@transient val sc = {
val _sc = org.apache.spark.repl.Main.interp.createSparkContext
println(Spark context available as sc.)
_sc
}
)
command(import org.apache.spark.SparkContext._)
}
}
我们看到 initializeSpark调用了 createSparkContext方法,createSparkContext的实现见代
码清单1-5。
代码清单1-5 createSparkContext的实现
def createSparkContext: SparkContext = {
valexecUri = System.getenv(SPARK_EXECUTOR_URI)
valjars = SparkILoop.getAddedJars
valconf = new SparkConf
.setMaster(getMaster)
.setAppName(Spark shell)
.setJars(jars)
.set(spark.repl.class.uri, intp.classServer.uri)
if (execUri != null) {
conf.set(spark.executor.uri, execUri)
}
sparkContext = new SparkContext(conf)
logInfo(Created spark context..)
sparkContext
}第 1 章 环 境 准 备 11
这里最终使用SparkConf 和SparkContext来完成初始化,具体内容将在第3章讲解。代
码分析中涉及的repl 主要用于与 Spark 实时交互。
1.3 阅读环境准备
准备 Spark阅读环境,同样需要一台好机器。笔者调试源码的机器的内存是 8 GB。源
码阅读的前提是在IDE 环境中打包、编译通过。常用的IDE有IntelliJ IDEA、Eclipse。笔者
选择用Eclipse编译 Spark,原因有二:一是由于使用多年对它比较熟悉,二是社区中使用
Eclipse编译Spark的资料太少,在这里可以做个补充。在 Windows系统编译 Spark源码,除
了安装JDK 外,还需要安装以下工具。
(1)安装Scala
由于 Spark 1.20版本的 sbt 里指定的 Scala 版本是 2.10.4,具体见Spark 源码目录下的文件
\project\plugins.sbt,其中有一行:scalaVersion := 2.10.4。所以选择下载 scala-2.10.4.msi,下
载地址:http:www.scala-lang.orgdownload。
下载完毕,安装 scala-2.10.4.msi。
(2)安装 SBT
由于 Scala 使用SBT作为构建工具,所以需要下载 SBT。下载地址:http:www.scala-sbt.
org,下载最新的安装包 sbt-0.13.8.msi 并安装。
(3)安装 Git Bash
由于 Spark源码使用Git 作为版本控制工具,所以需要下载Git的客户端工具,推荐使用
Git Bash,因为它更符合 Linux 下的操作习惯。下载地址:http:msysgit.github.io,下载最新
的版本并安装。
(4)安装Eclipse Scala IDE插件
Eclipse通过强大的插件方式支持各种 IDE 工具的集成,要在Eclipse中编译、调试、运
行Scala程序,就需要安装Eclipse Scala IDE 插件。下载地址:http:scala-ide.orgdownload
current.html。
由于笔者本地的Eclipse版本是Eclipse 4.4 (Luna),所以选择安装插件http:download.
scala-ide.orgsdklithiume44scala211stablesite,如图 1-14所示。
图 1-14 Eclipse Scala IDE插件安装地址
在 Eclipse中选择 Help 菜单,然后选择 Install New Software…选项,打开Install 对话框,如图1-15 所示。12 准 备 篇
图 1-15 Install对话框
单击Add按钮,打开 Add Repository对话框,输入插件地址,如图 1-16 所示。
图 1-16 添加 Scala IDE插件地址
全选插件的内容,完成安装,如图 1-17所示。
图 1-17 安装 Scala IDE插件第 1 章 环 境 准 备 13
1.4 Spark 源码编译与调试
1. 下载 Spark 源码
首先,访问 Spark 官网 http:spark.apache.org,如图1-18所示。
图1-18 Spark官网
单击 Download Spark按钮,在下一个页面找到 git 地址,如图 1-19 所示。
图1-19 Spark官方git 地址
打开 Git Bash工具,输入git clone git:github.comapachespark.git命令将源码下载到本
地,如图1-20 所示。
图1-20 下载 Spark源码
2. 构建 Scala 应用
使用 cmd 命令行进到 Spark 根目录,执行 sbt 命令。会下载和解析很多 jar 包,要等很长
时间,笔者大概花了一个多小时才执行完。14 准 备 篇
3. 使用 sbt生成 Eclipse 工程文件
等sbt提示符(>)出现后,输入Eclipse命令,开始生成Eclipse工程文件,也需要花费
很长时间,笔者本地大致花了40 分钟。完成时的状况如图1-21 所示。
图1-21 sbt编译过程
现在我们查看 Spark 下的子文件夹,发现其中都生成了 .project和 .classpath 文件。比如
mllib项目下就生成了.project和.classpath文件,如图1-22 所示。
图1-22 sbt生成的项目文件
4. 编译 Spark 源码
由于Spark 使用 Maven作为项目管理工具,所以需要将 Spark 项目作为 Maven项目导入
Eclipse中,如图1-23 所示。
单击 Next按钮进入下一个对话框,如图1-24 所示。第 1 章 环 境 准 备 15
图 1-23 导入Maven 项目
全选所有项目,单击 Finish按钮,这样就完成了导入,如图 1-25所示。
图1-24 选择 Maven 项目 图1-25 导入完成的项目16 准 备 篇
导入完成后,需要设置每个子项目的build path。右击每个项目,选择“Build Path”→
“Con?gure Build Path…”,打开 Java Build Path界面,如图 1-26所示。
图 1-26 Java编译目录
单击Add External JARs 按钮,将 Spark 项目下的lib_managed 文件夹的子文件夹 bundles
和jars 内的jar 包添加进来。
lib_managedjars文件夹下有很多打好的 spark的包,比如:spark-catalyst_2.10-1.3.2-
SNAPSHOT.jar。这些jar包有可能与你下载的Spark源码的版本不一致,导致你在调
试源码时,发生 jar 包冲突。所以请将它们排除出去。
Eclipse在对项目编译时,笔者本地出现了很多错误,有关这些错误的解决建议参见附录
H。所有错误解决后运行mvn clean install,如图 1-27 所示。
5. 调试 Spark 源码
以 Spark 源码自带的 JavaWordCount为例,介绍如何调试 Spark 源码。右击 JavaWord-
Count.java,选择“ Debug As”→“ Java Application”即可。如果想修改配置参数,右击
JavaWordCount.java,选择“Debug As”→“ Debug Con?gurations…”,从打开的对话框中选
择JavaWordCount,在右侧标签可以修改Java 执行参数、JRE、classpath、环境变量等配置,如图1-28 所示。
读者也可以在 Spark 源码中设置断点,进行跟踪调试。
注 意第 1 章 环 境 准 备 17
图1-27 编译成功
图1-28 源码调试
1.5 小结
本章通过引导大家在Linux 操作系统下搭建基本的执行环境,并且介绍spark-shell 等
脚本的执行,来帮助读者由浅入深地进行 Spark 源码的学习。由于目前多数开发工作都在
Windows系统下进行,并且 Eclipse有最广大的用户群,即便是一些开始使用 IntelliJ的用户对
Eclipse也不陌生,所以在Windows 环境下搭建源码阅读环境时,选择这些最常用的工具,能
降低读者的学习门槛,并且替大家节省时间。第2章
Spark 设计理念与基本架构
若夫乘天地之正,而御六气之辩,以游无穷者,彼且恶乎待哉?
—《庄子·逍遥游》
本章导读
上一章,介绍了Spark 环境的搭建,为方便读者学习Spark 做好准备。本章首先从Spark
产生的背景开始,介绍 Spark 的主要特点、基本概念、版本变迁。然后简要说明 Spark 的主要
模块和编程模型。最后从 Spark 的设计理念和基本架构入手,使读者能够对 Spark 有宏观的认
识,为之后的内容做一些准备工作。
Spark是一个通用的并行计算框架,由加州伯克利大学(UCBerkeley)的 AMP 实验室开
发于2009 年,并于2010 年开源,2013 年成长为Apache 旗下大数据领域最活跃的开源项目之
一。Spark 也是基于 map reduce 算法模式实现的分布式计算框架,拥有 Hadoop MapReduce 所
具有的优点,并且解决了Hadoop MapReduce中的诸多缺陷。
2.1 初识 Spark
2.1.1 Hadoop MRv1 的局限
Hadoop1.0版本采用的是MRv1版本的MapReduce编程模型。MRv1版本的实现都封装在
org.apache.hadoop.mapred包中, MRv1的Map和Reduce是通过接口实现的。MRv1包括三个部分:
T 运行时环境(JobTracker和 TaskTracker);
T 编程模型(MapReduce);
Chapter 2第 2 章 Spark 设计理念与基本架构 19
T 数据处理引擎(Map 任务和Reduce 任 务)。
MRv1存在以下不足:
T 可扩展性差:在运行时,JobTracker 既负责资源管理又负责任务调度,当集群繁忙时,JobTracker很容易成为瓶颈,最终导致它的可扩展性问题。
T 可用性差:采用了单节点的 Master,没有备用 Master 及选举操作,这导致一旦 Master
出现故障,整个集群将不可用。
T 资源利用率低:TaskTracker 使用 slot 等量划分本节点上的资源量。slot 代表计算资源
(CPU、内存等)。一个 Task 获取到一个 slot 后才有机会运行,Hadoop 调度器负责将
各个 TaskTracker 上的空闲slot 分配给Task 使用。一些Task并不能充分利用slot,而
其他Task 也无法使用这些空闲的资源。slot 分为 Map slot 和Reduce slot 两种,分别
供 MapTask 和 Reduce Task 使用。有时会因为作业刚刚启动等原因导致MapTask 很
多,而Reduce Task 任务还没有调度的情况,这时Reduce slot也会被闲置。
T 不能支持多种 MapReduce 框架:无法通过可插拔方式将自身的MapReduce框架替换
为其他实现,如 Spark、Storm 等。
MRv1的示意如图 2-1所示。
Apache 为了解决以上问题,对Hadoop进行升级改造,MRv2 最终诞生了。MRv2重
用了 MRv1中的编程模型和数据处理引擎,但是运行时环境被重构了。JobTracker被拆
分成了通用的资源调度平台(ResourceManager,RM)和负责各个计算框架的任务调度模
型(ApplicationMaster,AM)。MRv2 中 MapReduce 的核心不再是MapReduce框架,而是
YARN。在以 YARN 为核心的MRv2 中,MapReduce 框架是可插拔的,完全可以替换为其他
MapReduce 实现,比如 Spark、Storm等。MRv2的示意如图2-2所示。1
图2-1 MRv1示意图 图 2-2 MRv2示意图
Hadoop MRv2 虽然解决了MRv1 中的一些问题,但是由于对HDFS 的频繁操作(包括计
算结果持久化、数据备份及 shuf?e等)导致磁盘 IO成为系统性能的瓶颈,因此只适用于离
图2-1和图2-2 都来源自http:blog.chinaunix.netuid-28311809-ud-4383551.html。20 准 备 篇
线数据处理,而不能提供实时数据处理能力。
2.1.2 Spark 使用场景
Hadoop常用于解决高吞吐、批量处理的业务场景,例如离线计算结果用于浏览量统计。
如果需要实时查看浏览量统计信息,Hadoop 显然不符合这样的要求。Spark 通过内存计算能
力极大地提高了大数据处理速度,满足了以上场景的需要。此外,Spark 还支持 SQL 查询、流式计算、图计算、机器学习等。通过对 Java、Python、Scala、R等语言的支持,极大地方
便了用户的使用。
2.1.3 Spark 的特点
Spark看到MRv1的问题,对 MapReduce 做了大量优化,总结如下:
T 快速处理能力。随着实时大数据应用越来越多,Hadoop作为离线的高吞吐、低响应
框架已不能满足这类需求。Hadoop MapReduce的Job将中间输出和结果存储在HDFS
中,读写HDFS造成磁盘IO成为瓶颈。Spark允许将中间输出和结果存储在内存中,避免了大量的磁盘IO。同时Spark自身的DAG执行引擎也支持数据在内存中的计算。
Spark官网声称性能比Hadoop快100倍,如图2-3所示。即便是内存不足,需要磁盘
IO,其速度也是Hadoop的10倍以上。
T 易于使用。Spark现在支持Java、Scala、Python和 R等
语言编写应用程序,大大降低了使用者的门槛。自带了
80多个高等级操作符,允许在 Scala、Python、R的 shell
中进行交互式查询。
T 支持查询。Spark 支持SQL及 Hive SQL 对数据查询。
T 支持流式计算。与MapReduce只能处理离线数据相比,Spark 还支持实时的流计算。Spark依赖 Spark Streaming
对数据进行实时的处理,其流式处理能力还要强于Storm。
T 可用性高。Spark 自身实现了 Standalone部署模式,此模式下的 Master可以有多
个,解决了单点故障问题。此模式完全可以使用其他集群管理器替换,比如YARN、Mesos、EC2等。
T 丰富的数据源支持。Spark 除了可以访问操作系统自身的文件系统和HDFS,还可以
访问Cassandra、HBase、Hive、Tachyon以及任何Hadoop 的数据源。这极大地方便
了已经使用HDFS、Hbase的用户顺利迁移到 Spark。
2.2 Spark基础知识
1. 版本变迁
经过4年多的发展,Spark目前的版本是 1.4.1。我们简单看看它的版本发展过程。
图2-3 Hadoop与Spark执行
逻辑回归时间比较第 2 章 Spark 设计理念与基本架构 21
1)Spark 诞生于 UCBerkeley的AMP实验室(2009)。
2)Spark 正式对外开源(2010年)。
3)Spark 0.6.0 版本发布(2012-10-15),进行了大范围的性能改进,增加了一些新特性,并对 Standalone部署模式进行了简化。
4)Spark 0.6.2 版本发布(2013-02-07),解决了一些 bug,并增强了系统的可用性。
5)Spark 0.7.0 版本发布(2013-02-27),增加了更多关键特性,例如,Python API、Spark
Streaming的 alpha 版本等。
6)Spark 0.7.2版本发布(2013-06-02),性能改进并解决了一些bug,新增API使用的例子。
7)Spark 接受进入 Apache 孵化器(2013-06-21)。
8)Spark 0.7.3 版本发布(2013-07-16),解决一些 bug,更新 Spark Streaming API 等。
9)Spark 0.8.0 版本发布(2013-09-25),一些新功能及可用性改进。
10)Spark 0.8.1 版本发布(2013-12-19),支持Scala 2.9、YARN 2.2、Standalone部署模
式下调度的高可用性、shuf?e的优化等。
11)Spark 0.9.0 版本发布(2014-02-02),增加了GraphX,机器学习新特性,流式计算新
特性,核心引擎优化(外部聚合、加强对YARN 的支持)等。
12)Spark 0.9.1 版本发布(2014-04-09),增强使用 YARN 的稳定性,改进 Scala 和
Python API的奇偶性。
13)Spark 1.0.0 版本发布(2014-05-30),Spark SQL、MLlib、GraphX 和 Spark Streaming
都增加了新特性并进行了优化。Spark核心引擎还增加了对安全 YARN 集群的支持。
14)Spark 1.0.1 版本发布(2014-07-11),增加了 Spark SQL 的新特性和对 JSON 数据的
支持等。
15)Spark 1.0.2 版本发布(2014-08-05),Spark核心 API 及 Streaming、Python、MLlib 的
bug 修复。
16)Spark 1.1.0 版本发布(2014-09-11)。
17)Spark 1.1.1版本发布(2014-11-26),Spark核心API及Streaming、Python、SQL、GraphX
和 MLlib的 bug 修复。
18)Spark 1.2.0 版本发布(2014-12-18)。
19)Spark 1.2.1版本发布(2015-02-09),Spark核心API及Streaming、Python、SQL、GraphX
和 MLlib的 bug 修复。
20)Spark 1.3.0 版本发布(2015-03-13)。
21)Spark 1.4.0 版本发布(2015-06-11)。
22)Spark 1.4.1 版本发布(2015-07-15),DataFrame API及 Streaming、Python、SQL 和
MLlib的 bug 修复。
2. 基本概念
要想对 Spark 有整体性的了解,推荐读者阅读 Matei Zaharia 的Spark 论文。此处笔者先介22 准 备 篇
绍Spark中的一些概念:
T RDD(resillient distributed dataset):弹性分布式数据集。
T Task :具体执行任务。Task 分为 Shuf?eMapTask 和 ResultTask 两种。Shuf?eMapTask
和 ResultTask 分别类似于 Hadoop 中的 Map 和 Reduce。
T Job:用户提交的作业。一个 Job可能由一到多个Task 组成。
T Stage:Job分成的阶段。一个Job 可能被划分为一到多个Stage。
T Partition:数据分区。即一个RDD 的数据可以划分为多少个分区。
T NarrowDependency:窄依赖,即子RDD依赖于父RDD 中固定的Partition。Narrow-
Dependency分为OneToOneDependency和RangeDependency两种。
T Shuf?eDependency:shuf?e 依赖,也称为宽依赖,即子 RDD 对父RDD 中的所有
Partition都有依赖。
T DAG(directed acycle graph):有向无环图。用于反映各 RDD 之间的依赖关系。
3. Scala 与 Java 的比较
Spark 为什么要选择 Java 作为开发语言?笔者不得而知。如果能对二者进行比较,也许
能看出一些端倪。表2-1 列出了 Scala 与 Java 的比较。
表 2-1 Scala 与Java的比较
比项项 Scala Java
语言类型 面向函数为主,兼有面向对象 面向对象(Java8 也增加了lambda函数编程)
简洁性 非常简洁 不简洁
类型推断
丰富的类型推断,例如深度和链式的类型推断、duck type、隐式类型转换等,但也因此增加了编译
时长
少量的类型推断
可读性 一般,丰富的语法糖导致的各种奇幻用法,例如
方法签名
好
学习成本 较高 一般
语言特性 非常丰富的语法糖和更现代的语言特性,例如
Option、模式匹配、使用空格的方法调用
丰富
并发编程 使用Actor 的消息模型 使用阻塞、锁、阻塞队列等
通过以上比较似乎仍然无法判断Spark选择 Java作为开发语言的原因。由于函数式编程
更接近计算机思维,因此便于通过算法从大数据中建模,这应该更符合Spark 作为大数据框
架的理念吧!
2.3 Spark 基本设计思想
2.3.1 Spark 模块设计
整个Spark主要由以下模块组成:第 2 章 Spark 设计理念与基本架构 23
T Spark Core:Spark的核心功能实现,包括:SparkContext的初始化(Driver Application
通过 SparkContext提交)、部署模式、存储体系、任务提交与执行、计算引擎等。
T Spark SQL:提供 SQL 处理能力,便于熟悉关系型数据库操作的工程师进行交互查询。
此外,还为熟悉 Hadoop 的用户提供 Hive SQL处理能力。
T Spark Streaming:提供流式计算处理能力,目前支持Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis 和简单的TCP套接字等数据源。此外,还提供窗口操作。
T GraphX:提供图计算处理能力,支持分布式,Pregel提供的 API 可以解决图计算中的
常见问题。
T MLlib:提供机器学习相关的统计、分类、回归等领域的多种算法实现。其一致的
API 接口大大降低了用户的学习成本。
Spark SQL、Spark Streaming、GraphX、MLlib的能
力都是建立在核心引擎之上,如图 2-4所示。
1. Spark 核心功能
Spark Core 提供 Spark 最基础与最核心的功能,主
要包括以下功能。
T SparkContext:通常而言,Driver Application的执行与输出都是通过 SparkContext来完
成的,在正式提交Application之前,首先需要初始化SparkContext。SparkContext隐
藏了网络通信、分布式部署、消息通信、存储能力、计算能力、缓存、测量系统、文
件服务、Web 服务等内容,应用程序开发者只需要使用 SparkContext 提供的 API完成
功能开发。SparkContext内置的DAGScheduler 负责创建Job,将 DAG 中的RDD 划
分到不同的 Stage,提交Stage 等功能。内置的 TaskScheduler 负责资源的申请、任务
的提交及请求集群对任务的调度等工作。
T 存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用
磁盘,这极大地减少了磁盘IO,提升了任务执行的效率,使得Spark适用于实时计
算、流式计算等场景。此外,Spark还提供了以内存为中心的高容错的分布式文件系统
Tachyon供用户进行选择。Tachyon能够为Spark提供可靠的内存级的文件共享服务。
T 计算引擎:计算引擎由SparkContext中的 DAGScheduler、RDD 以及具体节点上
的 Executor负责执行的 Map 和Reduce任务组成。DAGScheduler 和 RDD 虽然位于
SparkContext内部,但是在任务正式提交与执行之前会将 Job中的 RDD组织成有向
无关图(简称 DAG),并对 Stage 进行划分,决定了任务执行阶段任务的数量、迭代
计算、shuf?e 等过程。
T 部署模式:由于单节点不足以提供足够的存储及计算能力,所以作为大数据处理的
Spark 在 SparkContext的TaskScheduler 组件中提供了对Standalone部署模式的实现和
Yarn、Mesos 等分布式资源管理系统的支持。通过使用 Standalone、Yarn、Mesos等部
署模式为Task 分配计算资源,提高任务的并发执行效率。除了可用于实际生产环境
图 2-4 Spark各模块依赖关系24 准 备 篇
的 Standalone、Yarn、Mesos 等部署模式外,Spark还提供了 Local模式和 local-cluster
模式便于开发和调试。
2. Spark 扩展功能
为了扩大应用范围,Spark 陆续增加了一些扩展功能,主要包括:
T Spark SQL :SQL 具有普及率高、学习成本低等特点,为了扩大 Spark的应用面,增
加了对SQL及 Hive的支持。Spark SQL 的过程可以总结为:首先使用SQL语句解析
器(SqlParser)将 SQL 转换为语法树(Tree),并且使用规则执行器(RuleExecutor)将
一系列规则(Rule)应用到语法树,最终生成物理执行计划并执行。其中,规则执行
器包括语法分析器(Analyzer)和优化器(Optimizer)。Hive的执行过程与SQL 类似。
T Spark Streaming:Spark Streaming 与Apache Storm类似,也用于流式计算。Spark
Streaming支持 Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis和简单的 TCP套
接字等多种数据输入源。输入流接收器(Receiver)负责接入数据,是接入数据流的
接口规范。Dstream 是Spark Streaming中所有数据流的抽象,Dstream 可以被组织为
DStream Graph。Dstream本质上由一系列连续的RDD组成。
T GraphX:Spark提供的分布式图计算框架。GraphX主要遵循整体同步并行(bulk
synchronous parallell,BSP)计算模式下的Pregel模型实现。GraphX提供了对图的抽象
Graph,Graph由顶点(Vertex)、边(Edge)及继承了Edge的EdgeTriplet(添加了srcAttr
和dstAttr用来保存源顶点和目的顶点的属性)三种结构组成。GraphX目前已经封装了
最短路径、网页排名、连接组件、三角关系统计等算法的实现,用户可以选择使用。
T MLlib :Spark 提供的机器学习框架。机器学习是一门涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多领域的交叉学科。MLlib目前已经提供了基础统计、分
类、回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩
减、特征提取与转型、频繁模式挖掘、预言模型标记语言、管道等多种数理统计、概
率论、数据挖掘方面的数学算法。
2.3.2 Spark 模型设计
1. Spark 编程模型
Spark 应用程序从编写到提交、执行、输出的整个过程如图2-5所示,图中描述的步骤如下。
1)用户使用 SparkContext提供的 API(常用的有 textFile、sequenceFile、runJob、stop 等)
编写 Driver application程序。此外SQLContext、HiveContext及StreamingContext对 Spark-
Context进行封装,并提供了SQL、Hive 及流式计算相关的 API。
2)使用SparkContext提交的用户应用程序,首先会使用BlockManager和 Broadcast-
Manager将任务的Hadoop 配置进行广播。然后由DAGScheduler 将任务转换为RDD 并组织成
DAG,DAG 还将被划分为不同的Stage。最后由TaskScheduler 借助 ActorSystem 将任务提交
给集群管理器(Cluster Manager)。第 2 章 Spark 设计理念与基本架构 25
3)集群管理器(Cluster Manager)给任务分配资源,即将具体任务分配到Worker上,Worker创建 Executor来处理任务的运行。Standalone、YARN、Mesos、EC2 等都可以作为
Spark 的集群管理器。
2. RDD计算模型
RDD 可以看做是对各种数据计算模型的统一抽象,Spark 的计算过程主要是RDD 的迭代
计算过程,如图2-6 所示。RDD 的迭代计算过程非常类似于管道。分区数量取决于partition
数量的设定,每个分区的数据只会在一个 Task 中计算。所有分区可以在多个机器节点的
Executor上并行执行。
图2-5 代码执行过程
图2-6 RDD 计算模型
2.4 Spark 基本架构
从集群部署的角度来看,Spark 集群由以下部分组成:
T Cluster Manager:Spark 的集群管理器,主要负责资源的分配与管理。集群管理器分配
的资源属于一级分配,它将各个Worker上的内存、CPU 等资源分配给应用程序,但
是并不负责对 Executor的资源分配。目前,Standalone、YARN、Mesos、EC2 等都可
以作为 Spark 的集群管理器。26 准 备 篇
T Worker :Spark 的工作节点。对Spark 应用程序来说,由集群管理器分配得到资
源的 Worker节点主要负责以下工作:创建Executor,将资源和任务进一步分配给
Executor,同步资源信息给Cluster Manager。
T Executor:执行计算任务的一线进程。主要负责任务的执行以及与 Worker、Driver
App 的信息同步。
T Driver App :客户端驱动程序,也可以理解为客户端应用程序,用于将任务程序转换
为 RDD 和 DAG,并与 Cluster Manager进行通信与调度。
这些组成部分之间的整体关系如图2-7 所示。
图 2-7 Spark 基本架构图
2.5 小结
每项技术的诞生都会由某种社会需求所驱动, Spark 正是在实时计算的大量需求下诞生的。
Spark借助其优秀的处理能力、可用性高、丰富的数据源支持等特点,在当前大数据领域变得
火热,参与的开发者也越来越多。Spark经过几年的迭代发展,如今已经提供了丰富的功能。
笔者相信,Spark在未来必将产生更耀眼的火花。核心设计篇
■ 第3章 SparkContext 的初始化
■ 第4章 存储体系
■ 第5章 任务提交与执行
■ 第6章 计算引擎
■ 第7章 部署模式第3章
SparkContext 的初始化
道生一, 一生二, 二生三, 三生万物。
—《道德经》
本章导读
SparkContext 的初始化是Driver 应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark 的读者理解源码。读者朋友如果能边
跟踪代码,边学习本章内容,也许是快速理解 SparkContext初始化过程的便捷途径。已经熟
练使用Spark 的开发人员可以选择跳过本章内容。
本章将在介绍 SparkContext初始化过程的同时,向读者介绍各个组件的作用,为阅读后
面的章节打好基础。 Spark 中的组件很多,就其功能而言涉及网络通信、分布式、消息、存储、计算、缓存、测量、清理、文件服务、Web UI 的方方面面。
3.1 SparkContext概述
Spark Driver用于提交用户应用程序,实际可以看作 Spark的客户端。了解 Spark Driver
的初始化,有助于读者理解用户应用程序在客户端的处理过程。
Spark Driver 的初始化始终围绕着SparkContext 的初始化。SparkContext 可以算得上是所
有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动。SparkContext 初始
化完毕,才能向Spark 集群提交任务。在平坦的公路上,发动机只需以较低的转速、较低的
功率就可以游刃有余;在山区,你可能需要一台能够提供大功率的发动机才能满足你的需求。
这些参数都是通过驾驶员操作油门、档位等传送给发动机的,而 SparkContext 的配置参数则
Chapter 3第 3 章 SparkContext 的初始化 29
由SparkConf 负责,SparkConf就是你的操作面板。
SparkConf 的构造很简单,主要是通过 ConcurrentHashMap 来维护各种 Spark 的配置属
性。SparkConf代码结构见代码清单3-1。Spark 的配置属性都是以“spark.”开头的字符串。
代码清单3-1 SparkConf代码结构
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._
def this = this(true)
private val settings = new ConcurrentHashMap[String, String]
if (loadDefaults) {
加载任何以spark.开头的系统属性
for ((key, value) <- Utils.getSystemProperties if key.startsWith(spark.)) {
set(key, value)
}
}
其余代码省略
现在开始介绍 SparkContext。SparkContext 的初始化步骤如下:
1)创建 Spark 执行环境 SparkEnv;
2)创建 RDD 清理器 metadataCleaner;
3)创建并初始化 Spark UI;
4)Hadoop 相关配置及 Executor环境变量的设置;
5)创建任务调度 TaskScheduler;
6)创建和启动 DAGScheduler;
7)TaskScheduler 的启动;
8)初始化块管理器 BlockManager(BlockManager是存储体系的主要组件之一,将在第 4
章介绍);
9)启动测量系统MetricsSystem;
10)创建和启动 Executor分配管理器ExecutorAllocationManager;
11)ContextCleaner的创建与启动;
12)Spark 环境更新;
13)创建DAGSchedulerSource 和BlockManagerSource;
14)将 SparkContext标记为激活。
SparkContext的主构造器参数为SparkConf,其实现如下。
clas s SparkContext(con?g: SparkConf) extends Logging with ExecutorAllocationClient {
private val creationSite: CallSite = Utils.getCallSite
private val allowMultipleContexts: Boolean =
con?g.getBoolean(spark.driver.allowMultipleContexts, false)
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
上面代码中的 CallSite存储了线程栈中最靠近栈顶的用户类及最靠近栈底的 Scala或者
Spark核心类信息。Utils.getCallSite的详细信息见附录A。SparkContext 默认只有一个实例(由30 核心设计篇
属性 spark.driver.allowMultipleContexts来控制,用户需要多个 SparkContext 实例时,可以将
其设置为true),方法markPartiallyConstructed用来确保实例的唯一性,并将当前SparkContext
标记为正在构建中。
接下来会对SparkConf 进行复制,然后对各种配置信息进行校验,代码如下。
private[spark] val conf = con?g.clone
conf.validateSettings
if (!conf.contains(spark.master)) {
throw new SparkException(A master URL must be set in your con?guration)
}
if (!conf.contains(spark.app.name)) {
thro w new SparkException(An application name must be set in your con?guration)
}
从上面校验的代码看到必须指定属性spark.master 和 spark.app.name,否则会抛出异常,结束初始化过程。spark.master用于设置部署模式,spark.app.name用于指定应用程序名称。
3.2 创建执行环境 SparkEnv
SparkEnv是 Spark 的执行环境对象,其中包括众多与Executor执行相关的对象。由于
在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下
Worker另起的 CoarseGrainedExecutorBackend进程中也会创建 Executor,所以 SparkEnv 存在
于Driver或者 CoarseGrainedExecutorBackend进程中。创建SparkEnv 主要使用SparkEnv 的
createDriverEnv,SparkEnv.createDriverEnv方法有三个参数:conf、isLocal和 listenerBus。
val isLocal = (master == local || master.startsWith(local[))
private[spark] val listenerBus = new LiveListenerBus
conf.set(spark.executor.id, driver)
private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)
上面代码中的conf是对 SparkConf的复制,isLocal标识是否是单机模式,listenerBus 采
用监听器模式维护各类事件的处理,在3.4.1 节会详细介绍。
SparkEnv的方法createDriverEnv最终调用create创建SparkEnv。SparkEnv的构造步骤如下:
1)创建安全管理器SecurityManager;
2)创建基于 Akka 的分布式消息系统ActorSystem;
3)创建 Map 任务输出跟踪器 mapOutputTracker;
4)实例化 Shuf?eManager;
5)创建 Shuf?eMemoryManager;
6)创建块传输服务BlockTransferService;
7)创建 BlockManagerMaster;第 3 章 SparkContext 的初始化 31
8)创建块管理器 BlockManager;
9)创建广播管理器BroadcastManager;
10)创建缓存管理器CacheManager;
11)创建HTTP 文件服务器 HttpFileServer;
12)创建测量系统MetricsSystem;
13)创建SparkEnv。
3.2.1 安全管理器 SecurityManager
SecurityManager主要对权限、账号进行设置,如果使用 Hadoop YARN 作为集群管理器,则需要使用证书生成 secret key 登录,最后给当前系统设置默认的口令认证实例,此实例采用
匿名内部类实现,参见代码清单3-2。
代码清单3-2 SecurityManager的实现
private val secretKey = generateSecretKey
使用HTTP连接设置口令认证
if (authOn) {
Authenticator.setDefault(
new Authenticator {
override def getPasswordAuthentication: PasswordAuthentication = {
var passAuth: PasswordAuthentication = null
val userInfo = getRequestingURL.getUserInfo
if (userInfo != null) {
val parts = userInfo.split(:, 2)
pass Auth = new PasswordAuthentication(parts(0), parts(1).
toCharArray)
}
return passAuth
}
})
}
3.2.2 基于 Akka 的分布式消息系统 ActorSystem
ActorSystem 是 Spark 中最基础的设施,Spark 既使用它发送分布式消息,又用它实现并
发编程。消息系统可以实现并发?要解释清楚这个问题,首先应该简单介绍下 Scala语言的
Actor并发编程模型:Scala认为 Java 线程通过共享数据以及通过锁来维护共享数据的一致性
是糟糕的做法,容易引起锁的争用,降低并发程序的性能,甚至会引入死锁的问题。在 Scala
中只需要自定义类型继承Actor,并且提供act 方法,就如同Java里实现Runnable 接口,需要
实现 run方法一样。但是不能直接调用 act 方法,而是通过发送消息的方式 (Scala 发送消息是
异步的) 传递数据。如:32 核心设计篇
Actor ! message
Akka是 Actor 编程模型的高级类库,类似于 JDK 1.5 之后越来越丰富的并发工具包,简
化了程序员并发编程的难度。ActorSystem 便是 Akka 提供的用于创建分布式消息通信系统的
基础类。Akka的具体信息见附录 B。
正是因为 Actor轻量级的并发编程、消息发送以及 ActorSystem 支持分布式消息发送等特
点,Spark选择了ActorSystem。
SparkEnv中创建 ActorSystem 时用到了 AkkaUtils工具类,见代码清单 3-3。AkkaUtils.
createActorSystem方法用于启动ActorSystem,见代码清单3-4。AkkaUtils 使用了Utils 的静
态方法startServiceOnPort, startServiceOnPort 最终会回调方法startService: Int => (T, Int),此
处的startService实际是方法 doCreateActorSystem。真正启动 ActorSystem 是由doCreate-
ActorSystem方法完成的,doCreateActorSystem的具体实现细节请见附录B。Spark 的 Driver
中Akka的默认访问地址是akka:sparkDriver,Spark 的 Executor中 Akka 的默认访问地址是
akka: sparkExecutor。如果不指定ActorSystem 的端口,那么所有节点的 ActorSystem 端口在
每次启动时随机产生。关于startServiceOnPort的实现,请见附录A。
代码清单3-3 AkkaUtils工具类创建和启动ActorSystem
val (actorSystem, boundPort) =
Option(defaultActorSystem) match {
case Some(as) => (as, port)
case None =>
val a ctorSystemName = if (isDriver) driverActorSystemName else
executorActorSystemName
Akka Utils.createActorSystem(actorSystemName, hostname, port, conf,securityManager)
}
代码清单3-4 ActorSystem的创建和启动
def createActorSystem(
name: String,host: String,port: Int,conf: SparkConf,securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
3.2.3 map 任务输出跟踪器 mapOutputTracker
mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce 阶段任务获取第 3 章 SparkContext 的初始化 33
地址及中间输出结果。每个 map 任务或者reduce 任务都会有其唯一标识,分别为 mapId 和
reduceId。每个reduce 任务的输入可能是多个map 任务的输出,reduce 会到各个 map 任务的
所在节点上拉取Block,这一过程叫做 shuf?e。每批 shuf?e过程都有唯一的标识shuf?eId。
这里先介绍下 MapOutputTrackerMaster。MapOutputTrackerMaster内部使用 mapStatuses:
TimeStampedHashMap[Int, Array[MapStatus]]来维护跟踪各个map 任务的输出状态。其中key
对应 shuf?eId,Array存储各个 map任务对应的状态信息MapStatus。由于 MapStatus维护
了map输出 Block的地址BlockManagerId,所以 reduce 任务知道从何处获取map 任务的中
间输出。MapOutputTrackerMaster还使用cachedSerializedStatuses:TimeStampedHashMap[Int,Array[Byte]]维护序列化后的各个 map 任务的输出状态。其中 key 对应 shuf?eId,Array 存储
各个序列化MapStatus生成的字节数组。
Driver和Executor处理MapOutputTrackerMaster的方式有所不同。
T 如果当前应用程序是 Driver,则创建 MapOutputTrackerMaster,然后创建
MapOutputTrackerMasterActor,并且注册到 ActorSystem 中。
T 如果当前应用程序是 Executor,则创建 MapOutputTrackerWorker,并从 ActorSystem
中找到 MapOutputTrackerMasterActor。
无论是 Driver还是 Executor,最后都由 mapOutputTracker的属性 trackerActor持有
MapOutputTrackerMasterActor的引用,参见代码清单3-5。
代码清单3-5 registerOrLookup方法用于查找或者注册Actor的实现
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo(Registering + name)
actorSystem.actorOf(Props(newActor), name = name)
} else {
AkkaUtils.makeDriverRef(name, conf, actorSystem)
}
}
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
}
mapOutputTracker.trackerActor = registerOrLookup(
MapOutputTracker,new M apOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrack
erMaster], conf))
在后面章节大家会知道map任务的状态正是由Executor向持有的MapOutputTracker-
MasterActor发送消息,将map任务状态同步到mapOutputTracker的mapStatuses和cached-
SerializedStatuses的。Executor究竟是如何找到MapOutputTrackerMasterActor的?registerOrLookup34 核心设计篇
方法通过调用 AkkaUtils.makeDriverRef找到 MapOutputTrackerMasterActor,实际正是利用
ActorSystem提供的分布式消息机制实现的,具体细节参见附录 B。这里第一次使用到了 Akka
提供的功能,以后大家会渐渐感觉到使用Akka的便捷。
3.2.4 实例化 ShuffleManager
Shuf?eManager 负责管理本地及远程的 block 数据的 shuf?e 操作。Shuf?eManager
默认为通过反射方式生成的 SortShuf?eManager 的实例,可以修改属性 spark.shuf?e.
manager 为 hash 来显式控制使用 HashShuf?eManager。SortShuf?eManager 通过持有的
IndexShuf?eBlockManager 间接操作 BlockManager 中的 DiskBlockManager 将 map 结果写入
本地,并根据 shuf?eId、mapId 写入索引文件,也能通过 MapOutputTrackerMaster 中维护
的 mapStatuses 从本地或者其他远程节点读取文件。有读者可能会问,为什么需要 shuf?e ?
Spark 作为并行计算框架,同一个作业会被划分为多个任务在多个节点上并行执行,reduce
的输入可能存在于多个节点上,因此需要通过“洗牌”将所有 reduce 的输入汇总起来,这
个过程就是 shuf?e。这个问题以及对 Shuf?eManager 的具体使用会在第 5 章和第 6 章详述。
Shuf?eManager 的实例化见代码清单 3-6。代码清单 3-6 最后创建的 Shuf?eMemoryManager
将在 3.2.5 节介绍。
代码清单3-6 Shuf?eManager的实例化及Shuf?eMemoryManager的创建
val shortShuf?eMgrNames = Map(
hash -> org.apache.spark.shuf?e.hash.HashShuf?eManager,sort -> org.apache.spark.shuf?e.sort.SortShuf?eManager)
val shuf?eMgrName = conf.get(spark.shuf?e.manager, sort)
val shuf?eMgrClass = shortShuf?eMgrNames.get
OrElse(shuf?eMgrName.toLowerCase, shuf?eMgrName)
val shuf?eManager = instantiateClass[Shuf?eManager](shuf?eMgrClass)
val shuf?eMemoryManager = new Shuf?eMemoryManager(conf)
3.2.5 shuffle 线程内存管理器 ShuffleMemoryManager
Shuf?eMemoryManager负责管理shuf?e 线程占有内存的分配与释放,并通过thread-
Memory:mutable.HashMap[Long, Long]缓存每个线程的内存字节数,见代码清单3-7。
代码清单3-7 Shuf?eMemoryManager的数据结构
private[spark] class Shuf?eMemoryManager(maxMemory: Long) extends Logging {
priv ate val threadMemory = new mutable.HashMap[Long, Long] threadId ->
memory bytes
def this(conf: SparkConf) = this(Shuf?eMemoryManager.getMaxMemory(conf))
getMaxMemory方法用于获取 shuf?e 所有线程占用的最大内存,实现如下。
def getMaxMemory(conf: SparkConf): Long = {第 3 章 SparkContext 的初始化 35
val memoryFraction = conf.getDouble(spark.shuf?e.memoryFraction, 0.2)
val safetyFraction = conf.getDouble(spark.shuf?e.safetyFraction, 0.8)
(Runtime.getRuntime.maxMemory memoryFraction safetyFraction).toLong
}
从上面代码可以看出,shuf?e 所有线程占用的最大内存的计算公式为:
Java 运行时最大内存 Spark 的 shuf?e 最大内存占比 Spark的安全内存占比
可以配置属性 spark.shuf?e.memoryFraction修改Spark 的 shuf?e最大内存占比,配置属性
spark.shuf?e.safetyFraction修改 Spark 的安全内存占比。
Shuf?eMemoryManager通常运行在 Executor中,Driver 中的 Shuf?eMemoryManager
只有在 local模式下才起作用。
3.2.6 块传输服务 BlockTransferService
BlockTransferService默 认为 NettyBlockTransferService(可以配置属性 spark.shuf?e.
blockTransferService使用 NioBlockTransferService),它使用Netty提供的异步事件驱动的网络
应用框架,提供 web服务及客户端,获取远程节点上 Block的集合。
val blockTransferService =
conf.get(spark.shuf?e.blockTransferService, netty).toLowerCase match {
case netty =>
new NettyBlockTransferService(conf, securityManager, numUsableCores)
case nio =>
new NioBlockTransferService(conf, securityManager)
}
NettyBlockTransferService 的具体实现将在第 4 章详细介绍。这里大家可能觉得奇怪,这
样的网络应用为何也要放在存储体系?大家不妨先带着疑问,直到你真正了解了存储体系。
3.2.7 BlockManagerMaster 介绍
BlockManagerMaster负责对 Block 的管理和协调,具体操作依赖于 BlockManager-
MasterActor。Driver和 Executor处理 BlockManagerMaster的方式不同:
T 如果当前应用程序是Driver,则创建BlockManagerMasterActor,并且注册到 Actor-
System 中。
T 如果当前应用程序是Executor,则从 ActorSystem中找到BlockManagerMasterActor。
无论是 Driver还是 Executor,最后 BlockManagerMaster的属性 driverActor将持有对
BlockManagerMasterActor的引用。BlockManagerMaster的创建代码如下。
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
BlockManagerMaster,new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
注 意36 核心设计篇
registerOrLookup已在3.2.3节介绍过了,不再赘述。BlockManagerMaster及BlockManager-
MasterActor的具体实现将在第4章详细介绍。
3.2.8 创建块管理器 BlockManager
BlockManager负责对 Block 的管理,只有在 BlockManager的初始化方法 initialize
被调用后,它才是有效的。BlockManager作为存储系统的一部分,具体实现见第 4 章。
BlockManager的创建代码如下。
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,seri alizer, conf, mapOutputTracker, shuffleManager, blockTransferService,securityManager,numUsableCores)
3.2.9 创建广播管理器 BroadcastManager
BroadcastManager用于将配置信息和序列化后的RDD、Job以及Shuf?eDependency等信息
在本地存储。如果为了容灾,也会复制到其他节点上。创建BroadcastManager的代码实现如下。
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
BroadcastManager必须在其初始化方法initialize被调用后,才能生效。initialize方法
实际利用反射生成广播工厂实例broadcastFactory(可以配置属性spark.broadcast.factory指
定,默认为 org.apache.spark.broadcast.TorrentBroadcastFactory)。BroadcastManager的广播
方法 newBroadcast实际代理了工厂broadcastFactory的 newBroadcast方法来生成广播对
象。unbroadcast 方法实际代理了工厂broadcastFactory的unbroadcast方法生成非广播对象。
BroadcastManager的initialize、unbroadcast 及 newBroadcast 方法见代码清单3-8。
代码清单3-8 BroadcastManager的实现
private def initialize {
synchronized {
if (!initialized) {
val b roadcastFactoryClass = conf.get(spark.broadcast.factory, org.
apache.spark.broadcast.TorrentBroadcastFactory)
broadcastFactory =
Clas s.forName(broadcastFactoryClass).newInstance.asInstanceOf
[BroadcastFactory]
broadcastFactory.initialize(isDriver, conf, securityManager)
initialized = true
}
}
}
private val nextBroadcastId = new AtomicLong(0)
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
broa dcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.
getAndIncrement)
}第 3 章 SparkContext 的初始化 37
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
}
}
3.2.10 创建缓存管理器 CacheManager
CacheManager用于缓存RDD 某个分区计算后的中间结果,缓存计算结果发生在迭代计
算的时候,将在6.1 节讲到。而CacheManager将在 4.10 节详细描述。创建CacheManager的
代码如下。
val cacheManager = new CacheManager(blockManager)
3.2.11 HTTP 文件服务器 HttpFileServer
HttpFileServer的创建参见代码清单3-9。HttpFileServer主要提供对jar 及其他文件的http
访问,这些jar包包括用户上传的jar 包。端口由属性spark.?leserver.port配置,默认为0,表
示随机生成端口号。
代码清单3-9 HttpFileServer的创建
val httpFileServer =
if (isDriver) {
val ?leServerPort = conf.getInt(spark.?leserver.port, 0)
val server = new HttpFileServer(conf, securityManager, ?leServerPort)
server.initialize
conf.set(spark.?leserver.uri, server.serverUri)
server
} else {
null
}
HttpFileServer的初始化过程见代码清单 3-10,主要包括以下步骤:
1)使用 Utils 工具类创建文件服务器的根目录及临时目录(临时目录在运行时环境关闭时
会删除)。Utils 工具的详细介绍见附录A。
2)创建存放 jar 包及其他文件的文件目录。
3)创建并启动 HTTP 服务。
代码清单3-10 HttpFileServer的初始化
def initialize {
baseDir = Utils.createTempDir(Utils.getLocalDir(conf), httpd)
leDir = new File(baseDir, ?les)
jarDir = new File(baseDir, jars)
leDir.mkdir
jarDir.mkdir38 核心设计篇
logInfo(HTTP File server directory is + baseDir)
http Server = new HttpServer(conf, baseDir, securityManager, requestedPort,HTTP ?le server)
httpServer.start
serverUri = httpServer.uri
logDebug(HTTP ?le server started at: + serverUri)
}
HttpServer的构造和 start方法的实现中,再次使用了 Utils的静态方法 startServiceOnPort,因此会回调doStart 方法,见代码清单3-11。有关Jetty的 API 使用参见附录C。
代码清单3-11 HttpServer的启动
def start {
if (server != null) {
throw new ServerStateException(Server is already started)
} else {
logInfo(Starting HTTP Server)
val (actualServer, actualPort) =
Util s.startServiceOnPort[Server](requestedPort, doStart, conf,serverName)
server = actualServer
port = actualPort
}
}
doStart方法中启动内嵌的Jetty 所提供的HTTP服务,见代码清单3-12。
代码清单3-12 HttpServer的启动功能实现
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server
val connector = new SocketConnector
connector.setMaxIdleTime(60 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
server.addConnector(connector)
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)
val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
if (securityManager.isAuthenticationEnabled{
logDebug(HttpServer is using security)
val sh = setupSecurityHandler(securityManager)
make sure we go through security handler to get resources
sh.setHandler(handlerList)第 3 章 SparkContext 的初始化 39
server.setHandler(sh)
} else {
logDebug(HttpServer is not using security)
server.setHandler(handlerList)
}
server.start
val actualPort = server.getConnectors(0).getLocalPort
(server, actualPort)
}
3.2.12 创建测量系统 MetricsSystem
MetricsSystem 是 Spark 的测量系统,创建 MetricsSystem 的代码如下。
val metricsSystem = if (isDriver) {
MetricsSystem.createMe ......
您现在查看是摘要介绍页, 详见PDF附件(6684KB,91页)。





