Spark大数据分析实战.pdf
http://www.100md.com
2019年12月24日
![]() |
| 第1页 |
![]() |
| 第9页 |
![]() |
| 第11页 |
![]() |
| 第21页 |
![]() |
| 第36页 |
参见附件(6756KB,438页)。
Spark大数据分析实战本站提供,这是针对Spark大数据开发打造的实战书籍,里面从零开始详细讲解项目搭建,开发流程等等,帮助你快速上手Spark大数据!

Spark大数据分析实战内容提要
这是一本根据应用场景讲解如何通过Spark进行大数据分析与应用构建的著作,以实战为导向。作者结合典型应用场景,抽象出通用与简化后的模型,以便于读者能举一反三,直接应用。
本书首先从技术层面讲解了Spark的机制、生态系统与开发相关的内容;然后从应用角度讲解了日志分析、推荐系统、情感分析、协同过滤、搜索引擎、社交网络分析、新闻数据分析等多个常见的大数据场景下的数据分析。在每个场景中,首先是对场景进行抽象与概括,然后将Spark融入其中构建数据分析算法与应用,最后结合其他开源系统或工具构建更为丰富的数据分析流水线。
Spark大数据分析实战作者信息
高彦杰
毕业于中国人民大学,就职于微软亚洲研究院。开源技术爱好者,对Spark及其他开源大数据系统与技术有较为深入的认识和研究,实践经验丰富。较早接触并使用Spark,对Spark应用开发、Spark系统的运维和测试比较熟悉,深度阅读了Spark的源代码,了解Spark的运行机制,擅长Spark的查询优化。
曾著有畅销书《Spark大数据处理:技术、应用与性能优化》。
倪亚宇
清华大学自动化系在读博士研究生,曾于微软亚洲研究院、IBM研究院实习。对大规模的推荐系统和机器学习算法有较为深入的研究和丰富的实践经验。
Spark大数据分析实战章节预览
第1章Spark简介
第2章Spark开发与环境配置
第3章BDAS简介
第4章Lamda架构日志分析流水线
第5章基于云平台和用户日志的推荐系统
第6章Twitter情感分析
第7章热点新闻分析系统
第8章构建分布式的协同过滤推荐系统
第9章基于Spark的社交网络分析
第10章基于Spark的大规模新闻
第11章构建分布式的搜索引擎主题分析
Spark大数据分析实战截图


大数据技术丛书
Spark大数据分析实战
高彦杰 倪亚宇 著
译者 译
ISBN:978-7-111-52307-9
本书纸版由机械工业出版社于2015年出版,电子版由华章分社(北京华
章图文信息有限公司,北京奥维博世图书发行有限公司)全球范围内制
作与发行。
版权所有,侵权必究
客服热线:+ 86-10-68995265
客服信箱:service@bbbvip.com
官方网址:www.hzmedia.com.cn
新浪微博 @华章数媒
微信公众号 华章电子书(微信号:hzebook)目录
前言
第1章 Spark简介
1.1 初识Spark
1.2 Spark生态系统BDAS
1.3 Spark架构与运行逻辑
1.4 弹性分布式数据集
1.4.1 RDD简介
1.4.2 RDD算子分类
1.5 本章小结
第2章 Spark开发与环境配置
2.1 Spark应用开发环境配置
2.1.1 使用Intellij开发Spark程序
2.1.2 使用SparkShell进行交互式数据分析
2.2 远程调试Spark程序
2.3 Spark编译
2.4 配置Spark源码阅读环境
2.5 本章小结
第3章 BDAS简介
3.1 SQL on Spark
3.1.1 为什么使用Spark SQL3.1.2 Spark SQL架构分析
3.2 Spark Streaming
3.2.1 Spark Streaming简介
3.2.2 Spark Streaming架构
3.2.3 Spark Streaming原理剖析
3.3 GraphX
3.3.1 GraphX简介
3.3.2 GraphX的使用简介
3.3.3 GraphX体系结构
3.4 MLlib
3.4.1 MLlib简介
3.4.2 MLlib中的聚类和分类
3.5 本章小结
第4章 Lamda架构日志分析流水线
4.1 日志分析概述
4.2 日志分析指标
4.3 Lamda架构
4.4 构建日志分析数据流水线
4.4.1 用Flume进行日志采集
4.4.2 用Kafka将日志汇总
4.4.3 用Spark Streaming进行实时日志分析4.4.4 Spark SQL离线日志分析
4.4.5 用Flask将日志KPI可视化
4.5 本章小结
第5章 基于云平台和用户日志的推荐系统
5.1 Azure云平台简介
5.1.1 Azure网站模型
5.1.2 Azure数据存储
5.1.3 Azure Queue消息传递
5.2 系统架构
5.3 构建Node.js应用
5.3.1 创建Azure Web应用
5.3.2 构建本地Node.js网站
5.3.3 发布应用到云平台
5.4 数据收集与预处理
5.4.1 通过JS收集用户行为日志
5.4.2 用户实时行为回传到Azure Queue
5.5 Spark Streaming实时分析用户日志
5.5.1 构建Azure Queue的Spark Streaming Receiver
5.5.2 Spark Streaming实时处理Azure Queue日志
5.5.3 Spark Streaming数据存储于Azure Table
5.6 MLlib离线训练模型5.6.1 加载训练数据
5.6.2 使用rating RDD训练ALS模型
5.6.3 使用ALS模型进行电影推荐
5.6.4 评估模型的均方差
5.7 本章小结
第6章 Twitter情感分析
6.1 系统架构
6.2 Twitter数据收集
6.2.1 设置
6.2.2 Spark Streaming接收并输出Tweet
6.3 数据预处理与Cassandra存储
6.3.1 添加SBT依赖
6.3.2 创建Cassandra Schema
6.3.3 数据存储于Cassandra
6.4 Spark Streaming热点Twitter分析
6.5 Spark Streaming在线情感分析
6.6 Spark SQL进行Twitter分析
6.6.1 读取Cassandra数据
6.6.2 查看JSON数据模式
6.6.3 Spark SQL分析Twitter
6.7 Twitter可视化6.8 本章小结
第7章 热点新闻分析系统
7.1 新闻数据分析
7.2 系统架构
7.3 爬虫抓取网络信息
7.3.1 Scrapy简介
7.3.2 创建基于Scrapy的新闻爬虫
7.3.3 爬虫分布式化
7.4 新闻文本数据预处理
7.5 新闻聚类
7.5.1 数据转换为向量(向量空间模型VSM)
7.5.2 新闻聚类
7.5.3 词向量同义词查询
7.5.4 实时热点新闻分析
7.6 Spark Elastic Search构建全文检索引擎
7.6.1 部署Elastic Search
7.6.2 用Elastic Search索引MongoDB数据
7.6.3 通过Elastic Search检索数据
7.7 本章小结
第8章 构建分布式的协同过滤推荐系统
8.1 推荐系统简介8.2 协同过滤介绍
8.2.1 基于用户的协同过滤算法User-based CF
8.2.2 基于项目的协同过滤算法Item-based CF
8.2.3 基于模型的协同过滤推荐Model-based CF
8.3 基于Spark的矩阵运算实现协同过滤算法
8.3.1 Spark中的矩阵类型
8.3.2 Spark中的矩阵运算
8.3.3 实现User-based协同过滤的示例
8.3.4 实现Item-based协同过滤的示例
8.3.5 基于奇异值分解实现Model-based协同过滤的示例
8.4 基于Spark的MLlib实现协同过滤算法
8.4.1 MLlib的推荐算法工具
8.4.2 MLlib协同过滤推荐示例
8.5 案例:使用MLlib协同过滤实现电影推荐
8.5.1 MovieLens数据集
8.5.2 确定最佳的协同过滤模型参数
8.5.3 利用最佳模型进行电影推荐
8.6 本章小结
第9章 基于Spark的社交网络分析
9.1 社交网络介绍
9.1.1 社交网络的类型9.1.2 社交网络的相关概念
9.2 社交网络中社团挖掘算法
9.2.1 聚类分析和K均值算法简介
9.2.2 社团挖掘的衡量指标
9.2.3 基于谱聚类的社团挖掘算法
9.3 Spark中的K均值算法
9.3.1 Spark中与K均值有关的 对象和方法
9.3.2 Spark下K均值算法示例
9.4 案例:基于Spark的Facebook社团挖掘
9.4.1 SNAP社交网络数据集 介绍
9.4.2 基于Spark的社团挖掘实现
9.5 社交网络中的链路预测算法
9.5.1 分类学习简介
9.5.2 分类器的评价指标
9.5.3 基于Logistic回归的链路预测算法
9.6 Spark MLlib中的Logistic回归
9.6.1 分类器相关对象
9.6.2 模型验证对象
9.6.3 基于Spark的Logistic回归示例
9.7 案例:基于Spark的链路预测算法
9.7.1 SNAP符号社交网络 Epinions数据集9.7.2 基于Spark的链路预测算法
9.8 本章小结
第10章 基于Spark的大规模新闻主题分析
10.1 主题模型简介
10.2 主题模型LDA
10.2.1 LDA模型介绍
10.2.2 LDA的训练算法
10.3 Spark中的LDA模型
10.3.1 MLlib对LDA的支持
10.3.2 Spark中LDA模型训练示例
10.4 案例:Newsgroups新闻的主题分析
10.4.1 Newsgroups数据集介绍
10.4.2 交叉验证估计新闻的主题个数
10.4.3 基于主题模型的文本聚类算法
10.4.4 基于主题模型的文本分类算法
10.5 本章小结
第11章 构建分布式的搜索引擎
11.1 搜索引擎简介
11.2 搜索排序概述
11.3 查询无关模型PageRank
11.4 基于Spark的分布式PageRank实现11.4.1 PageRank的MapReduce 实现
11.4.2 Spark的分布式图模型GraphX
11.4.3 基于GraphX的PageRank实现
11.5 案例:GoogleWeb Graph的PageRank计算
11.6 查询相关模型Ranking SVM
11.7 Spark中支持向量机的实现
11.7.1 Spark中的支持向量机 模型
11.7.2 使用Spark测试数据演示支持向量机的训练
11.8 案例:基于MSLR数据集的查询排序
11.8.1 Microsoft Learning to Rank 数据集介绍
11.8.2 基于Spark的Ranking SVM实现
11.9 本章小结前言
为什么要写这本书
Spark大数据技术还在如火如荼地发展,Spark中国峰会的召开,各
地meetup的火爆举行,开源软件Spark也因此水涨船高,很多公司已经
将Spark大范围落地并且应用。Spark使用者的需求已经从最初的部署安
装、运行实例,到现在越来越需要通过Spark构建丰富的数据分析应
用。写一本Spark实用案例类的技术书籍,是一个持续了很久的想法。
由于工作较为紧张,最初只是将参与或学习过的Spark相关案例进行总
结,但是随着时间的推移,最终还是打算将其中通用的算法、系统架构
以及应用场景抽象出来,并进行适当简化,也算是一种总结和分享。
Spark发源于美国加州大学伯克利分校AMPLab的大数据分析平台,它立足于内存计算,从多迭代批量处理出发,兼顾数据仓库、流处理和
图计算等多种计算范式,是大数据系统领域的全栈计算平台。Spark当
下已成为Apache基金会的顶级开源项目,拥有着庞大的社区支持,生态
系统日益完善,技术也逐渐走向成熟。
现在越来越多的同行已经了解Spark,并且开始使用Spark,但是国
内缺少一本Spark的实战案例类的书籍,很多Spark初学者和开发人员只
能参考网络上零散的博客或文档,学习效率较慢。本书也正是为了解决上述问题而着意编写。
本书希望带给读者一个系统化的视角,秉承大道至简的主导思想,介绍Spark的基本原理,如何在Spark上构建复杂数据分析算法,以及
Spark如何与其他开源系统进行结合构建数据分析应用,让读者开启
Spark技术应用之旅。
本书特色
Spark作为一款基于内存的分布式计算框架,具有简洁的接口,可
以快速构建上层数据分析算法,同时具有很好的兼容性,能够结合其他
开源数据分析系统构建数据分析应用或者产品。
为了适合读者阅读和掌握知识结构,本书从Spark基本概念和机制
介绍入手,结合笔者实践经验讲解如何在Spark之上构建机器学习算
法,并最后结合不同的应用场景构建数据分析应用。
读者对象
本书中一些实操和应用章节,比较适数据分析和开发人员,可以作
为工作手边书;机器学习和算法方面的章节,比较适合机器学习和算法
工程师,可以分享经验,拓展解决问题的思路。
·Spark初学者·Spark应用开发人员
·Spark机器学习爱好者
·开源软件爱好者
·其他对大数据技术感兴趣的人员
如何阅读本书
本书分为11章内容。
第1章 从Spark概念出发,介绍Spark的来龙去脉,阐述Spark机制
与如何进行Spark编程。
第2章 详细介绍Spark的开发环境配置。
第3章 详细介绍Spark生态系统重要组件Spark SQL、Spark
Streaming、GraphX、MLlib的实现机制,为后续使用奠定基础。
第4章 详细介绍如何通过Flume、Kafka、Spark Streaming、HDFS、Flask等开源工具构建实时与离线数据分析流水线。
第5章 从实际出发,详细介绍如何在Azure云平台,通过Node.js、Azure Queue、Azure Table、Spark Streaming、MLlib等组件对用户行为
数据进行分析与推荐。第6章 详细介绍如何通过Twitter API、Spark SQL、Spark
Streaming、Cassandra、D3等组件对Twitter进行情感分析与统计分析。
第7章 详细介绍如何通过Scrapy、Kafka、MongoDB、Spark、Spark Streaming、Elastic Search等组件对新闻进行抓取、分析、热点新
闻聚类等挖掘工作。
第8章 详细介绍了协同过滤概念和模型,讲解了如何在Spark中实
现基于Item-based、User-based和Model-based协同过滤算法的推荐系统。
第9章 详细介绍了社交网络分析的基本概念和经典算法,以及如
何利用Spark实现这些经典算法,用于真实网络的分析。
第10章 详细介绍了主题分析模型(LDA),讲解如何在Spark中
实现LDA算法,并且对真实的新闻数据进行分析。
第11章 详细介绍了搜索引擎的基本原理,以及其中用到的核心搜
索排序相关算法——PageRank和Ranking SVM,并讲解了如何在Spark中
实现PageRank和Ranking SVM算法,以及如何对真实的Web数据进行分
析。
如果你有一定的经验,能够理解Spark的相关基础知识和使用技
巧,那么可以直接阅读第4~11章。然而,如果你是一名初学者,请一定
从第1章的基础知识开始学起。勘误和支持
由于笔者的水平有限,加之编写时间仓促,书中难免会出现一些错
误或者不准确的地方,恳请读者批评指正。如果你有更多的宝贵意见,我们会尽量为读者提供最满意的解答。你也可以通过微博@高彦杰gyj,博客:http:blog.csdn.netgaoyanjie55,或者邮箱gaoyanjie55@163.com联
系到高彦杰。你也可以通过邮箱niyayu@foxmail.com联系到倪亚宇。
期待能够得到大家的真挚反馈,在技术之路上互勉共进。
致谢
感谢微软亚洲研究院的Thomas先生和Ying Yan,在每一次迷茫时给
予我鼓励与支持。
感谢机械工业出版社华章公司的杨福川和高婧雅,在近半年的时间
里始终支持我们的写作,你们的鼓励和帮助引导我顺利完成全部书稿。
特别致谢
谨以此书献给我最亲爱的爱人,家人,同事,以及众多热爱大数据
技术的朋友们!
高彦杰第1章 Spark简介
本章主要介绍Spark框架的概念、生态系统、架构及RDD等,并围
绕Spark的BDAS项目及其子项目进行了简要介绍。目前,Spark生态系
统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,本章只进行简要介绍,后
续章节会有详细阐述。1.1 初识Spark
Spark是基于内存计算的大数据并行计算框架,因为它基于内存计
算,所以提高了在大数据环境下数据处理的实时性,同时保证了高容错
性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集
群。
1.Spark执行的特点
Hadoop中包含计算框架MapReduce和分布式文件系统HDFS。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存
储层,融入Hadoop的生态系统,并弥补MapReduce的不足。
(1)中间结果输出
Spark将执行工作流抽象为通用的有向无环图执行计划(DAG),可以将多Stage的任务串联或者并行执行,而无需将Stage的中间结果输
出到HDFS中,类似的引擎包括Flink、Dryad、Tez。
(2)数据格式和内存布局
Spark抽象出分布式内存存储结构弹性分布式数据集RDD,可以理
解为利用分布式的数组来进行数据的存储。RDD能支持粗粒度写操作,但对于读取操作,它可以精确到每条记录。Spark的特性是能够控制数
据在不同节点上的分区,用户可以自定义分区策略。
(3)执行策略
Spark执行过程中不同Stage之间需要进行Shuffle。Shuffle是连接有
依赖的Stage的桥梁,上游Stage输出到下游Stage中必须经过Shuffle这个
环节,通过Shuffle将相同的分组数据拆分后聚合到同一个节点再处理。
Spark Shuffle支持基于Hash或基于排序的分布式聚合机制。
(4)任务调度的开销
Spark采用了事件驱动的类库AKKA来启动任务,通过线程池的复
用线程来避免系统启动和切换开销。
2.Spark的优势
Spark的一站式解决方案有很多的优势,分别如下所述。
(1)打造全栈多计算范式的高效数据流水线
支持复杂查询与数据分析任务。在简单的“Map”及“Reduce”操作之
外,Spark还支持SQL查询、流式计算、机器学习和图算法。同时,用
户可以在同一个工作流中无缝搭配这些计算范式。
(2)轻量级快速处理Spark代码量较小,这得益于Scala语言的简洁和丰富表达力,以及
Spark通过External DataSource API充分利用和集成Hadoop等其他第三方
组件的能力。同时Spark基于内存计算,可通过中间结果缓存在内存来
减少磁盘IO以达到性能的提升。
(3)易于使用,支持多语言
Spark支持通过Scala、Java和Python编写程序,这允许开发者在自己
熟悉的语言环境下进行工作。它自带了80多个算子,同时允许在Shell中
进行交互式计算。用户可以利用Spark像书写单机程序一样书写分布式
程序,轻松利用Spark搭建大数据内存计算平台并充分利用内存计算,实现海量数据的实时处理。
(4)与External Data Source多数据源支持
Spark可以独立运行,除了可以运行在当下的Yarn集群管理之外,它还可以读取已有的任何Hadoop数据。它可以运行多种数据源,比如
Parquet、Hive、HBase、HDFS等。这个特性让用户可以轻易迁移已有
的持久化层数据。
(5)社区活跃度高
Spark起源于2009年,当下已有超过600多位工程师贡献过代码。开
源系统的发展不应只看一时之快,更重要的是一个活跃的社区和强大的生态系统的支持。
同时也应该看到Spark并不是完美的,RDD模型适合的是粗粒度的
全局数据并行计算;不适合细粒度的、需要异步更新的计算。对于一些
计算需求,如果要针对特定工作负载达到最优性能,还需要使用一些其
他的大数据系统。例如,图计算领域的GraphLab在特定计算负载性能上
优于GraphX,流计算中的Storm在实时性要求很高的场合要更胜Spark
Streaming一筹。1.2 Spark生态系统BDAS
目前,Spark已经发展成为包含众多子项目的大数据计算平台。
BDAS是伯克利大学提出的基于Spark的数据分析栈(BDAS)。其核心
框架是Spark,同时涵盖支持结构化数据SQL查询与分析的查询引擎
Spark SQL,提供机器学习功能的系统MLBase及底层的分布式机器学习
库MLlib,并行图计算框架GraphX,流计算框架Spark Streaming,近似
查询引擎BlinkDB,内存分布式文件系统Tachyon,资源管理框架Mesos
等子项目。这些子项目在Spark上层提供了更高层、更丰富的计算范
式。
图1-1展现了BDAS的主要项目结构图。
图1-1 伯克利数据分析栈(BDAS)主要项目结构图
下面对BDAS的各个子项目进行更详细的介绍。(1)Spark
Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不
仅实现了MapReduce的算子map函数和reduce函数及计算模型,还提供
了更为丰富的算子,例如filter、join、groupByKey等。Spark将分布式数
据抽象为RDD(弹性分布式数据集),并实现了应用任务调度、RPC、序列化和压缩,并为运行在其上层的组件提供API。其底层采用Scala这
种函数式语言书写而成,并且所提供的API深度借鉴函数式的编程思
想,提供与Scala类似的编程接口。
图1-2所示即为Spark的处理流程(主要对象为RDD)。
Spark将数据在分布式环境下分区,然后将作业转化为有向无环图
(DAG),并分阶段进行DAG的调度和任务的分布式并行处理。
(2)Spark SQL
Spark SQL提供在大数据上的SQL查询功能,类似于Shark在整个生
态系统的角色,它们可以统称为SQL on Spark。之前,由于Shark的查询
编译和优化器依赖Hive,使得Shark不得不维护一套Hive分支。而Spark
SQL使用Catalyst作为查询解析和优化器,并在底层使用Spark作为执行
引擎实现SQL的算子。用户可以在Spark上直接书写SQL,相当于为
Spark扩充了一套SQL算子,这无疑更加丰富了Spark的算子和功能。同
时Spark SQL不断兼容不同的持久化存储(如HDFS、Hive等),为其发展奠定广阔的空间。
图1-2 Spark的任务处理流程图
(3)Spark Streaming
Spark Streaming通过将流数据按指定时间片累积为RDD,然后将每
个RDD进行批处理,进而实现大规模的流数据处理。其吞吐量能够超越
现有主流流处理框架Storm,并提供丰富的API用于流数据计算。
(4)GraphX
GraphX基于BSP模型,在Spark之上封装类似Pregel的接口,进行大
规模同步全局的图计算,尤其是当用户进行多轮迭代的时候,基于Spark内存计算的优势尤为明显。
(5)MLlib
MLlib是Spark之上的分布式机器学习算法库,同时包括相关的测试
和数据生成器。MLlib支持常见的机器学习问题,例如分类、回归、聚
类以及协同过滤,同时也包括一个底层的梯度下降优化基础算法。1.3 Spark架构与运行逻辑
1.Spark的架构
·Driver:运行Application的main函数并且创建SparkContext。
·Client:用户提交作业的客户端。
·Worker:集群中任何可以运行Application代码的节点,运行一个或
多个Executor进程。
·Executor:运行在Worker的Task执行器,Executor启动线程池运行
Task,并且负责将数据存在内存或者磁盘上。每个Application都会申请
各自的Executor来处理任务。
·SparkContext:整个应用的上下文,控制应用的生命周期。
·RDD:Spark的基本计算单元,一组RDD形成执行的有向无环图
RDD Graph。
·DAG Scheduler:根据Job构建基于Stage的DAG工作流,并提交
Stage给TaskScheduler。
·TaskScheduler:将Task分发给Executor执行。·SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。
2.运行逻辑
(1)Spark作业提交流程
如图1-3所示,Client提交应用,Master找到一个Worker启动
Driver,Driver向Master或者资源管理器申请资源,之后将应用转化为
RDD有向无环图,再由DAGScheduler将RDD有向无环图转化为Stage的
有向无环图提交给TaskScheduler,由TaskScheduler提交任务给Executor
进行执行。任务执行的过程中其他组件再协同工作确保整个应用顺利执
行。图1-3 Spark架构
(2)Spark作业运行逻辑
如图1-4所示,在Spark应用中,整个执行流程在逻辑上运算之间会
形成有向无环图。Action算子触发之后会将所有累积的算子形成一个有
向无环图,然后由调度器调度该图上的任务进行运算。Spark的调度方
式与MapReduce有所不同。Spark根据RDD之间不同的依赖关系切分形
成不同的阶段(Stage),一个阶段包含一系列函数进行流水线执行。
图中的A、B、C、D、E、F,分别代表不同的RDD,RDD内的一个方框
代表一个数据块。数据从HDFS输入Spark,形成RDD A和RDD C,RDD
C上执行map操作,转换为RDD D,RDD B和RDD E进行join操作转换为
F,而在B到F的过程中又会进行Shuffle。最后RDD F通过函数
saveAsSequenceFile输出保存到HDFS中。图1-4 Spark执行有向无环图1.4 弹性分布式数据集
本节将介绍弹性分布式数据集RDD。Spark是一个分布式计算框
架,而RDD是其对分布式内存数据的抽象,可以认为RDD就是Spark分
布式算法的数据结构,而RDD之上的操作是Spark分布式算法的核心原
语,由数据结构和原语设计上层算法。Spark最终会将算法(RDD上的
一连串操作)翻译为DAG形式的工作流进行调度,并进行分布式任务的
分发。1.4.1 RDD简介
在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数
据集(Resilient Distributed Dataset,RDD)。它在集群中的多台机器上
进行了数据分区,逻辑上可以认为是一个分布式的数组,而数组中每个
记录可以是用户自定义的任意数据结构。RDD是Spark的核心数据结
构,通过RDD的依赖关系形成Spark的调度顺序,通过对RDD的操作形
成整个Spark程序。
(1)RDD创建方式
1)从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、HBase)输入(例如HDFS)创建。
2)从父RDD转换得到新RDD。
3)通过parallelize或makeRDD将单机数据创建为分布式RDD。
(2)RDD的两种操作算子
对于RDD可以有两种操作算子:转换(Transformation)与行动
(Action)。
1)转换(Transformation):Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需
要等到有Action操作的时候才会真正触发运算。
2)行动(Action):Action算子会触发Spark提交作业(Job),并
将数据输出Spark系统。
(3)RDD的重要内部属性
通过RDD的内部属性,用户可以获取相应的元数据信息。通过这些
信息可以支持更复杂的算法或优化。
1)分区列表:通过分区列表可以找到一个RDD中包含的所有分区
及其所在地址。
2)计算每个分片的函数:通过函数可以对每个数据块进行RDD需
要进行的用户自定义函数运算。
3)对父RDD的依赖列表:为了能够回溯到父RDD,为容错等提供
支持。
4)对key-value pair数据类型RDD的分区器,控制分区策略和分区
数。通过分区函数可以确定数据记录在各个分区和节点上的分配,减少
分布不平衡。
5)每个数据分区的地址列表(如HDFS上的数据块的地址)。如果数据有副本,则通过地址列表可以获知单个数据块的所有副本
地址,为负载均衡和容错提供支持。
(4)Spark计算工作流
图1-5中描述了Spark的输入、运行转换、输出。在运行转换中通过
算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数
据进行转换和操作。
·输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数
据空间,会转化为Spark中的数据块,通过BlockManager进行管理。
·运行:在Spark数据输入形成RDD后,便可以通过变换算子fliter
等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据
缓存到内存。
·输出:程序运行结束数据会输出Spark运行时空间,存储到分布式
存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中(collect
输出到Scala集合,count返回Scala Int型数据)。图1-5 Spark算子和数据空间
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类
实现,如MappedRDD、ShuffledRDD等子类。Spark将常用的大数据操
作都转化成为RDD的子类。1.4.2 RDD算子分类
本节将主要介绍Spark算子的作用,以及算子的分类。
Spark算子大致可以分为以下两类。
1)Transformation变换算子:这种变换并不触发提交作业,完成作
业中间过程处理。
2)Action行动算子:这类算子会触发SparkContext提交Job作业。
下面分别对两类算子进行详细介绍。
1.Transformations算子
下文将介绍常用和较为重要的Transformation算子。
(1)map
将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为
一个新的元素。源码中map算子相当于初始化一个RDD,新RDD叫做
MappedRDD(this,sc.clean(f))。
图1-7中每个方框表示一个RDD分区,左侧的分区经过用户自定义
函数f:T->U映射为右侧的新RDD分区。但是,实际只有等到Action算子触发后这个f函数才会和其他函数在一个stage中对数据进行运算。在
图1-6中的第一个分区,数据记录V1输入f,通过f转换输出为转换后的
分区中的数据记录V'1。
(2)flatMap
将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的
RDD的每个集合中的元素合并为一个集合,内部创建
FlatMappedRDD(this,sc.clean(f))。
图1-6 map算子对RDD转换
图1-7表示RDD的一个分区进行flatMap函数操作,flatMap中传入的
函数为f:T->U,T和U可以是任意的数据类型。将分区中的数据通过用
户自定义函数f转换为新的数据。外部大方框可以认为是一个RDD分
区,小方框代表一个集合。V1、V2、V3在一个集合作为RDD的一个数
据项,可能存储为数组或其他容器,转换为V'1、V'2、V'3后,将原来
的数组或容器结合拆散,拆散的数据形成为RDD中的数据项。图1-7 flapMap算子对RDD转换
(3)mapPartitions
mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分
区整体的迭代器对整个分区的元素进行操作。内部实现是生成
MapPartitionsRDD。图1-8中的方框代表一个RDD分区。
图1-8中,用户通过函数f(iter)=>iter.filter(_>=3)对分区中所有
数据进行过滤,大于和等于3的数据保留。一个方块代表一个RDD分
区,含有1、2、3的分区过滤只剩下元素3。图1-8 mapPartitions算子对RDD转换
(4)union
使用union函数时需要保证两个RDD元素的数据类型相同,返回的
RDD数据类型和被合并的RDD元素数据类型相同。并不进行去重操
作,保存所有元素,如果想去重可以使用distinct。同时Spark还提
供更为简洁的使用union的API,通过++符号相当于union函数操作。
图1-9中左侧大方框代表两个RDD,大方框内的小方框代表RDD的
分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。合
并后,V1、V2、V3……V8形成一个分区,其他元素同理进行合并。
(5)cartesian
对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返
回CartesianRDD。图1-10中左侧大方框代表两个RDD,大方框内的小方
框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方
框代表分区。
例如:V1和另一个RDD中的W1、W2、Q5进行笛卡尔积运算形成
(V1,W1)、(V1,W2)、(V1,Q5)。图1-9 union算子对RDD转换图1-10 cartesian算子对RDD转换
(6)groupBy
groupBy:将元素通过函数生成相应的Key,数据就转化为Key-
Value格式,之后将Key相同的元素分为一组。
函数实现如下:
1)将用户函数预处理:
val cleanF = sc.clean(f)2)对数据map进行函数操作,最后再进行groupByKey分组操作。
this.map(t =>(cleanF(t), t)).groupByKey(p)
其中,p确定了分区个数和分区函数,也就决定了并行化的程度。
图1-11中方框代表一个RDD分区,相同key的元素合并到一个组。
例如V1和V2合并为V,Value为V1,V2。形成V,Seq(V1,V2)。
图1-11 groupBy算子对RDD转换
(7)filter
filter函数功能是对元素进行过滤,对每个元素应用f函数,返回值
为true的元素在RDD中保留,返回值为false的元素将被过滤掉。内部实
现相当于生成FilteredRDD(this,sc.clean(f))。下面代码为函数的本质实现:
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
图1-12中每个方框代表一个RDD分区,T可以是任意的类型。通过
用户自定义的过滤函数f,对每个数据项操作,将满足条件、返回结果
为true的数据项保留。例如,过滤掉V2和V3保留了V1,为区分命名为
V'1。
(8)sample
sample将RDD这个集合内的元素进行采样,获取所有元素的子集。
用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方
式。
内部实现是生成SampledRDD(withReplacement,fraction,seed)。
函数参数设置:
·withReplacement=true,表示有放回的抽样。
·withReplacement=false,表示无放回的抽样。
图1-13中的每个方框是一个RDD分区。通过sample函数,采样50%的数据。V1、V2、U1、U2……U4采样出数据V1和U1、U2形成新的
RDD。
图1-12 filter算子对RDD转换
图1-13 sample算子对RDD转换
(9)cache
cache将RDD元素从磁盘缓存到内存。相当于
persist(MEMORY_ONLY)函数的功能。图1-14 Cache算子对RDD转换
图1-14中每个方框代表一个RDD分区,左侧相当于数据分区都存储
在磁盘,通过cache算子将数据缓存在内存。
(10)persist
persist函数对RDD进行缓存操作。数据缓存在哪里依据StorageLevel
这个枚举类型进行确定。有以下几种类型的组合(见图1-14),DISK代
表磁盘,MEMORY代表内存,SER代表数据是否进行序列化存储。
下面为函数定义,StorageLevel是枚举类型,代表存储模式,用户
可以通过图1-14按需进行选择。
persist(newLevel:StorageLevel)
图1-15中列出persist函数可以进行缓存的模式。例如,MEMORY_AND_DISK_SER代表数据可以存储在内存和磁盘,并且以
序列化的方式存储,其他同理。图1-15 persist算子对RDD转换
图1-16中方框代表RDD分区。disk代表存储在磁盘,mem代表存储
在内存。数据最初全部存储在磁盘,通过
persist(MEMORY_AND_DISK)将数据缓存到内存,但是有的分区无
法容纳在内存,将含有V1、V2、V3的分区存储到磁盘。
(11)mapValues
mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。图1-17中的方框代表RDD分区。a=>a+2代表对(V1,1)这样的
Key Value数据对,数据只对Value中的1进行加2操作,返回结果为3。
图1-16 Persist算子对RDD转换
图1-17 mapValues算子RDD对转换
(12)combineByKey
下面代码为combineByKey函数的定义:
combineByKey[C](createCombiner:(V)C,mergeValue:(C, V)C,mergeCombiners:(C, C)C,partitioner:Partitioner,mapSideCombine:Boolean=true,serializer:Serializer=null):RDD[(K,C)]
说明:
·createCombiner:V=>C,C不存在的情况下,比如通过V创建seq
C。
·mergeValue:(C,V)=>C,当C已经存在的情况下,需要
merge,比如把item V加到seq C中,或者叠加。
·mergeCombiners:(C,C)=>C,合并两个C。
·partitioner:Partitioner,Shuffle时需要的Partitioner。
·mapSideCombine:Boolean=true,为了减小传输量,很多combine
可以在map端先做,比如叠加,可以先在一个partition中把所有相同的
key的value叠加,再shuffle。
·serializerClass:String=null,传输需要序列化,用户可以自定义序
列化类:
例如,相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。
图1-18中的方框代表RDD分区。如图,通过combineByKey,将
(V1,2),(V1,1)数据合并为(V1,Seq(2,1))。(13)reduceByKey
reduceByKey是比combineByKey更简单的一种情况,只是两个值合
并成一个值,(Int,Int V)to(Int,Int C),比如叠加。所以
createCombiner reduceBykey很简单,就是直接返回v,而mergeValue和
mergeCombiners逻辑是相同的,没有区别。
图1-18 comBineByKey算子对RDD转换
函数实现:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
图1-19中的方框代表RDD分区。通过用户自定义函数(A,B)
=>(A+B)函数,将相同key的数据(V1,2)和(V1,1)的value相加
运算,结果为(V1,3)。图1-19 reduceByKey算子对RDD转换
(14)join
join对两个需要连接的RDD进行cogroup函数操作,将相同key的数
据能够放到一个分区,在cogroup操作之后形成的新RDD对每个key下的
元素进行笛卡尔积的操作,返回的结果再展平,对应key下的所有元组
形成一个集合。最后返回RDD[(K,(V,W))]。
下面代码为join的函数实现,本质是通过cogroup算子先进行协同划
分,再通过flatMapValues将合并的数据打散。
this.cogroup(other,partitioner).f?latMapValues{case(vs,ws)=>
for(v<-vs;w>-ws)yield(v,w) }
图1-20是对两个RDD的join操作示意图。大方框代表RDD,小方框
代表RDD中的分区。函数对相同key的元素,如V1为key做连接后结果
为(V1,(1,1))和(V1,(1,2))。2.Actions算子
本质上在Action算子中通过SparkContext进行了提交作业的runJob操
作,触发了RDD DAG的执行。
图1-20 join算子对RDD转换
例如,Action算子collect函数的代码如下,感兴趣的读者可以顺着
这个入口进行源码剖析:
Return an array that contains all of the elements in this RDD.
def collect: Array[T] = {
提交Job
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _)
}
下面将介绍常用和较为重要的Action算子。
(1)foreach
foreach对RDD中的每个元素都应用f函数操作,不返回RDD和
Array,而是返回Uint。
图1-21表示foreach算子通过用户自定义函数对每个数据项进行操
作。本例中自定义函数为println,控制台打印所有数据项。
图1-21 foreach算子对RDD转换
(2)saveAsTextFile
函数将数据输出,存储到HDFS的指定目录。
下面为saveAsTextFile函数的内部实现,其内部通过调用
saveAsHadoopFile进行实现:this.map(x => (NullWritable.get, new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
将RDD中的每个元素映射转变为(null,x.toString),然后再将其
写入HDFS。
图1-22中左侧方框代表RDD分区,右侧方框代表HDFS的Block。通
过函数将RDD的每个分区存储为HDFS中的一个Block。
(3)collect
collect相当于toArray,toArray已经过时不推荐使用,collect将分布
式的RDD返回为一个单机的scala Array数组。在这个数组上运用scala的
函数式操作。
图1-23中左侧方框代表RDD分区,右侧方框代表单机内存中的数
组。通过函数操作,将结果返回到Driver程序所在的节点,以数组形式
存储。
图1-22 saveAsHadoopFile算子对RDD转换图1-23 Collect算子对RDD转换
(4)count
count返回整个RDD的元素个数。
内部函数实现为:
defcount:Long=sc.runJob(this,Utils.getIteratorSize_).sum
图1-24中,返回数据的个数为5。一个方块代表一个RDD分区。
图1-24 count对RDD算子转换1.5 本章小结
本章首先介绍了Spark分布式计算平台的基本概念、原理以及Spark
生态系统BDAS之上的典型组件。Spark为用户提供了系统底层细节透
明、编程接口简洁的分布式计算平台。Spark具有内存计算、实时性
高、容错性好等突出特点。同时本章介绍了Spark的计算模型,Spark会
将应用程序整体翻译为一个有向无环图进行调度和执行。相比
MapReduce,Spark提供了更加优化和复杂的执行流。读者还可以深入了
解Spark的运行机制与Spark算子,这样能更加直观地了解API的使用。
Spark提供了更加丰富的函数式算子,这样就为Spark上层组件的开发奠
定了坚实的基础。
相信读者已经想了解如何开发Spark程序,接下来将就Spark的开发
环境配置进行阐述。第2章 Spark开发与环境配置
用户进行Spark应用程序开发,一般在用户本地进行单机开发调
试,之后再将作业提交到集群生产环境中运行。下面将介绍Spark开发
环境的配置,如何编译和进行源码阅读环境的配置。
用户可以在官网上下载最新的AS软件包,网址
为:http:spark.apache.org。2.1 Spark应用开发环境配置
Spark的开发可以通过Intellij或者Eclipse IDE进行,在环境配置的开
始阶段,还需要安装相应的Scala插件。2.1.1 使用Intellij开发Spark程序
本节介绍如何使用Intellij IDEA构建Spark开发环境和源码阅读环
境。由于Intellij对Scala的支持更好,目前Spark开发团队主要使用Intellij
作为开发环境。
1.配置开发环境
(1)安装JDK
用户可以自行安装JDK8。官网地
址:http:www.oracle.comtechnetworkjavajavasedownloadsindex.html。
下载后,如果在Windows下直接运行安装程序,会自动配置环境变
量,安装成功后,在CMD的命令行下输入Java,有Java版本的日志信息
提示则证明安装成功。
如果在Linux下安装,下载JDK包解压缩后,还需要配置环境变
量。
在etcprofile文件中,配置环境变量:
export JAVA_HOME=usrjavajdk1.8
export JAVA_BIN=usrjavajdk1.8bin
export PATH=PATH:JAVA_HOMEbin
export CLASSPATH=.:JAVA_HOMElibdt.jar:JAVA_HOMElibtools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH(2)安装Scala
Spark内核采用Scala进行开发,上层通过封装接口提供Java和Python
的API,在进行开发前需要配置好Scala的开发包。
Spark对Scala的版本有约束,用户可以在Spark的官方下载界面看到
相应的Scala版本号。下载指定的Scala包,官网地址:http:www.scala-
lang.orgdownload。
(3)安装Intellij IDEA
用户可以下载安装最新版本的Intellij,官网地
址:http:www.jetbrains.comideadownload。
目前Intellij最新的版本中已经可以支持新建SBT工程,安装Scala插
件,可以很好地支持Scala开发。
(4)Intellij中安装Scala插件
在Intellij菜单中选择“Configure”,在下拉菜单中选择“Plugins”,再
选择“Browse repositories”,输入“Scala”搜索插件(如图2-1所示),在
弹出的对话框中单击“install”按钮,重启Intellij。
2.配置Spark应用开发环境1)用户在Intellij IDEA中创建Scala Project,SparkTest。
2)选择菜单中的“File”→“project structure”→“Libraries”命令,单
击“+”,导入“spark-assembly_2.10-1.0.0-incubating-hadoop2.2.0.jar”。
只需导入该jar包,该包可以通过在Spark的源码工程下执行“sbtsbt
assembly”命令生成,这个命令相当于将Spark的所有依赖包和Spark源码
打包为一个整体。
在“assemblytargetscala-2.10.4”目录下生成:spark-assembly-1.0.0-
incubating-hadoop2.2.0.jar。
3)如果IDE无法识别Scala库,则需要以同样方式将Scala库的jar包
导入。之后就可以开始开发Spark程序。如图2-2所示,本例将Spark默认
的示例程序SparkPi复制到文件。图2-1 输入“Scala”搜索插件
图2-2 编写程序
3.运行Spark程序(1)本地运行
编写完scala程序后,可以直接在Intellij中,以本地Local模式运行
(如图2-3所示),方法如下。
图2-3 以local模式运行
在Intellij中的选择“Run”→“Debug Configuration”→“Edit
Configurations”命令。在“Program arguments”文本框中输入main函数的输
入参数local。然后右键选择需要运行的类,单击“Run”按钮运行。
(2)集群上运行Spark应用jar包如果想把程序打成jar包,通过命令行的形式运行在Spark集群中,并按照以下步骤操作。
1)选择“File”→“Project Structure”,在弹出的对话框中选
择“Artifact”→“Jar”→“From Modules with dependencies”命令。
2)在选择“From Modules with dependencies”之后弹出的对话框中,选择Main函数,同时选择输出jar位置,最后单击“OK”按钮。
具体如图2-4~图2-6所示。
在图2-5中选择需要执行的Main函数。
在图2-6界面选择依赖的jar包。图2-4 生成jar包第一步图2-5 生成jar包第二步
图2-6 生成jar包第三步
在主菜单选择“Build”→“Build Artifact”命令,编译生成jar包。
3)将生成的jar包SparkTest.jar在集群的主节点,通过下面命令执
行:
java -jar SparkTest.jar
用户可以通过上面的流程和方式通过Intellij作为集成开发环境进行
Spark程序的开发。2.1.2 使用SparkShell进行交互式数据分析
如果是运行Spark Shell,那么会默认创建一个SparkContext,命名为
sc,所以不需要在Spark Shell创建新的SparkContext,SparkContext是应
用程序的上下文,调度整个应用并维护元数据信息。在运行Spark Shell
之前,可以设定参数MASTER,将Spark应用提交到MASTER指向的相
应集群或者本地模式执行,集群方式运行的作业将会分布式地运行,本
地模式执行的作业将会通过单机多线程方式运行。可以通过参数
ADD_JARS把JARS添加到classpath,用户可以通过这种方式添加所需的
第三方依赖库。
如果想spakr-shell在本地4核的CPU运行,需要如下方式启动:
MASTER=local[4] .spark-shell
这里的4是指启动4个工作线程。
如果要添加JARS,代码如下:
MASTER=local[4] ADD_JARS=code.jar .spark-shell
在spark-shell中,输入下面代码,读取dir文件:
scala>val text=sc.textFile(dir)输出文件中有多少数据项,则可用:
scala>text.count
按键,即可运行程序。
通过以上介绍,用户可以了解如何使用Spark Shell进行交互式数据
分析。
对于逻辑较为复杂或者运行时间较长的应用程序,用户可以通过本
地Intellij等IDE作为集成开发环境进行应用开发与打包,最终提交到集
群执行。对于执行时间较短的交互式分析作业,用户可以通过Spark
Shell进行相应的数据分析。2.2 远程调试Spark程序
本地调试Spark程序和传统的调试单机的Java程序基本一致,读者可
以参照原来的方式进行调试,关于单机调试本书暂不赘述。对于远程调
试服务器上的Spark代码,首先请确保在服务器和本地的Spark版本一
致。需要按前文介绍预先安装好JDK和Git。
(1)编译Spark
在服务器端和本地计算机下载Spark项目。
通过下面的命令克隆一份Spark源码:
git clone https: github.comapachespark
然后针对指定的Hadoop版本进行编译:
SPARK_HADOOP_VERSION=2.3.0 sbtsbt assembly
(2)在服务器端的配置
1)根据相应的Spark配置指定版本的Hadoop,并启动Hadoop。
2)对编译好的Spark进行配置,在confspark-env.sh文件中进行如下
配置:export SPARK_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=9999
其中“suspend=y”设置为需要挂起的模式。这样,当启动Spark的作
业时候程序会自动挂起,等待本地的IDE附加(Attach)到被调试的应
用程序上。address是开放等待连接的端口号。
(3)启动Spark集群和应用程序
1)启动Spark集群:
.sbinstart-all.sh
2)启动需要调试的程序,以Spark中自带的HdfsWordCount为例:
MASTER=spark: 10.10.1.168:7077
.binrun-example
org.apache.spark.examples.streaming.HdfsWordCount
hdfs: localhost:9000testtest.txt
3)如图2-7所示,执行后程序会挂起并等待本地的Intellij进行连
接,并显示“Listening for transport dt_socket at address: 9999”:
图2-7 远程调试
(4)本地IDE配置1)配置并连接服务器端挂起的程序。
在Intellij中选择“run”→“edit configuration”→“remote”命令,在弹出
的对话框中将默认配置中的端口号和IP改为服务器的地址,同时选择附
加(Attach)方式,如图2-8所示。
图2-8 远程调试设置
2)在“RunDebug Configurations”对话框中填入需要连接的主机名
和端口号以及其他参数,如图2-8所示。
3)在程序中设置断点进行调试。通过上面的介绍,用户可以了解如何进行远程调试。对于单机调试
方式则和日常开发的单机程序一样,常用方式是设置单机调试断点之后
再进行调试,在这里并不再展开介绍。2.3 Spark编译
用户可以通过Spark的默认构建工具SBT进行源码的编译和打包。当
用户需要对源码进行二次开发时,则需要对源码进行增量编译,通过下
面的方式读者可以实现编译和增量编译。
(1)克隆Spark源码
可通过克隆的方式克隆Spark源码,如图2-9所示。
git clone https: github.comapachespark
图2-9 git clone Spark库
这样将会从github将Spark源码下载到本地,建立本地的仓库。
(2)编译Spark源码
在Spark项目的根目录内执行编译和打包命令(如图2-10所示)。
sbtsbt assembly执行过程中会解析依赖和下载需要的依赖jar包。执行完成后会将所
有jar包打包为一个jar包,用户便可以运行Spark集群和示例了。
(3)增量编译
在有些情况下,用户需要修改源码,修改之后如果每次都重新下载
jar包或者对全部源码重新编译一遍,会很浪费时间,用户通过下面的增
量编译方法,可以只对改变的源码进行编译。
编译打包一个assembly的jar包。
sbtsbt clean assembly
图2-10 编译Spark源码这时的Spark程序已经可以运行。用户可以进入spark-shell执行程
序。
.binspark-shell
配置export SPARK_PREPEND_CLASSES参数为true,开启增量编
译模式。
export SPARK_PREPEND_CLASSES=true
继续使用spark-shell中的程序:
.binspark-shell
这时用户可以对代码进行修改和二次开发:初始开发Spark应用,之后编译。
编译Spark源码:
sbtsbt compile
继续开发Spark应用,之后编译。
sbtsbt compile
解除增量编译模式: unset SPARK_PREPEND_CLASSES
返回正常使用spark-shell的情景。
.binspark-shell Back to normal, using Spark classes from the assembly Jar
如果用户不想每次都开启一个新的SBT会话,可以在compile命令前
加上~。
sbtsbt ~ compile
(4)查看Spark源码依赖图
如果使用SBT进行查看依赖图(如图2-11所示),用户需要运行下
面的命令:
sbt
sbtsbt dependency-tree
如果使用Maven进行查看依赖图(如图2-11所示),用户需要运行
下面的命令:
Maven
mvn -DskipTests install
mvn dependency:tree图2-11 查看依赖图2.4 配置Spark源码阅读环境
由于Spark使用SBT作为项目管理构建工具,SBT的配置文件中配置
了依赖的jar包网络路径,在编译或者生成指定类型项目时需要从网络下
载jar包。需要用户预先安装git。在Linux操作系统或者Windows操作系
统上(用户可以下载Git Shell,在Git Shell中进行命令行操作)通
过“sbtsbt gen-idea”命令,生成Intellij项目文件,然后在Intellij IDE中直
接通过“Open Project”打开项目。
克隆Spark源码:
git clone https: github.comapachespark。
在所需要的软件安装好后在spark源代码根目录下,输入以下命令生
成Intellij项目:
sbtsbt gen-idea
这样SBT会自动下载依赖包和进行源文件编译以及生成Intellij所需
要的项目文件。2.5 本章小结
本章首先介绍了Spark应用程序的开发流程以及如何编译和调试
Spark程序。用户可以选用对Scala项目能够很好支持的Intellij IDE。如果
用户想深入了解Spark,以及诊断问题,建议读者配置好源码阅读环
境,进行源码分析。
通过本章的介绍,读者可以进行Spark开发环境的搭建,以及程序
的开发,后续将介绍Spark的生态系统BDAS。第3章 BDAS简介
提到Spark不得不说伯克利大学AMPLab开发的BDAS(Berkeley
Data Analytics Stack)数据分析的软件栈,如图3-1所示是其中的Spark生
态系统。其中用内存分布式大数据计算引擎Spark替代原有的
MapReduce,上层通过Spark SQL替代Hive等SQL on Hadoop系统,Spark
Streaming替换Storm等流式计算框架,GraphX替换GraphLab等大规模图
计算框架,MLlib替换Mahout等机器学习框架等,其整体框架基于内存
计算解决了原来Hadoop的性能瓶颈问题。AmpLab提出One Framework
to Rule Them All的理念,用户可以利用Spark一站式构建自己的数据分
析流水线。
图3-1 Spark生态系统
在一些数据分析应用中,用户可以使用Spark SQL预处理结构化数
据,GraphX预处理图数据,Spark Streaming实时捕获和处理流数据,最
终通过MLlib将数据融合,进行模型训练,底层各个系统通过Spark进行
运算。下面将介绍其中主要的项目。3.1 SQL on Spark[1]
AMPLab将大数据分析负载分为三大类型:批量数据处理、交互式
查询、实时流处理。而其中很重要的一环便是交互式查询。大数据分析
栈中需要满足用户ad-hoc、reporting、iterative等类型的查询需求,也需
要提供SQL接口来兼容原有数据库用户的使用习惯,同时也需要SQL能
够进行关系模式的重组。完成这些重要的SQL任务的便是Spark SQL和
Shark这两个开源分布式大数据查询引擎,它们可以理解为轻量级Hive
SQL在Spark上的实现,业界将该类技术统称为SQL on Hadoop。
在Spark峰会2014上,Databricks宣布不再支持Shark的开发,全力以
赴开发Shark的下一代技术Spark SQL,同时Hive社区也启动了Hive on
Spark项目,将Spark作为Hive(除MapReduce和Tez之外的)新执行引
擎。根据伯克利的Big Data Benchmark测试对比数据,Shark的In
Memory性能可以达到Hive的100倍,即使是On Disk也能达到10倍的性
能提升,是Hive强有力的替代解决方案。而作为Shark的进化版本的
Spark SQL,在AMPLab最新的测试中的性能已经超过Shark。图3-2展示
了Spark SQL和Hive on Spark是新的发展方向。图3-2 Spark SQL和Hive on Spark是新的发展方向
[1] 参考文章:高彦杰,陈冠诚Spark SQL:基于内存的大数据分析引擎
《程序员》2014.83.1.1 为什么使用Spark SQL
由于Shark底层依赖于Hive,这个架构的优势是对传统Hive用户可
以将Shark无缝集成进现有系统运行查询负载。但是也看到一些问题:
随着版本升级,查询优化器依赖于Hive,不方便添加新的优化策略,需
要进行另一套系统的学习和二次开发,学习成本很高。另一方面,MapReduce是进程级并行,例如:Hive在不同的进程空间会使用一些静
态变量,当在同一进程空间进行多线程并行执行,多线程同时写同名称
的静态变量会产生一致性问题,所以Shark需要使用另外一套独立维护
的Hive源码分支。而为了解决这个问题AMPLab和Databricks利用
Catalyst开发了Spark SQL。
Spark的全栈解决方案为用户提供了多样的数据分析框架,机器学
习、图计算、流计算如火如荼的发展和流行吸引了大批的学习者,为什
么人们今天还是要重视在大数据环境下使用SQL呢?笔者认为主要有以
下几点原因:
1)易用性与用户惯性。在过去的很多年中,有大批的程序员的工
作是围绕着数据库+应用的架构来做的,因为SQL的易用性提升了应用
的开发效率。程序员已经习惯了业务逻辑代码调用SQL的模式去写程
序,惯性的力量是强大的,如果还能用原有的方式解决现有的大数据问题,何乐而不为呢?提供SQL和JDBC的支持会让传统用户像以前一样
地书写程序,大大减少迁移成本。
2)生态系统的力量。很多系统软件性能好,但是未取得成功和没
落,很大程度上因为生态系统问题。传统的SQL在JDBC、ODBC、SQL
的各种标准下形成了一整套成熟的生态系统,很多应用组件和工具可以
迁移使用,像一些可视化的工具、数据分析工具等,原有企业的IT工具
可以无缝过渡。
3)数据解耦,Spark SQL正在扩展支持多种持久化层,用户可以使
用原有的持久化层存储数据,但是也可以体验和迁移到Spark SQL提供
的数据分析环境下进行Big Data的分析。3.1.2 Spark SQL架构分析
Spark SQL与传统DBMS的查询优化器+执行器的架构较为类似,只
不过其执行器是在分布式环境中实现,并采用的Spark作为执行引擎。
Spark SQL的查询优化是Catalyst,其基于Scala语言开发,可以灵活利用
Scala原生的语言特性很方便进行功能扩展,奠定了Spark SQL的发展空
间。Catalyst将SQL语言翻译成最终的执行计划,并在这个过程中进行查
询优化。这里和传统不太一样的地方就在于,SQL经过查询优化器最终
转换为可执行的查询计划是一个查询树,传统DB就可以执行这个查询
计划了。而Spark SQL最后执行还是会在Spark内将这棵执行计划树转换
为Spark的有向无环图DAG再执行。
1.Catalyst架构及执行流程分析
如图3-3所示为Catalyst的整体架构。图3-3 Spark SQL查询引擎Catalyst的架构
从图3-3中可以看到整个Catalyst是Spark SQL的调度核心,遵循传统
数据库的查询解析步骤,对SQL进行解析,转换为逻辑查询计划、物理
查询计划,最终转换为Spark的DAG后再执行。图3-4为Catalyst的执行流
程。
SqlParser将SQL语句转换为逻辑查询计划,Analyzer对逻辑查询计
划进行属性和关系关联检验,之后Optimizer通过逻辑查询优化将逻辑查
询计划转换为优化的逻辑查询计划,QueryPlanner将优化的逻辑查询计
划转换为物理查询计划,prepareForExecution调整数据分布,最后将物
理查询计划转换为执行计划进入Spark执行任务。
2.Spark SQL优化策略
查询优化是传统数据库中最为重要的一环,这项技术在传统数据库
中已经很成熟。除了查询优化,Spark SQL在存储上也进行了优化,从
以下几点查看Spark SQL的一些优化策略。图3-4 Catalyst的执行流程
(1)内存列式存储与内存缓存表
Spark SQL可以通过cacheTable将数据存储转换为列式存储,同时将
数据加载到内存进行缓存。cacheTable相当于在分布式集群的内存物化
视图,将数据进行缓存,这样迭代的或者交互式的查询不用再从HDFS
读数据,直接从内存读取数据大大减少了IO开销。列式存储的优势在于Spark SQL只需要读出用户需要的列,而不需要像行存储那样需要每
次将所有列读出,从而大大减少内存缓存数据量,更高效地利用内存数
据缓存,同时减少网络传输和IO开销。数据按照列式存储,由于是数
据类型相同的数据连续存储,能够利用序列化和压缩减少内存空间的占
用。
(2)列存储压缩
为了减少内存和硬盘空间占用,Spark SQL采用了一些压缩策略对
内存列存储数据进行压缩。Spark SQL的压缩方式要比Shark丰富很多,例如它支持PassThrough,RunLengthEncoding,DictionaryEncoding,BooleanBitSet,IntDelta,LongDelta等多种压缩方式。这样能够大幅度
减少内存空间占用和网络传输开销和IO开销。
(3)逻辑查询优化
Spark SQL在逻辑查询优化(如图3-5所示)上支持列剪枝、谓词下
压、属性合并等逻辑查询优化方法。列剪枝为了减少读取不必要的属性
列,减少数据传输和计算开销,在查询优化器进行转换的过程中会进行
列剪枝的优化。图3-5 逻辑查询优化
下面介绍一个逻辑优化例子:
SELECT Class FROM (SELECT ID,Name,Class FROM STUDENT ) S WHERE S.ID=1
Catalyst将原有查询通过谓词下压,将选择操作ID=1优先执行,这
样过滤大部分数据,通过属性合并将最后的投影只做一次最终保留Class
属性列。
(4)Join优化
Spark SQL深度借鉴传统数据库查询优化技术的精髓,同时也在分
布式环境下进行特定的优化策略调整和创新。Spark SQL对Join进行了优
化支持多种连接算法,现在的连接算法已经比Shark丰富,而且很多原来Shark的元素也逐步迁移过来。例如:BroadcastHashJoin、BroadcastNestedLoopJoin、HashJoin、LeftSemiJoin,等等。
下面介绍一个其中的BroadcastHashJoin算法思想。
BroadcastHashJoin将小表转化为广播变量进行广播,这样避免
Shuffle开销,最后在分区内做Hash连接。这里用的就是Hive中Map Side
Join的思想。同时用了DBMS中的Hash连接算法做连接。
随着Spark SQL的发展,未来会有更多的查询优化策略加入进来。
同时后续Spark SQL会支持像Shark Server一样的服务端、JDBC接口,兼
容更多的持久化层例如NoSQL,传统的DBMS等。一个强有力的结构化
大数据查询引擎正在崛起。
3.如何使用Spark SQL
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
在这里引入sqlContext下所有的方法就可以直接用sql方法进行查询 import sqlContext._
case class Person(name: String, age: Int)
下面的people是含有case类型数据的RDD,会默认由Scala的implicit机制将RDD转换为
SchemaRDD,SchemaRDD是SparkSQL中的核心RDD
val people = sc.textFile(examplessrcmainresourcespeople.txt).map(_.split(,)).map(p => Person(p(0), p(1).trim.toInt))
在内存的元数据中注册表信息,这样一个Spark SQL表就创建完成了 people.registerAsTable(people)
sql语句就会触发上面分析的Spark SQL的执行过程,读者可以参考上面的图示 val teenagers = sql(SELECT name FROM people WHERE age >= 13 AND age <= 19)
最后生成teenagers也是一个RDD
teenagers.map(t =>Name: + t(0)).collect.foreach(println)
通过之前的介绍,读者对支撑结构化数据分析任务的Spark SQL的原理与使用有了一定的了解。在生产环境中,有一类数据分析任务对响
应延迟要求高,需要实时处理流数据,在BDAS中,Spark Streaming用
于支撑大规模流式处理分析任务。3.2 Spark Streaming
Spark Streaming是一个批处理的流式计算框架。它的核心执行引擎
是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错
性。下面将对Spark Streaming进行详细的介绍。3.2.1 Spark Streaming简介
Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流
式大数据处理能力。Spark Streaming将数据流以时间片为单位进行分割
形成RDD,使用RDD操作处理每一块数据,每块数据(也就是RDD)
都会生成一个Spark Job进行处理,最终以批处理的方式处理每个时间片
的数据。请参照图3-6。
图3-6 Spark Streaming生成Job
Spark Streaming编程接口和Spark很相似。在Spark中,通过在RDD
上用Transformation(例如:map,filter等)和Action(例如:count,collect等)算子进行运算。在Spark Streaming中通过在DStream(表示数据流的RDD序列)上进行算子运算。图3-7为Spark Streaming转化过程。
图3-7 Spark Streaming转化过程图3-7中Spark Streaming将程序中对DStream的操作转换为DStream有
向无环图(DAG)。对每个时间片,DStream DAG会产生一个RDD
DAG。在RDD中通过Action算子触发一个Job,然后Spark Streaming会将
Job提交给JobManager。JobManager会将Job插入维护的Job队列,然后
JobManager会将队列中的Job逐个提交给Spark DAGScheduler,然后
Spark会调度Job并将Task分发到各节点的Executor上执行。
(1)优势及特点
1)多范式数据分析管道:能和Spark生态系统其他组件融合,实现
交互查询和机器学习等多范式组合处理。
2)扩展性:可以运行在100个节点以上的集群,延迟可以控制在秒
级。
3)容错性:使用Spark的Lineage及内存维护两份数据进行备份达到
容错。RDD通过Lineage记录下之前的操作,如果某节点在运行时出现
故障,则可以通过冗余备份数据在其他节点重新计算得到。
对于Spark Streaming来说,其RDD的Lineage关系如图3-8所示,图
中的每个长椭圆形表示一个RDD,椭圆中的每个圆形代表一个RDD中
的一个分区(Partition),图中的每一列的多个RDD表示一个
DStream(图中有3个DStream),t=1和t=2代表不同的分片下的不同
RDD DAG。图中的每一个RDD都是通过Lineage相连接形成了DAG,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(通常由三份
副本)也可以来自于网络(Spark Streaming会将网络输入数据的每一个
数据流复制两份到其他的机器)都能通过冗余数据及Lineage的重算机
制保证容错性。所以RDD中任意的Partition出错,都可以并行地在其他
机器上将缺失的Partition重算出来。
图3-8 Spark Streaming容错性
4)吞吐量大:将数据转换为RDD,基于批处理的方式,提升数据
处理吞吐量。图3-9是Berkeley利用WordCount和Grep两个用例所做的测
试。图3-9 Spark Streaming与Storm吞吐量比较图
5)实时性:Spark Streaming也是一个实时计算框架,Spark
Streaming能够满足除对实时性要求非常高(例如:高频实时交易)之外
的所有流式准实时计算场景。目前Spark Streaming最小的Batch Size的选
取在0.5~2s(对比:Storm目前最小的延迟是100ms左右)。
(2)适用场景
Spark Streaming适合需要历史数据和实时数据结合进行分析的应用
场景,对于实时性要求不是特别高的场景也能够胜任。3.2.2 Spark Streaming架构
通过图3-10,读者可以对Spark Streaming的整体架构有宏观把握。
图3-10 Spark Streaming架构图
组件介绍:
·Network InputTracker:通过接收器接收流数据,并将流数据映射
为输入DStream。
·Job Scheduler:周期性地查询DStream图,通过输入的流数据生成
Spark Job,将Spark Job提交给Job Manager进行执行。
·JobManager:维护一个Job队列,将队列中的Job提交到Spark进行
执行。
通过图3-10可以看到D-Stream Lineage Graph进行整体的流数据的DAG图调度,Taskscheduler负责具体的任务分发,Block tracker进行块
管理。在从节点,如果是通过网络输入的流数据会将数据存储两份进行
容错。Input receiver源源不断地接收输入流,Task execution负责执行主
节点分发的任务,Block manager负责块管理。Spark Streaming整体架构
和Spark很相近,很多思想是可以迁移理解的。3.2.3 Spark Streaming原理剖析
下面将由一个example示例,通过源码呈现Spark Streaming的底层机
制。
1.初始化与接收数据
Spark Streaming通过分布在各个节点上的接收器,缓存接收到的流
数据,并将数据包装成Spark能够处理的RDD的格式,输入到Spark
Streaming,之后由Spark Streaming将作业提交到Spark集群进行执行,如
图3-11所示。图3-11 Spark Streaming执行模型
初始化的过程主要可以概括为两点。
1)调度器的初始化。
调度器调度Spark Streaming的运行,用户可以通过配置相关参数进
行调优。
2)将输入流的接收器转化为RDD在集群进行分布式分配,然后启
动接收器集合中的每个接收器。
针对不同的数据源,Spark Streaming提供了不同的数据接收器,分
布在各个节点上的每个接收器可以认为是一个特定的进程,接收一部分
流数据作为输入。
用户也可以针对自身生产环境状况,自定义开发相应的数据接收
器。
如图3-12所示,接收器分布在各个节点上。通过下面代码,创建并
行的、在不同Worker节点分布的receiver集合。
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r,Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
在这里创造RDD相当于进入SparkContext.makeRDD
此处将receivers的集合作为一个RDD进行分区RDD[Receiver] 即使是只有一个输入流,按照这个分布式也是流的输入端在worker而不再Master …
将receivers的集合打散,然后启动它们…
ssc.sparkContext.runJob(tempRDD, startReceiver)… }
图3-12 Spark Streaming接收器
2.数据接收与转化
在“初始化与接收数据”部分中已经介绍过,receiver集合转换为
RDD,在集群上分布式地接收数据流。那么每个receiver是怎样接收并
处理数据流的呢?读者可以通过图3-13,对输入流的处理有一个全面的
了解。图3-13为Spark Streaming数据接收与转化的示意图。
图3-13的主要流程如下。1)数据缓冲:在receiver的receive函数中接收流数据,将接收到的
数据源源不断地放入到BlockGenerator.currentBuffer。
2)缓冲数据转化为数据块:在BlockGenerator中有一个定时器
(RecurringTimer),将当前缓冲区中的数据以用户定义的时间间隔封
装为一个数据块Block,放入到BlockGenerator的blocksForPush队列中
(这个队列)。
3)数据块转化为Spark数据块:在BlockGenerator中有一个
BlockPushingThread线程,不断地将blocksForPush队列中的块传递给
BlockManager,让BlockManager将数据存储为块。BlockManager负责
Spark中的块管理。
4)元数据存储:在pushArrayBuffer方法中还会将已经由
BlockManager存储的元数据信息(例如:Block的id号)传递给
ReceiverTracker,ReceiverTracker会将存储的blockId放到对应StreamId的
队列中。图3-13 Spark Streaming数据接收与转化
图中部分组件的作用如下:
·KeepPushingBlocks:调用此方法持续写入和保持数据块。·pushArrayBuffer:调用pushArrayBuffer方法将数据块存储到
BlockManager中。
·reportPushedBlock:存储完成后汇报数据块信息到主节点。
·receivedBlockInfo(Meta Data):已经接收到的数据块元数据记
录。
·streamId:数据流Id。
·BlockInfo:数据块元数据信息。
·BlockManager.put:数据块存储器写入备份数据块到其他节点。
·Receiver:数据块接收器,接收数据块。
·BlockGenerator:数据块生成器,将数据缓存生成Spark能处理的数
据块。
·BlockGenerator.currentBuffer:缓存网络接收的数据记录,等待之
后转换为Spark的数据块。
·BlockGenerator.blocksForPushing:将一块连续数据记录暂存为数据
块,待后续转换为Spark能够处理的BlockManager中的数据块(A Block
As a BlockManager’s Block)。·BlockGenerator.blockPushingThread:守护线程负责将数据块转换
为BlockManager中数据块。
·ReceiveTracker:输入数据块的元数据管理器,负责管理和记录数
据块。
·BlockManager:Spark数据块管理器,负责数据块在内存或磁盘的
管理。
·RecurringTimer:时间触发器,每隔一定时间进行缓存数据的转
换。
上面的过程中涉及最多的类就是BlockGenerator,在数据转化的过
程中其扮演者不可或缺的角色。
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,receiverId: Int,conf: SparkConf) extends Logging
感兴趣的读者可以参照图中所示的类和方法进行更加具体的机制的
了解。篇幅所限,对这个数据生成过程不再做具体的代码剖析。
3.生成RDD与提交Spark Job
Spark Streaming根据时间段,将数据切分为RDD,然后触发RDD的
Action提交Job,Job被提交到Job Manager中的Job Queue中由JobScheduler调度,之后Job Scheduler将Job提交到Spark的Job调度器,然后
将Job转换为大量的任务分发给Spark集群执行,如图3-14所示。
图3-14 Spark Streaming调度模型
Job generator中通过下面的方法生成Job进行调度和执行。
从下面的代码可以看出job是从outputStream中生成的,然后再触发
反向回溯执行整个DStream DAG,类似RDD的机制。
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
获取输入数据块的元数据信息 val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
. . .
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>jobScheduler.reportError(Error generating jobs for time + time, e)
}
eventActor !DoCheckpoint(time)
}
下面进入JobScheduler的submitJobSet方法一探究竟,JobScheduler是整个Spark
Streaming调度的核心组件 def submitJobSet(jobSet: JobSet) {
. . .
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
. . .
}
进入Graph生成job的方法,Graph本质是DStreamGraph类生成的对象 final private[streaming] class DStreamGraph extends Serializable with Logging {
def generateJobs(time: Time): Seq[Job] = {
. . .
private val inputStreams = new ArrayBuffer[InputDStream[_]]
private val outputStreams = new ArrayBuffer[DStream[_]]
. . .
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
. . .
}
outputStreams中的对象是DStream,下面进入DStream的generateJob一探究竟 private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
此处相当于针对每个时间段生成的一个RDD,会调用SparkContext的方法runJob提交Spark的一个 context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
在DStream算是父类,一些具体的DStream例如SocketInputStream等的类的父类可以通过
SocketInputDStream看是如何通过上面的getOrCompute生成RDD的 private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
generatedRDDs.get(time) match {
. . .
case None => {
if (isTimeValid(time)) {
Dstream是个父类,这里代表的是子类的compute方法,DStream通过compute调用用户自定义函数。当任务执行时,同一个 compute(time) match {
. . .
generatedRDDs.put(time, newRDD)
. . .
}
在SocketInputDStream的compute方法中生成了对应时间片的RDD:
override def compute(validTime: Time): Option[RDD[T]] = {if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]))
}
}
Spark Streaming在保证实时处理的要求下还能够保证高吞吐与容错
性。用户的数据分析中很多情况下也存在需要分析图数据,运行图算
法,通过GraphX可以简便地开发分布式图分析算法。3.3 GraphX
GraphX是Spark中的一个重要子项目,它利用Spark作为计算引擎,实现了大规模图计算的功能,并提供了类似Pregel的编程接口。GraphX
的出现,将Spark生态系统变得更加完善和丰富;同时以其与Spark生态
系统其他组件很好的融合,以及强大的图数据处理能力,在工业界得到
了广泛的应用。本章主要介绍GraphX的架构、原理和使用方式。3.3.1 GraphX简介
GraphX是常用图算法在Spark上的并行化实现,同时提供了丰富的
API接口。图算法是很多复杂机器学习算法的基础,在单机环境下有很
多应用案例。在大数据环境下,图的规模大到一定程度后,单机很难解
决大规模的图计算,需要将算法并行化,在分布式集群上进行大规模图
处理。目前,比较成熟的方案有GraphX和GraphLab等大规模图计算框
架。
GraphX的特点是离线计算、批量处理,基于同步的BSP模型(Bulk
Synchronous Parallel Computing Model,整体同步并行计算模型),这样
的优势在于可以提升数据处理的吞吐量和规模,但是会造成速度上稍逊
一筹。目前大规模图处理框架还有基于MPI模型的异步图计算模型
GraphLab和同样基于BSP模型的Graph等。
现在和GraphX可以组合使用的分布式图数据库是Neo4J。Neo4J一
个高性能的、非关系的、具有完全事务特性的、鲁棒的图数据库。另一
个数据库是Titan,Titan是一个分布式的图形数据库,特别为存储和处
理大规模图形而优化。二者均可作为GraphX的持久化层,存储大规模
图数据。3.3.2 GraphX的使用简介
类似Spark在RDD上提供了一组基本操作符(如map,filter,reduce),GraphX同样也有针对Graph的基本操作符,用户可以在这些
操作符传入自定义函数和通过修改图的节点属性或结构生成新的图。
GraphX提供了丰富的针对图数据的操作符。Graph类中定义了核心
的、优化过的操作符。一些更加方便的由底层核心操作符组合而成的上
层操作符在GraphOps中进行定义。正是通过Scala语言的implicit关键
字,GraphOps中定义的操作符可以作为Graph中的成员。这样做的目的
是未来GraphX会支持不同类型的图,而每种类型的图的呈现必须实现
核心的操作符和复用大部分的GraphOps中实现的操作符。
下面将操作符分为几个类别进行介绍。
(1)属性操作符
表3-1给出了GraphX的属性操作符。通过属性操作符,用户可以在
点或边上进行相应运算,构建和开发图算法。
表3-1 属性操作符(2)结构操作符
表3-2所示为GraphX的结构操作符。通过这些操作可以生成改变图
结构之后的图数据。
表3-2 结构操作符
(3)图信息属性(见表3-3)
表3-3所示为图信息属性,通过图信息属性,用户可以获取图上的
统计信息。表3-3 图信息属性
(4)邻接聚集操作符与Join操作符
表3-4所示为邻接聚集操作符与Join操作符。通过邻接操作符可以将
两个相近的表进行连接。
表3-4 邻接聚集操作符与Join操作符
(5)缓存操作符
表3-5所示为缓存操作符。表3-5 缓存操作符3.3.3 GraphX体系结构
1.整体架构
GraphX的整体架构(如图3-15所示)可以分为三部分。
图3-15 GraphX架构
存储和原语层:Graph类是图计算的核心类。内部含有
VertexRDD、EdgeRDD和RDD[EdgeTriplet]引用。GraphImpl是Graph类的子类,实现了图操作。
·接口层:在底层RDD的基础之上实现了Pregel模型,BSP模式的计
算接口。
·算法层:基于Pregel接口实现了常用的图算法。包括:PageRank、SVDPlusPlus、TriangleCount、ConnectedComponents、StronglyConnectedConponents等算法。
2.存储结构
在正式的工业级的应用中,图的规模极大,上百万个节点是经常出
现的。为了提高处理速度和数据量,希望能够将图以分布式的方式来存
储、处理图数据。图的分布式存储大致有两种方式,边分割(Edge
Cut)和点分割(Vertex Cut),如图3-16所示。最早期的图计算的框架
中,使用的是Edge Cut(边分割)的存储方式。而GraphX的设计者考虑
到真实世界中的大规模图大多是边多于点的图,所以采用点分割方式存
储。点分割能够减少网络传输和存储开销。底层实现是将边放到各个节
点存储,而在进行数据交换时将点在各个机器之间广播进行传输。对边
进行分区和存储的算法主要基于PartitionStrategy中封装的分区方法。这
里面的几种分区方法分别是对不同应用情景的权衡,用户可以根据具体
的需求进行分区方式的选择。用户可以在程序中指定边的分区方式。例
如:val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D))
图3-16 GraphX存储模型
一旦边已经在集群上分区和存储,大规模并行图计算的关键挑战就
变成了如何将点的属性连接到边。GraphX的处理方式是集群上移动传
播点的属性数据。由于不是每个分区都需要所有的点属性(因为每个分区只是一部分边),GraphX内部维持一个路由表(routing table),这
样当需要广播点到需要这个点的边的所在分区时就可以通过路由表映
射,将需要的点属性传输到指定的边分区。
点分割的好处是在边的存储上是没有冗余数据的,而且对于某个点
与它的邻居的交互操作,只要满足交换律和结合律。例如,求顶点的邻
接顶点权重的和,可以在不同的节点进行并行运算,最后把每个节点的
运行结果进行汇总,网络开销较小。代价是每个顶点属性可能要冗余存
储多份,更新点数据时要有数据同步开销。
3.使用技巧
采样观察可以通过不同的采样比例,先从小数据量进行计算、观察
效果、调整参数,再逐步增加数据量进行大规模的运算。可以通过RDD
的sample方法进行采样。同时通过Web UI观察集群的资源消耗。
1)内存释放:保留旧图对象的引用,但是尽快释放不使用的图的
顶点属性,节省空间占用。通过unPersistVertices方法进行顶点释放。
2)GC调优,请读者参考性能调优章节介绍。
3)调试:在各个时间点可以通过graph.vertices.count进行调
试,观测图现有状态。进行问题诊断和调优。
GraphX通过提供简洁的API以及优化的图数据管理,简化了用户开发分布式图算法的复杂度。在大数据分析中更多的应用场景是进行机器
学习,下面通过MLlib的介绍,读者可以了解如何通过Spark之上的
MLlib进行复杂的机器学习。3.4 MLlib
MLlib是构建在Spark上的分布式机器学习库,充分利用了Spark的
内存计算和适合迭代型计算的优势,将性能大幅度提升。同时由于
Spark算子丰富的表现力,让大规模机器学习的算法开发不再复杂。3.4.1 MLlib简介
MLlib是一些常用的机器学习算法和库在Spark平台上的实现。
MLlib是AMPLab的在研机器学习项目MLBase的底层组件。MLBase是一
个机器学习平台,MLI是一个接口层,提供很多结构,MLlib是底层算
法实现层,如图3-17所示。
图3-17 MLbase
MLlib中包含分类与回归、聚类、协同过滤、数据降维组件以及底
层的优化库,如图3-18所示。图3-18 MLlib组件图
通过图3-18读者可以对MLlib的整体组件和依赖库有一个宏观的把
握。
下面对图3-18中读者可能不太熟悉的底层组件进行简要介绍。
BLASLAPACK层:LAPACK是用Fortran编写的算法库,顾名思
义,Linear Algebra PACKage,是为了解决通用的线性代数问题的。另外必须要提的算法包是BLAS(Basic Linear Algebra Subprograms),其
实LAPACK底层是使用了BLAS库的。不少计算机厂商都提供了针对不
同处理器进行了优化的BLASLAPACK算法包。
Netlib-java(官网为:https:github.comfommilnetlib-java)是一个
对底层BLAS,LAPACK封装的Java接口层。
Breeze(官网为:https:github.comscalanlpbreeze)是一个Scala写
的数值处理库,提供向量、矩阵运算等API。
库依赖:MLlib底层使用到了Scala书写的线性代数库Breeze,Breeze底层依赖netlib-java库。netlib-java底层依赖原生的Fortran
routines。所以,当用户使用时需要在节点上预先安装gfortran runtime
library(下载地址:https:github.commikiobraunjblaswikiMissing-
Libraries)。由于许可证(license)问题,官方的MLlib依赖集中没有引
入netlib-java原生库的依赖。如果运行时环境没有可用原生库,用户将
会看到警告信息。如果程序中需要使用netlib-java的库,用户需要在项
目中引入com.github.fommil.netlib:all:1.1.2的依赖或者参照指南(网
址为:https:github.comfommilnetlib-
javablobmasterREADME.mdmachine-optimised-system-libraries)来建
立用户自己的项目。如果用户需要使用python接口,则需要1.4或者更高
版本的NumPy(注意:MLlib源码中注释有ExperimentalDeveloperApi的
API在未来的发布版本中可能会进行调整和改变,官方会在不同版本发布时提供迁移指南)。3.4.2 MLlib中的聚类和分类
聚类和分类是机器学习中两个常用的算法,聚类将数据分开为不同
的集合,分类对新数据进行类别预测,下面将就两类算法进行介绍。
1.聚类和分类
(1)什么是聚类
聚类(Clustering)指将数据对象分组成为多个类或者簇
(Cluster),它的目标是:在同一个簇中的对象之间具有较高的相似
度,而不同簇中的对象差别较大。其实,聚类在人们日常生活中是一种
常见行为,即所谓的“物以类聚,人以群分”,其核心思想在于分组,人
们不断地改进聚类模式来学习如何区分各个事物和人。
(2)什么是分类
数据仓库、数据库或者其他信息库中有许多可以为商业、科研等活
动的决策提供所需要的知识。分类与预测即是其中的两种数据分析形
式,可以用来抽取能够描述重要数据集合或预测未来数据趋势。分类方
法(Classification)用于预测数据对象的离散类别(Categorical
Label);预测方法(Prediction)用于预测数据对象的连续取值。分类流程:新样本→特征选取→分类→评价
训练流程:训练集→特征选取→训练→分类器
最初,机器学习的分类应用大多都是在这些方法及基于内存基础上
所构造的算法。目前,数据挖掘方法都要求具有基于外存以处理大规模
数据集合能力,同时具有可扩展能力。
2.MLlib中的聚类和分类
MLlib目前已经实现了K-Means聚类算法、朴素贝叶斯和决策树分
类算法。这里主要介绍被广泛使用的K-Means聚类算法和朴素贝叶斯分
类算法。
(1)K-Means算法
1)K-Means算法简介。
K-Means聚类算法能轻松地对聚类问题建模。K-Means聚类算法容
易理解,并且能在分布式的环境下并行运行。学习K-Means聚类算法,能更容易地理解聚类算法的优缺点,以及其他算法对于特定数据的高效
性。
K-Means聚类算法中的K是聚类的数目,在算法中会强制要求用户
输入。如果将新闻聚类成诸如政治、经济、文化等大类,可以选择10~20的数字作为K。因为这种顶级类别的数量是很小的。如果要对这
些新闻详细分类,选择50~100的数字也是没有问题的。K-Means聚类算
法主要可以分为三步。第一步是为待聚类的点寻找聚类中心;第二步是
计算每个点聚类中心的距离,将每个点聚类到离该点最近的聚类中去;
第三步是计算聚类中所有点的坐标平均值,并将这个平均值作为新的聚
类中心点。反复执行第二步,直到聚类中心不再进行大范围的移动,或
者聚类次数达到要求为止。
2)k-Means示例。
下面的例子中有7名选手,每名选手有两个类别的比分,A类比分
和B类比分如表3-6所示。
表3-6 A类和B类比分
这些数据将会聚为两个簇。随机选取1号和4号选手作为簇的中心,如表3-7所示。
表3-7 1号和4号选手信息将1号和4号选手分别作为两个簇的中心点,下面每一步将选取的点
计算和两个簇中心的欧几里德距离,哪个中心距离小就放到哪个簇中,如表3-8所示。
表3-8 第一步聚类
第一轮聚类的结果产生了,如表3-9所示。
表3-9 第一轮结果
第二轮将使用(1.8,2.3)和(4.1,5.4)作为新的簇中心,重复以
上的过程。直到迭代次数达到用户设定的次数终止。最后一轮的迭代分
出的两个簇就是最后的聚类结果。3)MLlib之K-Means源码解析。
MLlib中的K-Means的原理是:在同一个数据集上,跑多个K-Means
算法(每个称为一个run),然后返回效果最好的那个聚类的类簇中
心。初始的类簇中心点的选取有两种方法,一种是随机,另一种是采用
KMeans||(KMeans++的一个变种)。算法的停止条件是迭代次数达到
设置的次数,或者在某一次迭代后所有run的K-Means算法都收敛。
①类簇中心初始化。
本节介绍的初始化方法是对于每个运行的K-Means都随机选择K个
点作为初始类簇:
private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs k, new Random.nextInt).toSeq
Array.tabulate(runs)(r => sample.slice(r k, (r + 1) k).toArray)
}
②计算属于某个类簇的点。
在每一次迭代中,首先会计算属于各个类簇的点,然后更新各个类
簇的中心。
K-Means算法的并行实现通过Spark 的mapPartitions函数,通过该函数获取到分区的迭代器。
之后对于每个运行算法中的每个类簇计算属于该类簇的点的个数以及累加和。 val totalContribs = data.mapPartitions { points =>
val runs = activeCenters.length
val k = activeCenters(0).length
val dims = activeCenters(0)(0).length
val sums = Array.fill(runs, k)(new DoubleMatrix(dims))
val counts = Array.fill(runs, k)(0L)for (point <- points; (centers, runIndex) <- activeCenters.zipWithIndex) {
找到距离该点最近的类簇中心点 val (bestCenter, cost) = KMeans.findClosest(centers, point)
统计该运行算法开销, 用于在之后选取开销最小的那个运行的算法 costAccums(runIndex) += cost
将该点加到最近的类簇的统计总和中去, 方便之后计算该类簇的新中心点 sums(runIndex)(bestCenter).addi(new DoubleMatrix(point))
将距离该点最近的类簇的点数量加1,sum.divi(count)就是类簇的新中心 counts(runIndex)(bestCenter) += 1
}
val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
((i, j), (sums(i)(j), counts(i)(j)))
}
contribs.iterator
对于每个运行算法的每个类簇计算属于该类簇的点的个数和加和 }.reduceByKey(mergeContribs).collectAsMap
mergeContribs是一个负责合并的函数:
def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
(p1._1.addi(p2._1), p1._2 + p2._2)
}
③更新类簇的中心点。
for ((run, i) <- activeRuns.zipWithIndex) {
var changed = false
for (j <- 0 until k) {
val (sum, count) = totalContribs((i, j))
if (count != 0) {
计算类簇的新的中心点 val newCenter = sum.divi(count).data
if (MLUtils.squaredDistance(newCenter, centers(run)(j)) > epsilon epsilon) {
此处与代码和算法的停止条件有关 changed = true
}
centers(run)(j) = newCenter
}
}
如果某个run的KMeans算法的某轮次迭代中K个类簇的中心点变化都不超过指定阈值 if (!changed) {
active(run) = false
logInfo(Run + run + finished in + (iteration + 1) + iterations)
}
costs(run) = costAccums(i).value
}
④算法停止条件。
算法的停止条件是迭代次数达到设置的次数,或者所有运行的K-Means算法都收敛:
while (iteration < maxIterations !activeRuns.isEmpty)
上文对典型聚类算法K-Means原理进行介绍,下面将对典型的分类
算法朴素贝叶斯算法进行介绍。
(2)朴素贝叶斯分类算法
朴素贝叶斯分类算法是贝叶斯分类算法多个变种之一。朴素指假设
各属性之间是相互独立的。研究发现,在大多数情况下,朴素贝叶斯分
类算法(naive bayes classifier)在性能上与决策树(decision tree)、神
经网络(netural network)相当。贝叶斯分类算法在大数据集的应用中
具有方法简便、准确率高和速度快的优点。但事实上,贝叶斯分类也有
其缺点。由于贝叶斯定理假设一个属性值对给定类的影响独立于其他的
属性值,而此假设在实际情况中经常是不成立的,则其分类准确率可能
会下降。
朴素贝叶斯分类算法是一种监督学习算法,使用朴素贝叶斯分类算
法对文本进行分类,主要有两种模型,即多项式模型(multinomial
model)和伯努利模型(bernoulli model)。MLlib使用的是被广泛使用
的多项式模型。本书将以一个实际的例子来简略介绍使用多项式模型的
朴素贝叶斯分类算法。在多项式模型中,设某文档d=(t1,t2,…,tk),tk是该文档中出
现过的单词,允许重复。
先验概率P(c)=类c下单词总数整个训练样本的单词总数
类条件概率P(tk|c)=(类c下单词tk在各个文档中出现过的次
数之和+1)
(类c下单词总数+|V|)
V是训练样本的单词表(即抽取单词,单词出现多次,只算一
个),|V|则表示训练样本包含多少种单词。P(tk|c)可以看作是单词tk
在证明d属于类c上提供了多大的证据,而P(c)则可以认为是类别c在
整体上占多大比例(有多大可能性)。
给定一组分好类的文本训练数据,如表3-10所示。
给定一个新样本(河北河北河北吉林香港),对其进行分类。该文
本用属性向量表示为d=(河北,河北,河北,吉林,香港),类别集合
为Y={yes,no}。
表3-10 文本训练数据类yes下总共有8个单词,类no下总共有3个单词,训练样本单词总
数为11,因此P(yes)=811,P(no)=311。类条件概率计算如下:
P(yes)=811, P(no)=311。类条件概率计算如下:
P(河北 | yes)=(5+1)(8+6)=614=37
P(河北 | yes)=P(吉林 | yes)= (0+1)(8+6)=114
P(河北|no)=(1+1)(3+6)=29
P(Japan|no)=P(吉林| no) =(1+1)(3+6)=29
分母中的8,是指yes类别下textc的长度,也即训练样本的单词总
数,6是指训练样本有河北、北京、上海、广东、吉林、香港,共6个单
词,3是指no类下共有3个单词。
有了以上类条件概率,开始计算后验概率:
P(yes | d)=(37)3×114×114×811=108184877≈0.00058417
P(no | d)= (29)3×29×29×311=32216513≈0.00014780
比较大小,即可知道这个文档属于类别河北。3.5 本章小结
本章主要介绍了BDAS中广泛应用的几个数据分析组件。SQL on
Spark提供在Spark上的SQL查询功能。让用户可以基于内存计算和SQL
进行大数据分析。通过Spark Streaming,用户可以构建实时流处理应
用,其高吞吐量,以及适合历史和实时数据混合分析的特性使其在流数
据处理框架中突出重围。GraphX充当Spark生态系统中图计算的角色,其简洁的API让图处理算法的书写更加便捷。最后介绍了MLlib——
Spark上的机器学习库,它充分利用Spark内存计算和适合迭代的特性,使分布式系统与并行机器学习算法实现了完美的结合。相信随着Spark
生态系统的日臻完善,这些组件还会取得长足发展。第4章 Lamda架构日志分析流水线
4.1 日志分析概述
随着互联网的发展,在互联网上产生了大量的Web日志或移动应用
日志,日志包含用户最重要的信息,通过日志分析,用户可以获取到网
站或应用的访问量,哪个网页访问人数最多,哪个网页最有价值、用户
的特征、用户的兴趣等。
一般中型的网站(10万的PV[1]
以上),每天会产生1GB以上Web日
志文件。大型或超大型的网站,可能每小时就会产生500GB~1TB的数
据量。
对于日志的这种规模的数据,通过Spark进行大规模日志分析与日
志处理,能够达到很好的效果。
Web日志由Web服务器产生,现在互联网公司使用的主流的服务器
可能是Nginx、Apache、Tomcat等。从Web日志中,我们可以获取网站
每类页面的PV值(页面浏览)、UV(独立IP数)。更复杂一些的,可
以计算得出用户所检索的关键词排行榜、用户停留时间最高的页面等。
更为复杂的,构建广告点击模型、分析用户行为特征等。
1.日志格式目前常见的Web日志格式主要由两类:一种日志格式是Apache的
NCSA日志格式,另一种日志格式是IIS的W3C日志格式。
下面以Nginx日志格式为例进行讲解。
Nginx日志示例格式:
222.68.172.111 - - [18Sep2013:06:49:57 +0000]
GET imagesmy.jpg HTTP1.1 200 19939
http:www.angularjs.cnA00n Mozilla5.0 (Windows NT 6.1) AppleWebKit537.36 (KHTML, like Gecko) Chrome29.0.1547.66 Safari537.36
以下是本例中涉及的一些要素。
·remote_addr:记录客户端的IP地址。本例为222.68.172.111。
·remote_user:记录客户端用户名称,本例--表示为空。
·time_local:记录访问时间与时区,本例为[18Sep2013:06:49:
57+0000]。
·request:记录请求的URL与HTTP协议,本例为GETimagesmy.jpg
HTTP1.1。
·status:记录请求状态,成功是200。
·body_bytes_sent:记录发送给客户端文件主体内容大小,本例中为
19939。·http_referer:用来记录从哪个页面链接访问过来
的,http:www.angularjs.cnA00n。
·http_user_agent:记录客户浏览器的相关信息,本例中为
Mozilla5.0(Windows NT 6.1)AppleWebKit537.36(KHTML,like
Gecko)Chrome29.0.1547.66 Safari537.36。
注意 如果用户想要更多的信息,则要用其他手段去获取,通过JS
代码单独发送请求,并使用cookies记录用户的访问信息。
通过利用这些日志信息,我们可以深入分析用户行为或网站状况
了。
2.传统单机日志数据分析示例
当数据量较小(10MB,100MB,10GB),单机处理能够解决,可
以通过各种UnixLinux命令或者工具,awk、grep、sort、join等都是日志
分析的利器,再配合Perl、Python、正则表达式,基本就可以解决常见
日志分析的问题。
(1)Linux Shell进行单机日志分析示例
例如,想从上面提到的nginx日志中得到访问量最高的前10个IP,通
过以下Shell进行分析: cat access.log.10 | awk '{a[1]++} END {for(b in a) print b\ta[b]}'
| sort -k2 -r | head -n 10
163.177.71.12 972
101.226.68.137 972
183.195.232.138 971
50.116.27.194 97
14.17.29.86 96
61.135.216.104 94
61.135.216.105 91
61.186.190.41 9
59.39.192.108 9
220.181.51.212 9
(2)Python进行单机日志分析示例
检查Nginx的日志文件,统计基于每个独立IP地址的点击率,代码
如下:
!usrbinenv pythoncoding:utf8
import re
import sys
contents = sys.argv[1]def NginxIpHite(logfile_path):
IP:4个字符串,每个字符串为1~3个数字,由点连接 ipadd = r'\.'.join([r'\d{1,3}']4)
re_ip = re.compile(ipadd)
iphitlisting = {}
for line in open(contents):
match = re_ip.match(line)
if match:
ip = match.group( )
如果IP存在增加1,否则设置点击率为1
iphitlisting[ip] = iphitlisting.get(ip, 0) + 1
print iphitlisting
NginxIpHite(contents)
运行并打印结果如下:
[root@chlinux 06] .nginx_ip.py access_20140610.log
{'183.3.121.84': 1, '182.118.20.184': 2, '182.118.20.185': 1, '190.52.120.38': 1, '182.118.20.187': 1, '202.108.251.214': 2, '61.135.190.101': 2, '103.22.181.247': 1, '101.226.33.190': 3, '183.129.168.131': 1, '66.249.73.29': 26, '182.118.20.202': 1, '157.56.93.38': 2, '219.139.102.237': 4, '220.181.108.178': 1, '220.181.108.179': 1, '182.118.25.233': 4, '182.118.25.232': 1, '182.118.25.231': 2, '182.118.20.186': 1, '174.129.228.67': 20}
此脚本返回的是一个Key-Value映射,包含访问Nginx服务器的各个IP的点击数。用户可以通过这个示例再进行深入拓展,进行更丰富的日
志信息和知识的获取。
(3)大规模分布式日志分析情况
当数据量每天以10GB、100GB增长的时候,单机处理能力已经不
能满足需求。此时就需要增加系统的扩展性,用大数据分析和并行计算
来解决。在Spark出现之前,海量数据存储和海量日志分析都是基于
Hadoop、Hive等数据分析系统的。Spark的出现,使得全栈数据分析更
加容易。并且,Spark非常适合构建多范式日志分析流水线。我们将介
绍如何使用Spark构建日志分析流水线。
[1] Page View,页面访问量。4.2 日志分析指标
下面将介绍常用网站的运营数据分析指标。在数据越来越重要的趋
势下,数据化运营已经提上互联网公司的日程,如果监控网站或应用的
状况时发现瓶颈问题,我们需要针对网站或应用相关指标进行统计和分
析得出的。随着移动互联网的发展,越来越多的移动数据分析公司与工
具也不断涌现,其中代表性的为友盟、Talking Data等,为公司提供数
据化运营支持。
网站运行日志分析常用指标如下:
·PV(Page View):网站页面访问数,也称作网站流量。
·UV(Unique Visitor):页面IP的访问量统计,访问用户数,即独
立IP。
·PVPU(Page View Per User):平均每位用户访问页面数。
·漏斗模型与转化率:漏斗模型指的是多个不同的事件按照一定依
赖顺序依次触发的流程中的转化模型。用户通常会对应用中的一些关键
路径进行分析。比如注册流程、购物流程、交易流程等。以电商应用的
购物流程为例:
·1浏览商品页→2放入购物车→3生成订单→4支付订单→5完成交易·我们可以根据这些关键路径来计算每一步的转化率。转化率指的
是完成当前事件的用户中触发下一个依赖事件的用户所占比例。
·留存率:用户在某段时间内开始使用应用,经过一段时间后,仍
然继续使用这个应用的用户被认作是留存。这部分用户占开始新增用户
的比例即是留存率。
·用户属性:用户的基本属性和行为特征,将用户打标签,帮助产
品进一步的营销与推荐。
最终希望通过一个仪表盘展示出整个网站的统计指标信息,如图4-
1所示。
图4-1 日志统计效果图4.3 Lamda架构
日志分析中既有离线大规模分析的需求,又有实时性的需求,这就
可以通过采用Lamda架构构建日志分析流水线。
1.Lamda架构简介
Lambda架构的目的是为大数据分析应用程序提供一个低响应延迟
的组合数据传输环境。
Lambda系统架构定义了一套明确的架构原则,它为建立一套强大
的和可扩展的数据系统定义了架构范式。在Lamda架构中,被读取的数
据是不可变的,在并行处理过程中数据会依次进入流处理系统和批处理
系统,同时进行实时处理和离线数据分析。在查询时,当这两者都返回
结果后,才算是完成一次完整的查询。从逻辑上看,传输过程发生了两
次,一次是在批处理中,一次是在流处理中。
Lamda架构并不限定其中的具体系统,要根据实际情况进行调整优
化。大数据的系统选型具体可以有很多的组合变化。例如可以将图4-2
中的Kafka、Storm、Hadoop等换成其他类似的系统,例如Spark
Streaming、Spark等,惯常的做法是使用两个数据库来存储数据输出
表,一个存储实时表,响应实时查询需求,另外一个存储批处理表,返
回离线计算结果。图4-2 Lamda数据分析架构
它是由三层组成:批处理层、服务层和速度层。
①批处理层:Hadoop、Spark、Tez等都可以作为批处理层的处理工
具,HDFS、HBase等都可以作为数据持久化系统。
②服务层:用于加载和实现数据库中的批处理视图,以便用户能够
查询,不一定需要随机写,但是支持批更新和随机读,例如采用
ElephantDB、Voldemort。③快速处理层:主要处理新数据和服务层更新造成的高延迟补偿,利用流处理系统(如Storm、S4、Spark Streaming)和随机读写数据存储
库来计算实时视图(HBase)。批处理和服务层定期处理和转换实时视
图为批处理视图。
为了获得一个完整结果,批处理和实时视图都必须被同时查询和融
合(实时代表新数据)。
下面借鉴Lamda架构,设计整个数据分析流水线架构,如图4-3所
示。
图4-3 日志分析流水线整体架构图
本例中实时日志分析流水线大致按以下步骤操作。①数据采集:采用Flume NG进行数据采集。
②数据汇总与转发:通过Flume将数据转发汇总到实时消息系统
Kafka。
③数据处理:采用Spark Streaming进行实时数据处理。
④结果呈现:采用Flask作为可视化呈现工具进行结果呈现。
离线日志分析流水线大致按以下步骤操作。
①数据存储:通过Flume将数据转储到HDFS。
②数据处理:通过Spark SQL进行数据预处理。
③结果呈现:结果汇总存储到MySQL最后通过Flask进行结果呈
现。4.4 构建日志分析数据流水线
后续的章节将介绍日志数据采集、日志数据汇总、日志实时分析、日志离线分析及可视化,来构建数据分析流水线。4.4.1 用Flume进行日志采集
Web日志由Web服务器产生,生产环境的服务器可能是Nginx、Apache、Tomcat、IIS等。
例如,可以将Tomcat的日志收集到指定的目录,Tomcat安装
在opttomcat,日志存放在varlogdata。其他服务器(如Apache、Nginx、IIS等),用户可以根据相应服务器的默认目录进行相关配置。
1.Flume简介
Flume是Cloudera开发的日志收集系统,具有分布式、高可用等特
点,为大数据日志采集、汇总聚合和转储传输提供了支持。为了保证
Flume的扩展性和灵活性,在日志系统中定制各类数据发送方及数据接
收方。同时Flume提供对数据进行简单处理,并写各种数据到接受方的
能力。
Flume的核心是把数据从数据源收集过来,再送到数据接收方。为
了保证送达成功,在送到目的地之前,会先缓存数据,待数据真正到达
目的地后,删除自己缓存的数据。
Flume传输的数据的基本单位是事件(Event),如果是文本文件,通常是一行记录,这也是事务的基本单位。事件(Event)从源(Source)传输到通道(Channel),再到数据输出槽(Sink),本身为
一个比特(byte)数组,并可携带消息头(headers)信息。
Flume运行的核心是Agent。它是一个完整的数据收集工具,含有三
个核心组件,分别是Source、Channel、Sink。通过这些组件,Event可
以从一个地方流向另一个地方,如图4-4所示。
图4-4 Flume架构
Flume核心组件如下。
·Source可以接收外部源发送过来的数据。不同的Source,可以接受
不同的数据格式。比如有目录池(Spooling Directory)数据源,可以监
控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取
其内容。
·Channel是一个存储地,接收Source的输出,直到有Sink消费掉
Channel中的数据。Channel中的数据直到进入到下一个Channel中或者进入终端才会被删除。当Sink写入失败后,可以自动重启,不会造成数据
丢失,因此很可靠。
·Sink会消费Channel中的数据,然后送给外部源或者其他Source。
如数据可以写入到HDFS或者HBase中。
Flume允许多个Agent连在一起,形成前后相连的多级数据传输通
道。
2.Flume安装与配置
(1)安装Flume
1)安装JDK。
2)安装Flume。
http: mirrors.cnnic.cnapacheflume1.5.0apache-flume-1.5.0-bin.tar.gz。 tar xvzf apache-flume-1.5.0-bin.tar.gz
mv apache-flume-1.5.0-bin apache-flume-1.5.0
ln -s apache-flume-1.5.0 flume
3)环境变量设置。
vim etcprofile
export JAVA_HOME=usrlocaljdk
export CLASSPATH=.:JAVA_HOMElibdt.jar:JAVA_HOMElibtools.jar
export PATH=PATH:JAVA_HOMEbin
export FLUME_HOME=usrlocalflume
export FLUME_CONF_DIR=FLUME_HOMEconf
export PATH=.:PATH::FLUME_HOMEbin
source etcprofile(2)创建Agent配置文件将数据输出到HDFS
这需要修改flume.conf中的配置,具体如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
描述和配置source
第1步:配置数据源 a1.sources.r1.type = exec
a1.sources.r1.channels = c1
配置需要监控的日志输出目录 a1.sources.r1.command = tail -F varlogdata
Describe the sink
第2步:配置数据输出 a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs:192.168.11.177:9000flumeevents%Y%m%d%H%M
a1.sinks.k1.hdfs.filePrefix=cmcc
a1.sinks.k1.hdfs.minBlockReplicas=1
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
Use a channel which buffers events in memory
第3步:配置数据通道 a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
Bind the source and sink to the channel
第4步:将三者级联 a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(3)启动Flume Agent
cd usrlocalflume
nohup binflume-ng agent -n agent1 -c conf -f confflume-conf.properties amp;
通过上面介绍的一系列步骤,已经可以将Flume收集的数据输出到
HDFS。3.整合Flume与Kafka、HDFS
下面通过Sink设置的修改将Flume的日志输出到HDFS和Kafka。下
面的IP地址只是示例,用户根据具体需求改为生产环境中的IP地址。
define [sink] begin
define the sink k1,定义HDFS输出端
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs: 192.168.11.174:9000flumeevents%Y%m%d
a1.sinks.k1.hdfs.filePrefix=cmcc-%H
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.minBlockReplicas=1
a1.sinks.k1.hdfs.rollInterval=3600
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
define the sink k2,定义Kafka输出端 a1.sinks.k2.channel=c2
a1.sinks.k2.type=com.cmcc.chiwei.Kafka.CmccKafkaSink
a1.sinks.k2.metadata.broker.list=192.168.11.174:9092,192.168.11.175:9092,192.168.11.176:9092
a1.sinks.k2.partition.key=0
a1.sinks.k2.partitioner.class=com.cmcc.chiwei.Kafka.CmccPartition
a1.sinks.k2.serializer.class=Kafka.serializer.StringEncoder
a1.sinks.k2.request.required.acks=0
a1.sinks.k2.cmcc.encoding=UTF-8
a1.sinks.k2.cmcc.topic.name=cmcc
a1.sinks.k2.producer.type=async
a1.sinks.k2.batchSize=100
define [sink] end
以上配置将同样的数据无差异输出传递到多个输出端。
a1.sources.r1.selector.type=replicating
本例配置了两个输出端:一个是输出到Kafka,为了提高性能,用
内存通道。另一个是输出到HDFS,离线分析。在配置文件中设置两个sink:一个是Kafka的输出通道K2。一个是
HDFS的输出通道K1。
a1.sources = r1
a1.sinks = k1 k2
a1.channels=c1 c2
define [channel] begin
define the channel c1,a1.channels.c1.type=file
a1.channels.c1.checkpointDir=homeflumeflumeCheckpoint
a1.channels.c1.dataDirs=homeflumeflumeData , homeflumeflumeDataExt
a1.channels.c1.capacity=2000000
a1.channels.c1.transactionCapacity=100
define the channel c2
a1.channels.c2.type=memory
a1.channels.c2.capacity=2000000
a1.channels.c2.transactionCapacity=100
define [channel] end
大家在配置文件中添加如上信息,即可配置好,同时输出到Kafka
和HDFS。4.4.2 用Kafka将日志汇总
由于Flume收集的数据和后端处理的下游系统之间可能存在多对多
的关系,为了解耦合保证数据传输延迟,选用Kafka作为消息中间层进
行日志中转。
Apache Kafka是由Apache软件基金会开发的一个开源消息系统项
目,由Scala写成。Kafka最初是由LinkedIn开发,并于2011年初开源。
2012年10月从Apache Incubator“毕业”。该项目的目标是为处理实时数据
提供一个统一、高通量、低等待的平台[1]。它提供了类似于JMS的特
性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
Kafka进行消息保存时会根据Topic进行归类,发送消息者成为
Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例
组成,每个实例(Server)成为Broker。无论是Kafka集群,还是
Producer和Consumer都依赖Zookeeper来保证系统可用性集群保存一些元
(Meta)信息,如图4-5所示。图4-5 Kafka架构图
1.概念和术语
·消息:全称为Message,是指在生产者、服务端和消费者之间传输
数据。
·消息代理:全称为Message Broker,通俗来讲就是指该MQ的服务
端或者服务器。
·消息生产者:全称为Message Producer,负责产生消息并发送消息
到meta服务器。
·消息消费者:全称为Message Consumer,负责消息的消费。
·消息的主题:全称为Message Topic,由用户定义并在Broker上配
置。Producer发送消息到某个Topic下,Consumer从某个Topic下消费消息。
·主题的分区:也称为Partition,可以把一个Topic分为多个分区。每
个分区是一个有序、不可变的、顺序递增的Commit Log
·消费者分组:全称为Consumer Group,由多个消费者组成,共同
消费一个Topi ......
Spark大数据分析实战
高彦杰 倪亚宇 著
译者 译
ISBN:978-7-111-52307-9
本书纸版由机械工业出版社于2015年出版,电子版由华章分社(北京华
章图文信息有限公司,北京奥维博世图书发行有限公司)全球范围内制
作与发行。
版权所有,侵权必究
客服热线:+ 86-10-68995265
客服信箱:service@bbbvip.com
官方网址:www.hzmedia.com.cn
新浪微博 @华章数媒
微信公众号 华章电子书(微信号:hzebook)目录
前言
第1章 Spark简介
1.1 初识Spark
1.2 Spark生态系统BDAS
1.3 Spark架构与运行逻辑
1.4 弹性分布式数据集
1.4.1 RDD简介
1.4.2 RDD算子分类
1.5 本章小结
第2章 Spark开发与环境配置
2.1 Spark应用开发环境配置
2.1.1 使用Intellij开发Spark程序
2.1.2 使用SparkShell进行交互式数据分析
2.2 远程调试Spark程序
2.3 Spark编译
2.4 配置Spark源码阅读环境
2.5 本章小结
第3章 BDAS简介
3.1 SQL on Spark
3.1.1 为什么使用Spark SQL3.1.2 Spark SQL架构分析
3.2 Spark Streaming
3.2.1 Spark Streaming简介
3.2.2 Spark Streaming架构
3.2.3 Spark Streaming原理剖析
3.3 GraphX
3.3.1 GraphX简介
3.3.2 GraphX的使用简介
3.3.3 GraphX体系结构
3.4 MLlib
3.4.1 MLlib简介
3.4.2 MLlib中的聚类和分类
3.5 本章小结
第4章 Lamda架构日志分析流水线
4.1 日志分析概述
4.2 日志分析指标
4.3 Lamda架构
4.4 构建日志分析数据流水线
4.4.1 用Flume进行日志采集
4.4.2 用Kafka将日志汇总
4.4.3 用Spark Streaming进行实时日志分析4.4.4 Spark SQL离线日志分析
4.4.5 用Flask将日志KPI可视化
4.5 本章小结
第5章 基于云平台和用户日志的推荐系统
5.1 Azure云平台简介
5.1.1 Azure网站模型
5.1.2 Azure数据存储
5.1.3 Azure Queue消息传递
5.2 系统架构
5.3 构建Node.js应用
5.3.1 创建Azure Web应用
5.3.2 构建本地Node.js网站
5.3.3 发布应用到云平台
5.4 数据收集与预处理
5.4.1 通过JS收集用户行为日志
5.4.2 用户实时行为回传到Azure Queue
5.5 Spark Streaming实时分析用户日志
5.5.1 构建Azure Queue的Spark Streaming Receiver
5.5.2 Spark Streaming实时处理Azure Queue日志
5.5.3 Spark Streaming数据存储于Azure Table
5.6 MLlib离线训练模型5.6.1 加载训练数据
5.6.2 使用rating RDD训练ALS模型
5.6.3 使用ALS模型进行电影推荐
5.6.4 评估模型的均方差
5.7 本章小结
第6章 Twitter情感分析
6.1 系统架构
6.2 Twitter数据收集
6.2.1 设置
6.2.2 Spark Streaming接收并输出Tweet
6.3 数据预处理与Cassandra存储
6.3.1 添加SBT依赖
6.3.2 创建Cassandra Schema
6.3.3 数据存储于Cassandra
6.4 Spark Streaming热点Twitter分析
6.5 Spark Streaming在线情感分析
6.6 Spark SQL进行Twitter分析
6.6.1 读取Cassandra数据
6.6.2 查看JSON数据模式
6.6.3 Spark SQL分析Twitter
6.7 Twitter可视化6.8 本章小结
第7章 热点新闻分析系统
7.1 新闻数据分析
7.2 系统架构
7.3 爬虫抓取网络信息
7.3.1 Scrapy简介
7.3.2 创建基于Scrapy的新闻爬虫
7.3.3 爬虫分布式化
7.4 新闻文本数据预处理
7.5 新闻聚类
7.5.1 数据转换为向量(向量空间模型VSM)
7.5.2 新闻聚类
7.5.3 词向量同义词查询
7.5.4 实时热点新闻分析
7.6 Spark Elastic Search构建全文检索引擎
7.6.1 部署Elastic Search
7.6.2 用Elastic Search索引MongoDB数据
7.6.3 通过Elastic Search检索数据
7.7 本章小结
第8章 构建分布式的协同过滤推荐系统
8.1 推荐系统简介8.2 协同过滤介绍
8.2.1 基于用户的协同过滤算法User-based CF
8.2.2 基于项目的协同过滤算法Item-based CF
8.2.3 基于模型的协同过滤推荐Model-based CF
8.3 基于Spark的矩阵运算实现协同过滤算法
8.3.1 Spark中的矩阵类型
8.3.2 Spark中的矩阵运算
8.3.3 实现User-based协同过滤的示例
8.3.4 实现Item-based协同过滤的示例
8.3.5 基于奇异值分解实现Model-based协同过滤的示例
8.4 基于Spark的MLlib实现协同过滤算法
8.4.1 MLlib的推荐算法工具
8.4.2 MLlib协同过滤推荐示例
8.5 案例:使用MLlib协同过滤实现电影推荐
8.5.1 MovieLens数据集
8.5.2 确定最佳的协同过滤模型参数
8.5.3 利用最佳模型进行电影推荐
8.6 本章小结
第9章 基于Spark的社交网络分析
9.1 社交网络介绍
9.1.1 社交网络的类型9.1.2 社交网络的相关概念
9.2 社交网络中社团挖掘算法
9.2.1 聚类分析和K均值算法简介
9.2.2 社团挖掘的衡量指标
9.2.3 基于谱聚类的社团挖掘算法
9.3 Spark中的K均值算法
9.3.1 Spark中与K均值有关的 对象和方法
9.3.2 Spark下K均值算法示例
9.4 案例:基于Spark的Facebook社团挖掘
9.4.1 SNAP社交网络数据集 介绍
9.4.2 基于Spark的社团挖掘实现
9.5 社交网络中的链路预测算法
9.5.1 分类学习简介
9.5.2 分类器的评价指标
9.5.3 基于Logistic回归的链路预测算法
9.6 Spark MLlib中的Logistic回归
9.6.1 分类器相关对象
9.6.2 模型验证对象
9.6.3 基于Spark的Logistic回归示例
9.7 案例:基于Spark的链路预测算法
9.7.1 SNAP符号社交网络 Epinions数据集9.7.2 基于Spark的链路预测算法
9.8 本章小结
第10章 基于Spark的大规模新闻主题分析
10.1 主题模型简介
10.2 主题模型LDA
10.2.1 LDA模型介绍
10.2.2 LDA的训练算法
10.3 Spark中的LDA模型
10.3.1 MLlib对LDA的支持
10.3.2 Spark中LDA模型训练示例
10.4 案例:Newsgroups新闻的主题分析
10.4.1 Newsgroups数据集介绍
10.4.2 交叉验证估计新闻的主题个数
10.4.3 基于主题模型的文本聚类算法
10.4.4 基于主题模型的文本分类算法
10.5 本章小结
第11章 构建分布式的搜索引擎
11.1 搜索引擎简介
11.2 搜索排序概述
11.3 查询无关模型PageRank
11.4 基于Spark的分布式PageRank实现11.4.1 PageRank的MapReduce 实现
11.4.2 Spark的分布式图模型GraphX
11.4.3 基于GraphX的PageRank实现
11.5 案例:GoogleWeb Graph的PageRank计算
11.6 查询相关模型Ranking SVM
11.7 Spark中支持向量机的实现
11.7.1 Spark中的支持向量机 模型
11.7.2 使用Spark测试数据演示支持向量机的训练
11.8 案例:基于MSLR数据集的查询排序
11.8.1 Microsoft Learning to Rank 数据集介绍
11.8.2 基于Spark的Ranking SVM实现
11.9 本章小结前言
为什么要写这本书
Spark大数据技术还在如火如荼地发展,Spark中国峰会的召开,各
地meetup的火爆举行,开源软件Spark也因此水涨船高,很多公司已经
将Spark大范围落地并且应用。Spark使用者的需求已经从最初的部署安
装、运行实例,到现在越来越需要通过Spark构建丰富的数据分析应
用。写一本Spark实用案例类的技术书籍,是一个持续了很久的想法。
由于工作较为紧张,最初只是将参与或学习过的Spark相关案例进行总
结,但是随着时间的推移,最终还是打算将其中通用的算法、系统架构
以及应用场景抽象出来,并进行适当简化,也算是一种总结和分享。
Spark发源于美国加州大学伯克利分校AMPLab的大数据分析平台,它立足于内存计算,从多迭代批量处理出发,兼顾数据仓库、流处理和
图计算等多种计算范式,是大数据系统领域的全栈计算平台。Spark当
下已成为Apache基金会的顶级开源项目,拥有着庞大的社区支持,生态
系统日益完善,技术也逐渐走向成熟。
现在越来越多的同行已经了解Spark,并且开始使用Spark,但是国
内缺少一本Spark的实战案例类的书籍,很多Spark初学者和开发人员只
能参考网络上零散的博客或文档,学习效率较慢。本书也正是为了解决上述问题而着意编写。
本书希望带给读者一个系统化的视角,秉承大道至简的主导思想,介绍Spark的基本原理,如何在Spark上构建复杂数据分析算法,以及
Spark如何与其他开源系统进行结合构建数据分析应用,让读者开启
Spark技术应用之旅。
本书特色
Spark作为一款基于内存的分布式计算框架,具有简洁的接口,可
以快速构建上层数据分析算法,同时具有很好的兼容性,能够结合其他
开源数据分析系统构建数据分析应用或者产品。
为了适合读者阅读和掌握知识结构,本书从Spark基本概念和机制
介绍入手,结合笔者实践经验讲解如何在Spark之上构建机器学习算
法,并最后结合不同的应用场景构建数据分析应用。
读者对象
本书中一些实操和应用章节,比较适数据分析和开发人员,可以作
为工作手边书;机器学习和算法方面的章节,比较适合机器学习和算法
工程师,可以分享经验,拓展解决问题的思路。
·Spark初学者·Spark应用开发人员
·Spark机器学习爱好者
·开源软件爱好者
·其他对大数据技术感兴趣的人员
如何阅读本书
本书分为11章内容。
第1章 从Spark概念出发,介绍Spark的来龙去脉,阐述Spark机制
与如何进行Spark编程。
第2章 详细介绍Spark的开发环境配置。
第3章 详细介绍Spark生态系统重要组件Spark SQL、Spark
Streaming、GraphX、MLlib的实现机制,为后续使用奠定基础。
第4章 详细介绍如何通过Flume、Kafka、Spark Streaming、HDFS、Flask等开源工具构建实时与离线数据分析流水线。
第5章 从实际出发,详细介绍如何在Azure云平台,通过Node.js、Azure Queue、Azure Table、Spark Streaming、MLlib等组件对用户行为
数据进行分析与推荐。第6章 详细介绍如何通过Twitter API、Spark SQL、Spark
Streaming、Cassandra、D3等组件对Twitter进行情感分析与统计分析。
第7章 详细介绍如何通过Scrapy、Kafka、MongoDB、Spark、Spark Streaming、Elastic Search等组件对新闻进行抓取、分析、热点新
闻聚类等挖掘工作。
第8章 详细介绍了协同过滤概念和模型,讲解了如何在Spark中实
现基于Item-based、User-based和Model-based协同过滤算法的推荐系统。
第9章 详细介绍了社交网络分析的基本概念和经典算法,以及如
何利用Spark实现这些经典算法,用于真实网络的分析。
第10章 详细介绍了主题分析模型(LDA),讲解如何在Spark中
实现LDA算法,并且对真实的新闻数据进行分析。
第11章 详细介绍了搜索引擎的基本原理,以及其中用到的核心搜
索排序相关算法——PageRank和Ranking SVM,并讲解了如何在Spark中
实现PageRank和Ranking SVM算法,以及如何对真实的Web数据进行分
析。
如果你有一定的经验,能够理解Spark的相关基础知识和使用技
巧,那么可以直接阅读第4~11章。然而,如果你是一名初学者,请一定
从第1章的基础知识开始学起。勘误和支持
由于笔者的水平有限,加之编写时间仓促,书中难免会出现一些错
误或者不准确的地方,恳请读者批评指正。如果你有更多的宝贵意见,我们会尽量为读者提供最满意的解答。你也可以通过微博@高彦杰gyj,博客:http:blog.csdn.netgaoyanjie55,或者邮箱gaoyanjie55@163.com联
系到高彦杰。你也可以通过邮箱niyayu@foxmail.com联系到倪亚宇。
期待能够得到大家的真挚反馈,在技术之路上互勉共进。
致谢
感谢微软亚洲研究院的Thomas先生和Ying Yan,在每一次迷茫时给
予我鼓励与支持。
感谢机械工业出版社华章公司的杨福川和高婧雅,在近半年的时间
里始终支持我们的写作,你们的鼓励和帮助引导我顺利完成全部书稿。
特别致谢
谨以此书献给我最亲爱的爱人,家人,同事,以及众多热爱大数据
技术的朋友们!
高彦杰第1章 Spark简介
本章主要介绍Spark框架的概念、生态系统、架构及RDD等,并围
绕Spark的BDAS项目及其子项目进行了简要介绍。目前,Spark生态系
统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,本章只进行简要介绍,后
续章节会有详细阐述。1.1 初识Spark
Spark是基于内存计算的大数据并行计算框架,因为它基于内存计
算,所以提高了在大数据环境下数据处理的实时性,同时保证了高容错
性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集
群。
1.Spark执行的特点
Hadoop中包含计算框架MapReduce和分布式文件系统HDFS。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存
储层,融入Hadoop的生态系统,并弥补MapReduce的不足。
(1)中间结果输出
Spark将执行工作流抽象为通用的有向无环图执行计划(DAG),可以将多Stage的任务串联或者并行执行,而无需将Stage的中间结果输
出到HDFS中,类似的引擎包括Flink、Dryad、Tez。
(2)数据格式和内存布局
Spark抽象出分布式内存存储结构弹性分布式数据集RDD,可以理
解为利用分布式的数组来进行数据的存储。RDD能支持粗粒度写操作,但对于读取操作,它可以精确到每条记录。Spark的特性是能够控制数
据在不同节点上的分区,用户可以自定义分区策略。
(3)执行策略
Spark执行过程中不同Stage之间需要进行Shuffle。Shuffle是连接有
依赖的Stage的桥梁,上游Stage输出到下游Stage中必须经过Shuffle这个
环节,通过Shuffle将相同的分组数据拆分后聚合到同一个节点再处理。
Spark Shuffle支持基于Hash或基于排序的分布式聚合机制。
(4)任务调度的开销
Spark采用了事件驱动的类库AKKA来启动任务,通过线程池的复
用线程来避免系统启动和切换开销。
2.Spark的优势
Spark的一站式解决方案有很多的优势,分别如下所述。
(1)打造全栈多计算范式的高效数据流水线
支持复杂查询与数据分析任务。在简单的“Map”及“Reduce”操作之
外,Spark还支持SQL查询、流式计算、机器学习和图算法。同时,用
户可以在同一个工作流中无缝搭配这些计算范式。
(2)轻量级快速处理Spark代码量较小,这得益于Scala语言的简洁和丰富表达力,以及
Spark通过External DataSource API充分利用和集成Hadoop等其他第三方
组件的能力。同时Spark基于内存计算,可通过中间结果缓存在内存来
减少磁盘IO以达到性能的提升。
(3)易于使用,支持多语言
Spark支持通过Scala、Java和Python编写程序,这允许开发者在自己
熟悉的语言环境下进行工作。它自带了80多个算子,同时允许在Shell中
进行交互式计算。用户可以利用Spark像书写单机程序一样书写分布式
程序,轻松利用Spark搭建大数据内存计算平台并充分利用内存计算,实现海量数据的实时处理。
(4)与External Data Source多数据源支持
Spark可以独立运行,除了可以运行在当下的Yarn集群管理之外,它还可以读取已有的任何Hadoop数据。它可以运行多种数据源,比如
Parquet、Hive、HBase、HDFS等。这个特性让用户可以轻易迁移已有
的持久化层数据。
(5)社区活跃度高
Spark起源于2009年,当下已有超过600多位工程师贡献过代码。开
源系统的发展不应只看一时之快,更重要的是一个活跃的社区和强大的生态系统的支持。
同时也应该看到Spark并不是完美的,RDD模型适合的是粗粒度的
全局数据并行计算;不适合细粒度的、需要异步更新的计算。对于一些
计算需求,如果要针对特定工作负载达到最优性能,还需要使用一些其
他的大数据系统。例如,图计算领域的GraphLab在特定计算负载性能上
优于GraphX,流计算中的Storm在实时性要求很高的场合要更胜Spark
Streaming一筹。1.2 Spark生态系统BDAS
目前,Spark已经发展成为包含众多子项目的大数据计算平台。
BDAS是伯克利大学提出的基于Spark的数据分析栈(BDAS)。其核心
框架是Spark,同时涵盖支持结构化数据SQL查询与分析的查询引擎
Spark SQL,提供机器学习功能的系统MLBase及底层的分布式机器学习
库MLlib,并行图计算框架GraphX,流计算框架Spark Streaming,近似
查询引擎BlinkDB,内存分布式文件系统Tachyon,资源管理框架Mesos
等子项目。这些子项目在Spark上层提供了更高层、更丰富的计算范
式。
图1-1展现了BDAS的主要项目结构图。
图1-1 伯克利数据分析栈(BDAS)主要项目结构图
下面对BDAS的各个子项目进行更详细的介绍。(1)Spark
Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不
仅实现了MapReduce的算子map函数和reduce函数及计算模型,还提供
了更为丰富的算子,例如filter、join、groupByKey等。Spark将分布式数
据抽象为RDD(弹性分布式数据集),并实现了应用任务调度、RPC、序列化和压缩,并为运行在其上层的组件提供API。其底层采用Scala这
种函数式语言书写而成,并且所提供的API深度借鉴函数式的编程思
想,提供与Scala类似的编程接口。
图1-2所示即为Spark的处理流程(主要对象为RDD)。
Spark将数据在分布式环境下分区,然后将作业转化为有向无环图
(DAG),并分阶段进行DAG的调度和任务的分布式并行处理。
(2)Spark SQL
Spark SQL提供在大数据上的SQL查询功能,类似于Shark在整个生
态系统的角色,它们可以统称为SQL on Spark。之前,由于Shark的查询
编译和优化器依赖Hive,使得Shark不得不维护一套Hive分支。而Spark
SQL使用Catalyst作为查询解析和优化器,并在底层使用Spark作为执行
引擎实现SQL的算子。用户可以在Spark上直接书写SQL,相当于为
Spark扩充了一套SQL算子,这无疑更加丰富了Spark的算子和功能。同
时Spark SQL不断兼容不同的持久化存储(如HDFS、Hive等),为其发展奠定广阔的空间。
图1-2 Spark的任务处理流程图
(3)Spark Streaming
Spark Streaming通过将流数据按指定时间片累积为RDD,然后将每
个RDD进行批处理,进而实现大规模的流数据处理。其吞吐量能够超越
现有主流流处理框架Storm,并提供丰富的API用于流数据计算。
(4)GraphX
GraphX基于BSP模型,在Spark之上封装类似Pregel的接口,进行大
规模同步全局的图计算,尤其是当用户进行多轮迭代的时候,基于Spark内存计算的优势尤为明显。
(5)MLlib
MLlib是Spark之上的分布式机器学习算法库,同时包括相关的测试
和数据生成器。MLlib支持常见的机器学习问题,例如分类、回归、聚
类以及协同过滤,同时也包括一个底层的梯度下降优化基础算法。1.3 Spark架构与运行逻辑
1.Spark的架构
·Driver:运行Application的main函数并且创建SparkContext。
·Client:用户提交作业的客户端。
·Worker:集群中任何可以运行Application代码的节点,运行一个或
多个Executor进程。
·Executor:运行在Worker的Task执行器,Executor启动线程池运行
Task,并且负责将数据存在内存或者磁盘上。每个Application都会申请
各自的Executor来处理任务。
·SparkContext:整个应用的上下文,控制应用的生命周期。
·RDD:Spark的基本计算单元,一组RDD形成执行的有向无环图
RDD Graph。
·DAG Scheduler:根据Job构建基于Stage的DAG工作流,并提交
Stage给TaskScheduler。
·TaskScheduler:将Task分发给Executor执行。·SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。
2.运行逻辑
(1)Spark作业提交流程
如图1-3所示,Client提交应用,Master找到一个Worker启动
Driver,Driver向Master或者资源管理器申请资源,之后将应用转化为
RDD有向无环图,再由DAGScheduler将RDD有向无环图转化为Stage的
有向无环图提交给TaskScheduler,由TaskScheduler提交任务给Executor
进行执行。任务执行的过程中其他组件再协同工作确保整个应用顺利执
行。图1-3 Spark架构
(2)Spark作业运行逻辑
如图1-4所示,在Spark应用中,整个执行流程在逻辑上运算之间会
形成有向无环图。Action算子触发之后会将所有累积的算子形成一个有
向无环图,然后由调度器调度该图上的任务进行运算。Spark的调度方
式与MapReduce有所不同。Spark根据RDD之间不同的依赖关系切分形
成不同的阶段(Stage),一个阶段包含一系列函数进行流水线执行。
图中的A、B、C、D、E、F,分别代表不同的RDD,RDD内的一个方框
代表一个数据块。数据从HDFS输入Spark,形成RDD A和RDD C,RDD
C上执行map操作,转换为RDD D,RDD B和RDD E进行join操作转换为
F,而在B到F的过程中又会进行Shuffle。最后RDD F通过函数
saveAsSequenceFile输出保存到HDFS中。图1-4 Spark执行有向无环图1.4 弹性分布式数据集
本节将介绍弹性分布式数据集RDD。Spark是一个分布式计算框
架,而RDD是其对分布式内存数据的抽象,可以认为RDD就是Spark分
布式算法的数据结构,而RDD之上的操作是Spark分布式算法的核心原
语,由数据结构和原语设计上层算法。Spark最终会将算法(RDD上的
一连串操作)翻译为DAG形式的工作流进行调度,并进行分布式任务的
分发。1.4.1 RDD简介
在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数
据集(Resilient Distributed Dataset,RDD)。它在集群中的多台机器上
进行了数据分区,逻辑上可以认为是一个分布式的数组,而数组中每个
记录可以是用户自定义的任意数据结构。RDD是Spark的核心数据结
构,通过RDD的依赖关系形成Spark的调度顺序,通过对RDD的操作形
成整个Spark程序。
(1)RDD创建方式
1)从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、HBase)输入(例如HDFS)创建。
2)从父RDD转换得到新RDD。
3)通过parallelize或makeRDD将单机数据创建为分布式RDD。
(2)RDD的两种操作算子
对于RDD可以有两种操作算子:转换(Transformation)与行动
(Action)。
1)转换(Transformation):Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需
要等到有Action操作的时候才会真正触发运算。
2)行动(Action):Action算子会触发Spark提交作业(Job),并
将数据输出Spark系统。
(3)RDD的重要内部属性
通过RDD的内部属性,用户可以获取相应的元数据信息。通过这些
信息可以支持更复杂的算法或优化。
1)分区列表:通过分区列表可以找到一个RDD中包含的所有分区
及其所在地址。
2)计算每个分片的函数:通过函数可以对每个数据块进行RDD需
要进行的用户自定义函数运算。
3)对父RDD的依赖列表:为了能够回溯到父RDD,为容错等提供
支持。
4)对key-value pair数据类型RDD的分区器,控制分区策略和分区
数。通过分区函数可以确定数据记录在各个分区和节点上的分配,减少
分布不平衡。
5)每个数据分区的地址列表(如HDFS上的数据块的地址)。如果数据有副本,则通过地址列表可以获知单个数据块的所有副本
地址,为负载均衡和容错提供支持。
(4)Spark计算工作流
图1-5中描述了Spark的输入、运行转换、输出。在运行转换中通过
算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数
据进行转换和操作。
·输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数
据空间,会转化为Spark中的数据块,通过BlockManager进行管理。
·运行:在Spark数据输入形成RDD后,便可以通过变换算子fliter
等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据
缓存到内存。
·输出:程序运行结束数据会输出Spark运行时空间,存储到分布式
存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中(collect
输出到Scala集合,count返回Scala Int型数据)。图1-5 Spark算子和数据空间
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类
实现,如MappedRDD、ShuffledRDD等子类。Spark将常用的大数据操
作都转化成为RDD的子类。1.4.2 RDD算子分类
本节将主要介绍Spark算子的作用,以及算子的分类。
Spark算子大致可以分为以下两类。
1)Transformation变换算子:这种变换并不触发提交作业,完成作
业中间过程处理。
2)Action行动算子:这类算子会触发SparkContext提交Job作业。
下面分别对两类算子进行详细介绍。
1.Transformations算子
下文将介绍常用和较为重要的Transformation算子。
(1)map
将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为
一个新的元素。源码中map算子相当于初始化一个RDD,新RDD叫做
MappedRDD(this,sc.clean(f))。
图1-7中每个方框表示一个RDD分区,左侧的分区经过用户自定义
函数f:T->U映射为右侧的新RDD分区。但是,实际只有等到Action算子触发后这个f函数才会和其他函数在一个stage中对数据进行运算。在
图1-6中的第一个分区,数据记录V1输入f,通过f转换输出为转换后的
分区中的数据记录V'1。
(2)flatMap
将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的
RDD的每个集合中的元素合并为一个集合,内部创建
FlatMappedRDD(this,sc.clean(f))。
图1-6 map算子对RDD转换
图1-7表示RDD的一个分区进行flatMap函数操作,flatMap中传入的
函数为f:T->U,T和U可以是任意的数据类型。将分区中的数据通过用
户自定义函数f转换为新的数据。外部大方框可以认为是一个RDD分
区,小方框代表一个集合。V1、V2、V3在一个集合作为RDD的一个数
据项,可能存储为数组或其他容器,转换为V'1、V'2、V'3后,将原来
的数组或容器结合拆散,拆散的数据形成为RDD中的数据项。图1-7 flapMap算子对RDD转换
(3)mapPartitions
mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分
区整体的迭代器对整个分区的元素进行操作。内部实现是生成
MapPartitionsRDD。图1-8中的方框代表一个RDD分区。
图1-8中,用户通过函数f(iter)=>iter.filter(_>=3)对分区中所有
数据进行过滤,大于和等于3的数据保留。一个方块代表一个RDD分
区,含有1、2、3的分区过滤只剩下元素3。图1-8 mapPartitions算子对RDD转换
(4)union
使用union函数时需要保证两个RDD元素的数据类型相同,返回的
RDD数据类型和被合并的RDD元素数据类型相同。并不进行去重操
作,保存所有元素,如果想去重可以使用distinct。同时Spark还提
供更为简洁的使用union的API,通过++符号相当于union函数操作。
图1-9中左侧大方框代表两个RDD,大方框内的小方框代表RDD的
分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。合
并后,V1、V2、V3……V8形成一个分区,其他元素同理进行合并。
(5)cartesian
对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返
回CartesianRDD。图1-10中左侧大方框代表两个RDD,大方框内的小方
框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方
框代表分区。
例如:V1和另一个RDD中的W1、W2、Q5进行笛卡尔积运算形成
(V1,W1)、(V1,W2)、(V1,Q5)。图1-9 union算子对RDD转换图1-10 cartesian算子对RDD转换
(6)groupBy
groupBy:将元素通过函数生成相应的Key,数据就转化为Key-
Value格式,之后将Key相同的元素分为一组。
函数实现如下:
1)将用户函数预处理:
val cleanF = sc.clean(f)2)对数据map进行函数操作,最后再进行groupByKey分组操作。
this.map(t =>(cleanF(t), t)).groupByKey(p)
其中,p确定了分区个数和分区函数,也就决定了并行化的程度。
图1-11中方框代表一个RDD分区,相同key的元素合并到一个组。
例如V1和V2合并为V,Value为V1,V2。形成V,Seq(V1,V2)。
图1-11 groupBy算子对RDD转换
(7)filter
filter函数功能是对元素进行过滤,对每个元素应用f函数,返回值
为true的元素在RDD中保留,返回值为false的元素将被过滤掉。内部实
现相当于生成FilteredRDD(this,sc.clean(f))。下面代码为函数的本质实现:
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
图1-12中每个方框代表一个RDD分区,T可以是任意的类型。通过
用户自定义的过滤函数f,对每个数据项操作,将满足条件、返回结果
为true的数据项保留。例如,过滤掉V2和V3保留了V1,为区分命名为
V'1。
(8)sample
sample将RDD这个集合内的元素进行采样,获取所有元素的子集。
用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方
式。
内部实现是生成SampledRDD(withReplacement,fraction,seed)。
函数参数设置:
·withReplacement=true,表示有放回的抽样。
·withReplacement=false,表示无放回的抽样。
图1-13中的每个方框是一个RDD分区。通过sample函数,采样50%的数据。V1、V2、U1、U2……U4采样出数据V1和U1、U2形成新的
RDD。
图1-12 filter算子对RDD转换
图1-13 sample算子对RDD转换
(9)cache
cache将RDD元素从磁盘缓存到内存。相当于
persist(MEMORY_ONLY)函数的功能。图1-14 Cache算子对RDD转换
图1-14中每个方框代表一个RDD分区,左侧相当于数据分区都存储
在磁盘,通过cache算子将数据缓存在内存。
(10)persist
persist函数对RDD进行缓存操作。数据缓存在哪里依据StorageLevel
这个枚举类型进行确定。有以下几种类型的组合(见图1-14),DISK代
表磁盘,MEMORY代表内存,SER代表数据是否进行序列化存储。
下面为函数定义,StorageLevel是枚举类型,代表存储模式,用户
可以通过图1-14按需进行选择。
persist(newLevel:StorageLevel)
图1-15中列出persist函数可以进行缓存的模式。例如,MEMORY_AND_DISK_SER代表数据可以存储在内存和磁盘,并且以
序列化的方式存储,其他同理。图1-15 persist算子对RDD转换
图1-16中方框代表RDD分区。disk代表存储在磁盘,mem代表存储
在内存。数据最初全部存储在磁盘,通过
persist(MEMORY_AND_DISK)将数据缓存到内存,但是有的分区无
法容纳在内存,将含有V1、V2、V3的分区存储到磁盘。
(11)mapValues
mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。图1-17中的方框代表RDD分区。a=>a+2代表对(V1,1)这样的
Key Value数据对,数据只对Value中的1进行加2操作,返回结果为3。
图1-16 Persist算子对RDD转换
图1-17 mapValues算子RDD对转换
(12)combineByKey
下面代码为combineByKey函数的定义:
combineByKey[C](createCombiner:(V)C,mergeValue:(C, V)C,mergeCombiners:(C, C)C,partitioner:Partitioner,mapSideCombine:Boolean=true,serializer:Serializer=null):RDD[(K,C)]
说明:
·createCombiner:V=>C,C不存在的情况下,比如通过V创建seq
C。
·mergeValue:(C,V)=>C,当C已经存在的情况下,需要
merge,比如把item V加到seq C中,或者叠加。
·mergeCombiners:(C,C)=>C,合并两个C。
·partitioner:Partitioner,Shuffle时需要的Partitioner。
·mapSideCombine:Boolean=true,为了减小传输量,很多combine
可以在map端先做,比如叠加,可以先在一个partition中把所有相同的
key的value叠加,再shuffle。
·serializerClass:String=null,传输需要序列化,用户可以自定义序
列化类:
例如,相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。
图1-18中的方框代表RDD分区。如图,通过combineByKey,将
(V1,2),(V1,1)数据合并为(V1,Seq(2,1))。(13)reduceByKey
reduceByKey是比combineByKey更简单的一种情况,只是两个值合
并成一个值,(Int,Int V)to(Int,Int C),比如叠加。所以
createCombiner reduceBykey很简单,就是直接返回v,而mergeValue和
mergeCombiners逻辑是相同的,没有区别。
图1-18 comBineByKey算子对RDD转换
函数实现:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
图1-19中的方框代表RDD分区。通过用户自定义函数(A,B)
=>(A+B)函数,将相同key的数据(V1,2)和(V1,1)的value相加
运算,结果为(V1,3)。图1-19 reduceByKey算子对RDD转换
(14)join
join对两个需要连接的RDD进行cogroup函数操作,将相同key的数
据能够放到一个分区,在cogroup操作之后形成的新RDD对每个key下的
元素进行笛卡尔积的操作,返回的结果再展平,对应key下的所有元组
形成一个集合。最后返回RDD[(K,(V,W))]。
下面代码为join的函数实现,本质是通过cogroup算子先进行协同划
分,再通过flatMapValues将合并的数据打散。
this.cogroup(other,partitioner).f?latMapValues{case(vs,ws)=>
for(v<-vs;w>-ws)yield(v,w) }
图1-20是对两个RDD的join操作示意图。大方框代表RDD,小方框
代表RDD中的分区。函数对相同key的元素,如V1为key做连接后结果
为(V1,(1,1))和(V1,(1,2))。2.Actions算子
本质上在Action算子中通过SparkContext进行了提交作业的runJob操
作,触发了RDD DAG的执行。
图1-20 join算子对RDD转换
例如,Action算子collect函数的代码如下,感兴趣的读者可以顺着
这个入口进行源码剖析:
Return an array that contains all of the elements in this RDD.
def collect: Array[T] = {
提交Job
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _)
}
下面将介绍常用和较为重要的Action算子。
(1)foreach
foreach对RDD中的每个元素都应用f函数操作,不返回RDD和
Array,而是返回Uint。
图1-21表示foreach算子通过用户自定义函数对每个数据项进行操
作。本例中自定义函数为println,控制台打印所有数据项。
图1-21 foreach算子对RDD转换
(2)saveAsTextFile
函数将数据输出,存储到HDFS的指定目录。
下面为saveAsTextFile函数的内部实现,其内部通过调用
saveAsHadoopFile进行实现:this.map(x => (NullWritable.get, new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
将RDD中的每个元素映射转变为(null,x.toString),然后再将其
写入HDFS。
图1-22中左侧方框代表RDD分区,右侧方框代表HDFS的Block。通
过函数将RDD的每个分区存储为HDFS中的一个Block。
(3)collect
collect相当于toArray,toArray已经过时不推荐使用,collect将分布
式的RDD返回为一个单机的scala Array数组。在这个数组上运用scala的
函数式操作。
图1-23中左侧方框代表RDD分区,右侧方框代表单机内存中的数
组。通过函数操作,将结果返回到Driver程序所在的节点,以数组形式
存储。
图1-22 saveAsHadoopFile算子对RDD转换图1-23 Collect算子对RDD转换
(4)count
count返回整个RDD的元素个数。
内部函数实现为:
defcount:Long=sc.runJob(this,Utils.getIteratorSize_).sum
图1-24中,返回数据的个数为5。一个方块代表一个RDD分区。
图1-24 count对RDD算子转换1.5 本章小结
本章首先介绍了Spark分布式计算平台的基本概念、原理以及Spark
生态系统BDAS之上的典型组件。Spark为用户提供了系统底层细节透
明、编程接口简洁的分布式计算平台。Spark具有内存计算、实时性
高、容错性好等突出特点。同时本章介绍了Spark的计算模型,Spark会
将应用程序整体翻译为一个有向无环图进行调度和执行。相比
MapReduce,Spark提供了更加优化和复杂的执行流。读者还可以深入了
解Spark的运行机制与Spark算子,这样能更加直观地了解API的使用。
Spark提供了更加丰富的函数式算子,这样就为Spark上层组件的开发奠
定了坚实的基础。
相信读者已经想了解如何开发Spark程序,接下来将就Spark的开发
环境配置进行阐述。第2章 Spark开发与环境配置
用户进行Spark应用程序开发,一般在用户本地进行单机开发调
试,之后再将作业提交到集群生产环境中运行。下面将介绍Spark开发
环境的配置,如何编译和进行源码阅读环境的配置。
用户可以在官网上下载最新的AS软件包,网址
为:http:spark.apache.org。2.1 Spark应用开发环境配置
Spark的开发可以通过Intellij或者Eclipse IDE进行,在环境配置的开
始阶段,还需要安装相应的Scala插件。2.1.1 使用Intellij开发Spark程序
本节介绍如何使用Intellij IDEA构建Spark开发环境和源码阅读环
境。由于Intellij对Scala的支持更好,目前Spark开发团队主要使用Intellij
作为开发环境。
1.配置开发环境
(1)安装JDK
用户可以自行安装JDK8。官网地
址:http:www.oracle.comtechnetworkjavajavasedownloadsindex.html。
下载后,如果在Windows下直接运行安装程序,会自动配置环境变
量,安装成功后,在CMD的命令行下输入Java,有Java版本的日志信息
提示则证明安装成功。
如果在Linux下安装,下载JDK包解压缩后,还需要配置环境变
量。
在etcprofile文件中,配置环境变量:
export JAVA_HOME=usrjavajdk1.8
export JAVA_BIN=usrjavajdk1.8bin
export PATH=PATH:JAVA_HOMEbin
export CLASSPATH=.:JAVA_HOMElibdt.jar:JAVA_HOMElibtools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH(2)安装Scala
Spark内核采用Scala进行开发,上层通过封装接口提供Java和Python
的API,在进行开发前需要配置好Scala的开发包。
Spark对Scala的版本有约束,用户可以在Spark的官方下载界面看到
相应的Scala版本号。下载指定的Scala包,官网地址:http:www.scala-
lang.orgdownload。
(3)安装Intellij IDEA
用户可以下载安装最新版本的Intellij,官网地
址:http:www.jetbrains.comideadownload。
目前Intellij最新的版本中已经可以支持新建SBT工程,安装Scala插
件,可以很好地支持Scala开发。
(4)Intellij中安装Scala插件
在Intellij菜单中选择“Configure”,在下拉菜单中选择“Plugins”,再
选择“Browse repositories”,输入“Scala”搜索插件(如图2-1所示),在
弹出的对话框中单击“install”按钮,重启Intellij。
2.配置Spark应用开发环境1)用户在Intellij IDEA中创建Scala Project,SparkTest。
2)选择菜单中的“File”→“project structure”→“Libraries”命令,单
击“+”,导入“spark-assembly_2.10-1.0.0-incubating-hadoop2.2.0.jar”。
只需导入该jar包,该包可以通过在Spark的源码工程下执行“sbtsbt
assembly”命令生成,这个命令相当于将Spark的所有依赖包和Spark源码
打包为一个整体。
在“assemblytargetscala-2.10.4”目录下生成:spark-assembly-1.0.0-
incubating-hadoop2.2.0.jar。
3)如果IDE无法识别Scala库,则需要以同样方式将Scala库的jar包
导入。之后就可以开始开发Spark程序。如图2-2所示,本例将Spark默认
的示例程序SparkPi复制到文件。图2-1 输入“Scala”搜索插件
图2-2 编写程序
3.运行Spark程序(1)本地运行
编写完scala程序后,可以直接在Intellij中,以本地Local模式运行
(如图2-3所示),方法如下。
图2-3 以local模式运行
在Intellij中的选择“Run”→“Debug Configuration”→“Edit
Configurations”命令。在“Program arguments”文本框中输入main函数的输
入参数local。然后右键选择需要运行的类,单击“Run”按钮运行。
(2)集群上运行Spark应用jar包如果想把程序打成jar包,通过命令行的形式运行在Spark集群中,并按照以下步骤操作。
1)选择“File”→“Project Structure”,在弹出的对话框中选
择“Artifact”→“Jar”→“From Modules with dependencies”命令。
2)在选择“From Modules with dependencies”之后弹出的对话框中,选择Main函数,同时选择输出jar位置,最后单击“OK”按钮。
具体如图2-4~图2-6所示。
在图2-5中选择需要执行的Main函数。
在图2-6界面选择依赖的jar包。图2-4 生成jar包第一步图2-5 生成jar包第二步
图2-6 生成jar包第三步
在主菜单选择“Build”→“Build Artifact”命令,编译生成jar包。
3)将生成的jar包SparkTest.jar在集群的主节点,通过下面命令执
行:
java -jar SparkTest.jar
用户可以通过上面的流程和方式通过Intellij作为集成开发环境进行
Spark程序的开发。2.1.2 使用SparkShell进行交互式数据分析
如果是运行Spark Shell,那么会默认创建一个SparkContext,命名为
sc,所以不需要在Spark Shell创建新的SparkContext,SparkContext是应
用程序的上下文,调度整个应用并维护元数据信息。在运行Spark Shell
之前,可以设定参数MASTER,将Spark应用提交到MASTER指向的相
应集群或者本地模式执行,集群方式运行的作业将会分布式地运行,本
地模式执行的作业将会通过单机多线程方式运行。可以通过参数
ADD_JARS把JARS添加到classpath,用户可以通过这种方式添加所需的
第三方依赖库。
如果想spakr-shell在本地4核的CPU运行,需要如下方式启动:
MASTER=local[4] .spark-shell
这里的4是指启动4个工作线程。
如果要添加JARS,代码如下:
MASTER=local[4] ADD_JARS=code.jar .spark-shell
在spark-shell中,输入下面代码,读取dir文件:
scala>val text=sc.textFile(dir)输出文件中有多少数据项,则可用:
scala>text.count
按
通过以上介绍,用户可以了解如何使用Spark Shell进行交互式数据
分析。
对于逻辑较为复杂或者运行时间较长的应用程序,用户可以通过本
地Intellij等IDE作为集成开发环境进行应用开发与打包,最终提交到集
群执行。对于执行时间较短的交互式分析作业,用户可以通过Spark
Shell进行相应的数据分析。2.2 远程调试Spark程序
本地调试Spark程序和传统的调试单机的Java程序基本一致,读者可
以参照原来的方式进行调试,关于单机调试本书暂不赘述。对于远程调
试服务器上的Spark代码,首先请确保在服务器和本地的Spark版本一
致。需要按前文介绍预先安装好JDK和Git。
(1)编译Spark
在服务器端和本地计算机下载Spark项目。
通过下面的命令克隆一份Spark源码:
git clone https: github.comapachespark
然后针对指定的Hadoop版本进行编译:
SPARK_HADOOP_VERSION=2.3.0 sbtsbt assembly
(2)在服务器端的配置
1)根据相应的Spark配置指定版本的Hadoop,并启动Hadoop。
2)对编译好的Spark进行配置,在confspark-env.sh文件中进行如下
配置:export SPARK_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=9999
其中“suspend=y”设置为需要挂起的模式。这样,当启动Spark的作
业时候程序会自动挂起,等待本地的IDE附加(Attach)到被调试的应
用程序上。address是开放等待连接的端口号。
(3)启动Spark集群和应用程序
1)启动Spark集群:
.sbinstart-all.sh
2)启动需要调试的程序,以Spark中自带的HdfsWordCount为例:
MASTER=spark: 10.10.1.168:7077
.binrun-example
org.apache.spark.examples.streaming.HdfsWordCount
hdfs: localhost:9000testtest.txt
3)如图2-7所示,执行后程序会挂起并等待本地的Intellij进行连
接,并显示“Listening for transport dt_socket at address: 9999”:
图2-7 远程调试
(4)本地IDE配置1)配置并连接服务器端挂起的程序。
在Intellij中选择“run”→“edit configuration”→“remote”命令,在弹出
的对话框中将默认配置中的端口号和IP改为服务器的地址,同时选择附
加(Attach)方式,如图2-8所示。
图2-8 远程调试设置
2)在“RunDebug Configurations”对话框中填入需要连接的主机名
和端口号以及其他参数,如图2-8所示。
3)在程序中设置断点进行调试。通过上面的介绍,用户可以了解如何进行远程调试。对于单机调试
方式则和日常开发的单机程序一样,常用方式是设置单机调试断点之后
再进行调试,在这里并不再展开介绍。2.3 Spark编译
用户可以通过Spark的默认构建工具SBT进行源码的编译和打包。当
用户需要对源码进行二次开发时,则需要对源码进行增量编译,通过下
面的方式读者可以实现编译和增量编译。
(1)克隆Spark源码
可通过克隆的方式克隆Spark源码,如图2-9所示。
git clone https: github.comapachespark
图2-9 git clone Spark库
这样将会从github将Spark源码下载到本地,建立本地的仓库。
(2)编译Spark源码
在Spark项目的根目录内执行编译和打包命令(如图2-10所示)。
sbtsbt assembly执行过程中会解析依赖和下载需要的依赖jar包。执行完成后会将所
有jar包打包为一个jar包,用户便可以运行Spark集群和示例了。
(3)增量编译
在有些情况下,用户需要修改源码,修改之后如果每次都重新下载
jar包或者对全部源码重新编译一遍,会很浪费时间,用户通过下面的增
量编译方法,可以只对改变的源码进行编译。
编译打包一个assembly的jar包。
sbtsbt clean assembly
图2-10 编译Spark源码这时的Spark程序已经可以运行。用户可以进入spark-shell执行程
序。
.binspark-shell
配置export SPARK_PREPEND_CLASSES参数为true,开启增量编
译模式。
export SPARK_PREPEND_CLASSES=true
继续使用spark-shell中的程序:
.binspark-shell
这时用户可以对代码进行修改和二次开发:初始开发Spark应用,之后编译。
编译Spark源码:
sbtsbt compile
继续开发Spark应用,之后编译。
sbtsbt compile
解除增量编译模式: unset SPARK_PREPEND_CLASSES
返回正常使用spark-shell的情景。
.binspark-shell Back to normal, using Spark classes from the assembly Jar
如果用户不想每次都开启一个新的SBT会话,可以在compile命令前
加上~。
sbtsbt ~ compile
(4)查看Spark源码依赖图
如果使用SBT进行查看依赖图(如图2-11所示),用户需要运行下
面的命令:
sbt
sbtsbt dependency-tree
如果使用Maven进行查看依赖图(如图2-11所示),用户需要运行
下面的命令:
Maven
mvn -DskipTests install
mvn dependency:tree图2-11 查看依赖图2.4 配置Spark源码阅读环境
由于Spark使用SBT作为项目管理构建工具,SBT的配置文件中配置
了依赖的jar包网络路径,在编译或者生成指定类型项目时需要从网络下
载jar包。需要用户预先安装git。在Linux操作系统或者Windows操作系
统上(用户可以下载Git Shell,在Git Shell中进行命令行操作)通
过“sbtsbt gen-idea”命令,生成Intellij项目文件,然后在Intellij IDE中直
接通过“Open Project”打开项目。
克隆Spark源码:
git clone https: github.comapachespark。
在所需要的软件安装好后在spark源代码根目录下,输入以下命令生
成Intellij项目:
sbtsbt gen-idea
这样SBT会自动下载依赖包和进行源文件编译以及生成Intellij所需
要的项目文件。2.5 本章小结
本章首先介绍了Spark应用程序的开发流程以及如何编译和调试
Spark程序。用户可以选用对Scala项目能够很好支持的Intellij IDE。如果
用户想深入了解Spark,以及诊断问题,建议读者配置好源码阅读环
境,进行源码分析。
通过本章的介绍,读者可以进行Spark开发环境的搭建,以及程序
的开发,后续将介绍Spark的生态系统BDAS。第3章 BDAS简介
提到Spark不得不说伯克利大学AMPLab开发的BDAS(Berkeley
Data Analytics Stack)数据分析的软件栈,如图3-1所示是其中的Spark生
态系统。其中用内存分布式大数据计算引擎Spark替代原有的
MapReduce,上层通过Spark SQL替代Hive等SQL on Hadoop系统,Spark
Streaming替换Storm等流式计算框架,GraphX替换GraphLab等大规模图
计算框架,MLlib替换Mahout等机器学习框架等,其整体框架基于内存
计算解决了原来Hadoop的性能瓶颈问题。AmpLab提出One Framework
to Rule Them All的理念,用户可以利用Spark一站式构建自己的数据分
析流水线。
图3-1 Spark生态系统
在一些数据分析应用中,用户可以使用Spark SQL预处理结构化数
据,GraphX预处理图数据,Spark Streaming实时捕获和处理流数据,最
终通过MLlib将数据融合,进行模型训练,底层各个系统通过Spark进行
运算。下面将介绍其中主要的项目。3.1 SQL on Spark[1]
AMPLab将大数据分析负载分为三大类型:批量数据处理、交互式
查询、实时流处理。而其中很重要的一环便是交互式查询。大数据分析
栈中需要满足用户ad-hoc、reporting、iterative等类型的查询需求,也需
要提供SQL接口来兼容原有数据库用户的使用习惯,同时也需要SQL能
够进行关系模式的重组。完成这些重要的SQL任务的便是Spark SQL和
Shark这两个开源分布式大数据查询引擎,它们可以理解为轻量级Hive
SQL在Spark上的实现,业界将该类技术统称为SQL on Hadoop。
在Spark峰会2014上,Databricks宣布不再支持Shark的开发,全力以
赴开发Shark的下一代技术Spark SQL,同时Hive社区也启动了Hive on
Spark项目,将Spark作为Hive(除MapReduce和Tez之外的)新执行引
擎。根据伯克利的Big Data Benchmark测试对比数据,Shark的In
Memory性能可以达到Hive的100倍,即使是On Disk也能达到10倍的性
能提升,是Hive强有力的替代解决方案。而作为Shark的进化版本的
Spark SQL,在AMPLab最新的测试中的性能已经超过Shark。图3-2展示
了Spark SQL和Hive on Spark是新的发展方向。图3-2 Spark SQL和Hive on Spark是新的发展方向
[1] 参考文章:高彦杰,陈冠诚Spark SQL:基于内存的大数据分析引擎
《程序员》2014.83.1.1 为什么使用Spark SQL
由于Shark底层依赖于Hive,这个架构的优势是对传统Hive用户可
以将Shark无缝集成进现有系统运行查询负载。但是也看到一些问题:
随着版本升级,查询优化器依赖于Hive,不方便添加新的优化策略,需
要进行另一套系统的学习和二次开发,学习成本很高。另一方面,MapReduce是进程级并行,例如:Hive在不同的进程空间会使用一些静
态变量,当在同一进程空间进行多线程并行执行,多线程同时写同名称
的静态变量会产生一致性问题,所以Shark需要使用另外一套独立维护
的Hive源码分支。而为了解决这个问题AMPLab和Databricks利用
Catalyst开发了Spark SQL。
Spark的全栈解决方案为用户提供了多样的数据分析框架,机器学
习、图计算、流计算如火如荼的发展和流行吸引了大批的学习者,为什
么人们今天还是要重视在大数据环境下使用SQL呢?笔者认为主要有以
下几点原因:
1)易用性与用户惯性。在过去的很多年中,有大批的程序员的工
作是围绕着数据库+应用的架构来做的,因为SQL的易用性提升了应用
的开发效率。程序员已经习惯了业务逻辑代码调用SQL的模式去写程
序,惯性的力量是强大的,如果还能用原有的方式解决现有的大数据问题,何乐而不为呢?提供SQL和JDBC的支持会让传统用户像以前一样
地书写程序,大大减少迁移成本。
2)生态系统的力量。很多系统软件性能好,但是未取得成功和没
落,很大程度上因为生态系统问题。传统的SQL在JDBC、ODBC、SQL
的各种标准下形成了一整套成熟的生态系统,很多应用组件和工具可以
迁移使用,像一些可视化的工具、数据分析工具等,原有企业的IT工具
可以无缝过渡。
3)数据解耦,Spark SQL正在扩展支持多种持久化层,用户可以使
用原有的持久化层存储数据,但是也可以体验和迁移到Spark SQL提供
的数据分析环境下进行Big Data的分析。3.1.2 Spark SQL架构分析
Spark SQL与传统DBMS的查询优化器+执行器的架构较为类似,只
不过其执行器是在分布式环境中实现,并采用的Spark作为执行引擎。
Spark SQL的查询优化是Catalyst,其基于Scala语言开发,可以灵活利用
Scala原生的语言特性很方便进行功能扩展,奠定了Spark SQL的发展空
间。Catalyst将SQL语言翻译成最终的执行计划,并在这个过程中进行查
询优化。这里和传统不太一样的地方就在于,SQL经过查询优化器最终
转换为可执行的查询计划是一个查询树,传统DB就可以执行这个查询
计划了。而Spark SQL最后执行还是会在Spark内将这棵执行计划树转换
为Spark的有向无环图DAG再执行。
1.Catalyst架构及执行流程分析
如图3-3所示为Catalyst的整体架构。图3-3 Spark SQL查询引擎Catalyst的架构
从图3-3中可以看到整个Catalyst是Spark SQL的调度核心,遵循传统
数据库的查询解析步骤,对SQL进行解析,转换为逻辑查询计划、物理
查询计划,最终转换为Spark的DAG后再执行。图3-4为Catalyst的执行流
程。
SqlParser将SQL语句转换为逻辑查询计划,Analyzer对逻辑查询计
划进行属性和关系关联检验,之后Optimizer通过逻辑查询优化将逻辑查
询计划转换为优化的逻辑查询计划,QueryPlanner将优化的逻辑查询计
划转换为物理查询计划,prepareForExecution调整数据分布,最后将物
理查询计划转换为执行计划进入Spark执行任务。
2.Spark SQL优化策略
查询优化是传统数据库中最为重要的一环,这项技术在传统数据库
中已经很成熟。除了查询优化,Spark SQL在存储上也进行了优化,从
以下几点查看Spark SQL的一些优化策略。图3-4 Catalyst的执行流程
(1)内存列式存储与内存缓存表
Spark SQL可以通过cacheTable将数据存储转换为列式存储,同时将
数据加载到内存进行缓存。cacheTable相当于在分布式集群的内存物化
视图,将数据进行缓存,这样迭代的或者交互式的查询不用再从HDFS
读数据,直接从内存读取数据大大减少了IO开销。列式存储的优势在于Spark SQL只需要读出用户需要的列,而不需要像行存储那样需要每
次将所有列读出,从而大大减少内存缓存数据量,更高效地利用内存数
据缓存,同时减少网络传输和IO开销。数据按照列式存储,由于是数
据类型相同的数据连续存储,能够利用序列化和压缩减少内存空间的占
用。
(2)列存储压缩
为了减少内存和硬盘空间占用,Spark SQL采用了一些压缩策略对
内存列存储数据进行压缩。Spark SQL的压缩方式要比Shark丰富很多,例如它支持PassThrough,RunLengthEncoding,DictionaryEncoding,BooleanBitSet,IntDelta,LongDelta等多种压缩方式。这样能够大幅度
减少内存空间占用和网络传输开销和IO开销。
(3)逻辑查询优化
Spark SQL在逻辑查询优化(如图3-5所示)上支持列剪枝、谓词下
压、属性合并等逻辑查询优化方法。列剪枝为了减少读取不必要的属性
列,减少数据传输和计算开销,在查询优化器进行转换的过程中会进行
列剪枝的优化。图3-5 逻辑查询优化
下面介绍一个逻辑优化例子:
SELECT Class FROM (SELECT ID,Name,Class FROM STUDENT ) S WHERE S.ID=1
Catalyst将原有查询通过谓词下压,将选择操作ID=1优先执行,这
样过滤大部分数据,通过属性合并将最后的投影只做一次最终保留Class
属性列。
(4)Join优化
Spark SQL深度借鉴传统数据库查询优化技术的精髓,同时也在分
布式环境下进行特定的优化策略调整和创新。Spark SQL对Join进行了优
化支持多种连接算法,现在的连接算法已经比Shark丰富,而且很多原来Shark的元素也逐步迁移过来。例如:BroadcastHashJoin、BroadcastNestedLoopJoin、HashJoin、LeftSemiJoin,等等。
下面介绍一个其中的BroadcastHashJoin算法思想。
BroadcastHashJoin将小表转化为广播变量进行广播,这样避免
Shuffle开销,最后在分区内做Hash连接。这里用的就是Hive中Map Side
Join的思想。同时用了DBMS中的Hash连接算法做连接。
随着Spark SQL的发展,未来会有更多的查询优化策略加入进来。
同时后续Spark SQL会支持像Shark Server一样的服务端、JDBC接口,兼
容更多的持久化层例如NoSQL,传统的DBMS等。一个强有力的结构化
大数据查询引擎正在崛起。
3.如何使用Spark SQL
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
在这里引入sqlContext下所有的方法就可以直接用sql方法进行查询 import sqlContext._
case class Person(name: String, age: Int)
下面的people是含有case类型数据的RDD,会默认由Scala的implicit机制将RDD转换为
SchemaRDD,SchemaRDD是SparkSQL中的核心RDD
val people = sc.textFile(examplessrcmainresourcespeople.txt).map(_.split(,)).map(p => Person(p(0), p(1).trim.toInt))
在内存的元数据中注册表信息,这样一个Spark SQL表就创建完成了 people.registerAsTable(people)
sql语句就会触发上面分析的Spark SQL的执行过程,读者可以参考上面的图示 val teenagers = sql(SELECT name FROM people WHERE age >= 13 AND age <= 19)
最后生成teenagers也是一个RDD
teenagers.map(t =>Name: + t(0)).collect.foreach(println)
通过之前的介绍,读者对支撑结构化数据分析任务的Spark SQL的原理与使用有了一定的了解。在生产环境中,有一类数据分析任务对响
应延迟要求高,需要实时处理流数据,在BDAS中,Spark Streaming用
于支撑大规模流式处理分析任务。3.2 Spark Streaming
Spark Streaming是一个批处理的流式计算框架。它的核心执行引擎
是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错
性。下面将对Spark Streaming进行详细的介绍。3.2.1 Spark Streaming简介
Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流
式大数据处理能力。Spark Streaming将数据流以时间片为单位进行分割
形成RDD,使用RDD操作处理每一块数据,每块数据(也就是RDD)
都会生成一个Spark Job进行处理,最终以批处理的方式处理每个时间片
的数据。请参照图3-6。
图3-6 Spark Streaming生成Job
Spark Streaming编程接口和Spark很相似。在Spark中,通过在RDD
上用Transformation(例如:map,filter等)和Action(例如:count,collect等)算子进行运算。在Spark Streaming中通过在DStream(表示数据流的RDD序列)上进行算子运算。图3-7为Spark Streaming转化过程。
图3-7 Spark Streaming转化过程图3-7中Spark Streaming将程序中对DStream的操作转换为DStream有
向无环图(DAG)。对每个时间片,DStream DAG会产生一个RDD
DAG。在RDD中通过Action算子触发一个Job,然后Spark Streaming会将
Job提交给JobManager。JobManager会将Job插入维护的Job队列,然后
JobManager会将队列中的Job逐个提交给Spark DAGScheduler,然后
Spark会调度Job并将Task分发到各节点的Executor上执行。
(1)优势及特点
1)多范式数据分析管道:能和Spark生态系统其他组件融合,实现
交互查询和机器学习等多范式组合处理。
2)扩展性:可以运行在100个节点以上的集群,延迟可以控制在秒
级。
3)容错性:使用Spark的Lineage及内存维护两份数据进行备份达到
容错。RDD通过Lineage记录下之前的操作,如果某节点在运行时出现
故障,则可以通过冗余备份数据在其他节点重新计算得到。
对于Spark Streaming来说,其RDD的Lineage关系如图3-8所示,图
中的每个长椭圆形表示一个RDD,椭圆中的每个圆形代表一个RDD中
的一个分区(Partition),图中的每一列的多个RDD表示一个
DStream(图中有3个DStream),t=1和t=2代表不同的分片下的不同
RDD DAG。图中的每一个RDD都是通过Lineage相连接形成了DAG,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(通常由三份
副本)也可以来自于网络(Spark Streaming会将网络输入数据的每一个
数据流复制两份到其他的机器)都能通过冗余数据及Lineage的重算机
制保证容错性。所以RDD中任意的Partition出错,都可以并行地在其他
机器上将缺失的Partition重算出来。
图3-8 Spark Streaming容错性
4)吞吐量大:将数据转换为RDD,基于批处理的方式,提升数据
处理吞吐量。图3-9是Berkeley利用WordCount和Grep两个用例所做的测
试。图3-9 Spark Streaming与Storm吞吐量比较图
5)实时性:Spark Streaming也是一个实时计算框架,Spark
Streaming能够满足除对实时性要求非常高(例如:高频实时交易)之外
的所有流式准实时计算场景。目前Spark Streaming最小的Batch Size的选
取在0.5~2s(对比:Storm目前最小的延迟是100ms左右)。
(2)适用场景
Spark Streaming适合需要历史数据和实时数据结合进行分析的应用
场景,对于实时性要求不是特别高的场景也能够胜任。3.2.2 Spark Streaming架构
通过图3-10,读者可以对Spark Streaming的整体架构有宏观把握。
图3-10 Spark Streaming架构图
组件介绍:
·Network InputTracker:通过接收器接收流数据,并将流数据映射
为输入DStream。
·Job Scheduler:周期性地查询DStream图,通过输入的流数据生成
Spark Job,将Spark Job提交给Job Manager进行执行。
·JobManager:维护一个Job队列,将队列中的Job提交到Spark进行
执行。
通过图3-10可以看到D-Stream Lineage Graph进行整体的流数据的DAG图调度,Taskscheduler负责具体的任务分发,Block tracker进行块
管理。在从节点,如果是通过网络输入的流数据会将数据存储两份进行
容错。Input receiver源源不断地接收输入流,Task execution负责执行主
节点分发的任务,Block manager负责块管理。Spark Streaming整体架构
和Spark很相近,很多思想是可以迁移理解的。3.2.3 Spark Streaming原理剖析
下面将由一个example示例,通过源码呈现Spark Streaming的底层机
制。
1.初始化与接收数据
Spark Streaming通过分布在各个节点上的接收器,缓存接收到的流
数据,并将数据包装成Spark能够处理的RDD的格式,输入到Spark
Streaming,之后由Spark Streaming将作业提交到Spark集群进行执行,如
图3-11所示。图3-11 Spark Streaming执行模型
初始化的过程主要可以概括为两点。
1)调度器的初始化。
调度器调度Spark Streaming的运行,用户可以通过配置相关参数进
行调优。
2)将输入流的接收器转化为RDD在集群进行分布式分配,然后启
动接收器集合中的每个接收器。
针对不同的数据源,Spark Streaming提供了不同的数据接收器,分
布在各个节点上的每个接收器可以认为是一个特定的进程,接收一部分
流数据作为输入。
用户也可以针对自身生产环境状况,自定义开发相应的数据接收
器。
如图3-12所示,接收器分布在各个节点上。通过下面代码,创建并
行的、在不同Worker节点分布的receiver集合。
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r,Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
在这里创造RDD相当于进入SparkContext.makeRDD
此处将receivers的集合作为一个RDD进行分区RDD[Receiver] 即使是只有一个输入流,按照这个分布式也是流的输入端在worker而不再Master …
将receivers的集合打散,然后启动它们…
ssc.sparkContext.runJob(tempRDD, startReceiver)… }
图3-12 Spark Streaming接收器
2.数据接收与转化
在“初始化与接收数据”部分中已经介绍过,receiver集合转换为
RDD,在集群上分布式地接收数据流。那么每个receiver是怎样接收并
处理数据流的呢?读者可以通过图3-13,对输入流的处理有一个全面的
了解。图3-13为Spark Streaming数据接收与转化的示意图。
图3-13的主要流程如下。1)数据缓冲:在receiver的receive函数中接收流数据,将接收到的
数据源源不断地放入到BlockGenerator.currentBuffer。
2)缓冲数据转化为数据块:在BlockGenerator中有一个定时器
(RecurringTimer),将当前缓冲区中的数据以用户定义的时间间隔封
装为一个数据块Block,放入到BlockGenerator的blocksForPush队列中
(这个队列)。
3)数据块转化为Spark数据块:在BlockGenerator中有一个
BlockPushingThread线程,不断地将blocksForPush队列中的块传递给
BlockManager,让BlockManager将数据存储为块。BlockManager负责
Spark中的块管理。
4)元数据存储:在pushArrayBuffer方法中还会将已经由
BlockManager存储的元数据信息(例如:Block的id号)传递给
ReceiverTracker,ReceiverTracker会将存储的blockId放到对应StreamId的
队列中。图3-13 Spark Streaming数据接收与转化
图中部分组件的作用如下:
·KeepPushingBlocks:调用此方法持续写入和保持数据块。·pushArrayBuffer:调用pushArrayBuffer方法将数据块存储到
BlockManager中。
·reportPushedBlock:存储完成后汇报数据块信息到主节点。
·receivedBlockInfo(Meta Data):已经接收到的数据块元数据记
录。
·streamId:数据流Id。
·BlockInfo:数据块元数据信息。
·BlockManager.put:数据块存储器写入备份数据块到其他节点。
·Receiver:数据块接收器,接收数据块。
·BlockGenerator:数据块生成器,将数据缓存生成Spark能处理的数
据块。
·BlockGenerator.currentBuffer:缓存网络接收的数据记录,等待之
后转换为Spark的数据块。
·BlockGenerator.blocksForPushing:将一块连续数据记录暂存为数据
块,待后续转换为Spark能够处理的BlockManager中的数据块(A Block
As a BlockManager’s Block)。·BlockGenerator.blockPushingThread:守护线程负责将数据块转换
为BlockManager中数据块。
·ReceiveTracker:输入数据块的元数据管理器,负责管理和记录数
据块。
·BlockManager:Spark数据块管理器,负责数据块在内存或磁盘的
管理。
·RecurringTimer:时间触发器,每隔一定时间进行缓存数据的转
换。
上面的过程中涉及最多的类就是BlockGenerator,在数据转化的过
程中其扮演者不可或缺的角色。
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,receiverId: Int,conf: SparkConf) extends Logging
感兴趣的读者可以参照图中所示的类和方法进行更加具体的机制的
了解。篇幅所限,对这个数据生成过程不再做具体的代码剖析。
3.生成RDD与提交Spark Job
Spark Streaming根据时间段,将数据切分为RDD,然后触发RDD的
Action提交Job,Job被提交到Job Manager中的Job Queue中由JobScheduler调度,之后Job Scheduler将Job提交到Spark的Job调度器,然后
将Job转换为大量的任务分发给Spark集群执行,如图3-14所示。
图3-14 Spark Streaming调度模型
Job generator中通过下面的方法生成Job进行调度和执行。
从下面的代码可以看出job是从outputStream中生成的,然后再触发
反向回溯执行整个DStream DAG,类似RDD的机制。
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
获取输入数据块的元数据信息 val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
. . .
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>jobScheduler.reportError(Error generating jobs for time + time, e)
}
eventActor !DoCheckpoint(time)
}
下面进入JobScheduler的submitJobSet方法一探究竟,JobScheduler是整个Spark
Streaming调度的核心组件 def submitJobSet(jobSet: JobSet) {
. . .
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
. . .
}
进入Graph生成job的方法,Graph本质是DStreamGraph类生成的对象 final private[streaming] class DStreamGraph extends Serializable with Logging {
def generateJobs(time: Time): Seq[Job] = {
. . .
private val inputStreams = new ArrayBuffer[InputDStream[_]]
private val outputStreams = new ArrayBuffer[DStream[_]]
. . .
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
. . .
}
outputStreams中的对象是DStream,下面进入DStream的generateJob一探究竟 private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
此处相当于针对每个时间段生成的一个RDD,会调用SparkContext的方法runJob提交Spark的一个 context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
在DStream算是父类,一些具体的DStream例如SocketInputStream等的类的父类可以通过
SocketInputDStream看是如何通过上面的getOrCompute生成RDD的 private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
generatedRDDs.get(time) match {
. . .
case None => {
if (isTimeValid(time)) {
Dstream是个父类,这里代表的是子类的compute方法,DStream通过compute调用用户自定义函数。当任务执行时,同一个 compute(time) match {
. . .
generatedRDDs.put(time, newRDD)
. . .
}
在SocketInputDStream的compute方法中生成了对应时间片的RDD:
override def compute(validTime: Time): Option[RDD[T]] = {if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]))
}
}
Spark Streaming在保证实时处理的要求下还能够保证高吞吐与容错
性。用户的数据分析中很多情况下也存在需要分析图数据,运行图算
法,通过GraphX可以简便地开发分布式图分析算法。3.3 GraphX
GraphX是Spark中的一个重要子项目,它利用Spark作为计算引擎,实现了大规模图计算的功能,并提供了类似Pregel的编程接口。GraphX
的出现,将Spark生态系统变得更加完善和丰富;同时以其与Spark生态
系统其他组件很好的融合,以及强大的图数据处理能力,在工业界得到
了广泛的应用。本章主要介绍GraphX的架构、原理和使用方式。3.3.1 GraphX简介
GraphX是常用图算法在Spark上的并行化实现,同时提供了丰富的
API接口。图算法是很多复杂机器学习算法的基础,在单机环境下有很
多应用案例。在大数据环境下,图的规模大到一定程度后,单机很难解
决大规模的图计算,需要将算法并行化,在分布式集群上进行大规模图
处理。目前,比较成熟的方案有GraphX和GraphLab等大规模图计算框
架。
GraphX的特点是离线计算、批量处理,基于同步的BSP模型(Bulk
Synchronous Parallel Computing Model,整体同步并行计算模型),这样
的优势在于可以提升数据处理的吞吐量和规模,但是会造成速度上稍逊
一筹。目前大规模图处理框架还有基于MPI模型的异步图计算模型
GraphLab和同样基于BSP模型的Graph等。
现在和GraphX可以组合使用的分布式图数据库是Neo4J。Neo4J一
个高性能的、非关系的、具有完全事务特性的、鲁棒的图数据库。另一
个数据库是Titan,Titan是一个分布式的图形数据库,特别为存储和处
理大规模图形而优化。二者均可作为GraphX的持久化层,存储大规模
图数据。3.3.2 GraphX的使用简介
类似Spark在RDD上提供了一组基本操作符(如map,filter,reduce),GraphX同样也有针对Graph的基本操作符,用户可以在这些
操作符传入自定义函数和通过修改图的节点属性或结构生成新的图。
GraphX提供了丰富的针对图数据的操作符。Graph类中定义了核心
的、优化过的操作符。一些更加方便的由底层核心操作符组合而成的上
层操作符在GraphOps中进行定义。正是通过Scala语言的implicit关键
字,GraphOps中定义的操作符可以作为Graph中的成员。这样做的目的
是未来GraphX会支持不同类型的图,而每种类型的图的呈现必须实现
核心的操作符和复用大部分的GraphOps中实现的操作符。
下面将操作符分为几个类别进行介绍。
(1)属性操作符
表3-1给出了GraphX的属性操作符。通过属性操作符,用户可以在
点或边上进行相应运算,构建和开发图算法。
表3-1 属性操作符(2)结构操作符
表3-2所示为GraphX的结构操作符。通过这些操作可以生成改变图
结构之后的图数据。
表3-2 结构操作符
(3)图信息属性(见表3-3)
表3-3所示为图信息属性,通过图信息属性,用户可以获取图上的
统计信息。表3-3 图信息属性
(4)邻接聚集操作符与Join操作符
表3-4所示为邻接聚集操作符与Join操作符。通过邻接操作符可以将
两个相近的表进行连接。
表3-4 邻接聚集操作符与Join操作符
(5)缓存操作符
表3-5所示为缓存操作符。表3-5 缓存操作符3.3.3 GraphX体系结构
1.整体架构
GraphX的整体架构(如图3-15所示)可以分为三部分。
图3-15 GraphX架构
存储和原语层:Graph类是图计算的核心类。内部含有
VertexRDD、EdgeRDD和RDD[EdgeTriplet]引用。GraphImpl是Graph类的子类,实现了图操作。
·接口层:在底层RDD的基础之上实现了Pregel模型,BSP模式的计
算接口。
·算法层:基于Pregel接口实现了常用的图算法。包括:PageRank、SVDPlusPlus、TriangleCount、ConnectedComponents、StronglyConnectedConponents等算法。
2.存储结构
在正式的工业级的应用中,图的规模极大,上百万个节点是经常出
现的。为了提高处理速度和数据量,希望能够将图以分布式的方式来存
储、处理图数据。图的分布式存储大致有两种方式,边分割(Edge
Cut)和点分割(Vertex Cut),如图3-16所示。最早期的图计算的框架
中,使用的是Edge Cut(边分割)的存储方式。而GraphX的设计者考虑
到真实世界中的大规模图大多是边多于点的图,所以采用点分割方式存
储。点分割能够减少网络传输和存储开销。底层实现是将边放到各个节
点存储,而在进行数据交换时将点在各个机器之间广播进行传输。对边
进行分区和存储的算法主要基于PartitionStrategy中封装的分区方法。这
里面的几种分区方法分别是对不同应用情景的权衡,用户可以根据具体
的需求进行分区方式的选择。用户可以在程序中指定边的分区方式。例
如:val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D))
图3-16 GraphX存储模型
一旦边已经在集群上分区和存储,大规模并行图计算的关键挑战就
变成了如何将点的属性连接到边。GraphX的处理方式是集群上移动传
播点的属性数据。由于不是每个分区都需要所有的点属性(因为每个分区只是一部分边),GraphX内部维持一个路由表(routing table),这
样当需要广播点到需要这个点的边的所在分区时就可以通过路由表映
射,将需要的点属性传输到指定的边分区。
点分割的好处是在边的存储上是没有冗余数据的,而且对于某个点
与它的邻居的交互操作,只要满足交换律和结合律。例如,求顶点的邻
接顶点权重的和,可以在不同的节点进行并行运算,最后把每个节点的
运行结果进行汇总,网络开销较小。代价是每个顶点属性可能要冗余存
储多份,更新点数据时要有数据同步开销。
3.使用技巧
采样观察可以通过不同的采样比例,先从小数据量进行计算、观察
效果、调整参数,再逐步增加数据量进行大规模的运算。可以通过RDD
的sample方法进行采样。同时通过Web UI观察集群的资源消耗。
1)内存释放:保留旧图对象的引用,但是尽快释放不使用的图的
顶点属性,节省空间占用。通过unPersistVertices方法进行顶点释放。
2)GC调优,请读者参考性能调优章节介绍。
3)调试:在各个时间点可以通过graph.vertices.count进行调
试,观测图现有状态。进行问题诊断和调优。
GraphX通过提供简洁的API以及优化的图数据管理,简化了用户开发分布式图算法的复杂度。在大数据分析中更多的应用场景是进行机器
学习,下面通过MLlib的介绍,读者可以了解如何通过Spark之上的
MLlib进行复杂的机器学习。3.4 MLlib
MLlib是构建在Spark上的分布式机器学习库,充分利用了Spark的
内存计算和适合迭代型计算的优势,将性能大幅度提升。同时由于
Spark算子丰富的表现力,让大规模机器学习的算法开发不再复杂。3.4.1 MLlib简介
MLlib是一些常用的机器学习算法和库在Spark平台上的实现。
MLlib是AMPLab的在研机器学习项目MLBase的底层组件。MLBase是一
个机器学习平台,MLI是一个接口层,提供很多结构,MLlib是底层算
法实现层,如图3-17所示。
图3-17 MLbase
MLlib中包含分类与回归、聚类、协同过滤、数据降维组件以及底
层的优化库,如图3-18所示。图3-18 MLlib组件图
通过图3-18读者可以对MLlib的整体组件和依赖库有一个宏观的把
握。
下面对图3-18中读者可能不太熟悉的底层组件进行简要介绍。
BLASLAPACK层:LAPACK是用Fortran编写的算法库,顾名思
义,Linear Algebra PACKage,是为了解决通用的线性代数问题的。另外必须要提的算法包是BLAS(Basic Linear Algebra Subprograms),其
实LAPACK底层是使用了BLAS库的。不少计算机厂商都提供了针对不
同处理器进行了优化的BLASLAPACK算法包。
Netlib-java(官网为:https:github.comfommilnetlib-java)是一个
对底层BLAS,LAPACK封装的Java接口层。
Breeze(官网为:https:github.comscalanlpbreeze)是一个Scala写
的数值处理库,提供向量、矩阵运算等API。
库依赖:MLlib底层使用到了Scala书写的线性代数库Breeze,Breeze底层依赖netlib-java库。netlib-java底层依赖原生的Fortran
routines。所以,当用户使用时需要在节点上预先安装gfortran runtime
library(下载地址:https:github.commikiobraunjblaswikiMissing-
Libraries)。由于许可证(license)问题,官方的MLlib依赖集中没有引
入netlib-java原生库的依赖。如果运行时环境没有可用原生库,用户将
会看到警告信息。如果程序中需要使用netlib-java的库,用户需要在项
目中引入com.github.fommil.netlib:all:1.1.2的依赖或者参照指南(网
址为:https:github.comfommilnetlib-
javablobmasterREADME.mdmachine-optimised-system-libraries)来建
立用户自己的项目。如果用户需要使用python接口,则需要1.4或者更高
版本的NumPy(注意:MLlib源码中注释有ExperimentalDeveloperApi的
API在未来的发布版本中可能会进行调整和改变,官方会在不同版本发布时提供迁移指南)。3.4.2 MLlib中的聚类和分类
聚类和分类是机器学习中两个常用的算法,聚类将数据分开为不同
的集合,分类对新数据进行类别预测,下面将就两类算法进行介绍。
1.聚类和分类
(1)什么是聚类
聚类(Clustering)指将数据对象分组成为多个类或者簇
(Cluster),它的目标是:在同一个簇中的对象之间具有较高的相似
度,而不同簇中的对象差别较大。其实,聚类在人们日常生活中是一种
常见行为,即所谓的“物以类聚,人以群分”,其核心思想在于分组,人
们不断地改进聚类模式来学习如何区分各个事物和人。
(2)什么是分类
数据仓库、数据库或者其他信息库中有许多可以为商业、科研等活
动的决策提供所需要的知识。分类与预测即是其中的两种数据分析形
式,可以用来抽取能够描述重要数据集合或预测未来数据趋势。分类方
法(Classification)用于预测数据对象的离散类别(Categorical
Label);预测方法(Prediction)用于预测数据对象的连续取值。分类流程:新样本→特征选取→分类→评价
训练流程:训练集→特征选取→训练→分类器
最初,机器学习的分类应用大多都是在这些方法及基于内存基础上
所构造的算法。目前,数据挖掘方法都要求具有基于外存以处理大规模
数据集合能力,同时具有可扩展能力。
2.MLlib中的聚类和分类
MLlib目前已经实现了K-Means聚类算法、朴素贝叶斯和决策树分
类算法。这里主要介绍被广泛使用的K-Means聚类算法和朴素贝叶斯分
类算法。
(1)K-Means算法
1)K-Means算法简介。
K-Means聚类算法能轻松地对聚类问题建模。K-Means聚类算法容
易理解,并且能在分布式的环境下并行运行。学习K-Means聚类算法,能更容易地理解聚类算法的优缺点,以及其他算法对于特定数据的高效
性。
K-Means聚类算法中的K是聚类的数目,在算法中会强制要求用户
输入。如果将新闻聚类成诸如政治、经济、文化等大类,可以选择10~20的数字作为K。因为这种顶级类别的数量是很小的。如果要对这
些新闻详细分类,选择50~100的数字也是没有问题的。K-Means聚类算
法主要可以分为三步。第一步是为待聚类的点寻找聚类中心;第二步是
计算每个点聚类中心的距离,将每个点聚类到离该点最近的聚类中去;
第三步是计算聚类中所有点的坐标平均值,并将这个平均值作为新的聚
类中心点。反复执行第二步,直到聚类中心不再进行大范围的移动,或
者聚类次数达到要求为止。
2)k-Means示例。
下面的例子中有7名选手,每名选手有两个类别的比分,A类比分
和B类比分如表3-6所示。
表3-6 A类和B类比分
这些数据将会聚为两个簇。随机选取1号和4号选手作为簇的中心,如表3-7所示。
表3-7 1号和4号选手信息将1号和4号选手分别作为两个簇的中心点,下面每一步将选取的点
计算和两个簇中心的欧几里德距离,哪个中心距离小就放到哪个簇中,如表3-8所示。
表3-8 第一步聚类
第一轮聚类的结果产生了,如表3-9所示。
表3-9 第一轮结果
第二轮将使用(1.8,2.3)和(4.1,5.4)作为新的簇中心,重复以
上的过程。直到迭代次数达到用户设定的次数终止。最后一轮的迭代分
出的两个簇就是最后的聚类结果。3)MLlib之K-Means源码解析。
MLlib中的K-Means的原理是:在同一个数据集上,跑多个K-Means
算法(每个称为一个run),然后返回效果最好的那个聚类的类簇中
心。初始的类簇中心点的选取有两种方法,一种是随机,另一种是采用
KMeans||(KMeans++的一个变种)。算法的停止条件是迭代次数达到
设置的次数,或者在某一次迭代后所有run的K-Means算法都收敛。
①类簇中心初始化。
本节介绍的初始化方法是对于每个运行的K-Means都随机选择K个
点作为初始类簇:
private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs k, new Random.nextInt).toSeq
Array.tabulate(runs)(r => sample.slice(r k, (r + 1) k).toArray)
}
②计算属于某个类簇的点。
在每一次迭代中,首先会计算属于各个类簇的点,然后更新各个类
簇的中心。
K-Means算法的并行实现通过Spark 的mapPartitions函数,通过该函数获取到分区的迭代器。
之后对于每个运行算法中的每个类簇计算属于该类簇的点的个数以及累加和。 val totalContribs = data.mapPartitions { points =>
val runs = activeCenters.length
val k = activeCenters(0).length
val dims = activeCenters(0)(0).length
val sums = Array.fill(runs, k)(new DoubleMatrix(dims))
val counts = Array.fill(runs, k)(0L)for (point <- points; (centers, runIndex) <- activeCenters.zipWithIndex) {
找到距离该点最近的类簇中心点 val (bestCenter, cost) = KMeans.findClosest(centers, point)
统计该运行算法开销, 用于在之后选取开销最小的那个运行的算法 costAccums(runIndex) += cost
将该点加到最近的类簇的统计总和中去, 方便之后计算该类簇的新中心点 sums(runIndex)(bestCenter).addi(new DoubleMatrix(point))
将距离该点最近的类簇的点数量加1,sum.divi(count)就是类簇的新中心 counts(runIndex)(bestCenter) += 1
}
val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
((i, j), (sums(i)(j), counts(i)(j)))
}
contribs.iterator
对于每个运行算法的每个类簇计算属于该类簇的点的个数和加和 }.reduceByKey(mergeContribs).collectAsMap
mergeContribs是一个负责合并的函数:
def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
(p1._1.addi(p2._1), p1._2 + p2._2)
}
③更新类簇的中心点。
for ((run, i) <- activeRuns.zipWithIndex) {
var changed = false
for (j <- 0 until k) {
val (sum, count) = totalContribs((i, j))
if (count != 0) {
计算类簇的新的中心点 val newCenter = sum.divi(count).data
if (MLUtils.squaredDistance(newCenter, centers(run)(j)) > epsilon epsilon) {
此处与代码和算法的停止条件有关 changed = true
}
centers(run)(j) = newCenter
}
}
如果某个run的KMeans算法的某轮次迭代中K个类簇的中心点变化都不超过指定阈值 if (!changed) {
active(run) = false
logInfo(Run + run + finished in + (iteration + 1) + iterations)
}
costs(run) = costAccums(i).value
}
④算法停止条件。
算法的停止条件是迭代次数达到设置的次数,或者所有运行的K-Means算法都收敛:
while (iteration < maxIterations !activeRuns.isEmpty)
上文对典型聚类算法K-Means原理进行介绍,下面将对典型的分类
算法朴素贝叶斯算法进行介绍。
(2)朴素贝叶斯分类算法
朴素贝叶斯分类算法是贝叶斯分类算法多个变种之一。朴素指假设
各属性之间是相互独立的。研究发现,在大多数情况下,朴素贝叶斯分
类算法(naive bayes classifier)在性能上与决策树(decision tree)、神
经网络(netural network)相当。贝叶斯分类算法在大数据集的应用中
具有方法简便、准确率高和速度快的优点。但事实上,贝叶斯分类也有
其缺点。由于贝叶斯定理假设一个属性值对给定类的影响独立于其他的
属性值,而此假设在实际情况中经常是不成立的,则其分类准确率可能
会下降。
朴素贝叶斯分类算法是一种监督学习算法,使用朴素贝叶斯分类算
法对文本进行分类,主要有两种模型,即多项式模型(multinomial
model)和伯努利模型(bernoulli model)。MLlib使用的是被广泛使用
的多项式模型。本书将以一个实际的例子来简略介绍使用多项式模型的
朴素贝叶斯分类算法。在多项式模型中,设某文档d=(t1,t2,…,tk),tk是该文档中出
现过的单词,允许重复。
先验概率P(c)=类c下单词总数整个训练样本的单词总数
类条件概率P(tk|c)=(类c下单词tk在各个文档中出现过的次
数之和+1)
(类c下单词总数+|V|)
V是训练样本的单词表(即抽取单词,单词出现多次,只算一
个),|V|则表示训练样本包含多少种单词。P(tk|c)可以看作是单词tk
在证明d属于类c上提供了多大的证据,而P(c)则可以认为是类别c在
整体上占多大比例(有多大可能性)。
给定一组分好类的文本训练数据,如表3-10所示。
给定一个新样本(河北河北河北吉林香港),对其进行分类。该文
本用属性向量表示为d=(河北,河北,河北,吉林,香港),类别集合
为Y={yes,no}。
表3-10 文本训练数据类yes下总共有8个单词,类no下总共有3个单词,训练样本单词总
数为11,因此P(yes)=811,P(no)=311。类条件概率计算如下:
P(yes)=811, P(no)=311。类条件概率计算如下:
P(河北 | yes)=(5+1)(8+6)=614=37
P(河北 | yes)=P(吉林 | yes)= (0+1)(8+6)=114
P(河北|no)=(1+1)(3+6)=29
P(Japan|no)=P(吉林| no) =(1+1)(3+6)=29
分母中的8,是指yes类别下textc的长度,也即训练样本的单词总
数,6是指训练样本有河北、北京、上海、广东、吉林、香港,共6个单
词,3是指no类下共有3个单词。
有了以上类条件概率,开始计算后验概率:
P(yes | d)=(37)3×114×114×811=108184877≈0.00058417
P(no | d)= (29)3×29×29×311=32216513≈0.00014780
比较大小,即可知道这个文档属于类别河北。3.5 本章小结
本章主要介绍了BDAS中广泛应用的几个数据分析组件。SQL on
Spark提供在Spark上的SQL查询功能。让用户可以基于内存计算和SQL
进行大数据分析。通过Spark Streaming,用户可以构建实时流处理应
用,其高吞吐量,以及适合历史和实时数据混合分析的特性使其在流数
据处理框架中突出重围。GraphX充当Spark生态系统中图计算的角色,其简洁的API让图处理算法的书写更加便捷。最后介绍了MLlib——
Spark上的机器学习库,它充分利用Spark内存计算和适合迭代的特性,使分布式系统与并行机器学习算法实现了完美的结合。相信随着Spark
生态系统的日臻完善,这些组件还会取得长足发展。第4章 Lamda架构日志分析流水线
4.1 日志分析概述
随着互联网的发展,在互联网上产生了大量的Web日志或移动应用
日志,日志包含用户最重要的信息,通过日志分析,用户可以获取到网
站或应用的访问量,哪个网页访问人数最多,哪个网页最有价值、用户
的特征、用户的兴趣等。
一般中型的网站(10万的PV[1]
以上),每天会产生1GB以上Web日
志文件。大型或超大型的网站,可能每小时就会产生500GB~1TB的数
据量。
对于日志的这种规模的数据,通过Spark进行大规模日志分析与日
志处理,能够达到很好的效果。
Web日志由Web服务器产生,现在互联网公司使用的主流的服务器
可能是Nginx、Apache、Tomcat等。从Web日志中,我们可以获取网站
每类页面的PV值(页面浏览)、UV(独立IP数)。更复杂一些的,可
以计算得出用户所检索的关键词排行榜、用户停留时间最高的页面等。
更为复杂的,构建广告点击模型、分析用户行为特征等。
1.日志格式目前常见的Web日志格式主要由两类:一种日志格式是Apache的
NCSA日志格式,另一种日志格式是IIS的W3C日志格式。
下面以Nginx日志格式为例进行讲解。
Nginx日志示例格式:
222.68.172.111 - - [18Sep2013:06:49:57 +0000]
GET imagesmy.jpg HTTP1.1 200 19939
http:www.angularjs.cnA00n Mozilla5.0 (Windows NT 6.1) AppleWebKit537.36 (KHTML, like Gecko) Chrome29.0.1547.66 Safari537.36
以下是本例中涉及的一些要素。
·remote_addr:记录客户端的IP地址。本例为222.68.172.111。
·remote_user:记录客户端用户名称,本例--表示为空。
·time_local:记录访问时间与时区,本例为[18Sep2013:06:49:
57+0000]。
·request:记录请求的URL与HTTP协议,本例为GETimagesmy.jpg
HTTP1.1。
·status:记录请求状态,成功是200。
·body_bytes_sent:记录发送给客户端文件主体内容大小,本例中为
19939。·http_referer:用来记录从哪个页面链接访问过来
的,http:www.angularjs.cnA00n。
·http_user_agent:记录客户浏览器的相关信息,本例中为
Mozilla5.0(Windows NT 6.1)AppleWebKit537.36(KHTML,like
Gecko)Chrome29.0.1547.66 Safari537.36。
注意 如果用户想要更多的信息,则要用其他手段去获取,通过JS
代码单独发送请求,并使用cookies记录用户的访问信息。
通过利用这些日志信息,我们可以深入分析用户行为或网站状况
了。
2.传统单机日志数据分析示例
当数据量较小(10MB,100MB,10GB),单机处理能够解决,可
以通过各种UnixLinux命令或者工具,awk、grep、sort、join等都是日志
分析的利器,再配合Perl、Python、正则表达式,基本就可以解决常见
日志分析的问题。
(1)Linux Shell进行单机日志分析示例
例如,想从上面提到的nginx日志中得到访问量最高的前10个IP,通
过以下Shell进行分析: cat access.log.10 | awk '{a[1]++} END {for(b in a) print b\ta[b]}'
| sort -k2 -r | head -n 10
163.177.71.12 972
101.226.68.137 972
183.195.232.138 971
50.116.27.194 97
14.17.29.86 96
61.135.216.104 94
61.135.216.105 91
61.186.190.41 9
59.39.192.108 9
220.181.51.212 9
(2)Python进行单机日志分析示例
检查Nginx的日志文件,统计基于每个独立IP地址的点击率,代码
如下:
!usrbinenv pythoncoding:utf8
import re
import sys
contents = sys.argv[1]def NginxIpHite(logfile_path):
IP:4个字符串,每个字符串为1~3个数字,由点连接 ipadd = r'\.'.join([r'\d{1,3}']4)
re_ip = re.compile(ipadd)
iphitlisting = {}
for line in open(contents):
match = re_ip.match(line)
if match:
ip = match.group( )
如果IP存在增加1,否则设置点击率为1
iphitlisting[ip] = iphitlisting.get(ip, 0) + 1
print iphitlisting
NginxIpHite(contents)
运行并打印结果如下:
[root@chlinux 06] .nginx_ip.py access_20140610.log
{'183.3.121.84': 1, '182.118.20.184': 2, '182.118.20.185': 1, '190.52.120.38': 1, '182.118.20.187': 1, '202.108.251.214': 2, '61.135.190.101': 2, '103.22.181.247': 1, '101.226.33.190': 3, '183.129.168.131': 1, '66.249.73.29': 26, '182.118.20.202': 1, '157.56.93.38': 2, '219.139.102.237': 4, '220.181.108.178': 1, '220.181.108.179': 1, '182.118.25.233': 4, '182.118.25.232': 1, '182.118.25.231': 2, '182.118.20.186': 1, '174.129.228.67': 20}
此脚本返回的是一个Key-Value映射,包含访问Nginx服务器的各个IP的点击数。用户可以通过这个示例再进行深入拓展,进行更丰富的日
志信息和知识的获取。
(3)大规模分布式日志分析情况
当数据量每天以10GB、100GB增长的时候,单机处理能力已经不
能满足需求。此时就需要增加系统的扩展性,用大数据分析和并行计算
来解决。在Spark出现之前,海量数据存储和海量日志分析都是基于
Hadoop、Hive等数据分析系统的。Spark的出现,使得全栈数据分析更
加容易。并且,Spark非常适合构建多范式日志分析流水线。我们将介
绍如何使用Spark构建日志分析流水线。
[1] Page View,页面访问量。4.2 日志分析指标
下面将介绍常用网站的运营数据分析指标。在数据越来越重要的趋
势下,数据化运营已经提上互联网公司的日程,如果监控网站或应用的
状况时发现瓶颈问题,我们需要针对网站或应用相关指标进行统计和分
析得出的。随着移动互联网的发展,越来越多的移动数据分析公司与工
具也不断涌现,其中代表性的为友盟、Talking Data等,为公司提供数
据化运营支持。
网站运行日志分析常用指标如下:
·PV(Page View):网站页面访问数,也称作网站流量。
·UV(Unique Visitor):页面IP的访问量统计,访问用户数,即独
立IP。
·PVPU(Page View Per User):平均每位用户访问页面数。
·漏斗模型与转化率:漏斗模型指的是多个不同的事件按照一定依
赖顺序依次触发的流程中的转化模型。用户通常会对应用中的一些关键
路径进行分析。比如注册流程、购物流程、交易流程等。以电商应用的
购物流程为例:
·1浏览商品页→2放入购物车→3生成订单→4支付订单→5完成交易·我们可以根据这些关键路径来计算每一步的转化率。转化率指的
是完成当前事件的用户中触发下一个依赖事件的用户所占比例。
·留存率:用户在某段时间内开始使用应用,经过一段时间后,仍
然继续使用这个应用的用户被认作是留存。这部分用户占开始新增用户
的比例即是留存率。
·用户属性:用户的基本属性和行为特征,将用户打标签,帮助产
品进一步的营销与推荐。
最终希望通过一个仪表盘展示出整个网站的统计指标信息,如图4-
1所示。
图4-1 日志统计效果图4.3 Lamda架构
日志分析中既有离线大规模分析的需求,又有实时性的需求,这就
可以通过采用Lamda架构构建日志分析流水线。
1.Lamda架构简介
Lambda架构的目的是为大数据分析应用程序提供一个低响应延迟
的组合数据传输环境。
Lambda系统架构定义了一套明确的架构原则,它为建立一套强大
的和可扩展的数据系统定义了架构范式。在Lamda架构中,被读取的数
据是不可变的,在并行处理过程中数据会依次进入流处理系统和批处理
系统,同时进行实时处理和离线数据分析。在查询时,当这两者都返回
结果后,才算是完成一次完整的查询。从逻辑上看,传输过程发生了两
次,一次是在批处理中,一次是在流处理中。
Lamda架构并不限定其中的具体系统,要根据实际情况进行调整优
化。大数据的系统选型具体可以有很多的组合变化。例如可以将图4-2
中的Kafka、Storm、Hadoop等换成其他类似的系统,例如Spark
Streaming、Spark等,惯常的做法是使用两个数据库来存储数据输出
表,一个存储实时表,响应实时查询需求,另外一个存储批处理表,返
回离线计算结果。图4-2 Lamda数据分析架构
它是由三层组成:批处理层、服务层和速度层。
①批处理层:Hadoop、Spark、Tez等都可以作为批处理层的处理工
具,HDFS、HBase等都可以作为数据持久化系统。
②服务层:用于加载和实现数据库中的批处理视图,以便用户能够
查询,不一定需要随机写,但是支持批更新和随机读,例如采用
ElephantDB、Voldemort。③快速处理层:主要处理新数据和服务层更新造成的高延迟补偿,利用流处理系统(如Storm、S4、Spark Streaming)和随机读写数据存储
库来计算实时视图(HBase)。批处理和服务层定期处理和转换实时视
图为批处理视图。
为了获得一个完整结果,批处理和实时视图都必须被同时查询和融
合(实时代表新数据)。
下面借鉴Lamda架构,设计整个数据分析流水线架构,如图4-3所
示。
图4-3 日志分析流水线整体架构图
本例中实时日志分析流水线大致按以下步骤操作。①数据采集:采用Flume NG进行数据采集。
②数据汇总与转发:通过Flume将数据转发汇总到实时消息系统
Kafka。
③数据处理:采用Spark Streaming进行实时数据处理。
④结果呈现:采用Flask作为可视化呈现工具进行结果呈现。
离线日志分析流水线大致按以下步骤操作。
①数据存储:通过Flume将数据转储到HDFS。
②数据处理:通过Spark SQL进行数据预处理。
③结果呈现:结果汇总存储到MySQL最后通过Flask进行结果呈
现。4.4 构建日志分析数据流水线
后续的章节将介绍日志数据采集、日志数据汇总、日志实时分析、日志离线分析及可视化,来构建数据分析流水线。4.4.1 用Flume进行日志采集
Web日志由Web服务器产生,生产环境的服务器可能是Nginx、Apache、Tomcat、IIS等。
例如,可以将Tomcat的日志收集到指定的目录,Tomcat安装
在opttomcat,日志存放在varlogdata。其他服务器(如Apache、Nginx、IIS等),用户可以根据相应服务器的默认目录进行相关配置。
1.Flume简介
Flume是Cloudera开发的日志收集系统,具有分布式、高可用等特
点,为大数据日志采集、汇总聚合和转储传输提供了支持。为了保证
Flume的扩展性和灵活性,在日志系统中定制各类数据发送方及数据接
收方。同时Flume提供对数据进行简单处理,并写各种数据到接受方的
能力。
Flume的核心是把数据从数据源收集过来,再送到数据接收方。为
了保证送达成功,在送到目的地之前,会先缓存数据,待数据真正到达
目的地后,删除自己缓存的数据。
Flume传输的数据的基本单位是事件(Event),如果是文本文件,通常是一行记录,这也是事务的基本单位。事件(Event)从源(Source)传输到通道(Channel),再到数据输出槽(Sink),本身为
一个比特(byte)数组,并可携带消息头(headers)信息。
Flume运行的核心是Agent。它是一个完整的数据收集工具,含有三
个核心组件,分别是Source、Channel、Sink。通过这些组件,Event可
以从一个地方流向另一个地方,如图4-4所示。
图4-4 Flume架构
Flume核心组件如下。
·Source可以接收外部源发送过来的数据。不同的Source,可以接受
不同的数据格式。比如有目录池(Spooling Directory)数据源,可以监
控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取
其内容。
·Channel是一个存储地,接收Source的输出,直到有Sink消费掉
Channel中的数据。Channel中的数据直到进入到下一个Channel中或者进入终端才会被删除。当Sink写入失败后,可以自动重启,不会造成数据
丢失,因此很可靠。
·Sink会消费Channel中的数据,然后送给外部源或者其他Source。
如数据可以写入到HDFS或者HBase中。
Flume允许多个Agent连在一起,形成前后相连的多级数据传输通
道。
2.Flume安装与配置
(1)安装Flume
1)安装JDK。
2)安装Flume。
http: mirrors.cnnic.cnapacheflume1.5.0apache-flume-1.5.0-bin.tar.gz。 tar xvzf apache-flume-1.5.0-bin.tar.gz
mv apache-flume-1.5.0-bin apache-flume-1.5.0
ln -s apache-flume-1.5.0 flume
3)环境变量设置。
vim etcprofile
export JAVA_HOME=usrlocaljdk
export CLASSPATH=.:JAVA_HOMElibdt.jar:JAVA_HOMElibtools.jar
export PATH=PATH:JAVA_HOMEbin
export FLUME_HOME=usrlocalflume
export FLUME_CONF_DIR=FLUME_HOMEconf
export PATH=.:PATH::FLUME_HOMEbin
source etcprofile(2)创建Agent配置文件将数据输出到HDFS
这需要修改flume.conf中的配置,具体如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
描述和配置source
第1步:配置数据源 a1.sources.r1.type = exec
a1.sources.r1.channels = c1
配置需要监控的日志输出目录 a1.sources.r1.command = tail -F varlogdata
Describe the sink
第2步:配置数据输出 a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs:192.168.11.177:9000flumeevents%Y%m%d%H%M
a1.sinks.k1.hdfs.filePrefix=cmcc
a1.sinks.k1.hdfs.minBlockReplicas=1
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
Use a channel which buffers events in memory
第3步:配置数据通道 a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
Bind the source and sink to the channel
第4步:将三者级联 a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(3)启动Flume Agent
cd usrlocalflume
nohup binflume-ng agent -n agent1 -c conf -f confflume-conf.properties amp;
通过上面介绍的一系列步骤,已经可以将Flume收集的数据输出到
HDFS。3.整合Flume与Kafka、HDFS
下面通过Sink设置的修改将Flume的日志输出到HDFS和Kafka。下
面的IP地址只是示例,用户根据具体需求改为生产环境中的IP地址。
define [sink] begin
define the sink k1,定义HDFS输出端
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs: 192.168.11.174:9000flumeevents%Y%m%d
a1.sinks.k1.hdfs.filePrefix=cmcc-%H
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.minBlockReplicas=1
a1.sinks.k1.hdfs.rollInterval=3600
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
define the sink k2,定义Kafka输出端 a1.sinks.k2.channel=c2
a1.sinks.k2.type=com.cmcc.chiwei.Kafka.CmccKafkaSink
a1.sinks.k2.metadata.broker.list=192.168.11.174:9092,192.168.11.175:9092,192.168.11.176:9092
a1.sinks.k2.partition.key=0
a1.sinks.k2.partitioner.class=com.cmcc.chiwei.Kafka.CmccPartition
a1.sinks.k2.serializer.class=Kafka.serializer.StringEncoder
a1.sinks.k2.request.required.acks=0
a1.sinks.k2.cmcc.encoding=UTF-8
a1.sinks.k2.cmcc.topic.name=cmcc
a1.sinks.k2.producer.type=async
a1.sinks.k2.batchSize=100
define [sink] end
以上配置将同样的数据无差异输出传递到多个输出端。
a1.sources.r1.selector.type=replicating
本例配置了两个输出端:一个是输出到Kafka,为了提高性能,用
内存通道。另一个是输出到HDFS,离线分析。在配置文件中设置两个sink:一个是Kafka的输出通道K2。一个是
HDFS的输出通道K1。
a1.sources = r1
a1.sinks = k1 k2
a1.channels=c1 c2
define [channel] begin
define the channel c1,a1.channels.c1.type=file
a1.channels.c1.checkpointDir=homeflumeflumeCheckpoint
a1.channels.c1.dataDirs=homeflumeflumeData , homeflumeflumeDataExt
a1.channels.c1.capacity=2000000
a1.channels.c1.transactionCapacity=100
define the channel c2
a1.channels.c2.type=memory
a1.channels.c2.capacity=2000000
a1.channels.c2.transactionCapacity=100
define [channel] end
大家在配置文件中添加如上信息,即可配置好,同时输出到Kafka
和HDFS。4.4.2 用Kafka将日志汇总
由于Flume收集的数据和后端处理的下游系统之间可能存在多对多
的关系,为了解耦合保证数据传输延迟,选用Kafka作为消息中间层进
行日志中转。
Apache Kafka是由Apache软件基金会开发的一个开源消息系统项
目,由Scala写成。Kafka最初是由LinkedIn开发,并于2011年初开源。
2012年10月从Apache Incubator“毕业”。该项目的目标是为处理实时数据
提供一个统一、高通量、低等待的平台[1]。它提供了类似于JMS的特
性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
Kafka进行消息保存时会根据Topic进行归类,发送消息者成为
Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例
组成,每个实例(Server)成为Broker。无论是Kafka集群,还是
Producer和Consumer都依赖Zookeeper来保证系统可用性集群保存一些元
(Meta)信息,如图4-5所示。图4-5 Kafka架构图
1.概念和术语
·消息:全称为Message,是指在生产者、服务端和消费者之间传输
数据。
·消息代理:全称为Message Broker,通俗来讲就是指该MQ的服务
端或者服务器。
·消息生产者:全称为Message Producer,负责产生消息并发送消息
到meta服务器。
·消息消费者:全称为Message Consumer,负责消息的消费。
·消息的主题:全称为Message Topic,由用户定义并在Broker上配
置。Producer发送消息到某个Topic下,Consumer从某个Topic下消费消息。
·主题的分区:也称为Partition,可以把一个Topic分为多个分区。每
个分区是一个有序、不可变的、顺序递增的Commit Log
·消费者分组:全称为Consumer Group,由多个消费者组成,共同
消费一个Topi ......
您现在查看是摘要介绍页, 详见PDF附件(6756KB,438页)。




