Spark机器学习.pdf
http://www.100md.com
2020年1月9日
![]() |
| 第1页 |
![]() |
| 第9页 |
![]() |
| 第11页 |
![]() |
| 第22页 |
![]() |
| 第43页 |
![]() |
| 第292页 |
参见附件(4429KB,326页)。
Spark机器学习,这是一本关于Spark语音的学习书籍,从最基础的环境搭建到各项实例的练习,同学们可以在这里深入了解Spark语音,有需要的欢迎。

Spark机器学习内容简介
《Spark机器学习》每章都设计了案例研究,以机器学习算法为主线,结合实例探讨了Spark 的实际应用。书中没有让人抓狂的数据公式,而是从准备和正确认识数据开始讲起,全面涵盖了推荐系统、回归、聚类、降维等经典的机器学习算法及其实际应用。
Spark机器学习作者信息
Nick Pentreath是Graphflow公司联合创始人。Graphflow是一家大数据和机器学习公司,专注于以用户为中心的推荐系统和客户服务智能化技术。Nick拥有金融市场、机器学习和软件开发背景,曾任职于高盛集团,之后去在线广告营销创业公司Cognitive Match Limited(伦敦)担任研究科学家,后又去非洲更大的社交网络Mxit领导数据科学与分析团队。Nick是Apache Spark项目管理委员会成员之一。
Spark机器学习主目录
第1章 Spark 的环境搭建与运行
第2章 设计机器学习系统
第3章 Spark 上数据的获取、处理与准备
第4章 构建基于Spark 的推荐引擎
第5章 Spark 构建分类模型
第6章 Spark 构建回归模型
第7章 Spark 构建聚类模型
第8章 Spark 应用于数据降维
第9章 Spark 高级文本处理技术
第10章 Spark Streaming 在实时机器学习上的应用
Spark机器学习章节详解
第1章 “Spark的环境搭建与运行”,会讲到如何安装和搭建Spark框架的本地开发环境,以及怎样使用Amazon EC2在云端创建Spark集群。之后介绍Spark编程模型和API。最后分别用Scala、Java和Python语言创建一个简单的Spark应用。
第2章 “设计机器学习系统”,会展示一个贴合实际的机器学习系统案例。随后会针对该案例设计一个基于Spark的智能系统所对应的高层架构。
第3章 “Spark上数据的获取、处理与准备”,会详细介绍如何从各种的公开渠道获取用于机器学习系统的数据。我们将学到如何进行数据处理和清理,并通过可用的工具、库和Spark函数将它们转换为符合要求的数据,使之具备可用于机器学习模型的特征。
第4章 “构建基于Spark的推荐引擎”,展示了如何创建一个基于协同过滤的推荐模型。该模型将用于向给定用户推荐物品,以及创建与给定物品相似的物品。这一章还会讲到如何使用标准指标来评估推荐模型的效果。
第5章 “Spark构建分类模型”,阐述如何创建二元分类模型,以及如何利用标准的性能评估指标来评估分类效果。
第6章 “Spark构建回归模型”,扩展了第5章中的分类模型以创建一个回归模型,并详细介绍回归模型的评估指标。
第7章 “Spark构建聚类模型”,探索如何创建聚类模型以及相关评估方法的使用。你会学到如何分析和可视化聚类结果。
第8章 “Spark应用于数据降维”,将通过多种方法从数据中提取其内在结构并降低其维度。你会学到一些常见的降维方法,以及如何对它们进行应用和分析。这里还会讲到如何将降维的结果作为其他机器学习模型的输入。
第9章 “Spark高级文本处理技术”,介绍处理大规模文本数据的方法。这包括从文本提取特征以及处理文本数据常见的高维特征的方法。
第10章 “Spark Streaming在实时机器学习上的应用”,对Spark Streaming进行综述,并介绍在流数据上的机器学习中它如何实现对在线和增量学习方法的支持。
Spark机器学习截图


书名:Spark机器学习
作者:Nick Pentreath
译者:蔡立宇 黄章帅 周济民
ISBN:978-7-115-39983-0
本书由北京图灵文化发展有限公司发行数字版。版权所有,侵权必究。
您购买的图灵电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。
我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。
如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可
能追究法律责任。
图灵社区会员 gaara(bboniao@163.com) 专享 尊重版权
版权声明
前言
本书内容
预备知识
本书目标
排版约定
读者反馈
客户支持
下载示例代码
勘误表
侵权行为
问题
致谢
第1章 Spark的环境搭建与运行
1.1 Spark的本地安装与配置
1.2 Spark集群
1.3 Spark编程模型
1.3.1 SparkContext类与SparkConf类
1.3.2 Spark shell
1.3.3 弹性分布式数据集
1.3.4 广播变量和累加器
1.4 Spark Scala编程入门
1.5 Spark Java编程入门
1.6 Spark Py thon编程入门
1.7 在Amazon EC2上运行Spark
启动一个EC2 Spark集群
1.8 小结
第2章 设计机器学习系统
2.1 MovieStream介绍
2.2 机器学习系统商业用例
2.2.1 个性化
2.2.2 目标营销和客户细分
2.2.3 预测建模与分析
2.3 机器学习模型的种类
2.4 数据驱动的机器学习系统的组成2.4.1 数据获取与存储
2.4.2 数据清理与转换
2.4.3 模型训练与测试回路
2.4.4 模型部署与整合
2.4.5 模型监控与反馈
2.4.6 批处理或实时方案的选择
2.5 机器学习系统架构
动手练习
2.6 小结
第3章 Spark上数据的获取、处理与准备
3.1 获取公开数据集
MovieLens 100k数据集
3.2 探索与可视化数据
3.2.1 探索用户数据
3.2.2 探索电影数据
3.2.3 探索评级数据
3.3 处理与转换数据
非规整数据和缺失数据的填充
3.4 从数据中提取有用特征
3.4.1 数值特征
3.4.2 类别特征
3.4.3 派生特征
3.4.4 文本特征
3.4.5 正则化特征
3.4.6 用软件包提取特征
3.5 小结
第4章 构建基于Spark的推荐引擎
4.1 推荐模型的分类
4.1.1 基于内容的过滤
4.1.2 协同过滤
4.1.3 矩阵分解
4.2 提取有效特征
从MovieLens 100k数据集提取特征
4.3 训练推荐模型
4.3.1 使用MovieLens 100k数据集训练模型4.3.2 使用隐式反馈数据训练模型
4.4 使用推荐模型
4.4.1 用户推荐
4.4.2 物品推荐
4.5 推荐模型效果的评估
4.5.1 均方差
4.5.2 K 值平均准确率
4.5.3 使用MLlib内置的评估函数
4.6 小结
第5章 Spark构建分类模型
5.1 分类模型的种类
5.1.1 线性模型
5.1.2 朴素贝叶斯模型
5.1.3 决策树
5.2 从数据中抽取合适的特征
从KaggleStumbleUpon evergreen分类数据集中抽取特征
5.3 训练分类模型
在KaggleStumbleUpon evergreen的分类数据集中训练分类模型
5.4 使用分类模型
在KaggleStumbleUpon evergreen数据集上进行预测
5.5 评估分类模型的性能
5.5.1 预测的正确率和错误率
5.5.2 准确率和召回率
5.5.3 ROC曲线和AUC
5.6 改进模型性能以及参数调优
5.6.1 特征标准化
5.6.2 其他特征
5.6.3 使用正确的数据格式
5.6.4 模型参数调优
5.7 小结
第6章 Spark构建回归模型
6.1 回归模型的种类
6.1.1 最小二乘回归
6.1.2 决策树回归
6.2 从数据中抽取合适的特征从bike sharing数据集抽取特征
6.3 回归模型的训练和应用
在bike sharing数据上训练回归模型
6.4 评估回归模型的性能
6.4.1 均方误差和均方根误差
6.4.2 平均绝对误差
6.4.3 均方根对数误差
6.4.4 R-平方系数
6.4.5 计算不同度量下的性能
6.5 改进模型性能和参数调优
6.5.1 变换目标变量
6.5.2 模型参数调优
6.6 小结
第7章 Spark构建聚类模型
7.1 聚类模型的类型
7.1.1 K-均值聚类
7.1.2 混合模型
7.1.3 层次聚类
7.2 从数据中提取正确的特征
从MovieLens数据集提取特征
7.3 训练聚类模型
用MovieLens数据集训练聚类模型
7.4 使用聚类模型进行预测
用MovieLens数据集解释类别预测
7.5 评估聚类模型的性能
7.5.1 内部评价指标
7.5.2 外部评价指标
7.5.3 在MovieLens数据集计算性能
7.6 聚类模型参数调优
通过交叉验证选择K
7.7 小结
第8章 Spark应用于数据降维
8.1 降维方法的种类
8.1.1 主成分分析
8.1.2 奇异值分解8.1.3 和矩阵分解的关系
8.1.4 聚类作为降维的方法
8.2 从数据中抽取合适的特征
从LFW数据集中提取特征
8.3 训练降维模型
在LFW数据集上运行PCA
8.4 使用降维模型
8.4.1 在LFW数据集上使用PCA投影数据
8.4.2 PCA和SVD模型的关系
8.5 评价降维模型
在LFW数据集上估计SVD的 k 值
8.6 小结
第9章 Spark高级文本处理技术
9.1 处理文本数据有什么特别之处
9.2 从数据中抽取合适的特征
9.2.1 短语加权表示
9.2.2 特征哈希
9.2.3 从20新闻组数据集中提取TF-IDF特征
9.3 使用TF-IDF模型
9.3.1 20 Newsgroups数据集的文本相似度和TF-IDF特征
9.3.2 基于20 Newsgroups数据集使用TF-IDF训练文本分类器
9.4 评估文本处理技术的作用
在20 Newsgroups数据集上比较原始特征和处理过的TF-IDF特征
9.5 Word2Vec模型
基于20 Newsgroups数据集训练Word2Vec
9.6 小结
第10章 Spark Streaming在实时机器学习上的应用
10.1 在线学习
10.2 流处理
10.2.1 Spark Streaming介绍
10.2.2 使用Spark Streaming缓存和容错
10.3 创建Spark Streaming应用
10.3.1 消息生成端
10.3.2 创建简单的流处理程序
10.3.3 流式分析10.3.4 有状态的流计算
10.4 使用Spark Streaming进行在线学习
10.4.1 流回归
10.4.2 一个简单的流回归程序
10.4.3 流K-均值
10.5 在线模型评估
使用Spark Streaming比较模型性能
10.6 小结
版权声明
Copyright ? 2015 Packt Publishing. First published in the English language under the title Machine
Learning with Spark.
Simplified Chinese-language edition copy right ? 2015 by Posts Telecom Press. All rights
reserved.
本书中文简体字版由Packt Publishing授权人民邮电出版社独家出版。未经出版者书面许可,不得以任何方式复制或抄袭本书内容。
版权所有,侵权必究。
前言
近年来,被收集、存储和分析的数据量呈爆炸式增长,特别是与网络、移动设备相关的数
据,以及传感器产生的数据。大规模数据的存储、处理、分析和建模,以前只有Google、Yahoo!、Facebook和Twitter这样的大公司才涉及,而现在越来越多的机构都会面对处理海量
数据的挑战。
面对如此量级的数据以及常见的实时利用该数据的需求,人工驱动的系统难以应对。这就催
生了所谓的大数据和机器学习系统,它们从数据中学习并可自动决策。
为了能以低成本实现对大规模数据的支持,Google、Yahoo!、Amazon和Facebook涌现了大量
开源技术。这些技术旨在通过在计算机集群上进行分布式数据存储和计算来简化大数据处
理。
这些技术中最广为人知的是Apache Hadoop,它极大简化了海量数据的存储(通过Hadoop
Distributed File Sy stem,即HDFS)和计算(通过Hadoop MapReduce,一种在集群里多个节
点上进行并行计算的框架)流程,并降低了相应的成本。
然而,MapReduce有其严重的缺点,如启动任务时的高开销、对中间数据和计算结果写入磁
盘的依赖。这些都使得Hadoop不适合迭代式或低延迟的任务。Apache Spark是一个新的分布
式计算框架,从设计开始便注重对低延迟任务的优化,并将中间数据和结果保存在内存中。
Spark提供简洁明了的函数式API,并完全兼容Hadoop生态系统。
不止如此,Spark还提供针对Scala、Java和Py thon语言的原生API。通过Scala和Py thon的
API,Spark应用程序可充分利用Scala或Py thon语言的优势。这些优势包括使用相关的解释程
序进行实时交互式的程序编写。Spark目前还自带一个分布式机器学习和数据挖掘工具包
MLlib。经过重点开发,这个包中已经包括一些针对常见计算任务的高质量、可扩展的算
法。本书会涉及其中的部分算法。
在大型数据集上进行机器学习颇具挑战性。这主要是因为常见的机器学习算法并非为并行架
构而设计。大部分情况下,设计这样的算法并不容易。机器学习模型一般具有迭代式的特
性,而这与Spark的设计目标一致。并行计算的框架有很多,但很少能在兼顾速度、可扩展
性、内存处理和容错性的同时,还提供灵活、表达力丰富的API。Spark是其中为数不多的一
个。
本书将关注机器学习技术的实际应用。我们会简要介绍机器学习算法的一些理论知识,但总
的来说本书注重技术实践。具体来说,我们会通过示例程序和样例代码,举例说明如何借助
Spark、MLlib以及其他常见的免费机器学习和数据分析套件来创建一个有用的机器学习系
统。本书内容
第1章 “Spark的环境搭建与运行”,会讲到如何安装和搭建Spark框架的本地开发环境,以
及怎样使用Amazon EC2在云端创建Spark集群。之后介绍Spark编程模型和API。最后分别用
Scala、Java和Py thon语言创建一个简单的Spark应用。
第2章 “设计机器学习系统”,会展示一个贴合实际的机器学习系统案例。随后会针对该案
例设计一个基于Spark的智能系统所对应的高层架构。
第3章 “Spark上数据的获取、处理与准备”,会详细介绍如何从各种免费的公开渠道获取
用于机器学习系统的数据。我们将学到如何进行数据处理和清理,并通过可用的工具、库和
Spark函数将它们转换为符合要求的数据,使之具备可用于机器学习模型的特征。
第4章 “构建基于Spark的推荐引擎”,展示了如何创建一个基于协同过滤的推荐模型。该
模型将用于向给定用户推荐物品,以及创建与给定物品相似的物品。这一章还会讲到如何使
用标准指标来评估推荐模型的效果。
第5章 “Spark构建分类模型”,阐述如何创建二元分类模型,以及如何利用标准的性能评估
指标来评估分类效果。
第6章 “Spark构建回归模型”,扩展了第5章中的分类模型以创建一个回归模型,并详细介
绍回归模型的评估指标。
第7章 “Spark构建聚类模型”,探索如何创建聚类模型以及相关评估方法的使用。你会学到
如何分析和可视化聚类结果。
第8章 “Spark应用于数据降维”,将通过多种方法从数据中提取其内在结构并降低其维度。
你会学到一些常见的降维方法,以及如何对它们进行应用和分析。这里还会讲到如何将降维
的结果作为其他机器学习模型的输入。
第9章 “Spark高级文本处理技术”,介绍处理大规模文本数据的方法。这包括从文本提取特
征以及处理文本数据常见的高维特征的方法。
第10章 “Spark Streaming在实时机器学习上的应用”,对Spark Streaming进行综述,并介
绍在流数据上的机器学习中它如何实现对在线和增量学习方法的支持。预备知识
本书假设读者已有基本的Scala、Java或Python编程经验,以及机器学习、统计学和数据分析
方面的基础知识。本书目标
本书的预期读者是初中级数据科学研究者、数据分析师、软件工程师和对大规模环境下的机
器学习或数据挖掘感兴趣的人。读者不需要熟悉Spark,但若具有统计、机器学习相关软件
(比如MATLAB、scikit-learn、Mahout、R和Weka等)或分布式系统(如Hadoop)的实践经
验,会很有帮助。排版约定
在本书中,你会发现一些不同的文本样式,用以区别不同种类的信息。下面举例说明。
代码段的格式如下:
val conf = new SparkConf
.setAppName(Test Spark App)
.setMaster(local[4])
val sc = new SparkContext(conf)
所有的命令行输入或输出的格式如下:
>tar xfvz spark-1.2.0-bin-hadoop2.4.tgz
>cd spark-1.2.0-bin-hadoop2.4
新术语和重点词汇以楷体标示。屏幕、目录或对话框上的内容这样表示:“这些信息可以从
AWS主页上依次点击‘Account’ | ‘Security Credentials’ | ‘Access Credentials’看到。”
这个图标表示警告或需要特别注意的内容。
这个图标表示提示或者技巧。读者反馈
欢迎提出反馈。如果你对本书有任何想法,喜欢它什么,不喜欢它什么,请让我们知道。要
写出真正对大家有帮助的书,了解读者的反馈很重要。
一般的反馈,请发送电子邮件至feedback@packtpub.com,并在邮件主题中包含书名。
如果你有某个主题的专业知识,并且有兴趣写成或帮助促成一本书,请参考我们的作者指
南http:www.pack tpub.comauthors。客户支持
现在,你是一位自豪的Packt图书的拥有者,我们会尽全力帮你充分利用你手中的书。
下载示例代码
你可以用你的账户从http:www.pack tpub.com下载所有已购买Packt图书的示例代码文件。如
果你从其他地方购买本书,可以访问http:www.packtpub.comsupport并注册,我们将通过电
子邮件把文件发送给你。
勘误表
虽然我们已尽力确保本书内容正确,但出错仍旧在所难免。如果你在我们的书中发现错误,不管是文本还是代码,希望能告知我们,我们不胜感激。这样做可以减少其他读者的困扰,帮助我们改进本书的后续版本。如果你发现任何错误,请访问
http:www.packtpub.comsubmit-errata提交,选择你的书,点击勘误表提交表单的链接,并输
入详细说明。勘误一经核实,你的提交将被接受,此勘误将上传到本公司网站或添加到现有
勘误表。从http:www.pack tpub.comsupport选择书名就可以查看现有的勘误表。侵权行为
互联网上的盗版是所有媒体都要面对的问题。Pack t非常重视保护版权和许可证。如果你发现
我们的作品在互联网上被非法复制,不管以什么形式,都请立即为我们提供位置地址或网站
名称,以便我们可以寻求补救。
请把可疑盗版材料的链接发到copyright@packtpub.com。
非常感谢你帮助我们保护作者,以及保护我们给你带来有价值内容的能力。问题
如果你对本书内容存有疑问,不管是哪个方面,都可以通过questions@packtpub.com联系我
们,我们将尽最大努力来解决。
致谢
过去一年里,本书的写作过程如同过山车一般跌宕起伏,伴随着熬夜和周末加班。对机器学
习和Apache Spark的热爱让我受益良多,也希望本书能让读者有所收获。
非常感谢Packt出版团队在本书写作和编辑过程中提供的帮助,感谢Rebecca、Susmita、Sudhir、Amey、Neil、Vivek、Pankaj和所有为本书出过力的人。
同样感谢StumbleUpon公司的Debora Donato,她提供过数据和法律方面的协助。
写书的过程可能会让人感到孤立无援,因此审校人的反馈对保证本书的可读性,以及知晓还
需要作出哪些调整十分有帮助。我深深地感谢Andrea Mostosi、Hao Ren和Krishna Sank ar花费
时间审阅本书,并提供细致且极为重要的反馈。
家人和朋友的不懈支持是本书得以写成的必要因素。特别是我的好妻子Tammy,感谢她在若
干个夜晚和周末的陪伴与支持。谢谢你们所有人!
最后,谢谢你阅读这本书,希望它对你能有所帮助。
第1章 Spark的环境搭建与运行
Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。该
框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操
作都进行了抽象。它提供了一个更高级别的API用于处理分布式数据。从这方面说,它与
Apache Hadoop等分布式处理框架类似。但在底层架构上,Spark与它们有所不同。
Spark起源于加利福利亚大学伯克利分校的一个研究项目。学校当时关注分布式机器学习算法
的应用情况。因此,Spark从一开始便为应对迭代式应用的高性能需求而设计。在这类应用
中,相同的数据会被多次访问。该设计主要靠利用数据集内存缓存以及启动任务时的低延迟
和低系统开销来实现高性能。再加上其容错性、灵活的分布式数据结构和强大的函数式编程
接口,Spark在各类基于机器学习和迭代分析的大规模数据处理任务上有广泛的应用,这也表
明了其实用性。
关于Spark项目的更多背景信息,包括其开发的核心研究论文,可从项
目的历史介绍页面中查到:http:spark .apache.orgcommunity.htmlhistory。
Spark支持四种运行模式。
本地单机模式:所有Spark进程都运行在同一个Java虚拟机(Java Vitural
Machine,JVM)中。
集群单机模式:使用Spark自己内置的任务调度框架。
基于Mesos:Mesos是一个流行的开源集群计算框架。
基于YARN:即Hadoop 2,它是一个与Hadoop关联的集群计算和资源调度框架。
本章主要包括以下内容。
下载Spark二进制版本并搭建一个本地单机模式下的开发环境。各章的代码示例都
在该环境下运行。
通过Spark的交互式终端来了解它的编程模型及其API。
分别用Scala、Java和Py thon语言来编写第一个Spark程序。
在Amazon的Elastic Cloud Compute(EC2)平台上架设一个Spark集群。相比本地
模式,该集群可以应对数据量更大、计算更复杂的任务。通过自定义脚本,Spark同样可以运行在Amazon的Elastic MapReduce服
务上,但这不在本书讨论范围内。相关信息可参
考http:aws.amazon.comarticles4926593393724923;本书写作时,这篇文章是基于Spark
1.1.0写的。
如果读者曾构建过Spark环境并有Spark程序编写基础,可以跳过本章。1.1 Spark的本地安装与配置
Spark能通过内置的单机集群调度器来在本地运行。此时,所有的Spark进程运行在同一个Java
虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式很适合程序的原型
设计、开发、调试及测试。同样,它也适应于在单机上进行多核并行计算的实际场景。
Spark的本地模式与集群模式完全兼容,本地编写和测试过的程序仅需增加少许设置便能在集
群上运行。
本地构建Spark环境的第一步是下载其最新的版本包(本书写作时为1.2.0版)。各个版本的版
本包及源代码的GitHub地址可从Spark项目的下载页面找
到:http:spark.apache.orgdownloads.html。
Spark的在线文档http:spark .apache.orgdocslatest涵盖了进一步学习
Spark所需的各种资料。强烈推荐读者浏览查阅。
为了访问HDFS(Hadoop Distributed File System,Hadoop分布式文件系统)以及标准或定制
的Hadoop输入源,Spark的编译需要与Hadoop的版本对应。上述下载页面提供了针对Hadoop
1、CDH4(Cloudera的Hadoop发行版)、MapR的Hadoop发行版和Hadoop 2(YARN)的预
编译二进制包。除非你想构建针对特定版本Hadoop的Spark,否则建议你通过如下链接从
Apache镜像下载Hadoop 2.4预编译版本:http:www.apache.orgdyncloser.cgisparkspark-
1.2.0spark-1.2.0-bin-hadoop2.4.tgz。
Spark的运行依赖Scala编程语言(本书写作时为2.10.4版)。好在预编译的二进制包中已包含
Scala运行环境,我们不需要另外安装Scala便可运行Spark。但是,JRE(Java运行时环境)或
JDK(Java开发套件)是要安装的(相应的安装指南可参见本书代码包中的软硬件列表)。
下载完上述版本包后,解压,并在终端进入解压时新建的主目录:
>tar xfvz spark-1.2.0-bin-hadoop2.4.tgz
>cd spark-1.2.0-bin-hadoop2.4
用户运行Spark的脚本在该目录的bin目录下。我们可以运行Spark附带的一个示例程序来测试
是否一切正常:
>.binrun-example org.apache.spark.examples.SparkPi该命令将在本地单机模式下执行SparkPi这个示例。在该模式下,所有的Spark进程均运行于同
一个JVM中,而并行处理则通过多线程来实现。默认情况下,该示例会启用与本地系统的
CPU核心数目相同的线程。示例运行完,应可在输出的结尾看到类似如下的提示:…
141127 20:58:47 INFO SparkContext: Job finished: reduce at
SparkPi.scala:35, took 0.723269s
Pi is roughly 3.1465…
要在本地模式下设置并行的级别,以local[N]的格式来指定一个master变量即可。上述
参数中的N表示要使用的线程数目。比如只使用两个线程时,可输入如下命令:
>MASTER=local[2] .binrun-example
org.apache.spark.examples.SparkPi1.2 Spark集群
Spark集群由两类程序构成:一个驱动程序和多个执行程序。本地模式时所有的处理都运行在
同一个JVM内,而在集群模式时它们通常运行在不同的节点上。
举例来说,一个采用单机模式的Spark集群(即使用Spark内置的集群管理模块)通常包括:
一个运行Spark单机主进程和驱动程序的主节点;
各自运行一个执行程序进程的多个工作节点。
在本书中,我们将使用Spark的本地单机模式做概念讲解和举例说明,但所用的代码也可运行
在Spark集群上。比如在一个Spark单机集群上运行上述示例,只需传入主节点的URL即可:
>MASTER=spark:IP:PORT .binrun-example
org.apache.spark.examples.SparkPi
其中的IP和PORT分别是主节点IP地址和端口号。这是告诉Spark让示例程序运行在主节点所
对应的集群上。
Spark集群管理和部署的完整方案不在本书的讨论范围内。但是,本章后面会对Amazon EC2
集群的设置和使用做简要说明。
Spark集群部署的概要介绍可参见如下链接:
http:spark.apache.orgdocslatestcluster-overview.html
http:spark.apache.orgdocslatestsubmitting-applications.html1.3 Spark编程模型
在对Spark的设计进行更全面的介绍前,我们先介绍SparkContext对象以及Spark shell。后
面将通过它们来了解Spark编程模型的基础知识。
虽然这里会对Spark的使用进行简要介绍并提供示例,但要想了解更
多,可参考下面这些资料。
Spark快速入门:http:spark .apache.orgdocslatestquick -start.html。
针对Scala、Java和Py thon的《Spark编程指
南》:http:spark .apache.orgdocslatestprogramming-guide.html。
1.3.1 SparkContext类与SparkConf类
任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)
开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的
各种参数(比如主节点的URL)。
初始化后,我们便可用SparkContext对象所包含的各种方法来创建和操作分布式数据集
和共享变量。Spark shell(在Scala和Py thon下可以,但不支持Java)能自动完成上述初始
化。若要用Scala代码来实现的话,可参照下面的代码:
val conf = new SparkConf
.setAppName(Test Spark App)
.setMaster(local[4])
val sc = new SparkContext(conf)
这段代码会创建一个4线程的SparkContext对象,并将其相应的任务命名为Test Spark
APP。我们也可通过如下方式调用SparkContext的简单构造函数,以默认的参数值来创建
相应的对象。其效果和上述的完全相同:
val sc = new SparkContext(local[4], Test Spark App)下载示例代码
你可从http:www.packtpub.com下载你账号购买过的Packt书籍所对应的示例代码。若书
是从别处购买的,则可在https:www.packtpub.combook scontentsupport注册,相应的代
码会直接发送到你的电子邮箱。
1.3.2 Spark shell
Spark支持用Scala或Py thon REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程
序编写。由于输入的代码会被立即计算,shell能在输入代码时给出实时反馈。在Scala shell
里,命令执行结果的值与类型在代码执行完后也会显示出来。
要想通过Scala来使用Spark shell,只需从Spark的主目录执行.binspark-shell。它会
启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个
对象。该命令的终端输出应该如下图所示:要想在Py thon shell中使用Spark,直接运行.binpyspark命令即可。与Scala shell类似,Py thon下的SparkContext对象可以通过Py thon变量sc来调用。上述命令的终端输出应该
如下图所示:1.3.3 弹性分布式数据集
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。一个RDD
代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的
多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。Spark中的RDD
具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。
1. 创建RDD
RDD可从现有的集合创建。比如在Scala shell中:
val collection = List(a, b, c, d, e)
val rddFromCollection = sc.parallelize(collection)RDD也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。基于
Hadoop的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文
件、其他Hadoop标准格式、HBase、Cassandra等。以下举例说明如何用一个本地文件系统里
的文件创建RDD:
val rddFromTextFile = sc.textFile(LICENSE)
上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个
表示文本文件中某一行文字的String(字符串)对象。
2. Spark操作
创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被
分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里
的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。
Spark的操作通常采用函数式风格。对于那些熟悉用Scala或Py thon进行函数式编程的程序员
来说,这不难掌握。但Spark API其实容易上手,所以那些没有函数式编程经验的程序员也不
用担心。
Spark程序中最常用的转换操作便是map操作。该操作对一个RDD里的每一条记录都执行某个
函数,从而将输入映射成为新的输出。比如,下面这段代码便对一个从本地文本文件创建的
RDD进行操作。它对该RDD中的每一条记录都执行size函数。之前我们曾创建过一个这样
的由若干String构成的RDD对象。通过map函数,我们将每一个字符串都转换为一个整
数,从而返回一个由若干Int构成的RDD对象。
val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
其输出应与如下类似,其中也提示了RDD的类型:
intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] =
MappedRDD[5] at map at:14
示例代码中的=>是Scala下表示匿名函数的语法。匿名函数指那些没有指定函数名的函数
(比如Scala或Py thon中用def关键字定义的函数)。匿名函数的具体细节并不在本书讨论范围内,但由于它们在Scala、Py thon以及Java 8中大量使用(示例或现实应用中都是),列举一些实例仍会有帮助。
语法line => line.size表示以=>操作符左边的部分作为输入,对其执行一个函
数,并以=>操作符右边代码的执行结果为输出。在这个例子中,输入为line,输出则
是line.size函数的执行结果。在Scala语言中,这种将一个String对象映射为一
个Int的函数被表示为String => Int。
该语法使得每次使用如map这种方法时,都不需要另外单独定义一个函数。当函数简单
且只需使用一次时(像本例一样时),这种方式很有用。
现在我们可以调用一个常见的执行操作count,来返回RDD中的记录数目。
intsFromStringsRDD.count
执行的结果应该类似如下输出:
140129 23:28:28 INFO SparkContext: Starting job: count at
:17 ...
140129 23:28:28 INFO SparkContext: Job finished: count at
:17, took 0.019227 s
res4: Long = 398
如果要计算这个文本文件里每行字符串的平均长度,可以先使用sum函数来对所有记录的长
度求和,然后再除以总的记录数目:
val sumOfRecords = intsFromStringsRDD.sum
val numRecords = intsFromStringsRDD.count
val aveLengthOfRecord = sumOfRecords numRecords
结果应该如下:
aveLengthOfRecord: Double = 52.06030150753769
Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中,count返回一个Long,sum返回一个Double)。这就意味着多个操作可以很自然
地前后连接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子
相同的结果:
val aveLengthOfRecordChained = rddFromTextFile.map(line =>
line.size).sum rddFromTextFile.count
值得注意的一点是,Spark中的转换操作是延后的。也就是说,在RDD上调用一个转换操作并
不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才
被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返
回给驱动程序,从而提高了Spark的效率。
这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会
得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:
val transformedRDD = rddFromTextFile.map(line => line.size).
filter(size => size > 10).map(size => size 2)
相应的终端输出如下:
transformedRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[8] at
map at:14
注意,这里实际上没有触发任何计算,也没有结果被返回。如果我们现在在新的RDD上调用
一个执行操作,比如sum,该计算将会被触发:
val computation = transformedRDD.sum
现在你可以看到一个Spark任务被启动,并返回如下终端输出:...
141127 21:48:21 INFO SparkContext: Job finished: sum at
:16,took 0.193513 s
computation: Double = 60468.0RDD支持的转换和执行操作的完整列表以及更为详细的例子,参见
《Spark编程指南》(http:spark.apache.orgdocslatestprogramming-guide.htmlrdd-
operations)以及Spark API(Scala)文档
(http:spark.apache.orgdocslatestapiscalaindex.htmlorg.apache.spark .rdd.RDD)。
3. RDD缓存策略
Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache
函数来实现:
rddFromTextFile.cache
调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个
执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因
此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。
但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的IO操作,加快计算。多数情况下,这会取得数倍的速度提升。
如果现在在已缓存了的RDD上调用count或sum函数,应该可以感觉到RDD的确已经载入到
了内存中:
val aveLengthOfRecordChained = rddFromTextFile.map(line =>
line.size).
sum rddFromTextFile.count
实际上,从下方的输出我们可以看到,数据在第一次调用cache时便已缓存到内存,并占用
了大约62 KB的空间,余下270 MB可用:...
140130 06:59:27 INFO MemoryStore: ensureFreeSpace(63454)
called with curMem=32960, maxMem=311387750
140130 06:59:27 INFO MemoryStore: Block rdd_2_0 stored as
values to memory (estimated size 62.0 KB, free 296.9 MB)
140130 06:59:27 INFO
BlockManagerMasterActorBlockManagerInfo: Added rdd_2_0 in
memory on 10.0.0.3:55089 (size: 62.0 KB, free: 296.9 MB)...
现在,我们再次求平均长度:
val aveLengthOfRecordChainedFromCached =
rddFromTextFile.map(line => line.size).sum
rddFromTextFile.count
从如下的输出中应该可以看出缓存的数据是从内存直接读出的:...
140130 06:59:34 INFO BlockManager: Found block rdd_2_0
locally...
Spark支持更为细化的缓存策略。通过persist函数可以指定Spark的数
据缓存策略。关于RDD缓存的更多信息可参
见:http:spark.apache.orgdocslatestprogramming-guide.htmlrdd-persistence。
1.3.4 广播变量和累加器
Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。
广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序创建后
发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如
机器学习,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即
可:
val broadcastAList = sc.broadcast(List(a, b, c, d,e))
终端的输出表明,广播变量存储在内存中,占用的空间大概是488字节,仍余下270 MB可用
空间:
140130 07:13:32 INFO MemoryStore: ensureFreeSpace(488) called with curMem=96414, maxMem=311387750
140130 07:13:32 INFO MemoryStore: Block broadcast_1 stored as
values to memory (estimated size 488.0 B, free 296.9 MB)
broadCastAList:
org.apache.spark.broadcast.Broadcast[List[String]] =
Broadcast(1)
广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的
value方法:
sc.parallelize(List(1, 2, 3)).map(x =>
broadcastAList.value ++ x).collect
这段代码会从{1, 2, 3}这个集合(一个Scala List)里,新建一个带有三条记录
的RDD。map函数里的代码会返回一个新的List对象。这个对象里的记录由之前创建的那
个broadcastAList里的记录与新建的RDD里的三条记录分别拼接而成。
注意,上述代码使用了collect函数。这个函数是一个Spark执行函数,它将整个RDD以
Scala(Python或Java)集合的形式返回驱动程序。
通常只在需将结果返回到驱动程序所在节点以供本地处理时,才调用collect函数。
注意,collect函数一般仅在的确需要将整个结果集返回驱动程序并
进行后续处理时才有必要调用。如果在一个非常大的数据集上调用该函数,可能耗尽驱
动程序的可用内存,进而导致程序崩溃。
高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在
不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便
属于这类情况。
从如下结果可以看出,新生成的RDD里包含3条记录,其每一条记录包含一个由原来被广播
的List变量附加一个新的元素所构成的新记录(也就是说,新记录分别以1、2、3结尾)。...
140131 10:15:39 INFO SparkContext: Job finished: collect at
:15, took 0.025806 sres6: Array[List[Any]] = Array(List(a, b, c, d, e, 1), List(a,b, c, d, e, 2), List(a, b, c, d, e, 3))
累加器(accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不
同,是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累
加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算
以及返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只
允许驱动程序访问。累加器同样可以在Spark代码中通过value访问。
关于累加器的更多信息,可参见《Spark编程指
南》:http:spark .apache.orgdocslatestprogramming-guide.htmlshared-variables。1.4 Spark Scala编程入门
下面我们用上一节所提到的内容来编写一个简单的Spark数据处理程序。该程序将依次用
Scala、Java和Py thon三种语言来编写。所用数据是客户在我们在线商店的商品购买记录。该
数据存在一个CSV文件中,名为UserPurchaseHistory.csv,内容如下所示。文件的每一行对应
一条购买记录,从左到右的各列值依次为客户名称、商品名以及商品价格。
John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49
对于Scala程序而言,需要创建两个文件:Scala代码文件以及项目的构建配置文件。项目将
使用SBT(Scala Build Tool,Scala构建工具)来构建。为便于理解,建议读者下载示例代码
scala-spark-app。该资源里的data目录下包含了上述CSV文件。运行这个示例项目需要系统中
已经安装好SBT(编写本书时所使用的版本为0.13.1)。
配置SBT并不在本书讨论范围内,但读者可以从http:www.scala-
sbt.orgreleasedocsGetting-StartedSetup.html找到更多信息。
我们的SBT配置文件是build.sbt,其内容如下面所示(注意,各行代码之间的空行是必需
的):
name := scala-spark-app
version := 1.0
scalaVersion := 2.10.4
libraryDependencies += org.apache.spark %% spark-core %
1.2.0
最后一行代码是添加Spark到本项目的依赖库。
相应的Scala程序在ScalaApp.scala这个文件里。接下来我们会逐一讲解代码的各个部分。首
先,导入所需要的Spark类:import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
用Scala编写的一个简单的Spark应用
object ScalaApp {
在主函数里,我们要初始化所需的SparkContext对象,并且用它通过textFile函数来
访问CSV数据文件。之后对每一行原始字符串以逗号为分隔符进行分割,提取出相应的用户
名、产品和价格信息,从而完成对原始文本的映射:
def main(args: Array[String]) {
val sc = new SparkContext(local[2], First Spark App)
将CSV格式的原始数据转化为(user,product,price)格式的记录集
val data = sc.textFile(dataUserPurchaseHistory.csv)
.map(line => line.split(,))
.map(purchaseRecord => (purchaseRecord(0),purchaseRecord(1),purchaseRecord(2)))
现在,我们有了一个RDD,其每条记录都由(user, product, price)三个字段构成。
我们可以对商店计算如下指标:
购买总次数
客户总个数
总收入
最畅销的产品
计算方法如下:
求购买次数 val numPurchases = data.count
求有多少个不同客户购买过商品
val uniqueUsers = data.map{ case (user, product, price) =>
user }.distinct.count
求和得出总收入 val totalRevenue = data.map{ case (user, product, price) =>
price.toDouble }.sum
求最畅销的产品是什么 val productsByPopularity = data
.map{ case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)最后那段计算最畅销产品的代码演示了如何进行MapReduce模式的计算,该模式随Hadoop
而流行。第一步,我们将(user, product, price)格式的记录映射为(product, 1)
格式。然后,我们执行一个reduceByKey操作,它会对各个产品的1值进行求和。
转换后的RDD包含各个商品的购买次数。有了这个RDD后,我们可以调用collect函数,这会将其计算结果以Scala集合的形式返回驱动程序。之后在驱动程序的本地对这些记录按照
购买次数进行排序。(注意,在实际处理大量数据时,我们通常通过sortByKey这类操作
来对其进行并行排序。)
最后,可在终端上打印出计算结果:
println(Total purchases: + numPurchases)
println(Unique users: + uniqueUsers)
println(Total revenue: + totalRevenue)
println(Most popular product: %s with %d purchases.
format(mostPopular._1, mostPopular._2))
}
}
可以在项目的主目录下执行sbt run命令来运行这个程序。如果你使用了IDE的话,也可以
从Scala IDE直接运行。最终的输出应该与下面的内容相似:...
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp...
140130 10:54:40 INFO spark.SparkContext: Job finished:
collect at
ScalaApp.scala:25, took 0.045181 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
可以看到,商店总共有4个客户的5次交易,总收入为39.91。最畅销的商品是iPhone Cover,共购买2次。1.5 Spark Java编程入门
Java API与Scala API本质上很相似。Scala代码可以很方便地调用Java代码,但某些Scala代
码却无法在Java里调用,特别是那些使用了隐式类型转换、默认参数和采用了某些Scala反射
机制的代码。
一般来说,这些特性在Scala程序中会被广泛使用。这就有必要另外为那些常见的类编写相应
的Java版本。由此,SparkContext有了对应的Java版本JavaSparkContext,而RDD则
对应JavaRDD。
1.8及之前版本的Java并不支持匿名函数,在函数式编程上也没有严格的语法规范。于是,套
用到Spark的Java API上的函数必须要实现一个带有call函数的WrappedFunction接口。
这会使得代码冗长,所以我们经常会创建临时类来传递给Spark操作。这些类会实现操作所需
的接口以及call函数,以取得和用Scala编写时相同的效果。
Spark提供对Java 8匿名函数(lambda)语法的支持。使用该语法能让Java 8书写的代码看上
去很像等效的Scala版。
用Scala编写时,键值对记录的RDD能支持一些特别的操作(比如reduceByKey和
saveAsSequenceFile)。这些操作可以通过隐式类型转换而自动被调用。用Java编写
时,则需要特别类型的JavaRDD来支持这些操作。它们包括用于键值对的
JavaPairRDD,以及用于数值记录的JavaDoubleRDD。
我们在这里只涉及标准的Java API语法。关于Java下支持的RDD以及
Java 8 lambda表达式支持的更多信息可参见《Spark编程指
南》:http:spark.apache.orgdocslatestprogramming-guide.htmlrdd-operations。
在后面的Java程序中,我们可以看到大部分差异。这些示例代码包含在本章示例代码的java-
spark-app目录下。该目录的data子目录下也包含上述CSV数据。
这里会使用Maven构建工具来编译和运行这个项目。我们假设读者已经在其系统上安装好了
该工具。
Maven的安装和配置并不在本书讨论范围内。通常它可通过Linux系统中的软件管理器或Mac OS X中的HomeBrew或MacPorts方便地安装。
详细的安装指南参见:http:maven.apache.orgdownload.cgi。
项目中包含一个名为JavaApp.java的Java源文件:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
用Java编写的一个简单的Spark应用
public class JavaApp {
public static void main(String[] args) {
正如在Scala项目中一样,我们首先需要初始化一个上下文对象。值得注意的是,这里所使用
的是JavaSparkContext类而不是之前的SparkContext。类似地,调
用JavaSparkContext对象,利用textFile函数来访问数据,然后将各行输入分割成多
个字段。请注意下面代码的高亮部分是如何使用匿名类来定义一个分割函数的。该函数确定
了如何对各行字符串进行分割。
JavaSparkContext sc = new JavaSparkContext(local[2], First
Spark App);
将CSV格式的原始数据转化为(user,product,price)格式的记录集 JavaRDD data =
sc.textFile(dataUserPurchaseHistory.csv)
.map(new Function {
@Override
public String[] call(String s) throws Exception {
return s.split(,);
}
});
现在可以算一下用Scala时计算过的指标。这里有两点值得注意的地方,一是下面Java API中
有些函数(比如distinct和count)实际上和在Scala API中一样,二是我们定义了一个匿名类并将其传给map函数。匿名类的定义方式可参见代码的高亮部分。
求总购买次数
long numPurchases = data.count;
求有多少个不同客户购买过商品
long uniqueUsers = data.map(new Function {
@Override
public String call(String[] strings) throws Exception {
return strings[0];
}
}).distinct.count;
求和得出总收入
double totalRevenue = data.map(new DoubleFunction{
@Override
public Double call(String[] strings) throws Exception {
return Double.parseDouble(strings[2]);
}
}).sum;
下面的代码展现了如何求出最畅销的产品,其步骤与Scala示例的相同。多出的那些代码看似
复杂,但它们大多与Java中创建匿名函数有关,实际功能与用Scala时一样:
求最畅销的产品是哪个 首先用一个PairFunction和Tuple2类将数据映射成为(product,1)格式的 记录 然后,用一个Function2类来调用reduceByKey操作,该操作实际上是一个
求和函数
List> pairs = data.map(new
PairFunction {
@Override
public Tuple2 call(String[] strings)
throws Exception {
return new Tuple2(strings[1], 1);
}
}).reduceByKey(new Function2 {
@Override
public Integer call(Integer integer, Integer integer2)
throws Exception {
return integer + integer2;
}
}).collect; 最后对结果进行排序。注意,这里会需要创建一个Comparator函数来进行降 序排列 Collections.sort(pairs, new Comparator> {
@Override
public int compare(Tuple2 o1,Tuple2 o2) {
return -(o1._2 - o2._2);
}
});
String mostPopular = pairs.get(0)._1;
int purchases = pairs.get(0)._2;
System.out.println(Total purchases: + numPurchases);
System.out.println(Unique users: + uniqueUsers);
System.out.println(Total revenue: + totalRevenue);
System.out.println(String.format(Most popular product:
%s with %d purchases, mostPopular, purchases));
}
}
从前面代码可以看出,Java代码和Scala代码相比虽然多了通过内部类来声明变量和函数的引
用代码,但两者的基本结构类似。读者不妨分别练习这两种版本的代码,并比较一下计算同
一个指标时两种语言在表达上的异同。
该程序可以通过在项目主目录下执行如下命令运行:
>mvn exec:java -Dexec.mainClass=JavaApp
可以看到其输出和Scala版的很类似,而且计算结果完全一样:...
140130 17:02:43 INFO spark.SparkContext: Job finished:
collect at
JavaApp.java:46, took 0.039167 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases1.6 Spark Python编程入门
Spark的Py thon API几乎覆盖了所有Scala API所能提供的功能,但的确有些特性,比如Spark
Streaming和个别的API方法,暂不支持。具体可参见《Spark编程指南》的Py thon部
分:http:spark.apache.orgdocslatestprogramming-guide.html。
与上两节类似,这里将编写一个相同功能的Python版程序。我们假设读者系统中已安装2.6或
更高版本的Py thon(多数Linux系统和Mac OS X已预装Python)。
如下示例代码可以在本章的python-spark -app目录下找到。相应的CSV数据文件也在该目录的
data子目录中。项目代码在一个名为py thonapp.py的脚本里,其内容如下:
用Python编写的一个简单Spark应用
from pyspark import SparkContext
sc = SparkContext(local[2], First Spark App)
将CSV格式的原始数据转化为(user,product,price)格式的记录集
data = sc.textFile(dataUserPurchaseHistory.csv).map(lambda
line:
line.split(,)).map(lambda record: (record[0], record[1],record[2]))
求总购买次数 numPurchases = data.count
求有多少不同客户购买过商品 uniqueUsers = data.map(lambda record:
record[0]).distinct.count
求和得出总收入
totalRevenue = data.map(lambda record: float(record[2])).sum
求最畅销的产品是什么
products = data.map(lambda record: (record[1], 1.0)).
reduceByKey(lambda a, b: a + b).collect
mostPopular = sorted(products, key=lambda x: x[1],reverse=True)[0]
print Total purchases: %d % numPurchases
print Unique users: %d % uniqueUsers
print Total revenue: %2.2f % totalRevenue
print Most popular product: %s with %d purchases %
(mostPopular[0], mostPopular[1])
对比Scala版和Python版代码,不难发现语法大致相同。主要不同在于匿名函数的表达方式
上,匿名函数在Py thon语言中亦称lambda函数,lambda也是语法表达上的关键字。用Scala
编写时,一个将输入x映射为输出y的匿名函数表示为x => y,而在Python中则是lambda
x : y。在上面代码的高亮部分,我们定义了一个将两个输入映射为一个输出的匿名函数。
这两个输入的类型一般相同,这里调用的是相加函数,故写成lambda a, b : a + b。
运行该脚本的最好方法是在脚本目录下运行如下命令:>SPARK_HOMEbinspark-submit pythonapp.py
上述代码中的SPARK_HOME变量应该被替换为Spark的主目录,也就是在本章开始Spark预编
译包解压生成的那个目录。
脚本运行完的输出应该和运行Scala和Java版时的类似,其结果同样也是:...
140130 11:43:47 INFO SparkContext: Job finished: collect at
pythonapp.
py:14, took 0.050251 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases1.7 在Amazon EC2上运行Spark
Spark项目提供了在Amazon EC2上构建一个Spark集群所需的脚本,位于ec2文件夹下。输入
如下命令便可调用该文件夹下的spark -ec2脚本:
>.ec2spark-ec2
当不带参数直接运行上述代码时,终端会显示该命令的用法信息:
Usage: spark-ec2 [options]
can be: launch, destroy, login, stop, start, get-
master
Options:...
在创建一个Spark EC2集群前,我们需要一个Amazon账号。
如果没有Amazon Web Service账号,可以在http:aws.amazon.com注
册。
AWS的管理控制台地址是:http:aws.amazon.comconsole。
另外,我们还需要创建一个Amazon EC2密钥对和相关的安全凭证。Spark文档提到了在EC2
上部署时的需求。
你要先自己创建一个Amazon EC2密钥对。通过管理控制台登入你的Amazon Web
Services账号后,单击左边导航栏中的“Key Pairs”,然后创建并下载相应的私钥
文件。通过ssh远程访问EC2时,会需要提交该密钥。该密钥的系统访问权限必须
设定为600(即只有你可以读写该文件),否则会访问失败。
当需要使用spark -ec2脚本时,需要设置AWS_ACCESS_KEY_ID和
AWS_SECRET_ACCESS_ KEY两个环境变量。它们分别为你的Amazon EC2访问
密钥标识(key ID)和对应的密钥密码(secret access k ey)。这些信息可以从
AWS主页上依次点击“Account | Security Credentials | Access Credentials”获得。
创建一个密钥时,最好选取一个好记的名字来命名。这里假设密钥名为spark,对应的密钥文件的名称为spark.pem。如上面提到的,我们需要确认密钥的访问权限并设定好所需的环境变
量:
>chmod 600 spark.pem
>export AWS_ACCESS_KEY_ID=...
>export AWS_SECRET_ACCESS_KEY=...
上述下载所得的密钥文件只能下载一次(即在刚创建后),故对其既要安全保存又要避免丢
失。
注意,下一节中会启用一个Amazon EC2集群,这会在你的AWS账号下产生相应的费用。
启动一个EC2 Spark集群
现在我们可以启动一个小型Spark集群了。启动它只需进入到ec2目录,然后输入:
>cd ec2
>.spark-ec2 -k spark -i spark.pem -s 1 –-instance-type
m3.medium --hadoop-major-version 2 launch test-cluster
这将启动一个名为“test-cluster”的新集群,其包含“m3.medium”级别的主节点和从节点各一
个。该集群所用的Spark版本适配于Hadoop 2。我们使用的密钥名和密钥文件分别是spark和
spark .pem。
集群的完全启动和初始化会需要一些时间。在运行启动代码后,应该会立即看到如下图所示
的内容:
如果集群启动成功,最终应可在终端中看到类似如下的输出:要测试是否能连接到新集群,可以输入如下命令:
>ssh -i spark.pem root@ec2-54-227-127-14.compute-
1.amazonaws.com
注意该命令中root@后面的IP地址需要替换为你自己的Amazon EC2的公开域名。该域名可
在启动集群时的输出中找到。
另外也可以通过如下命令得到集群的公开域名:
>.spark-ec2 –i spark.pem get-master test-cluster
上述ssh命令执行成功后,你会连接到EC2上Spark集群的主节点,同时终端的输入应与如下
类似:
如果要测试集群是否已正确配置Spark环境,可以切换到Spark目录后运行一个示例程序:
>cd spark
>MASTER=local[2] .binrun-example SparkPi其输出应该与在自己电脑上的输出类似:...
140130 20:20:21 INFO SparkContext: Job finished: reduce at
SparkPi.scala:35, took 0.864044012 s
Pi is roughly 3.14032...
这样就有了包含多个节点的真实集群,可以测试集群模式下的Spark了。我们会在一个从节点
的集群上运行相同的示例。运行命令和上面相同,但用主节点的URL作为MASTER的值:
>MASTER=spark:ec2-54-227-127-14.compute-1.amazonaws.com:7077
.binrun-example SparkPi
注意,你需要将上面代码中的公开域名替换为你自己的。
同样,命令的输出应该和本地运行时的类似。不同的是,这里会有日志消息提示你的驱动程
序已连接到Spark集群的主节点。...
140130 20:26:17 INFO client.ClientClientActor: Connecting to
master spark:ec2-54-220-189-136.eu-west-
1.compute.amazonaws.com:7077
140130 20:26:17 INFO cluster.SparkDeploySchedulerBackend:
Connected to Spark cluster with app ID app-20140130202617-0001
140130 20:26:17 INFO client.ClientClientActor: Executor
added: app- 20140130202617-00010 on worker-20140130201049-ip-
10-34-137-45.eu-west-1.compute.internal-57119 (ip-10-34-137-
45.eu-west-1.compute.internal:57119) with 1 cores
140130 20:26:17 INFO cluster.SparkDeploySchedulerBackend:
Granted executor ID app-20140130202617-00010 on hostPort ip-
10-34-137-45.eu- west-1.compute.internal:57119 with 1 cores,2.4 GB RAM
140130 20:26:17 INFO client.ClientClientActor: Executor updated: app- 20140130202617-00010 is now RUNNING
140130 20:26:18 INFO spark.SparkContext: Starting job: reduce
at SparkPi.scala:39...
读者不妨在集群上自由练习,熟悉一下Scala的交互式终端:
>.binspark-shell --master spark:ec2-54-227-127-14.compute-
1.amazonaws.com:7077
练习完后,输入exit便可退出终端。另外也可以通过如下命令来体验PySpark终端:
>.binpyspark --master spark:ec2-54-227-127-14.compute-
1.amazonaws.com:7077
通过Spark主节点网页界面,可以看到主节点下注册了哪些应用。该界面位于ec2-54-227-127-
14.compute-1.amazonaws.com:8080(同样,需要将公开域名替换为你自己的)。你应该可以
看到类似下面截图的界面,显示了之前运行过的一个程序以及两个已启动的终端任务。
值得注意的是,Amazon会根据集群的使用情况收取费用。所以在集群使用完毕后,记得停止或终止这个测试集群。要终止该集群可以先在你本地系统的ssh会话里输入exit,然后再
输入如下命令:
>.ec2spark-ec2 -k spark -i spark.pem destroy test-cluster
应该可以看到这样的输出:
Are you sure you want to destroy the cluster test-cluster?
The following ninstances will be terminated:
Searching for existing cluster test-cluster...
Found 1 master(s), 1 slaves
> ec2-54-227-127-14.compute-1.amazonaws.com
> ec2-54-91-61-225.compute-1.amazonaws.com
ALL DATA ON ALL NODES WILL BE LOST!!
Destroy cluster test-cluster (yN): y
Searching for existing cluster test-cluster...
Terminating master...
Terminating slaves...
输入y,然后回车便可终止该集群。
恭喜!现在你已经做到了在云端设置Spark集群,并在它上面运行了一个完全并发的示例程
序,最后也终止了这个集群。如果在学习后续章节时你想在集群上运行示例或你自己的程
序,都可以再次使用这些脚本并指定想要的集群规模和配置。(留意下费用并记得使用完毕
后关闭它们就行。)1.8 小结
本章我们谈到了如何在自己的电脑以及Amazon EC2的云端上配置Spark环境。通过Scala交互
式终端,我们学习了Spark编程模型的基础知识并了解了它的API。另外我们还分别用Scala、Java和Py thon语言,编写了一个简单的Spark程序。
下一章,我们将考虑如何使用Spark来创建一个机器学习系统。
第2章 设计机器学习系统
本章,我们将为一个智能分布式机器学习系统设计高层架构,该系统以Spark作为其核心计算
引擎。这里我们将会关注如何对现有的基于网页的业务进行重新设计,以令其能利用自动化
机器学习系统来增强业务中的关键部分。本章的主要内容有:
介绍假想的业务场景
概述现有架构
探寻用机器学习系统来增强或是替代某些业务功能的可能途径
根据上述内容,提出新的架构
现代的大数据场景包含如下需求。
必须能与系统的其他组件整合,尤其是数据的收集和存储系统、分析和报告以及
前端应用。
易于扩展且与其他组件相对独立。理想情况下,同时具备良好的水平和垂直可扩
展性。
支持高效完成所需类型的计算,即机器学习和迭代式分析应用。
最好能同时支持批处理和实时处理。
Spark作为一个框架本身能满足上述需求。然而我们还需确保基于它设计的机器学习系统也能
满足这些需求。若算法的实现存在能引发系统故障的瓶颈,比如不再能满足上述某些需求,那该实现就没多大意义。2.1 MovieStream介绍
为便于说明我们的架构设计,这里假设存在一个贴近现实的情景。假设我们受命领导
MovieStream数据科学团队。MovieStream是一家假想的互联网公司,为用户提供在线电影和
电视节目的内容服务。
MovieStream成长迅速,其用户量和收录的电影都在快速增加。MovieStream现有系统可概括
为图2-1:
图2-1 MovieStream现有系统架构
如图所示,向用户推荐哪些电影和节目以及在站点的何处显示,都由MovieStream内容编辑
团队负责。该团队还负责MovieStream的群发营销,包括电子邮件和其他直销渠道。现阶
段,MovieStream以汇总的方式来收集用户的电影浏览记录,并能访问一些用户注册时所填
写的资料。此外,他们还能访问其所收录的电影的一些基本元数据。随着业务快速发展,新发布的电影和用户的活动不断增加,MovieStream团队愈发难以跟上
这样的趋势。MovieStream的CEO之前对大数据、机器学习和人工智能有过较多了解。他希
望我们能为MovieStream创建一个机器学习系统,以处理现在由内容团队人工处理的许多内
容。2.2 机器学习系统商业用例
我们该问的第一个问题或许是:为什么要使用机器学习?为何不直接仍以人工方式来支持
MovieStream?使用机器学习的理由有很多(不使用的理由同样也有很多),其中最为重要
的几点有:
涉及的数据规模意味着完全依靠人工处理会很快跟不上MovieStream的发展;
机器学习和统计模型等基于模型的方式能发现人类(因数据集量级和复杂度过
高)难以发现的模式;
基于模型的方式能避免个人或是情感上的偏见(只要应用时足够细心且正确)。
然而,没有任何理由说基于模型和基于人工的处理和决策不能并存。比如,许多机器学习系
统依赖已标记的数据来训练模型。通常来说,标记数据代价高昂、耗时且需人工参与。文本
数据分类和文本的情感标识便是很好的例子。许多现实中的系统会采取某种人力机制来为数
据生成标识,并用于训练模型。之后,这些模型则部署到在线系统中用于大规模环境下的预
测。
在MovieStream的案例中,我们并不需要担心机器学习的引入会使得内容团队多余。事实
上,我们的目标是让机器学习来负担那些耗时且机器擅长的任务,并向内容团队提供工具以
帮助他们更好地理解用户和内容。比如,帮助他们确定向电影库中新增哪些电影(新增电影
代价高昂,因而对业务至关重要)。
2.2.1 个性化
对MovieStream的业务来说,个性化或许是机器学习最为重要的潜在应用。一般来说,个性
化是根据各种因素来改变用户体验和呈现给用户内容。这些因素可能包括用户的行为数据和
外部因素。
推荐(recommendation)从根本上说是个性化的一种,常指向用户呈现一个他们可能感兴趣
的物品列表。推荐可用于网页(比如推荐相关产品)、电子邮件、其他直销渠道或移动应用
等。
个性化和推荐十分相似,但推荐通常专指向用户显式地呈现某些产品或是内容,而个性化有
时也偏向隐式。比如说,对MovieStream的搜索功能个性化,以根据该用户的数据来改变搜
索结果。这些数据可能包括基于推荐的数据(在搜索产品或内容时),或基于地理位置和搜
索历史等各种数据。用户可能不会明显感觉到搜索结果的变化,这就是个性化更偏向隐性的
原因。
2.2.2 目标营销和客户细分
目标营销用与推荐类似的方法从用户群中找出要营销的对象。一般来说,推荐和个性化的应
用场景都是一对一,而客户细分则试图将用户分成不同的组。其分组根据用户的特征进行,并可能参考行为数据。这种方法可能比较简单,也可能使用了某种机器学习模型,比如聚类。但无论如何,其结果都是对市场的若干细分。这些细分或许有助于理解各组用户的共
性、同组用户之间的相似性,以及不同组之间的差异。
这些将能帮助MovieStream理解用户行为背后的动机。相比个性化时的一对一营销,它们甚
至还能有助于制定针对用户群的更为广泛的营销策略。
当没有已标记数据时,这些方法能帮助制定营销策略,而非采取一刀切的方法。
2.2.3 预测建模与分析
第三种机器学习的应用领域是预测性分析。这个词的范围很宽泛,甚至从某种意义上说还覆
盖推荐、个性化和目标营销。再考虑到推荐和市场细分有所区别,这里用预测建模
(predictive modeling)来表示其他做预测的模型。借助活动记录、收入数据以及内容属性,MovieStream可以创建一个回归模型(regression model)来预测新电影的市场表现。
另外,我们也可使用分类模型(classificaiton model)来对只有部分数据的新电影自动分配
标签、关键字或分类。2.3 机器学习模型的种类
以上MovieSteam的例子列出了机器学习的一些应用场景,但这些并非全部。后面几章在介绍
不同机器学习任务时还会提到一些相关例子。
以上应用案例和方法大致可分为如下两种。
监督学习(supervised learning):这种方法使用已标记数据来学习。推荐引
擎、回归和分类便是例子。它们所使用的标记数据可以是用户对电影的评级(对
推荐来说)、电影标签(对上述分类例子来说)或是收入数字(对回归预测来
说)。我们将在第4章、第5章和第6章讨论监督学习。
无监督学习(unsupervised learning):一些模型的学习过程不需要标记数据,我们称其为无监督学习。这类模型试图学习或是提取数据背后的结构或从中抽取
最为重要的特征。聚类、降维和文本处理的某些特征提取都是无监督学习。我们
将在第7章、第8章和第9章分别介绍它们。2.4 数据驱动的机器学习系统的组成
从高层设计来看,我们的机器学习系统的组成如图2-2所示,其中展示了机器学习的流程。该
流程始于从数据存储处获取数据,之后将其转换为可用于机器学习模型的形式。随后的环节
有对模型的训练、测试和完善,以及将最终的模型部署到生产系统中。有新数据产生时则重
复该流程。
图2-2 常见的一种机器学习流程
2.4.1 数据获取与存储
机器学习流程的第一步是获取训练模型所需的数据。与其他公司类似,MovieStream的数据
通常来自用户活动、其他系统(通常称作机器生成的数据)和外部数据源(比如某个用户访
问站点的时间和当时的天气)。
获取这些数据的途径很多,比如收集浏览器里用户的活动记录、移动应用的事件日志或通过
外部网络API来获取地理或天气信息。
获取数据后通常需将其存储起来。要存储的数据包括:原始数据、即时处理后的数据,以及
可用于生产系统的最终建模结果。
数据存储并不简单,可能涉及多种系统。文件系统,如HDFS、Amazon S3等;SQL数据库,如MySQL或PostgreSQL;分布式NoSQL数据存储,如HBase、Cassandra和Dy namoDB;搜
索引擎,如Solr和Elasticsearch;流数据系统,如Kafk a、Flume和Amazon Kinesis。
本书假设已获取相关数据,这样我们能专注在流程后续的处理和建模环节。
2.4.2 数据清理与转换
大部分机器学习模型所处理的都是特征(feature)。特征通常是输入变量所对应的可用于模
型的数值表示。
虽然我们希望能将大部分时间用于机器学习模型探索,但通常经上述途径获取到的数据都是原始形式,需要进一步处理。比如我们记录的一些用户事件的细节,比如用户查看某部电影
页面的时间、观看某部电影的时间或给出某些反馈的时间。我们还可能收集了一些外部信
息,比如用户的位置(通过他们的IP查到)。这些时间日志通常由一些文字或数值信息组合
而成。
绝大部分情况下,这些原始数据都需要经过预处理才能为模型所使用。预处理的情况可能包
括以下几种。
数据过滤:比如我们想从原始数据的部分数据中创建一个模型,而所需数据只
是最近几月的活动数据或是满足特定条件的事件数据。
处理数据缺失、不完整或有缺陷:许多现实中的数据集都存在某种程度上的
不完整。这可能包括数据缺失(比如用户没有输入),数据存在错误或是缺陷
(比如数据收集或存储时的错误,又或是技术问题或漏洞,以及软硬件故障)。
可能要过滤掉非规整数据,或通过某种方式来填充缺失的数据点(比如选取数据
集的平均值来作为缺失点的值)。
处理可能的异常、错误和异常值:错误或异常的数据可能不利于模型的训
练,所以需要过滤掉,或是通过某些方法来处理。
合并多个数据源:比如可能要将各个用户的事件数据与不同的内部数据或是外
部数据合并。内部数据如用户属性;外部数据如地理位置、天气和经济数据。
数据汇总:某些模型需要输入的数据进行过某种汇总,比如统计各用户经历过
的事件类型的总数目。
对数据进行初步预处理后,需要将其转换为一种适合机器学习模型的表示形式。对许多模型
类型来说,这种表示就是包含数值数据的向量或矩阵。数据转换和特征提取时常见的挑战包
括以下这些情况。
将类别数据(比如地理位置所在的国家或是电影的类别)编码为对应的数值表
示。
从文本数据提取有用信息。
处理图像或是音频数据。
数值数据常被转换为类别数据以减少某个变量的可能值的数目。例如将年龄分为
几个段(比如25~35、45~55等)。
对数值特征进行转换。比如对数值变量应用对数转换,这会有助于处理值域很大
的变量。
对特征进行正则化、标准化,以保证同一模型的不同输入变量的值域相同。
特征工程是对现有变量进行组合或转换以生成新特征的过程。例如从其他数据求
平均数,像求某个用户看电影的平均时间。
这些方法都会在本书的例子中讲到。
这些数据清理、探索、聚合和转换步骤,都能通过Spark核心API、SparkSQL引擎和其他外部
Scala、Java或Py thon包做到。借助Spark的Hadoop功能还能实现上述多种存储系统上的读
写。2.4.3 模型训练与测试回路
当数据已转换为可用于模型的形式,便可开始模型的训练和测试。在这个部分,我们主要关
注模型选择(model selection)问题。这可以归结为对特定任务最优建模方法的选择,或是
对特定模型最佳参数的选择问题。在许多情况下,我们会想尝试多种模型并选出表现最好的
那个(各模型都采用了最佳的参数时)。因而,这个词在现实中经常同时指代这两个过程。
在这个阶段,探索多个模型组合(也称集成学习法,ensemble method)的效果也很常见。
在训练数据集上运行模型并在测试数据集(即为评估模型而预留的数据,在训练阶段模型没
接触过该数据)上测试其效果,这个过程一般相对直接,被称作交叉验证(cross-
validation)。
然而我们所处理的通常是大型数据集。这样,先在具有代表性的小样本数据集上进行初步的
训练-测试回路,或是尽可能并行地选择模型,都会有所帮助。
Spark内置的机器学习库MLlib完全能胜任这个阶段的需求。本书将主要关注如何借助MLlib和
Spark核心功能来实现对各种机器学习方法的模型训练、评估以及交叉验证。
2.4.4 模型部署与整合
通过训练测试循环找出最佳模型后,要让它能得出可付诸实践的预测,还需将其部署到生产
系统中。
这个过程一般要将已训练的模型导入特定的数据存储中。该位置也是生产系统获取新版本的
地方。通过这种方式,实时服务系统能在训练新模型时进行周期性的更新。
2.4.5 模型监控与反馈
监控机器学习系统在生产环境下的表现十分重要。在部署了最优训练的模型后,我们会想知
道其在实际中的表现如何:它在新的未知数据上的表现是否符合预期?其准确度怎么样?毕
竟不管之前的模型选择和优化做得如何,检验其实际表现的唯一方法是观察其在生产环境下
的表现。
同样值得注意的是,模型准确度和预测效果只是现实中系统表现的一部分。通常还应该关注
其他业务效果(比如收入和利润率)或用户体验(比如站点使用时间和用户总体活跃度)的
相关指标。多数情况下很难将它们与模型预测能力直接关联。推荐系统或目标营销系统的准
确度可能很重要,但它只与我们真正关心的那些指标(如用户体验度、活跃度以及最终收
入)间接相关。
所以,现实中应该同时监控模型准确度相关指标和业务指标。我们可以尽可能在生产系统中
部署不同的模型,通过调整它们而优化业务指标。实践中,这通常通过在线分割测试(live
split test)进行。然而,做好这类测试并不容易。在线测试和实验可能引发错误,也可能效果
不好,或者会使用基准模型,这些都会给用户体验和收入带来负面影响,故其代价高昂。本阶段另一个重要的方面是模型反馈(model feedback),指通过用户的行为来对模型的预
测进行反馈的过程。在现实系统中,模型的应用将影响用户的决策和潜在行为,从而反过来
将从根本上改变模型自己将来的训练数据。
举例来说,假设我们部署了一个推荐系统。由于推荐实际上限制了用户的可选项,从而影响
了用户的选择。我们希望用户的选择不会受模型的影响,然而这种反馈回路会反过来影响模
型的训练数据,并最终对模型准确度和重要的业务指标产生不利影响。
好在我们可以借助一些机制来降低反馈回路的这种负面影响,比如提供一些无偏见的训练数
据。这类数据来自那些没有被推荐的用户,又或者在一开始就考虑到这种平衡需求而划分出
来的客户。这些机制有助于对数据的理解、探索以及利用已有的经验来提升系统的表现。
第10章将会简要介绍实时监控和模型更新的部分内容。
2.4.6 批处理或实时方案的选择
前几节简要概括了常见的批处理方法。在这类方法下,模型用所有数据或一部分数据进行周
期性的重新训练。由于上述流程会花费一定的时间,这就使得批处理方法难以在新数据到达
时立即完成模型的更新。
虽然本书将主要讨论批处理机器学习方法,但的确存在一类名为在线学习(online
learning)的机器学习方法。它们在新数据到达时便能立即更新模型,从而使实时系统成为可
能。常见的例子有对线性模型的在线优化算法,如随机梯度下降法。我们可以通过例子来学
习该算法。这类方法的优势在于其系统将能对新的信息和底层行为(即输入数据的特征或是
分布会随时间变化,现实中的绝大部分情况都会如此)作出快速的反应和调整。
但在实际生产环境中,在线学习模型也会面对特有的挑战。比如,对数据的获取和转换难以
做到实时。在一个纯在线环境下选择适当的模型也不简单。在线训练和模型选择以及部署阶
段的延时可能难以达到实时性的需求(比如在线广告对延时的需求是以毫秒计)。最后,批
处理框架不适合对本质为流的数据进行实时处理。
幸运的是,Spark提供了实时流处理组件Spark Streaming,对实时机器学习任务来说是个不错
的选择。第10章将探讨Spark Streaming和在线学习问题。
现实中的实时机器学习系统具有天生的复杂性,故实践中大部分的系统都以近实时性为设计
目标。这是一种混合方法,它并不要求模型一定在数据到达时立即更新。相反,新的数据会
被收集为小批量的训练数据,再输入给在线学习算法。大部分情况下,该方法会周期性地进
行某种批处理。处理的内容可能包括在整个数据集上重新计算模型,或是更为复杂的某些数
据处理以及模型的选择。这些能保证实时模型的表现不会随时间推移而变差。
另一种类似的方法是,在周期性批处理中进行重新计算时,若有新的数据到来则只对更复杂
的模型进行近似更新。这样模型可从新的数据学习,但有短暂延迟。因为是近似更新,所以
模型的准确度会随着时间推移而下降。但周期性地在所有数据上重新计算模型能弥补这一
点。2.5 机器学习系统架构
现在我们已经了解了如何在MovieStream的情景中应用机器学习系统,其可能的架构可概括
为图2-3所示:图2-3 MovieStream的未来架构
如图所示,该系统包含了早先机器学习流程示意图的内容,此外还包括:
收集与用户、用户行为和电影标题有关的数据;
将这些数据转为特征;
模型训练,包括训练-测试和模型选择环节;
将已训练模型部署到在线服务系统,并用于离线处理;
通过推荐和目标页面将模型结果反馈到MovieStream站点;
将模型结果返回到MovieStream的个性化营销渠道;
使用离线模型来为MovieSteam的各个团队提供工具,以帮助其理解用户的行为、内容目录的特点和业务收入的驱动因素。
动手练习
假设你现在要告知前端和基础设施工程团队你的机器学习系统需要哪些数据。想一想如何简
要告诉他们该如何设计数据收集过程。画出原始数据(比如网页日志、时间日志等)可能的结构,以及它们在系统中的流向。需要考虑的方面有:
需要哪些数据源
数据格式应该如何
数据收集、处理、可能进行的汇总以及存储的频率
使用何种存储以保证可扩展性2.6 小结
本章,你学到了数据驱动的自动化机器学习系统由哪些部分构成。我们同样也描述了一个真
实系统的可能架构。
下一章,我们将讨论如何获取公开数据集以用于常见的机器学习任务,了解数据处理、清理
和转换环节的一些基本概念。经过这些环节后,数据便可以用于训练机器学习模型了。
第3章 Spark上数据的获取、处理与准备
机器学习是一个极为广泛的领域,其应用范围已包括Web和移动应用、物联网、传感网络、金融服务、医疗健康和其他科研领域,而这些还只是其中一小部分。
由此,可用于机器学习的数据来源也极为广泛。本书将重点关注其在商业领域的应用。这类
领域中可用的数据通常由组织的内部数据(比如金融公司的交易数据)以及外部数据(比如
该金融公司下的金融资产价格数据)构成。
以第2章假想的互联网公司MovieStream为例,其主要的内部数据包括网站提供的电影数据、用户的服务信息数据以及行为数据。这些数据涉及电影和相关内容(比如标题、分类、图
片、演员和导演)、用户信息(比如用户属性、位置和其他信息)以及用户活动数据(比如
浏览数、预览的标题和次数、评级、评论,以及如赞、分享之类的社交数据,还有包括像
Facebook和Twitter之类的社交网络属性)。
其外部数据来源则可能包括天气和地理定位信息,以及如IMDB和Rotten Tomators之类的第三
方电影评级与评论信息等。
一般来说,获取实际的公司或机构的内部数据十分困难,因为这些信息很敏感(尤其是购买
记录、用户或客户行为以及公司财务),也关系组织的潜在利益。这也是对这类数据应用机
器学习建模的实用之处:一个预测精准的好模型有着极高的商业价值(Netflix Prize和Kaggle
上机器学习比赛的成功就是很好的见证)。
本书将使用可以公开访问的数据来讲解数据处理和机器学习模型训练的相关概念。
本章内容包括:
简要概述机器学习中用到的数据类型;
举例说明从何处获取感兴趣的数据集(通常可从因特网上获取),其中一些会用
于阐述本书所涉及模型的应用;
了解数据的处理、清理、探索和可视化方法;
介绍将原始数据转换为可用于机器学习算法特征的各种技术;
学习如何使用外部库或Spark内置函数来正则化输入特征。3.1 获取公开数据集
商业敏感数据虽然难以获取,但好在仍有相当多有用数据可公开访问。它们中的不少常用来
作为特定机器学习问题的基准测试数据。常见的有以下几个。
UCL机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http:archive.ics.uci.eduml。
Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。
这些数据集包括人类基因组项目、Common Crawl网页语料库、维基百科数据和
Google Book s Ngrams。相关信息可参见:http:aws.amazon.compublicdatasets。
Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分
类、回归、排名、推荐系统以及图像分析领域,可从Competitions区域下
载:http:www.kaggle.comcompetitions。
KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该
列表位于:http:www.kdnuggets.comdatasetsindex.html。
针对特定的应用领域与机器学习任务,仍有许多其他公开数据集。希
望你自己也会接触到一些有趣的学术或是商业数据。
为说明Spark下的数据处理、转换和特征提取相关的概念,需要下载一个电影推荐方面的常用
数据集MovieLens。它能应用于推荐系统和其他可能的机器学习任务,适合作为示例数据
集。
Spark的机器学习库MLlib一直在紧锣密鼓地开发。但和Spark的核心不
同,其全局API和设计的进度尚未完全稳定。
Spark 1.2.0引入了一个实验性质的新MLlib API,位于ml包下(现有的接口则位于mllib
包下)。新API旨在加强原有的API和接口的设计,从而更容易衔接数据流程的各个环
节。这些环节包括特征提取、正则化、数据集转化、模型训练和交叉验证。
新API仍处于实现阶段,在后续的版本中可能会出现重大的变更。因此,后续的章节将
只关注相对更成熟的现有MLlib API。随着版本的更新,本书所提到的各种特征提取方
法和模型将会简单地桥接到新API中。但新API的核心思路和大部分底层代码仍会保持原样。
MovieLens 100k数据集
MovieLens 100k数据集包含表示多个用户对多部电影的10万次评级数据,也包含电影元数据
和用户属性信息。该数据集不大,方便下载和用Spark程序快速处理,故适合做讲解示例。
可从http:files.grouplens.orgdatasetsmovielensml-100k .zip下载这个数据集。
下载后 ,可在终端将其解压:
>unzip ml-100k.zip
inflating: ml-100kallbut.pl
inflating: ml-100kmku.sh
inflating: ml-100kREADME...
inflating: ml-100kub.base
inflating: ml-100kub.test
这会创建一个名为ml-100k的文件夹。下面变更当前目录到该目录然后查看其内容。其中重要
的文件有u.user(用户属性文件)、u.item(电影元数据)和u.data(用户对电影的评级)。
>cd ml-100k
关于数据集的更多信息可以从README获得,包括每个数据文件里的变量定义。我们可以使
用head命令来查看各个文件中的内容。
比如说,可以看到u.user文件包含user.id、age、gender、occupation和ZIP code
这些属性,各属性之间用管道符(|)分隔。
>head -5 u.user
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213
u.item文件则包含movie id、title、release date以及若干与IMDB link和电影分
类相关的属性。各个属性之间也用|符号分隔:>head -5 u.item
1|Toy Story (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?Toy%20
Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
2|GoldenEye (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
3|Four Rooms (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?
Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
4|Get Shorty (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?
Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
5|Copycat (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
最后,u.data文件包含user id、movie id、rating(从1到5)和timestamp属性,各
属性间用制表符(\t)分隔。
>head -5 u.data
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 8863975963.2 探索与可视化数据
有数据后,用启动Spark交互式终端来探索该数据吧!本节将通过IPy thon交互式终端和
matplotlib库来对数据进行处理和可视化,故我们会用到Py thon和PySparkshell。
IPy thon是针对Py thon的一个高级交互式壳程序,包含内置一系列实用
功能的pylab,其中有NumPy和SciPy用于数值计算,以及matplotlib用于交互式绘图和可
视化。
建议使用最新版的IPython(本书写作时为2.3.1)。IPython的安装方法可参考如下指
引:http:ipy thon.orginstall.html。如果这是你第一次使用IPython,这里有一个教
程:http:ipy thon.orgipy thon-docstableinteractivetutorial.html。
运行本章代码需要之前提到的所有软件包。它们的安装指南可从源代码包中找到。如果你刚
开始使用Py thon且不熟悉这些包的安装过程,我们强烈推荐你使用一个预编译的科学Py thon
套件,比如Anaconda(http:continuum.iodownloads)或
Enthougt(https:store.enthought.comdownloads)。这些套件极大简化了安装过程且包含运行
本章代码所需的一切。
PySpark支持运行Python时可指定的参数。在启动PySpark终端时,我们可以使用IPy thon而非
标准的Py thon shell。启动时也可以向IPy thon传入其他参数,包括让它在启动时也启用py lab
功能。
可以在Spark主目录下运行如下命令来实现上述需求:
>IPYTHON=1 IPYTHON_OPTS=--pylab .binpyspark
可以看到PySpark终端会启动,其输出和下面类似:图3-1 IPython下的PySpark的终端界面
终端里的IPython 2.3.1 -- An enhanced Interactive
Python和Using matplotlib backend: MacOSX输出行表示IPython和pylab均
已被PySpark启用。实际使用的操作系统和软件版本的不同,实际的输出可能会有所不同。
现在IPython终端已启动,我们可以探索MovieLens数据集并做些基本分析。
在本章的学习过程中,你可以将样本代码输入到IPy thon终端,也可通
过IPython提供的Notebook 应用来完成。后者支持支持HTML显示,且在IPy thon终端的
基础上提供了一些增强功能,如即时绘图、HTML标记,以及独立运行代码片段的功
能。
本章的图片使用IPy thon Notebook生成。它们的样式可能会和你看到的不同,但只要内
容上一致就没关系。如果愿意,你也可以使用Notebook来运行本章的代码。本章除提供
Py thon代码外,还提供相应的IPython Notebook版本,以供你导入到IPython Notebook
中。
IPython Notebook的使用指南可参见:http:ipy thon.orgipython-
docstableinteractivenotebook .html。
3.2.1 探索用户数据
首先来分析MovieLens用户的特征。在你的终端里输入如下代码(其中的PATH是指用unzip
命令来解压MovieLens 100k数据集时所生成的主目录):
user_data = sc.textFile(PATHml-100ku.user)
user_data.first
其输出应该与下面类似:
u'1|24|M|technician|85711'
这是用户数据文件的首行。从中可以看到,它是由“|”字符分隔。
first函数与collect函数类似,但前者只向驱动程序返回RDD的首个元素。我们也可以使用take(k)函数来只返回RDD的前k个元素到驱动程序。
下面用“|”字符来分隔各行数据。这将生成一个RDD,其中每一个记录对应一个Py thon 列
表,各列表由用户ID(user ID)、年龄(age)、性别(gender)、职业(occupation)和邮
编(ZIP code)五个属性构成。
之后再统计用户、性别、职业和邮编的数目。这可通过如下代码实现。该数据集不大,故这
里并未缓存它。
user_fields = user_data.map(lambda line: line.split(|))
num_users = user_fields.map(lambda fields: fields[0]).count
num_genders = user_fields.map(lambda fields:
fields[2]).distinct.count
num_occupations = user_fields.map(lambda fields:
fields[3]).distinct.count
num_zipcodes = user_fields.map(lambda fields:
fields[4]).distinct.count
print Users: %d, genders: %d, occupations: %d, ZIP codes: %d
% (num_users, num_genders, num_occupations, num_zipcodes)
对应输出如下:
Users: 943, genders: 2, occupations: 21, ZIP codes: 795
接着用matplotlib的hist函数来创建一个直方图,以分析用户年龄的分布情况:
ages = user_fields.map(lambda x: int(x[1])).collect
hist(ages, bins=20, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16, 10)
这里hist函数的输入参数有ages数组、直方图的bins数目(即区间数,这里为20)。同
时还使用了normed=True参数来正则化直方图,即让每个方条表示年龄在该区间内的数据
量占总数据量的比。
你将能看到图3-2所示的直方图。从中可以看出MovieLens的用户偏年轻。大量用户处于15岁
到35岁之间。图3-2 用户的年龄段分布
若想了解用户的职业分布情况,可以用如下的代码来实现。首先利用之前用到的MapReduce
方法来计算数据集中各种职业的出现次数,然后matplotlib下的bar函数来绘制一个不同
职业的数量的条形图。
数据中对职业的描述用的是文本,所以需要对其稍作处理以便bar函数使用:
count_by_occupation = user_fields.map(lambda fields:
(fields[3], 1)).
reduceByKey(lambda x, y: x + y).collect
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
在得到各职业所占数量的RDD后,需将其转为两个数组才能用来做条形图。它们分别对应x
轴(职业标签)与y轴(数量)。collect函数返回数量数据时并不排序。我们需要对该数
据进行排序,从而在条形图中以从少到多的顺序来显示各个职业。
为此可先创建两个numpy数组。之后调用numpy的argsort函数来以数量升序从各数组中
选取元素。注意这里会对x轴和y轴的数组都以y轴值排序(即以数量排序):
x_axis = x_axis1[np.argsort(x_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]有了条形图两轴所需的数据后便可创建条形图。创建时,会以职业作为x轴上的分类标签,以数量作为y轴的值。下面的代码也增加了如plt.xticks(rotation=30)之类的代码来
美化条形图。
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes
ax.set_xticks(pos + (width 2))
ax.set_xticklabels(x_axis)
plt.bar(pos, y_axis, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16, 10)
生成的图形应该和图3-3类似。从中可看出,数量最多的职业是student、other、educator、administrator、engineer和programmer。
图3-3 用户的职业分布
Spark对RDD提供了一个名为countByValue的便捷函数。它会计算RDD里各不同值所分别
出现的次数,并将其以Python dict函数的形式(或是Scala、Java下的Map函数)返回给驱
动程序:
count_by_occupation2 = user_fields.map(lambda fields:
fields[3]).countByValue
print Map-reduce approach:
print dict(count_by_occupation2)
print
print countByValue approach:print dict(count_by_occupation)
可以看到,上述两种方式的结果相同。
3.2.2 探索电影数据
接下来了解下电影分类数据的特征。如之前那样,我们可以先简单看一下某行记录,然后再
统计电影总数。
movie_data = sc.textFile(PATHml-100ku.item)
print movie_data.first
num_movies = movie_data.count
print Movies: %d % num_movies
其终端上的输出如下:
1|Toy Story (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?
Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
Movies: 1682
绘制电影年龄的分布图的方法和之前对用户年龄和职业分布的处理类似。电影年龄即其发行
年份相对于现在过了多少年(在本数据中现在是1998年)。
从下面的代码可以看到,电影数据中有些数据不规整,故需要一个函数来处理解析release
date时可能的解析错误。这里命名该函数为convert_year:
def convert_year(x):
try:
return int(x[-4:])
except:
return 1900 若数据缺失年份则将其年份设为1900。在后续处理中会过滤掉这
类数据
有了以上函数来解析发行年份后,便可在调用电影数据进行map转换时应用该函数,并取回
其结果:
movie_fields = movie_data.map(lambda lines: lines.split(|))
years = movie_fields.map(lambda fields: fields[2]).map(lambda
x: convert_year(x))解析出错的数据的年份已设为1900。要过滤掉这些数据可以使用Spark的filter转换操
作:
years_filtered = years.filter(lambda x: x != 1900)
现实的数据经常会有不规整的情况,对其解析时就需要进一步的处理。上面便是一个很好的
例子。事实上,这也表明了数据探索的重要性所在,即它有助于发现数据在完整性和质量上
的问题。
过滤掉问题数据后,我们用当前年份减去发行年份,从而将电影发行年份列表转换为电影年
龄。接着用countByValue来计算不同年龄电影的数目。最后绘制电影年龄直方图(同样
会使用hist函数,且其values变量的值来自countByValue的结果,主键则为bins变
量):
movie_ages = years_filtered.map(lambda yr: 1998-
yr).countByValue
values = movie_ages.values
bins = movie_ages.keys
hist(values, bins=bins, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16,10)
你会看到如图3-4这样的结果。它表明大部分电影发行于1998年的前几年。图3-4 电影的年龄分布
3.2.3 探索评级数据
现在来看一下评级数据:
rating_data = sc.textFile(PATHml-100ku.data)
print rating_data.first
num_ratings = rating_data.count
print Ratings: %d % num_ratings
这些代码的输出为:
196 242 3 881250949
Ratings: 100000
可以看到评级次数共有10万。另外和用户数据与电影数据不同,评级记录用“\t”分隔。你可能也已想到,我们会想做些基本的统计,以及绘制评级值分布的直方图。动手吧:
rating_data = rating_data_raw.map(lambda line:
line.split(\t))
ratings = rating_data.map(lambda fields: int(fields[2]))
max_rating = ratings.reduce(lambda x, y: max(x, y))
min_rating = ratings.reduce(lambda x, y: min(x, y))
mean_rating = ratings.reduce(lambda x, y: x + y) num_ratings
median_rating = np.median(ratings.collect)
ratings_per_user = num_ratings num_users
ratings_per_movie = num_ratings num_movies
print Min rating: %d % min_rating
print Max rating: %d % max_rating
print Average rating: %2.2f % mean_rating
print Median rating: %d % median_rating
print Average of ratings per user: %2.2f % ratings_per_user
print Average of ratings per movie: %2.2f %
ratings_per_movie
在终端执行以上命令后,输出应该与下面类似:
Min rating: 1
Max rating: 5
Average rating: 3.53
Median rating: 4
Average of ratings per user: 106.00
Average of ratings per movie: 59.00
从中可以看到,最低的评级为1,而最大的评级为5。这并不意外,因为评级的范围便是从1
到5。
Spark对RDD也提供一个名为states的函数。该函数包含一个数值变量用于做类似的统计:
ratings.stats
其输出为:
(count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0,min: 1.0)
可以看出,用户对电影的平均评级(mean)是3.5左右,而评级中位数(median)为4。这就
能期待说评级的分布稍倾向高点的得分。要验证这点,可以创建一个评级值分布的条形图。具体做法和之前的类似:
count_by_rating = ratings.countByValue
x_axis = np.array(count_by_rating.keys)
y_axis = np.array([float(c) for c in count_by_rating.values])
这里对y轴正则化,使它表示百分比
y_axis_normed = y_axis y_axis.sum
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes
ax.set_xticks(pos + (width 2))
ax.set_xticklabels(x_axis)
plt.bar(pos, y_axis_normed, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16, 10)
这会生成图3-5所示的结果:
图3-5 电影评级的分布
其特征和我们之前所期待的相同,即评级的分布的确偏向中等以上。同样也可以求各个用户评级次数的分布情况。记得之前我们已对评级数据用制表符分隔,从
而生成过rating_data RDD。后续的代码中将再次用到该RDD变量。
计算各用户的评级次数的分布时,我们先从rating_data RDD里提取出以用户ID为主键、评级为值的键值对。之后调用Spark的groupByKey函数,来对评级以用户ID为主键进行分
组:
user_ratings_grouped = rating_data.map(lambda fields:
(int(fields[0]),int(fields[2]))). groupByKey
接着求出每一个主键(用户ID)对应的评级集合的大小;这会给出各用户评级的次数:
user_ratings_byuser = user_ratings_grouped.map(lambda (k, v):
(k, len(v)))
user_ratings_byuser.take(5)
要检查结果RDD,可从中选出少数记录。这应该会返回一个(用户ID, 评级次数)键值对类
型的RDD:
[(1, 272), (2, 62), (3, 54), (4, 24), (5, 175)]
最后,用我们所熟悉的hist函数来绘制各用户评级分布的直方图。
user_ratings_byuser_local = user_ratings_byuser.map(lambda (k,v): v).collect
hist(user_ratings_byuser_local, bins=200, color='lightblue',normed=True)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16,10)
结果如图3-6所示。可以看出,大部分用户的评级次数少于100。但该分布也表明仍然有较多
用户做出过上百次的评级。图3-6 各用户的电影评级的分布
可以用类似的方法绘制各个电影评级次数的直方图,读者可自己练习。如果觉得不够,甚至
还可以提取出不同日期(可从评级数据集的最后一列的时间戳得到)下的电影评级情况,进
而绘制出总评级次数、参与评级的不同用户的个数,以及被评级的不同电影的个数的时间
线。时间线精确到每天。3.3 处理与转换数据
现在我们已对数据集进行过探索性的分析,并了解了用户和电影的一些特征。那接下来做什
么呢?
为让原始数据可用于机器学习算法,需要先对其进行清理,并可能需要将其进行各种转换,之后才能从转换后的数据里提取有用的特征。数据的转换和特征提取联系紧密。某些情况
下,一些转换本身便是特征提取的过程。
在之前处理电影数据集时我们已经看到数据清理的必要性。一般来说,现实中的数据会存在
信息不规整、数据点缺失和异常值问题。理想情况下,我们会修复非规整数据。但很多数据
集都源于一些难以重现的收集过程(比如网络活动数据和传感器数据),故实际上会难以修
复。值缺失和异常也很常见,且处理方式可与处理非规整信息类似。总的来说,大致的处理
方法如下。
过滤掉或删除非规整或有值缺失的数据:这通常是必须的,但的确会损失这
些数据里那些好的信息。
填充非规整或缺失的数据:可以根据其他的数据来填充非规整或缺失的数据。
方法包括用零值、全局期望或中值来填充,或是根据相邻或类似的数据点来做插
值(通常针对时序数据)等。选择正确的方式并不容易,它会因数据、应用场景
和个人经验而不同。
对异常值做鲁棒处理:异常值的主要问题在于即使它们是极值也不一定就是错
的。到底是对是错通常很难分辨。异常值可被移除或是填充,但的确存在某些统
计技术(如鲁棒回归)可用于处理异常值或是极值。
对可能的异常值进行转换:另一种处理异常值或极值的方法是进行转换。对那
些可能存在异常值或值域覆盖过大的特征,利用如对数或高斯核对其转换。这类
转换有助于降低变量存在的值跳跃的影响,并将非线性关系变为线性的。
非规整数据和缺失数据的填充
前面已经举过过滤非规整数据的例子。顺着上述代码,下面的代码对发行日期有问题的数据
采取了填充策略,即用发行日期的中位数来填充问题数据。
years_pre_processed = movie_fields.map(lambda fields:
fields[2]).map(lambda x: convert_year(x)).collect
years_pre_processed_array = np.array(years_pre_processed)
在选取所有的发行日期后,这里首先计算发行年份的平均数和中位数。选取的数据不包含非
规整数据。然后用numpy的函数来找出year_pre_processed_array中的非规整数据点
的序号(之前我们给该数据点分配了1900的值)。最后通过该序号来将中位数作为非规整
数据的发行年份:mean_year =
np.mean(years_pre_processed_array[years_pre_processed_array!=1900])
median_year =
np.median(years_pre_processed_array[years_pre_processed_array!=1900])
index_bad_data = np.where(years_pre_processed_array==1900)[0]
[0]
years_pre_processed_array[index_bad_data] = median_year
print Mean year of release: %d % mean_year
print Median year of release: %d % median_year
print Index of '1900' after assigning median: %s %
np.where(years_pre_processed_array == 1900)[0]
其输出应如下:
Mean year of release: 1989
Median year of release: 1995
Index of '1900' after assigning median: []
这里同时求出了发行年份的平均值和中位值。从输出也可看到,发行年份分布的偏向使得其
中位值很高。特定情况下通常不容易确定选取什么样的值来做填充才够精确。 但在本例中,从该偏向来看使用中位值来填充的确可行。
严格来说,上面示例代码的可扩展性并不很高,因为它要把数据都返
回给驱动程序。平均值的计算可通过Spark下数值型RDD的mean函数来实现,但目前并
没相应的中位数函数。我们可以自己编写这个函数来求中位数,又或是用sample函数
(后面几章会更多看到)计算样本的中位数。3.4 从数据中提取有用特征
在完成对数据的初步探索、处理和清理后,便可从中提取可供机器学习模型训练用的特征。
特征(feature)指那些用于模型训练的变量。每一行数据包含可供提取到训练样本中的各
种信息。从根本上说,几乎所有机器学习模型都是与用向量表示的数值特征打交道;因此,我们需要将原始数据转换为数值。
特征可以概括地分为如下几种。
数值特征(numerical feature):这些特征通常为实数或整数,比如之前例子中
提到的年龄。
类别特征(categorical feature):它们的取值只能是可能状态集合中的某一种。
我们数据集中的用户性别、职业或电影类别便是这类。
文本特征(text feature):它们派生自数据中的文本内容,比如电影名、描述或
是评论。
其他特征:大部分其他特征都最终表示为数值。比如图像、视频和音频可被表
示为数值数据的集合。地理位置则可由经纬度或地理散列(geohash)表示。
这里我们将谈到数值、类别以及文本类的特征。
3.4.1 数值特征
原始的数值和一个数值特征之间的区别是什么?实际上,任何数值数据都能作为输入变量。
但是,机器学习模型中所学习的是各个特征所对应的向量的权值。这些权值在特征值到输出
或是目标变量(指在监督学习模型中)的映射过程中扮演重要角色。
由此我们会想使用那些合理的特征,让模型能从这些特征学到特征值和目标变量之间的关
系。比如年龄就是一个合理的特征。年龄的增加和某项支出之间可能就存在直接关系。类似
地,高度也是一个可直接使用的数值特征。
当数值特征仍处于原始形式时,其可用性相对较低,但可以转化为更有用的表示形式。位置
信息便是如此。若使用原始位置信息(比如用经纬度表示的),我们的模型可能学习不到该
信息和某个输出之间的有用关系,这就使得该信息的可用性不高,除非数据点的确很密集。
然而若对位置进行聚合或挑选后(比如聚焦为一个城市或国家),便容易和特定输出之间存
在某种关联了。
3.4.2 类别特征
当类别特征仍为原始形式时,其取值来自所有可能取值所构成的集合而不是一个数字,故不
能作为输入。如之前的例子中的用户职业便是一个类别特征变量,其可能取值有学生、程序
员等。这样的类别特征也称作名义(nominal)变量,即其各个可能取值之间没有顺序关系。相
反,那些存在顺序关系的(比如之前提到的评级,从定义上说评级5会高于或是好于评级1)
则被称为有序(ordinal)变量。
将类别特征表示为数字形式,常可借助 k 之1(1-of-k)方法进行。将名义变量表示为可用于
机器学习任务的形式,会需要借助如 k 之1编码这样的方法。有序变量的原始值可能就能直
接使用,但也常会经过和名义变量一样的编码处理。
假设变量可取的值有 k 个。如果对这些值用1到 k 编序,则可以用长度为k的二元向量来表示
一个变量的取值。在这个向量里,该取值对应的序号所在的元素为1,其他元素都为0。
比如,我们可以取回occupation的所有可能取值:
all_occupations = user_fields.map(lambda fields: fields[3]).
distinct.collect
all_occupations.sort
然后可以依次对各可能的职业分配序号(注意,为与Python、Scala以及Java中数组编序相
同,这里也从0开始编号):
idx = 0
all_occupations_dict = {}
for o in all_occupations:
all_occupations_dict[o] = idx
idx +=1
看一下“k之1”编码会对新的例子分配什么值 print Encoding of 'doctor': %d %
all_occupations_dict['doctor']
print Encoding of 'programmer': %d %
all_occupations_dict['programmer']
其输出如下:
Encoding of 'doctor': 2
Encoding of 'programmer': 14
最后来编码programmer的取值。首先需创建一个长度和可能的职业数目相同(本例中为
5)的numpy数组,其各元素值为0。这可通过numpy的zeros函数实现。
之后将提取单词programmer的序号,并将数组中对应该序号的那个元素值赋为1:
K = len(all_occupations_dict)
binary_x = np.zeros(K)k_programmer = all_occupations_dict['programmer']
binary_x[k_programmer] = 1
print Binary feature vector: %s % binary_x
print Length of binary vector: %d % K
对应的输出为:
Binary feature vector: [ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
0. 1. 0. 0. 0. 0. 0. 0.] Length
Length of binary vector: 21
3.4.3 派生特征
上面曾提到,从现有的一个或多个变量派生出新的特征常常是有帮助的。理想情况下,派生
出的特征能比原始属性带来更多信息。
比如,可以分别计算各用户已有的电影评级的平均数。这将能给模型加入针对不同用户的个
性化特征(事实上,这常用于推荐系统)。在前文中我们也从原始的评级数据里创建了新的
特征以学习出更好的模型。
从原始数据派生特征的例子包括计算平均值、中位值、方差、和、差、最大值或最小值以及
计数。在先前内容中,我们也看到是如何从电影的发行年份和当前年份派生了新的movie
age特征的。这类转换背后的想法常常是对数值数据进行某种概括,并期望它能让模型学习
更容易。
数值特征到类别特征的转换也很常见,比如划分为区间特征。进行这类转换的变量常见的有
年龄、地理位置和时间。
将时间戳转为类别特征
下面以对评级时间的转换为例,说明如何将数值数据装换为类别特征。该时间的格式为Unix
的时间戳。我们可以用Python的datetime模块从中提取出日期、时间以及点钟(hour)信
息。其结果将是由各评级对应的点钟数所构成的RDD。
需要定义一个函数将评级时间戳提取为datetime的格式:
def extract_datetime(ts):
import datetime
return datetime.datetime.fromtimestamp(ts)
下面会再次用到之前例子中求出的rating_data RDD。
我们首先使用map将时间戳属性转换为Py thon int 类型。然后通过extract_datetime函数将各时间戳转为datetime类型的对象,进而提取出其点钟数。
timestamps = rating_data.map(lambda fields: int(fields[3]))
hour_of_day = timestamps.map(lambda ts:
extract_datetime(ts).hour)
hour_of_day.take(5)
若取出结果RDD的前5条记录,可看到如下输出:
[17, 21, 9, 7, 7]
这就完成了从原始的时间数据到表示评级发生的点钟的类别特征的转换。
现在,假设我们觉得这样的表示过于粗糙,想更为精确。我们可以将点钟数划分到一天中的
不同时段。
比如可以说7点到12点是上午,12点到14点是中午,以此类推。要生成这些时间段,可以创
建一个以点钟数为输入的函数来返回相应的时间段:
def assign_tod(hr):
times_of_day = {
'morning' : range(7, 12),'lunch' : range(12, 14),'afternoon' : range(14, 18),'evening' : range(18, 23),'night' : range(23, 7)
}
for k, v in times_of_day.iteritems:
if hr in v:
return k
现在对hour_of_day RDD里的各次评级的点钟数调用assign_tod函数:
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
time_of_day.take(5)
如果我们选择查看该新RDD里的前5条记录,会输出如下已转换的值:
['afternoon', 'evening', 'morning', 'morning', 'morning']
我们已将时间戳变量转为点钟数,再接着转为了时间段,从而得到了一个类别特征。我们可
以借助之前提到的 k 之1编码方法来生成其相应的二元特征向量。3.4.4 文本特征
从某种意义上说,文本特征也是一种类别特征或派生特征。下面以电影的描述(我们的数据
集中不含该数据)来举例。即便作为类别数据,其原始的文本也不能直接使用。因为假设每
个单词都是一种可能的取值,那单词之间可能出现的组合有几乎无限种。这时模型几乎看不
到有相同的特征出现两次,学习的效果也就不理想。从中可以看出,我们会希望将原始的文
本转换为一种更便于机器学习的形式。
文本的处理方式有很多种。自然语言处理便是专注于文本内容的处理、表示和建模的一个领
域。关于文本处理的完整内容并不在本书的讨论范围内,但我们会介绍一种简单且标准化的
文本特征提取方法。该方法被称为词袋(bag-of-word)表示法。
词袋法将一段文本视为由其中的文本或数字组成的集合,其处理过程如下。
分词(tokenization):首先会应用某些分词方法来将文本分隔为一个由词(一般
如单词、数字等)组成的集合。可用的方法如空白分隔法。这种方法在空白处对
文本分隔并可能还删除其他如标点符号和其他非字母或数字字符。
删除停用 ......
作者:Nick Pentreath
译者:蔡立宇 黄章帅 周济民
ISBN:978-7-115-39983-0
本书由北京图灵文化发展有限公司发行数字版。版权所有,侵权必究。
您购买的图灵电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。
我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。
如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可
能追究法律责任。
图灵社区会员 gaara(bboniao@163.com) 专享 尊重版权
版权声明
前言
本书内容
预备知识
本书目标
排版约定
读者反馈
客户支持
下载示例代码
勘误表
侵权行为
问题
致谢
第1章 Spark的环境搭建与运行
1.1 Spark的本地安装与配置
1.2 Spark集群
1.3 Spark编程模型
1.3.1 SparkContext类与SparkConf类
1.3.2 Spark shell
1.3.3 弹性分布式数据集
1.3.4 广播变量和累加器
1.4 Spark Scala编程入门
1.5 Spark Java编程入门
1.6 Spark Py thon编程入门
1.7 在Amazon EC2上运行Spark
启动一个EC2 Spark集群
1.8 小结
第2章 设计机器学习系统
2.1 MovieStream介绍
2.2 机器学习系统商业用例
2.2.1 个性化
2.2.2 目标营销和客户细分
2.2.3 预测建模与分析
2.3 机器学习模型的种类
2.4 数据驱动的机器学习系统的组成2.4.1 数据获取与存储
2.4.2 数据清理与转换
2.4.3 模型训练与测试回路
2.4.4 模型部署与整合
2.4.5 模型监控与反馈
2.4.6 批处理或实时方案的选择
2.5 机器学习系统架构
动手练习
2.6 小结
第3章 Spark上数据的获取、处理与准备
3.1 获取公开数据集
MovieLens 100k数据集
3.2 探索与可视化数据
3.2.1 探索用户数据
3.2.2 探索电影数据
3.2.3 探索评级数据
3.3 处理与转换数据
非规整数据和缺失数据的填充
3.4 从数据中提取有用特征
3.4.1 数值特征
3.4.2 类别特征
3.4.3 派生特征
3.4.4 文本特征
3.4.5 正则化特征
3.4.6 用软件包提取特征
3.5 小结
第4章 构建基于Spark的推荐引擎
4.1 推荐模型的分类
4.1.1 基于内容的过滤
4.1.2 协同过滤
4.1.3 矩阵分解
4.2 提取有效特征
从MovieLens 100k数据集提取特征
4.3 训练推荐模型
4.3.1 使用MovieLens 100k数据集训练模型4.3.2 使用隐式反馈数据训练模型
4.4 使用推荐模型
4.4.1 用户推荐
4.4.2 物品推荐
4.5 推荐模型效果的评估
4.5.1 均方差
4.5.2 K 值平均准确率
4.5.3 使用MLlib内置的评估函数
4.6 小结
第5章 Spark构建分类模型
5.1 分类模型的种类
5.1.1 线性模型
5.1.2 朴素贝叶斯模型
5.1.3 决策树
5.2 从数据中抽取合适的特征
从KaggleStumbleUpon evergreen分类数据集中抽取特征
5.3 训练分类模型
在KaggleStumbleUpon evergreen的分类数据集中训练分类模型
5.4 使用分类模型
在KaggleStumbleUpon evergreen数据集上进行预测
5.5 评估分类模型的性能
5.5.1 预测的正确率和错误率
5.5.2 准确率和召回率
5.5.3 ROC曲线和AUC
5.6 改进模型性能以及参数调优
5.6.1 特征标准化
5.6.2 其他特征
5.6.3 使用正确的数据格式
5.6.4 模型参数调优
5.7 小结
第6章 Spark构建回归模型
6.1 回归模型的种类
6.1.1 最小二乘回归
6.1.2 决策树回归
6.2 从数据中抽取合适的特征从bike sharing数据集抽取特征
6.3 回归模型的训练和应用
在bike sharing数据上训练回归模型
6.4 评估回归模型的性能
6.4.1 均方误差和均方根误差
6.4.2 平均绝对误差
6.4.3 均方根对数误差
6.4.4 R-平方系数
6.4.5 计算不同度量下的性能
6.5 改进模型性能和参数调优
6.5.1 变换目标变量
6.5.2 模型参数调优
6.6 小结
第7章 Spark构建聚类模型
7.1 聚类模型的类型
7.1.1 K-均值聚类
7.1.2 混合模型
7.1.3 层次聚类
7.2 从数据中提取正确的特征
从MovieLens数据集提取特征
7.3 训练聚类模型
用MovieLens数据集训练聚类模型
7.4 使用聚类模型进行预测
用MovieLens数据集解释类别预测
7.5 评估聚类模型的性能
7.5.1 内部评价指标
7.5.2 外部评价指标
7.5.3 在MovieLens数据集计算性能
7.6 聚类模型参数调优
通过交叉验证选择K
7.7 小结
第8章 Spark应用于数据降维
8.1 降维方法的种类
8.1.1 主成分分析
8.1.2 奇异值分解8.1.3 和矩阵分解的关系
8.1.4 聚类作为降维的方法
8.2 从数据中抽取合适的特征
从LFW数据集中提取特征
8.3 训练降维模型
在LFW数据集上运行PCA
8.4 使用降维模型
8.4.1 在LFW数据集上使用PCA投影数据
8.4.2 PCA和SVD模型的关系
8.5 评价降维模型
在LFW数据集上估计SVD的 k 值
8.6 小结
第9章 Spark高级文本处理技术
9.1 处理文本数据有什么特别之处
9.2 从数据中抽取合适的特征
9.2.1 短语加权表示
9.2.2 特征哈希
9.2.3 从20新闻组数据集中提取TF-IDF特征
9.3 使用TF-IDF模型
9.3.1 20 Newsgroups数据集的文本相似度和TF-IDF特征
9.3.2 基于20 Newsgroups数据集使用TF-IDF训练文本分类器
9.4 评估文本处理技术的作用
在20 Newsgroups数据集上比较原始特征和处理过的TF-IDF特征
9.5 Word2Vec模型
基于20 Newsgroups数据集训练Word2Vec
9.6 小结
第10章 Spark Streaming在实时机器学习上的应用
10.1 在线学习
10.2 流处理
10.2.1 Spark Streaming介绍
10.2.2 使用Spark Streaming缓存和容错
10.3 创建Spark Streaming应用
10.3.1 消息生成端
10.3.2 创建简单的流处理程序
10.3.3 流式分析10.3.4 有状态的流计算
10.4 使用Spark Streaming进行在线学习
10.4.1 流回归
10.4.2 一个简单的流回归程序
10.4.3 流K-均值
10.5 在线模型评估
使用Spark Streaming比较模型性能
10.6 小结
版权声明
Copyright ? 2015 Packt Publishing. First published in the English language under the title Machine
Learning with Spark.
Simplified Chinese-language edition copy right ? 2015 by Posts Telecom Press. All rights
reserved.
本书中文简体字版由Packt Publishing授权人民邮电出版社独家出版。未经出版者书面许可,不得以任何方式复制或抄袭本书内容。
版权所有,侵权必究。
前言
近年来,被收集、存储和分析的数据量呈爆炸式增长,特别是与网络、移动设备相关的数
据,以及传感器产生的数据。大规模数据的存储、处理、分析和建模,以前只有Google、Yahoo!、Facebook和Twitter这样的大公司才涉及,而现在越来越多的机构都会面对处理海量
数据的挑战。
面对如此量级的数据以及常见的实时利用该数据的需求,人工驱动的系统难以应对。这就催
生了所谓的大数据和机器学习系统,它们从数据中学习并可自动决策。
为了能以低成本实现对大规模数据的支持,Google、Yahoo!、Amazon和Facebook涌现了大量
开源技术。这些技术旨在通过在计算机集群上进行分布式数据存储和计算来简化大数据处
理。
这些技术中最广为人知的是Apache Hadoop,它极大简化了海量数据的存储(通过Hadoop
Distributed File Sy stem,即HDFS)和计算(通过Hadoop MapReduce,一种在集群里多个节
点上进行并行计算的框架)流程,并降低了相应的成本。
然而,MapReduce有其严重的缺点,如启动任务时的高开销、对中间数据和计算结果写入磁
盘的依赖。这些都使得Hadoop不适合迭代式或低延迟的任务。Apache Spark是一个新的分布
式计算框架,从设计开始便注重对低延迟任务的优化,并将中间数据和结果保存在内存中。
Spark提供简洁明了的函数式API,并完全兼容Hadoop生态系统。
不止如此,Spark还提供针对Scala、Java和Py thon语言的原生API。通过Scala和Py thon的
API,Spark应用程序可充分利用Scala或Py thon语言的优势。这些优势包括使用相关的解释程
序进行实时交互式的程序编写。Spark目前还自带一个分布式机器学习和数据挖掘工具包
MLlib。经过重点开发,这个包中已经包括一些针对常见计算任务的高质量、可扩展的算
法。本书会涉及其中的部分算法。
在大型数据集上进行机器学习颇具挑战性。这主要是因为常见的机器学习算法并非为并行架
构而设计。大部分情况下,设计这样的算法并不容易。机器学习模型一般具有迭代式的特
性,而这与Spark的设计目标一致。并行计算的框架有很多,但很少能在兼顾速度、可扩展
性、内存处理和容错性的同时,还提供灵活、表达力丰富的API。Spark是其中为数不多的一
个。
本书将关注机器学习技术的实际应用。我们会简要介绍机器学习算法的一些理论知识,但总
的来说本书注重技术实践。具体来说,我们会通过示例程序和样例代码,举例说明如何借助
Spark、MLlib以及其他常见的免费机器学习和数据分析套件来创建一个有用的机器学习系
统。本书内容
第1章 “Spark的环境搭建与运行”,会讲到如何安装和搭建Spark框架的本地开发环境,以
及怎样使用Amazon EC2在云端创建Spark集群。之后介绍Spark编程模型和API。最后分别用
Scala、Java和Py thon语言创建一个简单的Spark应用。
第2章 “设计机器学习系统”,会展示一个贴合实际的机器学习系统案例。随后会针对该案
例设计一个基于Spark的智能系统所对应的高层架构。
第3章 “Spark上数据的获取、处理与准备”,会详细介绍如何从各种免费的公开渠道获取
用于机器学习系统的数据。我们将学到如何进行数据处理和清理,并通过可用的工具、库和
Spark函数将它们转换为符合要求的数据,使之具备可用于机器学习模型的特征。
第4章 “构建基于Spark的推荐引擎”,展示了如何创建一个基于协同过滤的推荐模型。该
模型将用于向给定用户推荐物品,以及创建与给定物品相似的物品。这一章还会讲到如何使
用标准指标来评估推荐模型的效果。
第5章 “Spark构建分类模型”,阐述如何创建二元分类模型,以及如何利用标准的性能评估
指标来评估分类效果。
第6章 “Spark构建回归模型”,扩展了第5章中的分类模型以创建一个回归模型,并详细介
绍回归模型的评估指标。
第7章 “Spark构建聚类模型”,探索如何创建聚类模型以及相关评估方法的使用。你会学到
如何分析和可视化聚类结果。
第8章 “Spark应用于数据降维”,将通过多种方法从数据中提取其内在结构并降低其维度。
你会学到一些常见的降维方法,以及如何对它们进行应用和分析。这里还会讲到如何将降维
的结果作为其他机器学习模型的输入。
第9章 “Spark高级文本处理技术”,介绍处理大规模文本数据的方法。这包括从文本提取特
征以及处理文本数据常见的高维特征的方法。
第10章 “Spark Streaming在实时机器学习上的应用”,对Spark Streaming进行综述,并介
绍在流数据上的机器学习中它如何实现对在线和增量学习方法的支持。预备知识
本书假设读者已有基本的Scala、Java或Python编程经验,以及机器学习、统计学和数据分析
方面的基础知识。本书目标
本书的预期读者是初中级数据科学研究者、数据分析师、软件工程师和对大规模环境下的机
器学习或数据挖掘感兴趣的人。读者不需要熟悉Spark,但若具有统计、机器学习相关软件
(比如MATLAB、scikit-learn、Mahout、R和Weka等)或分布式系统(如Hadoop)的实践经
验,会很有帮助。排版约定
在本书中,你会发现一些不同的文本样式,用以区别不同种类的信息。下面举例说明。
代码段的格式如下:
val conf = new SparkConf
.setAppName(Test Spark App)
.setMaster(local[4])
val sc = new SparkContext(conf)
所有的命令行输入或输出的格式如下:
>tar xfvz spark-1.2.0-bin-hadoop2.4.tgz
>cd spark-1.2.0-bin-hadoop2.4
新术语和重点词汇以楷体标示。屏幕、目录或对话框上的内容这样表示:“这些信息可以从
AWS主页上依次点击‘Account’ | ‘Security Credentials’ | ‘Access Credentials’看到。”
这个图标表示警告或需要特别注意的内容。
这个图标表示提示或者技巧。读者反馈
欢迎提出反馈。如果你对本书有任何想法,喜欢它什么,不喜欢它什么,请让我们知道。要
写出真正对大家有帮助的书,了解读者的反馈很重要。
一般的反馈,请发送电子邮件至feedback@packtpub.com,并在邮件主题中包含书名。
如果你有某个主题的专业知识,并且有兴趣写成或帮助促成一本书,请参考我们的作者指
南http:www.pack tpub.comauthors。客户支持
现在,你是一位自豪的Packt图书的拥有者,我们会尽全力帮你充分利用你手中的书。
下载示例代码
你可以用你的账户从http:www.pack tpub.com下载所有已购买Packt图书的示例代码文件。如
果你从其他地方购买本书,可以访问http:www.packtpub.comsupport并注册,我们将通过电
子邮件把文件发送给你。
勘误表
虽然我们已尽力确保本书内容正确,但出错仍旧在所难免。如果你在我们的书中发现错误,不管是文本还是代码,希望能告知我们,我们不胜感激。这样做可以减少其他读者的困扰,帮助我们改进本书的后续版本。如果你发现任何错误,请访问
http:www.packtpub.comsubmit-errata提交,选择你的书,点击勘误表提交表单的链接,并输
入详细说明。勘误一经核实,你的提交将被接受,此勘误将上传到本公司网站或添加到现有
勘误表。从http:www.pack tpub.comsupport选择书名就可以查看现有的勘误表。侵权行为
互联网上的盗版是所有媒体都要面对的问题。Pack t非常重视保护版权和许可证。如果你发现
我们的作品在互联网上被非法复制,不管以什么形式,都请立即为我们提供位置地址或网站
名称,以便我们可以寻求补救。
请把可疑盗版材料的链接发到copyright@packtpub.com。
非常感谢你帮助我们保护作者,以及保护我们给你带来有价值内容的能力。问题
如果你对本书内容存有疑问,不管是哪个方面,都可以通过questions@packtpub.com联系我
们,我们将尽最大努力来解决。
致谢
过去一年里,本书的写作过程如同过山车一般跌宕起伏,伴随着熬夜和周末加班。对机器学
习和Apache Spark的热爱让我受益良多,也希望本书能让读者有所收获。
非常感谢Packt出版团队在本书写作和编辑过程中提供的帮助,感谢Rebecca、Susmita、Sudhir、Amey、Neil、Vivek、Pankaj和所有为本书出过力的人。
同样感谢StumbleUpon公司的Debora Donato,她提供过数据和法律方面的协助。
写书的过程可能会让人感到孤立无援,因此审校人的反馈对保证本书的可读性,以及知晓还
需要作出哪些调整十分有帮助。我深深地感谢Andrea Mostosi、Hao Ren和Krishna Sank ar花费
时间审阅本书,并提供细致且极为重要的反馈。
家人和朋友的不懈支持是本书得以写成的必要因素。特别是我的好妻子Tammy,感谢她在若
干个夜晚和周末的陪伴与支持。谢谢你们所有人!
最后,谢谢你阅读这本书,希望它对你能有所帮助。
第1章 Spark的环境搭建与运行
Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。该
框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操
作都进行了抽象。它提供了一个更高级别的API用于处理分布式数据。从这方面说,它与
Apache Hadoop等分布式处理框架类似。但在底层架构上,Spark与它们有所不同。
Spark起源于加利福利亚大学伯克利分校的一个研究项目。学校当时关注分布式机器学习算法
的应用情况。因此,Spark从一开始便为应对迭代式应用的高性能需求而设计。在这类应用
中,相同的数据会被多次访问。该设计主要靠利用数据集内存缓存以及启动任务时的低延迟
和低系统开销来实现高性能。再加上其容错性、灵活的分布式数据结构和强大的函数式编程
接口,Spark在各类基于机器学习和迭代分析的大规模数据处理任务上有广泛的应用,这也表
明了其实用性。
关于Spark项目的更多背景信息,包括其开发的核心研究论文,可从项
目的历史介绍页面中查到:http:spark .apache.orgcommunity.htmlhistory。
Spark支持四种运行模式。
本地单机模式:所有Spark进程都运行在同一个Java虚拟机(Java Vitural
Machine,JVM)中。
集群单机模式:使用Spark自己内置的任务调度框架。
基于Mesos:Mesos是一个流行的开源集群计算框架。
基于YARN:即Hadoop 2,它是一个与Hadoop关联的集群计算和资源调度框架。
本章主要包括以下内容。
下载Spark二进制版本并搭建一个本地单机模式下的开发环境。各章的代码示例都
在该环境下运行。
通过Spark的交互式终端来了解它的编程模型及其API。
分别用Scala、Java和Py thon语言来编写第一个Spark程序。
在Amazon的Elastic Cloud Compute(EC2)平台上架设一个Spark集群。相比本地
模式,该集群可以应对数据量更大、计算更复杂的任务。通过自定义脚本,Spark同样可以运行在Amazon的Elastic MapReduce服
务上,但这不在本书讨论范围内。相关信息可参
考http:aws.amazon.comarticles4926593393724923;本书写作时,这篇文章是基于Spark
1.1.0写的。
如果读者曾构建过Spark环境并有Spark程序编写基础,可以跳过本章。1.1 Spark的本地安装与配置
Spark能通过内置的单机集群调度器来在本地运行。此时,所有的Spark进程运行在同一个Java
虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式很适合程序的原型
设计、开发、调试及测试。同样,它也适应于在单机上进行多核并行计算的实际场景。
Spark的本地模式与集群模式完全兼容,本地编写和测试过的程序仅需增加少许设置便能在集
群上运行。
本地构建Spark环境的第一步是下载其最新的版本包(本书写作时为1.2.0版)。各个版本的版
本包及源代码的GitHub地址可从Spark项目的下载页面找
到:http:spark.apache.orgdownloads.html。
Spark的在线文档http:spark .apache.orgdocslatest涵盖了进一步学习
Spark所需的各种资料。强烈推荐读者浏览查阅。
为了访问HDFS(Hadoop Distributed File System,Hadoop分布式文件系统)以及标准或定制
的Hadoop输入源,Spark的编译需要与Hadoop的版本对应。上述下载页面提供了针对Hadoop
1、CDH4(Cloudera的Hadoop发行版)、MapR的Hadoop发行版和Hadoop 2(YARN)的预
编译二进制包。除非你想构建针对特定版本Hadoop的Spark,否则建议你通过如下链接从
Apache镜像下载Hadoop 2.4预编译版本:http:www.apache.orgdyncloser.cgisparkspark-
1.2.0spark-1.2.0-bin-hadoop2.4.tgz。
Spark的运行依赖Scala编程语言(本书写作时为2.10.4版)。好在预编译的二进制包中已包含
Scala运行环境,我们不需要另外安装Scala便可运行Spark。但是,JRE(Java运行时环境)或
JDK(Java开发套件)是要安装的(相应的安装指南可参见本书代码包中的软硬件列表)。
下载完上述版本包后,解压,并在终端进入解压时新建的主目录:
>tar xfvz spark-1.2.0-bin-hadoop2.4.tgz
>cd spark-1.2.0-bin-hadoop2.4
用户运行Spark的脚本在该目录的bin目录下。我们可以运行Spark附带的一个示例程序来测试
是否一切正常:
>.binrun-example org.apache.spark.examples.SparkPi该命令将在本地单机模式下执行SparkPi这个示例。在该模式下,所有的Spark进程均运行于同
一个JVM中,而并行处理则通过多线程来实现。默认情况下,该示例会启用与本地系统的
CPU核心数目相同的线程。示例运行完,应可在输出的结尾看到类似如下的提示:…
141127 20:58:47 INFO SparkContext: Job finished: reduce at
SparkPi.scala:35, took 0.723269s
Pi is roughly 3.1465…
要在本地模式下设置并行的级别,以local[N]的格式来指定一个master变量即可。上述
参数中的N表示要使用的线程数目。比如只使用两个线程时,可输入如下命令:
>MASTER=local[2] .binrun-example
org.apache.spark.examples.SparkPi1.2 Spark集群
Spark集群由两类程序构成:一个驱动程序和多个执行程序。本地模式时所有的处理都运行在
同一个JVM内,而在集群模式时它们通常运行在不同的节点上。
举例来说,一个采用单机模式的Spark集群(即使用Spark内置的集群管理模块)通常包括:
一个运行Spark单机主进程和驱动程序的主节点;
各自运行一个执行程序进程的多个工作节点。
在本书中,我们将使用Spark的本地单机模式做概念讲解和举例说明,但所用的代码也可运行
在Spark集群上。比如在一个Spark单机集群上运行上述示例,只需传入主节点的URL即可:
>MASTER=spark:IP:PORT .binrun-example
org.apache.spark.examples.SparkPi
其中的IP和PORT分别是主节点IP地址和端口号。这是告诉Spark让示例程序运行在主节点所
对应的集群上。
Spark集群管理和部署的完整方案不在本书的讨论范围内。但是,本章后面会对Amazon EC2
集群的设置和使用做简要说明。
Spark集群部署的概要介绍可参见如下链接:
http:spark.apache.orgdocslatestcluster-overview.html
http:spark.apache.orgdocslatestsubmitting-applications.html1.3 Spark编程模型
在对Spark的设计进行更全面的介绍前,我们先介绍SparkContext对象以及Spark shell。后
面将通过它们来了解Spark编程模型的基础知识。
虽然这里会对Spark的使用进行简要介绍并提供示例,但要想了解更
多,可参考下面这些资料。
Spark快速入门:http:spark .apache.orgdocslatestquick -start.html。
针对Scala、Java和Py thon的《Spark编程指
南》:http:spark .apache.orgdocslatestprogramming-guide.html。
1.3.1 SparkContext类与SparkConf类
任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)
开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的
各种参数(比如主节点的URL)。
初始化后,我们便可用SparkContext对象所包含的各种方法来创建和操作分布式数据集
和共享变量。Spark shell(在Scala和Py thon下可以,但不支持Java)能自动完成上述初始
化。若要用Scala代码来实现的话,可参照下面的代码:
val conf = new SparkConf
.setAppName(Test Spark App)
.setMaster(local[4])
val sc = new SparkContext(conf)
这段代码会创建一个4线程的SparkContext对象,并将其相应的任务命名为Test Spark
APP。我们也可通过如下方式调用SparkContext的简单构造函数,以默认的参数值来创建
相应的对象。其效果和上述的完全相同:
val sc = new SparkContext(local[4], Test Spark App)下载示例代码
你可从http:www.packtpub.com下载你账号购买过的Packt书籍所对应的示例代码。若书
是从别处购买的,则可在https:www.packtpub.combook scontentsupport注册,相应的代
码会直接发送到你的电子邮箱。
1.3.2 Spark shell
Spark支持用Scala或Py thon REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程
序编写。由于输入的代码会被立即计算,shell能在输入代码时给出实时反馈。在Scala shell
里,命令执行结果的值与类型在代码执行完后也会显示出来。
要想通过Scala来使用Spark shell,只需从Spark的主目录执行.binspark-shell。它会
启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个
对象。该命令的终端输出应该如下图所示:要想在Py thon shell中使用Spark,直接运行.binpyspark命令即可。与Scala shell类似,Py thon下的SparkContext对象可以通过Py thon变量sc来调用。上述命令的终端输出应该
如下图所示:1.3.3 弹性分布式数据集
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。一个RDD
代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的
多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。Spark中的RDD
具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。
1. 创建RDD
RDD可从现有的集合创建。比如在Scala shell中:
val collection = List(a, b, c, d, e)
val rddFromCollection = sc.parallelize(collection)RDD也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。基于
Hadoop的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文
件、其他Hadoop标准格式、HBase、Cassandra等。以下举例说明如何用一个本地文件系统里
的文件创建RDD:
val rddFromTextFile = sc.textFile(LICENSE)
上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个
表示文本文件中某一行文字的String(字符串)对象。
2. Spark操作
创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被
分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里
的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。
Spark的操作通常采用函数式风格。对于那些熟悉用Scala或Py thon进行函数式编程的程序员
来说,这不难掌握。但Spark API其实容易上手,所以那些没有函数式编程经验的程序员也不
用担心。
Spark程序中最常用的转换操作便是map操作。该操作对一个RDD里的每一条记录都执行某个
函数,从而将输入映射成为新的输出。比如,下面这段代码便对一个从本地文本文件创建的
RDD进行操作。它对该RDD中的每一条记录都执行size函数。之前我们曾创建过一个这样
的由若干String构成的RDD对象。通过map函数,我们将每一个字符串都转换为一个整
数,从而返回一个由若干Int构成的RDD对象。
val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
其输出应与如下类似,其中也提示了RDD的类型:
intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] =
MappedRDD[5] at map at
示例代码中的=>是Scala下表示匿名函数的语法。匿名函数指那些没有指定函数名的函数
(比如Scala或Py thon中用def关键字定义的函数)。匿名函数的具体细节并不在本书讨论范围内,但由于它们在Scala、Py thon以及Java 8中大量使用(示例或现实应用中都是),列举一些实例仍会有帮助。
语法line => line.size表示以=>操作符左边的部分作为输入,对其执行一个函
数,并以=>操作符右边代码的执行结果为输出。在这个例子中,输入为line,输出则
是line.size函数的执行结果。在Scala语言中,这种将一个String对象映射为一
个Int的函数被表示为String => Int。
该语法使得每次使用如map这种方法时,都不需要另外单独定义一个函数。当函数简单
且只需使用一次时(像本例一样时),这种方式很有用。
现在我们可以调用一个常见的执行操作count,来返回RDD中的记录数目。
intsFromStringsRDD.count
执行的结果应该类似如下输出:
140129 23:28:28 INFO SparkContext: Starting job: count at
140129 23:28:28 INFO SparkContext: Job finished: count at
res4: Long = 398
如果要计算这个文本文件里每行字符串的平均长度,可以先使用sum函数来对所有记录的长
度求和,然后再除以总的记录数目:
val sumOfRecords = intsFromStringsRDD.sum
val numRecords = intsFromStringsRDD.count
val aveLengthOfRecord = sumOfRecords numRecords
结果应该如下:
aveLengthOfRecord: Double = 52.06030150753769
Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中,count返回一个Long,sum返回一个Double)。这就意味着多个操作可以很自然
地前后连接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子
相同的结果:
val aveLengthOfRecordChained = rddFromTextFile.map(line =>
line.size).sum rddFromTextFile.count
值得注意的一点是,Spark中的转换操作是延后的。也就是说,在RDD上调用一个转换操作并
不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才
被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返
回给驱动程序,从而提高了Spark的效率。
这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会
得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:
val transformedRDD = rddFromTextFile.map(line => line.size).
filter(size => size > 10).map(size => size 2)
相应的终端输出如下:
transformedRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[8] at
map at
注意,这里实际上没有触发任何计算,也没有结果被返回。如果我们现在在新的RDD上调用
一个执行操作,比如sum,该计算将会被触发:
val computation = transformedRDD.sum
现在你可以看到一个Spark任务被启动,并返回如下终端输出:...
141127 21:48:21 INFO SparkContext: Job finished: sum at
computation: Double = 60468.0RDD支持的转换和执行操作的完整列表以及更为详细的例子,参见
《Spark编程指南》(http:spark.apache.orgdocslatestprogramming-guide.htmlrdd-
operations)以及Spark API(Scala)文档
(http:spark.apache.orgdocslatestapiscalaindex.htmlorg.apache.spark .rdd.RDD)。
3. RDD缓存策略
Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache
函数来实现:
rddFromTextFile.cache
调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个
执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因
此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。
但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的IO操作,加快计算。多数情况下,这会取得数倍的速度提升。
如果现在在已缓存了的RDD上调用count或sum函数,应该可以感觉到RDD的确已经载入到
了内存中:
val aveLengthOfRecordChained = rddFromTextFile.map(line =>
line.size).
sum rddFromTextFile.count
实际上,从下方的输出我们可以看到,数据在第一次调用cache时便已缓存到内存,并占用
了大约62 KB的空间,余下270 MB可用:...
140130 06:59:27 INFO MemoryStore: ensureFreeSpace(63454)
called with curMem=32960, maxMem=311387750
140130 06:59:27 INFO MemoryStore: Block rdd_2_0 stored as
values to memory (estimated size 62.0 KB, free 296.9 MB)
140130 06:59:27 INFO
BlockManagerMasterActorBlockManagerInfo: Added rdd_2_0 in
memory on 10.0.0.3:55089 (size: 62.0 KB, free: 296.9 MB)...
现在,我们再次求平均长度:
val aveLengthOfRecordChainedFromCached =
rddFromTextFile.map(line => line.size).sum
rddFromTextFile.count
从如下的输出中应该可以看出缓存的数据是从内存直接读出的:...
140130 06:59:34 INFO BlockManager: Found block rdd_2_0
locally...
Spark支持更为细化的缓存策略。通过persist函数可以指定Spark的数
据缓存策略。关于RDD缓存的更多信息可参
见:http:spark.apache.orgdocslatestprogramming-guide.htmlrdd-persistence。
1.3.4 广播变量和累加器
Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。
广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序创建后
发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如
机器学习,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即
可:
val broadcastAList = sc.broadcast(List(a, b, c, d,e))
终端的输出表明,广播变量存储在内存中,占用的空间大概是488字节,仍余下270 MB可用
空间:
140130 07:13:32 INFO MemoryStore: ensureFreeSpace(488) called with curMem=96414, maxMem=311387750
140130 07:13:32 INFO MemoryStore: Block broadcast_1 stored as
values to memory (estimated size 488.0 B, free 296.9 MB)
broadCastAList:
org.apache.spark.broadcast.Broadcast[List[String]] =
Broadcast(1)
广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的
value方法:
sc.parallelize(List(1, 2, 3)).map(x =>
broadcastAList.value ++ x).collect
这段代码会从{1, 2, 3}这个集合(一个Scala List)里,新建一个带有三条记录
的RDD。map函数里的代码会返回一个新的List对象。这个对象里的记录由之前创建的那
个broadcastAList里的记录与新建的RDD里的三条记录分别拼接而成。
注意,上述代码使用了collect函数。这个函数是一个Spark执行函数,它将整个RDD以
Scala(Python或Java)集合的形式返回驱动程序。
通常只在需将结果返回到驱动程序所在节点以供本地处理时,才调用collect函数。
注意,collect函数一般仅在的确需要将整个结果集返回驱动程序并
进行后续处理时才有必要调用。如果在一个非常大的数据集上调用该函数,可能耗尽驱
动程序的可用内存,进而导致程序崩溃。
高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在
不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便
属于这类情况。
从如下结果可以看出,新生成的RDD里包含3条记录,其每一条记录包含一个由原来被广播
的List变量附加一个新的元素所构成的新记录(也就是说,新记录分别以1、2、3结尾)。...
140131 10:15:39 INFO SparkContext: Job finished: collect at
累加器(accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不
同,是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累
加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算
以及返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只
允许驱动程序访问。累加器同样可以在Spark代码中通过value访问。
关于累加器的更多信息,可参见《Spark编程指
南》:http:spark .apache.orgdocslatestprogramming-guide.htmlshared-variables。1.4 Spark Scala编程入门
下面我们用上一节所提到的内容来编写一个简单的Spark数据处理程序。该程序将依次用
Scala、Java和Py thon三种语言来编写。所用数据是客户在我们在线商店的商品购买记录。该
数据存在一个CSV文件中,名为UserPurchaseHistory.csv,内容如下所示。文件的每一行对应
一条购买记录,从左到右的各列值依次为客户名称、商品名以及商品价格。
John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49
对于Scala程序而言,需要创建两个文件:Scala代码文件以及项目的构建配置文件。项目将
使用SBT(Scala Build Tool,Scala构建工具)来构建。为便于理解,建议读者下载示例代码
scala-spark-app。该资源里的data目录下包含了上述CSV文件。运行这个示例项目需要系统中
已经安装好SBT(编写本书时所使用的版本为0.13.1)。
配置SBT并不在本书讨论范围内,但读者可以从http:www.scala-
sbt.orgreleasedocsGetting-StartedSetup.html找到更多信息。
我们的SBT配置文件是build.sbt,其内容如下面所示(注意,各行代码之间的空行是必需
的):
name := scala-spark-app
version := 1.0
scalaVersion := 2.10.4
libraryDependencies += org.apache.spark %% spark-core %
1.2.0
最后一行代码是添加Spark到本项目的依赖库。
相应的Scala程序在ScalaApp.scala这个文件里。接下来我们会逐一讲解代码的各个部分。首
先,导入所需要的Spark类:import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
用Scala编写的一个简单的Spark应用
object ScalaApp {
在主函数里,我们要初始化所需的SparkContext对象,并且用它通过textFile函数来
访问CSV数据文件。之后对每一行原始字符串以逗号为分隔符进行分割,提取出相应的用户
名、产品和价格信息,从而完成对原始文本的映射:
def main(args: Array[String]) {
val sc = new SparkContext(local[2], First Spark App)
将CSV格式的原始数据转化为(user,product,price)格式的记录集
val data = sc.textFile(dataUserPurchaseHistory.csv)
.map(line => line.split(,))
.map(purchaseRecord => (purchaseRecord(0),purchaseRecord(1),purchaseRecord(2)))
现在,我们有了一个RDD,其每条记录都由(user, product, price)三个字段构成。
我们可以对商店计算如下指标:
购买总次数
客户总个数
总收入
最畅销的产品
计算方法如下:
求购买次数 val numPurchases = data.count
求有多少个不同客户购买过商品
val uniqueUsers = data.map{ case (user, product, price) =>
user }.distinct.count
求和得出总收入 val totalRevenue = data.map{ case (user, product, price) =>
price.toDouble }.sum
求最畅销的产品是什么 val productsByPopularity = data
.map{ case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)最后那段计算最畅销产品的代码演示了如何进行MapReduce模式的计算,该模式随Hadoop
而流行。第一步,我们将(user, product, price)格式的记录映射为(product, 1)
格式。然后,我们执行一个reduceByKey操作,它会对各个产品的1值进行求和。
转换后的RDD包含各个商品的购买次数。有了这个RDD后,我们可以调用collect函数,这会将其计算结果以Scala集合的形式返回驱动程序。之后在驱动程序的本地对这些记录按照
购买次数进行排序。(注意,在实际处理大量数据时,我们通常通过sortByKey这类操作
来对其进行并行排序。)
最后,可在终端上打印出计算结果:
println(Total purchases: + numPurchases)
println(Unique users: + uniqueUsers)
println(Total revenue: + totalRevenue)
println(Most popular product: %s with %d purchases.
format(mostPopular._1, mostPopular._2))
}
}
可以在项目的主目录下执行sbt run命令来运行这个程序。如果你使用了IDE的话,也可以
从Scala IDE直接运行。最终的输出应该与下面的内容相似:...
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp...
140130 10:54:40 INFO spark.SparkContext: Job finished:
collect at
ScalaApp.scala:25, took 0.045181 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
可以看到,商店总共有4个客户的5次交易,总收入为39.91。最畅销的商品是iPhone Cover,共购买2次。1.5 Spark Java编程入门
Java API与Scala API本质上很相似。Scala代码可以很方便地调用Java代码,但某些Scala代
码却无法在Java里调用,特别是那些使用了隐式类型转换、默认参数和采用了某些Scala反射
机制的代码。
一般来说,这些特性在Scala程序中会被广泛使用。这就有必要另外为那些常见的类编写相应
的Java版本。由此,SparkContext有了对应的Java版本JavaSparkContext,而RDD则
对应JavaRDD。
1.8及之前版本的Java并不支持匿名函数,在函数式编程上也没有严格的语法规范。于是,套
用到Spark的Java API上的函数必须要实现一个带有call函数的WrappedFunction接口。
这会使得代码冗长,所以我们经常会创建临时类来传递给Spark操作。这些类会实现操作所需
的接口以及call函数,以取得和用Scala编写时相同的效果。
Spark提供对Java 8匿名函数(lambda)语法的支持。使用该语法能让Java 8书写的代码看上
去很像等效的Scala版。
用Scala编写时,键值对记录的RDD能支持一些特别的操作(比如reduceByKey和
saveAsSequenceFile)。这些操作可以通过隐式类型转换而自动被调用。用Java编写
时,则需要特别类型的JavaRDD来支持这些操作。它们包括用于键值对的
JavaPairRDD,以及用于数值记录的JavaDoubleRDD。
我们在这里只涉及标准的Java API语法。关于Java下支持的RDD以及
Java 8 lambda表达式支持的更多信息可参见《Spark编程指
南》:http:spark.apache.orgdocslatestprogramming-guide.htmlrdd-operations。
在后面的Java程序中,我们可以看到大部分差异。这些示例代码包含在本章示例代码的java-
spark-app目录下。该目录的data子目录下也包含上述CSV数据。
这里会使用Maven构建工具来编译和运行这个项目。我们假设读者已经在其系统上安装好了
该工具。
Maven的安装和配置并不在本书讨论范围内。通常它可通过Linux系统中的软件管理器或Mac OS X中的HomeBrew或MacPorts方便地安装。
详细的安装指南参见:http:maven.apache.orgdownload.cgi。
项目中包含一个名为JavaApp.java的Java源文件:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
用Java编写的一个简单的Spark应用
public class JavaApp {
public static void main(String[] args) {
正如在Scala项目中一样,我们首先需要初始化一个上下文对象。值得注意的是,这里所使用
的是JavaSparkContext类而不是之前的SparkContext。类似地,调
用JavaSparkContext对象,利用textFile函数来访问数据,然后将各行输入分割成多
个字段。请注意下面代码的高亮部分是如何使用匿名类来定义一个分割函数的。该函数确定
了如何对各行字符串进行分割。
JavaSparkContext sc = new JavaSparkContext(local[2], First
Spark App);
将CSV格式的原始数据转化为(user,product,price)格式的记录集 JavaRDD
sc.textFile(dataUserPurchaseHistory.csv)
.map(new Function
@Override
public String[] call(String s) throws Exception {
return s.split(,);
}
});
现在可以算一下用Scala时计算过的指标。这里有两点值得注意的地方,一是下面Java API中
有些函数(比如distinct和count)实际上和在Scala API中一样,二是我们定义了一个匿名类并将其传给map函数。匿名类的定义方式可参见代码的高亮部分。
求总购买次数
long numPurchases = data.count;
求有多少个不同客户购买过商品
long uniqueUsers = data.map(new Function
@Override
public String call(String[] strings) throws Exception {
return strings[0];
}
}).distinct.count;
求和得出总收入
double totalRevenue = data.map(new DoubleFunction
@Override
public Double call(String[] strings) throws Exception {
return Double.parseDouble(strings[2]);
}
}).sum;
下面的代码展现了如何求出最畅销的产品,其步骤与Scala示例的相同。多出的那些代码看似
复杂,但它们大多与Java中创建匿名函数有关,实际功能与用Scala时一样:
求最畅销的产品是哪个 首先用一个PairFunction和Tuple2类将数据映射成为(product,1)格式的 记录 然后,用一个Function2类来调用reduceByKey操作,该操作实际上是一个
求和函数
List
PairFunction
@Override
public Tuple2
throws Exception {
return new Tuple2(strings[1], 1);
}
}).reduceByKey(new Function2
@Override
public Integer call(Integer integer, Integer integer2)
throws Exception {
return integer + integer2;
}
}).collect; 最后对结果进行排序。注意,这里会需要创建一个Comparator函数来进行降 序排列 Collections.sort(pairs, new Comparator
@Override
public int compare(Tuple2
return -(o1._2 - o2._2);
}
});
String mostPopular = pairs.get(0)._1;
int purchases = pairs.get(0)._2;
System.out.println(Total purchases: + numPurchases);
System.out.println(Unique users: + uniqueUsers);
System.out.println(Total revenue: + totalRevenue);
System.out.println(String.format(Most popular product:
%s with %d purchases, mostPopular, purchases));
}
}
从前面代码可以看出,Java代码和Scala代码相比虽然多了通过内部类来声明变量和函数的引
用代码,但两者的基本结构类似。读者不妨分别练习这两种版本的代码,并比较一下计算同
一个指标时两种语言在表达上的异同。
该程序可以通过在项目主目录下执行如下命令运行:
>mvn exec:java -Dexec.mainClass=JavaApp
可以看到其输出和Scala版的很类似,而且计算结果完全一样:...
140130 17:02:43 INFO spark.SparkContext: Job finished:
collect at
JavaApp.java:46, took 0.039167 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases1.6 Spark Python编程入门
Spark的Py thon API几乎覆盖了所有Scala API所能提供的功能,但的确有些特性,比如Spark
Streaming和个别的API方法,暂不支持。具体可参见《Spark编程指南》的Py thon部
分:http:spark.apache.orgdocslatestprogramming-guide.html。
与上两节类似,这里将编写一个相同功能的Python版程序。我们假设读者系统中已安装2.6或
更高版本的Py thon(多数Linux系统和Mac OS X已预装Python)。
如下示例代码可以在本章的python-spark -app目录下找到。相应的CSV数据文件也在该目录的
data子目录中。项目代码在一个名为py thonapp.py的脚本里,其内容如下:
用Python编写的一个简单Spark应用
from pyspark import SparkContext
sc = SparkContext(local[2], First Spark App)
将CSV格式的原始数据转化为(user,product,price)格式的记录集
data = sc.textFile(dataUserPurchaseHistory.csv).map(lambda
line:
line.split(,)).map(lambda record: (record[0], record[1],record[2]))
求总购买次数 numPurchases = data.count
求有多少不同客户购买过商品 uniqueUsers = data.map(lambda record:
record[0]).distinct.count
求和得出总收入
totalRevenue = data.map(lambda record: float(record[2])).sum
求最畅销的产品是什么
products = data.map(lambda record: (record[1], 1.0)).
reduceByKey(lambda a, b: a + b).collect
mostPopular = sorted(products, key=lambda x: x[1],reverse=True)[0]
print Total purchases: %d % numPurchases
print Unique users: %d % uniqueUsers
print Total revenue: %2.2f % totalRevenue
print Most popular product: %s with %d purchases %
(mostPopular[0], mostPopular[1])
对比Scala版和Python版代码,不难发现语法大致相同。主要不同在于匿名函数的表达方式
上,匿名函数在Py thon语言中亦称lambda函数,lambda也是语法表达上的关键字。用Scala
编写时,一个将输入x映射为输出y的匿名函数表示为x => y,而在Python中则是lambda
x : y。在上面代码的高亮部分,我们定义了一个将两个输入映射为一个输出的匿名函数。
这两个输入的类型一般相同,这里调用的是相加函数,故写成lambda a, b : a + b。
运行该脚本的最好方法是在脚本目录下运行如下命令:>SPARK_HOMEbinspark-submit pythonapp.py
上述代码中的SPARK_HOME变量应该被替换为Spark的主目录,也就是在本章开始Spark预编
译包解压生成的那个目录。
脚本运行完的输出应该和运行Scala和Java版时的类似,其结果同样也是:...
140130 11:43:47 INFO SparkContext: Job finished: collect at
pythonapp.
py:14, took 0.050251 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases1.7 在Amazon EC2上运行Spark
Spark项目提供了在Amazon EC2上构建一个Spark集群所需的脚本,位于ec2文件夹下。输入
如下命令便可调用该文件夹下的spark -ec2脚本:
>.ec2spark-ec2
当不带参数直接运行上述代码时,终端会显示该命令的用法信息:
Usage: spark-ec2 [options]
master
Options:...
在创建一个Spark EC2集群前,我们需要一个Amazon账号。
如果没有Amazon Web Service账号,可以在http:aws.amazon.com注
册。
AWS的管理控制台地址是:http:aws.amazon.comconsole。
另外,我们还需要创建一个Amazon EC2密钥对和相关的安全凭证。Spark文档提到了在EC2
上部署时的需求。
你要先自己创建一个Amazon EC2密钥对。通过管理控制台登入你的Amazon Web
Services账号后,单击左边导航栏中的“Key Pairs”,然后创建并下载相应的私钥
文件。通过ssh远程访问EC2时,会需要提交该密钥。该密钥的系统访问权限必须
设定为600(即只有你可以读写该文件),否则会访问失败。
当需要使用spark -ec2脚本时,需要设置AWS_ACCESS_KEY_ID和
AWS_SECRET_ACCESS_ KEY两个环境变量。它们分别为你的Amazon EC2访问
密钥标识(key ID)和对应的密钥密码(secret access k ey)。这些信息可以从
AWS主页上依次点击“Account | Security Credentials | Access Credentials”获得。
创建一个密钥时,最好选取一个好记的名字来命名。这里假设密钥名为spark,对应的密钥文件的名称为spark.pem。如上面提到的,我们需要确认密钥的访问权限并设定好所需的环境变
量:
>chmod 600 spark.pem
>export AWS_ACCESS_KEY_ID=...
>export AWS_SECRET_ACCESS_KEY=...
上述下载所得的密钥文件只能下载一次(即在刚创建后),故对其既要安全保存又要避免丢
失。
注意,下一节中会启用一个Amazon EC2集群,这会在你的AWS账号下产生相应的费用。
启动一个EC2 Spark集群
现在我们可以启动一个小型Spark集群了。启动它只需进入到ec2目录,然后输入:
>cd ec2
>.spark-ec2 -k spark -i spark.pem -s 1 –-instance-type
m3.medium --hadoop-major-version 2 launch test-cluster
这将启动一个名为“test-cluster”的新集群,其包含“m3.medium”级别的主节点和从节点各一
个。该集群所用的Spark版本适配于Hadoop 2。我们使用的密钥名和密钥文件分别是spark和
spark .pem。
集群的完全启动和初始化会需要一些时间。在运行启动代码后,应该会立即看到如下图所示
的内容:
如果集群启动成功,最终应可在终端中看到类似如下的输出:要测试是否能连接到新集群,可以输入如下命令:
>ssh -i spark.pem root@ec2-54-227-127-14.compute-
1.amazonaws.com
注意该命令中root@后面的IP地址需要替换为你自己的Amazon EC2的公开域名。该域名可
在启动集群时的输出中找到。
另外也可以通过如下命令得到集群的公开域名:
>.spark-ec2 –i spark.pem get-master test-cluster
上述ssh命令执行成功后,你会连接到EC2上Spark集群的主节点,同时终端的输入应与如下
类似:
如果要测试集群是否已正确配置Spark环境,可以切换到Spark目录后运行一个示例程序:
>cd spark
>MASTER=local[2] .binrun-example SparkPi其输出应该与在自己电脑上的输出类似:...
140130 20:20:21 INFO SparkContext: Job finished: reduce at
SparkPi.scala:35, took 0.864044012 s
Pi is roughly 3.14032...
这样就有了包含多个节点的真实集群,可以测试集群模式下的Spark了。我们会在一个从节点
的集群上运行相同的示例。运行命令和上面相同,但用主节点的URL作为MASTER的值:
>MASTER=spark:ec2-54-227-127-14.compute-1.amazonaws.com:7077
.binrun-example SparkPi
注意,你需要将上面代码中的公开域名替换为你自己的。
同样,命令的输出应该和本地运行时的类似。不同的是,这里会有日志消息提示你的驱动程
序已连接到Spark集群的主节点。...
140130 20:26:17 INFO client.ClientClientActor: Connecting to
master spark:ec2-54-220-189-136.eu-west-
1.compute.amazonaws.com:7077
140130 20:26:17 INFO cluster.SparkDeploySchedulerBackend:
Connected to Spark cluster with app ID app-20140130202617-0001
140130 20:26:17 INFO client.ClientClientActor: Executor
added: app- 20140130202617-00010 on worker-20140130201049-ip-
10-34-137-45.eu-west-1.compute.internal-57119 (ip-10-34-137-
45.eu-west-1.compute.internal:57119) with 1 cores
140130 20:26:17 INFO cluster.SparkDeploySchedulerBackend:
Granted executor ID app-20140130202617-00010 on hostPort ip-
10-34-137-45.eu- west-1.compute.internal:57119 with 1 cores,2.4 GB RAM
140130 20:26:17 INFO client.ClientClientActor: Executor updated: app- 20140130202617-00010 is now RUNNING
140130 20:26:18 INFO spark.SparkContext: Starting job: reduce
at SparkPi.scala:39...
读者不妨在集群上自由练习,熟悉一下Scala的交互式终端:
>.binspark-shell --master spark:ec2-54-227-127-14.compute-
1.amazonaws.com:7077
练习完后,输入exit便可退出终端。另外也可以通过如下命令来体验PySpark终端:
>.binpyspark --master spark:ec2-54-227-127-14.compute-
1.amazonaws.com:7077
通过Spark主节点网页界面,可以看到主节点下注册了哪些应用。该界面位于ec2-54-227-127-
14.compute-1.amazonaws.com:8080(同样,需要将公开域名替换为你自己的)。你应该可以
看到类似下面截图的界面,显示了之前运行过的一个程序以及两个已启动的终端任务。
值得注意的是,Amazon会根据集群的使用情况收取费用。所以在集群使用完毕后,记得停止或终止这个测试集群。要终止该集群可以先在你本地系统的ssh会话里输入exit,然后再
输入如下命令:
>.ec2spark-ec2 -k spark -i spark.pem destroy test-cluster
应该可以看到这样的输出:
Are you sure you want to destroy the cluster test-cluster?
The following ninstances will be terminated:
Searching for existing cluster test-cluster...
Found 1 master(s), 1 slaves
> ec2-54-227-127-14.compute-1.amazonaws.com
> ec2-54-91-61-225.compute-1.amazonaws.com
ALL DATA ON ALL NODES WILL BE LOST!!
Destroy cluster test-cluster (yN): y
Searching for existing cluster test-cluster...
Terminating master...
Terminating slaves...
输入y,然后回车便可终止该集群。
恭喜!现在你已经做到了在云端设置Spark集群,并在它上面运行了一个完全并发的示例程
序,最后也终止了这个集群。如果在学习后续章节时你想在集群上运行示例或你自己的程
序,都可以再次使用这些脚本并指定想要的集群规模和配置。(留意下费用并记得使用完毕
后关闭它们就行。)1.8 小结
本章我们谈到了如何在自己的电脑以及Amazon EC2的云端上配置Spark环境。通过Scala交互
式终端,我们学习了Spark编程模型的基础知识并了解了它的API。另外我们还分别用Scala、Java和Py thon语言,编写了一个简单的Spark程序。
下一章,我们将考虑如何使用Spark来创建一个机器学习系统。
第2章 设计机器学习系统
本章,我们将为一个智能分布式机器学习系统设计高层架构,该系统以Spark作为其核心计算
引擎。这里我们将会关注如何对现有的基于网页的业务进行重新设计,以令其能利用自动化
机器学习系统来增强业务中的关键部分。本章的主要内容有:
介绍假想的业务场景
概述现有架构
探寻用机器学习系统来增强或是替代某些业务功能的可能途径
根据上述内容,提出新的架构
现代的大数据场景包含如下需求。
必须能与系统的其他组件整合,尤其是数据的收集和存储系统、分析和报告以及
前端应用。
易于扩展且与其他组件相对独立。理想情况下,同时具备良好的水平和垂直可扩
展性。
支持高效完成所需类型的计算,即机器学习和迭代式分析应用。
最好能同时支持批处理和实时处理。
Spark作为一个框架本身能满足上述需求。然而我们还需确保基于它设计的机器学习系统也能
满足这些需求。若算法的实现存在能引发系统故障的瓶颈,比如不再能满足上述某些需求,那该实现就没多大意义。2.1 MovieStream介绍
为便于说明我们的架构设计,这里假设存在一个贴近现实的情景。假设我们受命领导
MovieStream数据科学团队。MovieStream是一家假想的互联网公司,为用户提供在线电影和
电视节目的内容服务。
MovieStream成长迅速,其用户量和收录的电影都在快速增加。MovieStream现有系统可概括
为图2-1:
图2-1 MovieStream现有系统架构
如图所示,向用户推荐哪些电影和节目以及在站点的何处显示,都由MovieStream内容编辑
团队负责。该团队还负责MovieStream的群发营销,包括电子邮件和其他直销渠道。现阶
段,MovieStream以汇总的方式来收集用户的电影浏览记录,并能访问一些用户注册时所填
写的资料。此外,他们还能访问其所收录的电影的一些基本元数据。随着业务快速发展,新发布的电影和用户的活动不断增加,MovieStream团队愈发难以跟上
这样的趋势。MovieStream的CEO之前对大数据、机器学习和人工智能有过较多了解。他希
望我们能为MovieStream创建一个机器学习系统,以处理现在由内容团队人工处理的许多内
容。2.2 机器学习系统商业用例
我们该问的第一个问题或许是:为什么要使用机器学习?为何不直接仍以人工方式来支持
MovieStream?使用机器学习的理由有很多(不使用的理由同样也有很多),其中最为重要
的几点有:
涉及的数据规模意味着完全依靠人工处理会很快跟不上MovieStream的发展;
机器学习和统计模型等基于模型的方式能发现人类(因数据集量级和复杂度过
高)难以发现的模式;
基于模型的方式能避免个人或是情感上的偏见(只要应用时足够细心且正确)。
然而,没有任何理由说基于模型和基于人工的处理和决策不能并存。比如,许多机器学习系
统依赖已标记的数据来训练模型。通常来说,标记数据代价高昂、耗时且需人工参与。文本
数据分类和文本的情感标识便是很好的例子。许多现实中的系统会采取某种人力机制来为数
据生成标识,并用于训练模型。之后,这些模型则部署到在线系统中用于大规模环境下的预
测。
在MovieStream的案例中,我们并不需要担心机器学习的引入会使得内容团队多余。事实
上,我们的目标是让机器学习来负担那些耗时且机器擅长的任务,并向内容团队提供工具以
帮助他们更好地理解用户和内容。比如,帮助他们确定向电影库中新增哪些电影(新增电影
代价高昂,因而对业务至关重要)。
2.2.1 个性化
对MovieStream的业务来说,个性化或许是机器学习最为重要的潜在应用。一般来说,个性
化是根据各种因素来改变用户体验和呈现给用户内容。这些因素可能包括用户的行为数据和
外部因素。
推荐(recommendation)从根本上说是个性化的一种,常指向用户呈现一个他们可能感兴趣
的物品列表。推荐可用于网页(比如推荐相关产品)、电子邮件、其他直销渠道或移动应用
等。
个性化和推荐十分相似,但推荐通常专指向用户显式地呈现某些产品或是内容,而个性化有
时也偏向隐式。比如说,对MovieStream的搜索功能个性化,以根据该用户的数据来改变搜
索结果。这些数据可能包括基于推荐的数据(在搜索产品或内容时),或基于地理位置和搜
索历史等各种数据。用户可能不会明显感觉到搜索结果的变化,这就是个性化更偏向隐性的
原因。
2.2.2 目标营销和客户细分
目标营销用与推荐类似的方法从用户群中找出要营销的对象。一般来说,推荐和个性化的应
用场景都是一对一,而客户细分则试图将用户分成不同的组。其分组根据用户的特征进行,并可能参考行为数据。这种方法可能比较简单,也可能使用了某种机器学习模型,比如聚类。但无论如何,其结果都是对市场的若干细分。这些细分或许有助于理解各组用户的共
性、同组用户之间的相似性,以及不同组之间的差异。
这些将能帮助MovieStream理解用户行为背后的动机。相比个性化时的一对一营销,它们甚
至还能有助于制定针对用户群的更为广泛的营销策略。
当没有已标记数据时,这些方法能帮助制定营销策略,而非采取一刀切的方法。
2.2.3 预测建模与分析
第三种机器学习的应用领域是预测性分析。这个词的范围很宽泛,甚至从某种意义上说还覆
盖推荐、个性化和目标营销。再考虑到推荐和市场细分有所区别,这里用预测建模
(predictive modeling)来表示其他做预测的模型。借助活动记录、收入数据以及内容属性,MovieStream可以创建一个回归模型(regression model)来预测新电影的市场表现。
另外,我们也可使用分类模型(classificaiton model)来对只有部分数据的新电影自动分配
标签、关键字或分类。2.3 机器学习模型的种类
以上MovieSteam的例子列出了机器学习的一些应用场景,但这些并非全部。后面几章在介绍
不同机器学习任务时还会提到一些相关例子。
以上应用案例和方法大致可分为如下两种。
监督学习(supervised learning):这种方法使用已标记数据来学习。推荐引
擎、回归和分类便是例子。它们所使用的标记数据可以是用户对电影的评级(对
推荐来说)、电影标签(对上述分类例子来说)或是收入数字(对回归预测来
说)。我们将在第4章、第5章和第6章讨论监督学习。
无监督学习(unsupervised learning):一些模型的学习过程不需要标记数据,我们称其为无监督学习。这类模型试图学习或是提取数据背后的结构或从中抽取
最为重要的特征。聚类、降维和文本处理的某些特征提取都是无监督学习。我们
将在第7章、第8章和第9章分别介绍它们。2.4 数据驱动的机器学习系统的组成
从高层设计来看,我们的机器学习系统的组成如图2-2所示,其中展示了机器学习的流程。该
流程始于从数据存储处获取数据,之后将其转换为可用于机器学习模型的形式。随后的环节
有对模型的训练、测试和完善,以及将最终的模型部署到生产系统中。有新数据产生时则重
复该流程。
图2-2 常见的一种机器学习流程
2.4.1 数据获取与存储
机器学习流程的第一步是获取训练模型所需的数据。与其他公司类似,MovieStream的数据
通常来自用户活动、其他系统(通常称作机器生成的数据)和外部数据源(比如某个用户访
问站点的时间和当时的天气)。
获取这些数据的途径很多,比如收集浏览器里用户的活动记录、移动应用的事件日志或通过
外部网络API来获取地理或天气信息。
获取数据后通常需将其存储起来。要存储的数据包括:原始数据、即时处理后的数据,以及
可用于生产系统的最终建模结果。
数据存储并不简单,可能涉及多种系统。文件系统,如HDFS、Amazon S3等;SQL数据库,如MySQL或PostgreSQL;分布式NoSQL数据存储,如HBase、Cassandra和Dy namoDB;搜
索引擎,如Solr和Elasticsearch;流数据系统,如Kafk a、Flume和Amazon Kinesis。
本书假设已获取相关数据,这样我们能专注在流程后续的处理和建模环节。
2.4.2 数据清理与转换
大部分机器学习模型所处理的都是特征(feature)。特征通常是输入变量所对应的可用于模
型的数值表示。
虽然我们希望能将大部分时间用于机器学习模型探索,但通常经上述途径获取到的数据都是原始形式,需要进一步处理。比如我们记录的一些用户事件的细节,比如用户查看某部电影
页面的时间、观看某部电影的时间或给出某些反馈的时间。我们还可能收集了一些外部信
息,比如用户的位置(通过他们的IP查到)。这些时间日志通常由一些文字或数值信息组合
而成。
绝大部分情况下,这些原始数据都需要经过预处理才能为模型所使用。预处理的情况可能包
括以下几种。
数据过滤:比如我们想从原始数据的部分数据中创建一个模型,而所需数据只
是最近几月的活动数据或是满足特定条件的事件数据。
处理数据缺失、不完整或有缺陷:许多现实中的数据集都存在某种程度上的
不完整。这可能包括数据缺失(比如用户没有输入),数据存在错误或是缺陷
(比如数据收集或存储时的错误,又或是技术问题或漏洞,以及软硬件故障)。
可能要过滤掉非规整数据,或通过某种方式来填充缺失的数据点(比如选取数据
集的平均值来作为缺失点的值)。
处理可能的异常、错误和异常值:错误或异常的数据可能不利于模型的训
练,所以需要过滤掉,或是通过某些方法来处理。
合并多个数据源:比如可能要将各个用户的事件数据与不同的内部数据或是外
部数据合并。内部数据如用户属性;外部数据如地理位置、天气和经济数据。
数据汇总:某些模型需要输入的数据进行过某种汇总,比如统计各用户经历过
的事件类型的总数目。
对数据进行初步预处理后,需要将其转换为一种适合机器学习模型的表示形式。对许多模型
类型来说,这种表示就是包含数值数据的向量或矩阵。数据转换和特征提取时常见的挑战包
括以下这些情况。
将类别数据(比如地理位置所在的国家或是电影的类别)编码为对应的数值表
示。
从文本数据提取有用信息。
处理图像或是音频数据。
数值数据常被转换为类别数据以减少某个变量的可能值的数目。例如将年龄分为
几个段(比如25~35、45~55等)。
对数值特征进行转换。比如对数值变量应用对数转换,这会有助于处理值域很大
的变量。
对特征进行正则化、标准化,以保证同一模型的不同输入变量的值域相同。
特征工程是对现有变量进行组合或转换以生成新特征的过程。例如从其他数据求
平均数,像求某个用户看电影的平均时间。
这些方法都会在本书的例子中讲到。
这些数据清理、探索、聚合和转换步骤,都能通过Spark核心API、SparkSQL引擎和其他外部
Scala、Java或Py thon包做到。借助Spark的Hadoop功能还能实现上述多种存储系统上的读
写。2.4.3 模型训练与测试回路
当数据已转换为可用于模型的形式,便可开始模型的训练和测试。在这个部分,我们主要关
注模型选择(model selection)问题。这可以归结为对特定任务最优建模方法的选择,或是
对特定模型最佳参数的选择问题。在许多情况下,我们会想尝试多种模型并选出表现最好的
那个(各模型都采用了最佳的参数时)。因而,这个词在现实中经常同时指代这两个过程。
在这个阶段,探索多个模型组合(也称集成学习法,ensemble method)的效果也很常见。
在训练数据集上运行模型并在测试数据集(即为评估模型而预留的数据,在训练阶段模型没
接触过该数据)上测试其效果,这个过程一般相对直接,被称作交叉验证(cross-
validation)。
然而我们所处理的通常是大型数据集。这样,先在具有代表性的小样本数据集上进行初步的
训练-测试回路,或是尽可能并行地选择模型,都会有所帮助。
Spark内置的机器学习库MLlib完全能胜任这个阶段的需求。本书将主要关注如何借助MLlib和
Spark核心功能来实现对各种机器学习方法的模型训练、评估以及交叉验证。
2.4.4 模型部署与整合
通过训练测试循环找出最佳模型后,要让它能得出可付诸实践的预测,还需将其部署到生产
系统中。
这个过程一般要将已训练的模型导入特定的数据存储中。该位置也是生产系统获取新版本的
地方。通过这种方式,实时服务系统能在训练新模型时进行周期性的更新。
2.4.5 模型监控与反馈
监控机器学习系统在生产环境下的表现十分重要。在部署了最优训练的模型后,我们会想知
道其在实际中的表现如何:它在新的未知数据上的表现是否符合预期?其准确度怎么样?毕
竟不管之前的模型选择和优化做得如何,检验其实际表现的唯一方法是观察其在生产环境下
的表现。
同样值得注意的是,模型准确度和预测效果只是现实中系统表现的一部分。通常还应该关注
其他业务效果(比如收入和利润率)或用户体验(比如站点使用时间和用户总体活跃度)的
相关指标。多数情况下很难将它们与模型预测能力直接关联。推荐系统或目标营销系统的准
确度可能很重要,但它只与我们真正关心的那些指标(如用户体验度、活跃度以及最终收
入)间接相关。
所以,现实中应该同时监控模型准确度相关指标和业务指标。我们可以尽可能在生产系统中
部署不同的模型,通过调整它们而优化业务指标。实践中,这通常通过在线分割测试(live
split test)进行。然而,做好这类测试并不容易。在线测试和实验可能引发错误,也可能效果
不好,或者会使用基准模型,这些都会给用户体验和收入带来负面影响,故其代价高昂。本阶段另一个重要的方面是模型反馈(model feedback),指通过用户的行为来对模型的预
测进行反馈的过程。在现实系统中,模型的应用将影响用户的决策和潜在行为,从而反过来
将从根本上改变模型自己将来的训练数据。
举例来说,假设我们部署了一个推荐系统。由于推荐实际上限制了用户的可选项,从而影响
了用户的选择。我们希望用户的选择不会受模型的影响,然而这种反馈回路会反过来影响模
型的训练数据,并最终对模型准确度和重要的业务指标产生不利影响。
好在我们可以借助一些机制来降低反馈回路的这种负面影响,比如提供一些无偏见的训练数
据。这类数据来自那些没有被推荐的用户,又或者在一开始就考虑到这种平衡需求而划分出
来的客户。这些机制有助于对数据的理解、探索以及利用已有的经验来提升系统的表现。
第10章将会简要介绍实时监控和模型更新的部分内容。
2.4.6 批处理或实时方案的选择
前几节简要概括了常见的批处理方法。在这类方法下,模型用所有数据或一部分数据进行周
期性的重新训练。由于上述流程会花费一定的时间,这就使得批处理方法难以在新数据到达
时立即完成模型的更新。
虽然本书将主要讨论批处理机器学习方法,但的确存在一类名为在线学习(online
learning)的机器学习方法。它们在新数据到达时便能立即更新模型,从而使实时系统成为可
能。常见的例子有对线性模型的在线优化算法,如随机梯度下降法。我们可以通过例子来学
习该算法。这类方法的优势在于其系统将能对新的信息和底层行为(即输入数据的特征或是
分布会随时间变化,现实中的绝大部分情况都会如此)作出快速的反应和调整。
但在实际生产环境中,在线学习模型也会面对特有的挑战。比如,对数据的获取和转换难以
做到实时。在一个纯在线环境下选择适当的模型也不简单。在线训练和模型选择以及部署阶
段的延时可能难以达到实时性的需求(比如在线广告对延时的需求是以毫秒计)。最后,批
处理框架不适合对本质为流的数据进行实时处理。
幸运的是,Spark提供了实时流处理组件Spark Streaming,对实时机器学习任务来说是个不错
的选择。第10章将探讨Spark Streaming和在线学习问题。
现实中的实时机器学习系统具有天生的复杂性,故实践中大部分的系统都以近实时性为设计
目标。这是一种混合方法,它并不要求模型一定在数据到达时立即更新。相反,新的数据会
被收集为小批量的训练数据,再输入给在线学习算法。大部分情况下,该方法会周期性地进
行某种批处理。处理的内容可能包括在整个数据集上重新计算模型,或是更为复杂的某些数
据处理以及模型的选择。这些能保证实时模型的表现不会随时间推移而变差。
另一种类似的方法是,在周期性批处理中进行重新计算时,若有新的数据到来则只对更复杂
的模型进行近似更新。这样模型可从新的数据学习,但有短暂延迟。因为是近似更新,所以
模型的准确度会随着时间推移而下降。但周期性地在所有数据上重新计算模型能弥补这一
点。2.5 机器学习系统架构
现在我们已经了解了如何在MovieStream的情景中应用机器学习系统,其可能的架构可概括
为图2-3所示:图2-3 MovieStream的未来架构
如图所示,该系统包含了早先机器学习流程示意图的内容,此外还包括:
收集与用户、用户行为和电影标题有关的数据;
将这些数据转为特征;
模型训练,包括训练-测试和模型选择环节;
将已训练模型部署到在线服务系统,并用于离线处理;
通过推荐和目标页面将模型结果反馈到MovieStream站点;
将模型结果返回到MovieStream的个性化营销渠道;
使用离线模型来为MovieSteam的各个团队提供工具,以帮助其理解用户的行为、内容目录的特点和业务收入的驱动因素。
动手练习
假设你现在要告知前端和基础设施工程团队你的机器学习系统需要哪些数据。想一想如何简
要告诉他们该如何设计数据收集过程。画出原始数据(比如网页日志、时间日志等)可能的结构,以及它们在系统中的流向。需要考虑的方面有:
需要哪些数据源
数据格式应该如何
数据收集、处理、可能进行的汇总以及存储的频率
使用何种存储以保证可扩展性2.6 小结
本章,你学到了数据驱动的自动化机器学习系统由哪些部分构成。我们同样也描述了一个真
实系统的可能架构。
下一章,我们将讨论如何获取公开数据集以用于常见的机器学习任务,了解数据处理、清理
和转换环节的一些基本概念。经过这些环节后,数据便可以用于训练机器学习模型了。
第3章 Spark上数据的获取、处理与准备
机器学习是一个极为广泛的领域,其应用范围已包括Web和移动应用、物联网、传感网络、金融服务、医疗健康和其他科研领域,而这些还只是其中一小部分。
由此,可用于机器学习的数据来源也极为广泛。本书将重点关注其在商业领域的应用。这类
领域中可用的数据通常由组织的内部数据(比如金融公司的交易数据)以及外部数据(比如
该金融公司下的金融资产价格数据)构成。
以第2章假想的互联网公司MovieStream为例,其主要的内部数据包括网站提供的电影数据、用户的服务信息数据以及行为数据。这些数据涉及电影和相关内容(比如标题、分类、图
片、演员和导演)、用户信息(比如用户属性、位置和其他信息)以及用户活动数据(比如
浏览数、预览的标题和次数、评级、评论,以及如赞、分享之类的社交数据,还有包括像
Facebook和Twitter之类的社交网络属性)。
其外部数据来源则可能包括天气和地理定位信息,以及如IMDB和Rotten Tomators之类的第三
方电影评级与评论信息等。
一般来说,获取实际的公司或机构的内部数据十分困难,因为这些信息很敏感(尤其是购买
记录、用户或客户行为以及公司财务),也关系组织的潜在利益。这也是对这类数据应用机
器学习建模的实用之处:一个预测精准的好模型有着极高的商业价值(Netflix Prize和Kaggle
上机器学习比赛的成功就是很好的见证)。
本书将使用可以公开访问的数据来讲解数据处理和机器学习模型训练的相关概念。
本章内容包括:
简要概述机器学习中用到的数据类型;
举例说明从何处获取感兴趣的数据集(通常可从因特网上获取),其中一些会用
于阐述本书所涉及模型的应用;
了解数据的处理、清理、探索和可视化方法;
介绍将原始数据转换为可用于机器学习算法特征的各种技术;
学习如何使用外部库或Spark内置函数来正则化输入特征。3.1 获取公开数据集
商业敏感数据虽然难以获取,但好在仍有相当多有用数据可公开访问。它们中的不少常用来
作为特定机器学习问题的基准测试数据。常见的有以下几个。
UCL机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http:archive.ics.uci.eduml。
Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。
这些数据集包括人类基因组项目、Common Crawl网页语料库、维基百科数据和
Google Book s Ngrams。相关信息可参见:http:aws.amazon.compublicdatasets。
Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分
类、回归、排名、推荐系统以及图像分析领域,可从Competitions区域下
载:http:www.kaggle.comcompetitions。
KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该
列表位于:http:www.kdnuggets.comdatasetsindex.html。
针对特定的应用领域与机器学习任务,仍有许多其他公开数据集。希
望你自己也会接触到一些有趣的学术或是商业数据。
为说明Spark下的数据处理、转换和特征提取相关的概念,需要下载一个电影推荐方面的常用
数据集MovieLens。它能应用于推荐系统和其他可能的机器学习任务,适合作为示例数据
集。
Spark的机器学习库MLlib一直在紧锣密鼓地开发。但和Spark的核心不
同,其全局API和设计的进度尚未完全稳定。
Spark 1.2.0引入了一个实验性质的新MLlib API,位于ml包下(现有的接口则位于mllib
包下)。新API旨在加强原有的API和接口的设计,从而更容易衔接数据流程的各个环
节。这些环节包括特征提取、正则化、数据集转化、模型训练和交叉验证。
新API仍处于实现阶段,在后续的版本中可能会出现重大的变更。因此,后续的章节将
只关注相对更成熟的现有MLlib API。随着版本的更新,本书所提到的各种特征提取方
法和模型将会简单地桥接到新API中。但新API的核心思路和大部分底层代码仍会保持原样。
MovieLens 100k数据集
MovieLens 100k数据集包含表示多个用户对多部电影的10万次评级数据,也包含电影元数据
和用户属性信息。该数据集不大,方便下载和用Spark程序快速处理,故适合做讲解示例。
可从http:files.grouplens.orgdatasetsmovielensml-100k .zip下载这个数据集。
下载后 ,可在终端将其解压:
>unzip ml-100k.zip
inflating: ml-100kallbut.pl
inflating: ml-100kmku.sh
inflating: ml-100kREADME...
inflating: ml-100kub.base
inflating: ml-100kub.test
这会创建一个名为ml-100k的文件夹。下面变更当前目录到该目录然后查看其内容。其中重要
的文件有u.user(用户属性文件)、u.item(电影元数据)和u.data(用户对电影的评级)。
>cd ml-100k
关于数据集的更多信息可以从README获得,包括每个数据文件里的变量定义。我们可以使
用head命令来查看各个文件中的内容。
比如说,可以看到u.user文件包含user.id、age、gender、occupation和ZIP code
这些属性,各属性之间用管道符(|)分隔。
>head -5 u.user
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213
u.item文件则包含movie id、title、release date以及若干与IMDB link和电影分
类相关的属性。各个属性之间也用|符号分隔:>head -5 u.item
1|Toy Story (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?Toy%20
Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
2|GoldenEye (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
3|Four Rooms (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?
Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
4|Get Shorty (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?
Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
5|Copycat (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
最后,u.data文件包含user id、movie id、rating(从1到5)和timestamp属性,各
属性间用制表符(\t)分隔。
>head -5 u.data
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 8863975963.2 探索与可视化数据
有数据后,用启动Spark交互式终端来探索该数据吧!本节将通过IPy thon交互式终端和
matplotlib库来对数据进行处理和可视化,故我们会用到Py thon和PySparkshell。
IPy thon是针对Py thon的一个高级交互式壳程序,包含内置一系列实用
功能的pylab,其中有NumPy和SciPy用于数值计算,以及matplotlib用于交互式绘图和可
视化。
建议使用最新版的IPython(本书写作时为2.3.1)。IPython的安装方法可参考如下指
引:http:ipy thon.orginstall.html。如果这是你第一次使用IPython,这里有一个教
程:http:ipy thon.orgipy thon-docstableinteractivetutorial.html。
运行本章代码需要之前提到的所有软件包。它们的安装指南可从源代码包中找到。如果你刚
开始使用Py thon且不熟悉这些包的安装过程,我们强烈推荐你使用一个预编译的科学Py thon
套件,比如Anaconda(http:continuum.iodownloads)或
Enthougt(https:store.enthought.comdownloads)。这些套件极大简化了安装过程且包含运行
本章代码所需的一切。
PySpark支持运行Python时可指定的参数。在启动PySpark终端时,我们可以使用IPy thon而非
标准的Py thon shell。启动时也可以向IPy thon传入其他参数,包括让它在启动时也启用py lab
功能。
可以在Spark主目录下运行如下命令来实现上述需求:
>IPYTHON=1 IPYTHON_OPTS=--pylab .binpyspark
可以看到PySpark终端会启动,其输出和下面类似:图3-1 IPython下的PySpark的终端界面
终端里的IPython 2.3.1 -- An enhanced Interactive
Python和Using matplotlib backend: MacOSX输出行表示IPython和pylab均
已被PySpark启用。实际使用的操作系统和软件版本的不同,实际的输出可能会有所不同。
现在IPython终端已启动,我们可以探索MovieLens数据集并做些基本分析。
在本章的学习过程中,你可以将样本代码输入到IPy thon终端,也可通
过IPython提供的Notebook 应用来完成。后者支持支持HTML显示,且在IPy thon终端的
基础上提供了一些增强功能,如即时绘图、HTML标记,以及独立运行代码片段的功
能。
本章的图片使用IPy thon Notebook生成。它们的样式可能会和你看到的不同,但只要内
容上一致就没关系。如果愿意,你也可以使用Notebook来运行本章的代码。本章除提供
Py thon代码外,还提供相应的IPython Notebook版本,以供你导入到IPython Notebook
中。
IPython Notebook的使用指南可参见:http:ipy thon.orgipython-
docstableinteractivenotebook .html。
3.2.1 探索用户数据
首先来分析MovieLens用户的特征。在你的终端里输入如下代码(其中的PATH是指用unzip
命令来解压MovieLens 100k数据集时所生成的主目录):
user_data = sc.textFile(PATHml-100ku.user)
user_data.first
其输出应该与下面类似:
u'1|24|M|technician|85711'
这是用户数据文件的首行。从中可以看到,它是由“|”字符分隔。
first函数与collect函数类似,但前者只向驱动程序返回RDD的首个元素。我们也可以使用take(k)函数来只返回RDD的前k个元素到驱动程序。
下面用“|”字符来分隔各行数据。这将生成一个RDD,其中每一个记录对应一个Py thon 列
表,各列表由用户ID(user ID)、年龄(age)、性别(gender)、职业(occupation)和邮
编(ZIP code)五个属性构成。
之后再统计用户、性别、职业和邮编的数目。这可通过如下代码实现。该数据集不大,故这
里并未缓存它。
user_fields = user_data.map(lambda line: line.split(|))
num_users = user_fields.map(lambda fields: fields[0]).count
num_genders = user_fields.map(lambda fields:
fields[2]).distinct.count
num_occupations = user_fields.map(lambda fields:
fields[3]).distinct.count
num_zipcodes = user_fields.map(lambda fields:
fields[4]).distinct.count
print Users: %d, genders: %d, occupations: %d, ZIP codes: %d
% (num_users, num_genders, num_occupations, num_zipcodes)
对应输出如下:
Users: 943, genders: 2, occupations: 21, ZIP codes: 795
接着用matplotlib的hist函数来创建一个直方图,以分析用户年龄的分布情况:
ages = user_fields.map(lambda x: int(x[1])).collect
hist(ages, bins=20, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16, 10)
这里hist函数的输入参数有ages数组、直方图的bins数目(即区间数,这里为20)。同
时还使用了normed=True参数来正则化直方图,即让每个方条表示年龄在该区间内的数据
量占总数据量的比。
你将能看到图3-2所示的直方图。从中可以看出MovieLens的用户偏年轻。大量用户处于15岁
到35岁之间。图3-2 用户的年龄段分布
若想了解用户的职业分布情况,可以用如下的代码来实现。首先利用之前用到的MapReduce
方法来计算数据集中各种职业的出现次数,然后matplotlib下的bar函数来绘制一个不同
职业的数量的条形图。
数据中对职业的描述用的是文本,所以需要对其稍作处理以便bar函数使用:
count_by_occupation = user_fields.map(lambda fields:
(fields[3], 1)).
reduceByKey(lambda x, y: x + y).collect
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
在得到各职业所占数量的RDD后,需将其转为两个数组才能用来做条形图。它们分别对应x
轴(职业标签)与y轴(数量)。collect函数返回数量数据时并不排序。我们需要对该数
据进行排序,从而在条形图中以从少到多的顺序来显示各个职业。
为此可先创建两个numpy数组。之后调用numpy的argsort函数来以数量升序从各数组中
选取元素。注意这里会对x轴和y轴的数组都以y轴值排序(即以数量排序):
x_axis = x_axis1[np.argsort(x_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]有了条形图两轴所需的数据后便可创建条形图。创建时,会以职业作为x轴上的分类标签,以数量作为y轴的值。下面的代码也增加了如plt.xticks(rotation=30)之类的代码来
美化条形图。
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes
ax.set_xticks(pos + (width 2))
ax.set_xticklabels(x_axis)
plt.bar(pos, y_axis, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16, 10)
生成的图形应该和图3-3类似。从中可看出,数量最多的职业是student、other、educator、administrator、engineer和programmer。
图3-3 用户的职业分布
Spark对RDD提供了一个名为countByValue的便捷函数。它会计算RDD里各不同值所分别
出现的次数,并将其以Python dict函数的形式(或是Scala、Java下的Map函数)返回给驱
动程序:
count_by_occupation2 = user_fields.map(lambda fields:
fields[3]).countByValue
print Map-reduce approach:
print dict(count_by_occupation2)
print countByValue approach:print dict(count_by_occupation)
可以看到,上述两种方式的结果相同。
3.2.2 探索电影数据
接下来了解下电影分类数据的特征。如之前那样,我们可以先简单看一下某行记录,然后再
统计电影总数。
movie_data = sc.textFile(PATHml-100ku.item)
print movie_data.first
num_movies = movie_data.count
print Movies: %d % num_movies
其终端上的输出如下:
1|Toy Story (1995)|01-Jan-1995||http:us.imdb.comMtitle-
exact?
Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
Movies: 1682
绘制电影年龄的分布图的方法和之前对用户年龄和职业分布的处理类似。电影年龄即其发行
年份相对于现在过了多少年(在本数据中现在是1998年)。
从下面的代码可以看到,电影数据中有些数据不规整,故需要一个函数来处理解析release
date时可能的解析错误。这里命名该函数为convert_year:
def convert_year(x):
try:
return int(x[-4:])
except:
return 1900 若数据缺失年份则将其年份设为1900。在后续处理中会过滤掉这
类数据
有了以上函数来解析发行年份后,便可在调用电影数据进行map转换时应用该函数,并取回
其结果:
movie_fields = movie_data.map(lambda lines: lines.split(|))
years = movie_fields.map(lambda fields: fields[2]).map(lambda
x: convert_year(x))解析出错的数据的年份已设为1900。要过滤掉这些数据可以使用Spark的filter转换操
作:
years_filtered = years.filter(lambda x: x != 1900)
现实的数据经常会有不规整的情况,对其解析时就需要进一步的处理。上面便是一个很好的
例子。事实上,这也表明了数据探索的重要性所在,即它有助于发现数据在完整性和质量上
的问题。
过滤掉问题数据后,我们用当前年份减去发行年份,从而将电影发行年份列表转换为电影年
龄。接着用countByValue来计算不同年龄电影的数目。最后绘制电影年龄直方图(同样
会使用hist函数,且其values变量的值来自countByValue的结果,主键则为bins变
量):
movie_ages = years_filtered.map(lambda yr: 1998-
yr).countByValue
values = movie_ages.values
bins = movie_ages.keys
hist(values, bins=bins, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16,10)
你会看到如图3-4这样的结果。它表明大部分电影发行于1998年的前几年。图3-4 电影的年龄分布
3.2.3 探索评级数据
现在来看一下评级数据:
rating_data = sc.textFile(PATHml-100ku.data)
print rating_data.first
num_ratings = rating_data.count
print Ratings: %d % num_ratings
这些代码的输出为:
196 242 3 881250949
Ratings: 100000
可以看到评级次数共有10万。另外和用户数据与电影数据不同,评级记录用“\t”分隔。你可能也已想到,我们会想做些基本的统计,以及绘制评级值分布的直方图。动手吧:
rating_data = rating_data_raw.map(lambda line:
line.split(\t))
ratings = rating_data.map(lambda fields: int(fields[2]))
max_rating = ratings.reduce(lambda x, y: max(x, y))
min_rating = ratings.reduce(lambda x, y: min(x, y))
mean_rating = ratings.reduce(lambda x, y: x + y) num_ratings
median_rating = np.median(ratings.collect)
ratings_per_user = num_ratings num_users
ratings_per_movie = num_ratings num_movies
print Min rating: %d % min_rating
print Max rating: %d % max_rating
print Average rating: %2.2f % mean_rating
print Median rating: %d % median_rating
print Average of ratings per user: %2.2f % ratings_per_user
print Average of ratings per movie: %2.2f %
ratings_per_movie
在终端执行以上命令后,输出应该与下面类似:
Min rating: 1
Max rating: 5
Average rating: 3.53
Median rating: 4
Average of ratings per user: 106.00
Average of ratings per movie: 59.00
从中可以看到,最低的评级为1,而最大的评级为5。这并不意外,因为评级的范围便是从1
到5。
Spark对RDD也提供一个名为states的函数。该函数包含一个数值变量用于做类似的统计:
ratings.stats
其输出为:
(count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0,min: 1.0)
可以看出,用户对电影的平均评级(mean)是3.5左右,而评级中位数(median)为4。这就
能期待说评级的分布稍倾向高点的得分。要验证这点,可以创建一个评级值分布的条形图。具体做法和之前的类似:
count_by_rating = ratings.countByValue
x_axis = np.array(count_by_rating.keys)
y_axis = np.array([float(c) for c in count_by_rating.values])
这里对y轴正则化,使它表示百分比
y_axis_normed = y_axis y_axis.sum
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes
ax.set_xticks(pos + (width 2))
ax.set_xticklabels(x_axis)
plt.bar(pos, y_axis_normed, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16, 10)
这会生成图3-5所示的结果:
图3-5 电影评级的分布
其特征和我们之前所期待的相同,即评级的分布的确偏向中等以上。同样也可以求各个用户评级次数的分布情况。记得之前我们已对评级数据用制表符分隔,从
而生成过rating_data RDD。后续的代码中将再次用到该RDD变量。
计算各用户的评级次数的分布时,我们先从rating_data RDD里提取出以用户ID为主键、评级为值的键值对。之后调用Spark的groupByKey函数,来对评级以用户ID为主键进行分
组:
user_ratings_grouped = rating_data.map(lambda fields:
(int(fields[0]),int(fields[2]))). groupByKey
接着求出每一个主键(用户ID)对应的评级集合的大小;这会给出各用户评级的次数:
user_ratings_byuser = user_ratings_grouped.map(lambda (k, v):
(k, len(v)))
user_ratings_byuser.take(5)
要检查结果RDD,可从中选出少数记录。这应该会返回一个(用户ID, 评级次数)键值对类
型的RDD:
[(1, 272), (2, 62), (3, 54), (4, 24), (5, 175)]
最后,用我们所熟悉的hist函数来绘制各用户评级分布的直方图。
user_ratings_byuser_local = user_ratings_byuser.map(lambda (k,v): v).collect
hist(user_ratings_byuser_local, bins=200, color='lightblue',normed=True)
fig = matplotlib.pyplot.gcf
fig.set_size_inches(16,10)
结果如图3-6所示。可以看出,大部分用户的评级次数少于100。但该分布也表明仍然有较多
用户做出过上百次的评级。图3-6 各用户的电影评级的分布
可以用类似的方法绘制各个电影评级次数的直方图,读者可自己练习。如果觉得不够,甚至
还可以提取出不同日期(可从评级数据集的最后一列的时间戳得到)下的电影评级情况,进
而绘制出总评级次数、参与评级的不同用户的个数,以及被评级的不同电影的个数的时间
线。时间线精确到每天。3.3 处理与转换数据
现在我们已对数据集进行过探索性的分析,并了解了用户和电影的一些特征。那接下来做什
么呢?
为让原始数据可用于机器学习算法,需要先对其进行清理,并可能需要将其进行各种转换,之后才能从转换后的数据里提取有用的特征。数据的转换和特征提取联系紧密。某些情况
下,一些转换本身便是特征提取的过程。
在之前处理电影数据集时我们已经看到数据清理的必要性。一般来说,现实中的数据会存在
信息不规整、数据点缺失和异常值问题。理想情况下,我们会修复非规整数据。但很多数据
集都源于一些难以重现的收集过程(比如网络活动数据和传感器数据),故实际上会难以修
复。值缺失和异常也很常见,且处理方式可与处理非规整信息类似。总的来说,大致的处理
方法如下。
过滤掉或删除非规整或有值缺失的数据:这通常是必须的,但的确会损失这
些数据里那些好的信息。
填充非规整或缺失的数据:可以根据其他的数据来填充非规整或缺失的数据。
方法包括用零值、全局期望或中值来填充,或是根据相邻或类似的数据点来做插
值(通常针对时序数据)等。选择正确的方式并不容易,它会因数据、应用场景
和个人经验而不同。
对异常值做鲁棒处理:异常值的主要问题在于即使它们是极值也不一定就是错
的。到底是对是错通常很难分辨。异常值可被移除或是填充,但的确存在某些统
计技术(如鲁棒回归)可用于处理异常值或是极值。
对可能的异常值进行转换:另一种处理异常值或极值的方法是进行转换。对那
些可能存在异常值或值域覆盖过大的特征,利用如对数或高斯核对其转换。这类
转换有助于降低变量存在的值跳跃的影响,并将非线性关系变为线性的。
非规整数据和缺失数据的填充
前面已经举过过滤非规整数据的例子。顺着上述代码,下面的代码对发行日期有问题的数据
采取了填充策略,即用发行日期的中位数来填充问题数据。
years_pre_processed = movie_fields.map(lambda fields:
fields[2]).map(lambda x: convert_year(x)).collect
years_pre_processed_array = np.array(years_pre_processed)
在选取所有的发行日期后,这里首先计算发行年份的平均数和中位数。选取的数据不包含非
规整数据。然后用numpy的函数来找出year_pre_processed_array中的非规整数据点
的序号(之前我们给该数据点分配了1900的值)。最后通过该序号来将中位数作为非规整
数据的发行年份:mean_year =
np.mean(years_pre_processed_array[years_pre_processed_array!=1900])
median_year =
np.median(years_pre_processed_array[years_pre_processed_array!=1900])
index_bad_data = np.where(years_pre_processed_array==1900)[0]
[0]
years_pre_processed_array[index_bad_data] = median_year
print Mean year of release: %d % mean_year
print Median year of release: %d % median_year
print Index of '1900' after assigning median: %s %
np.where(years_pre_processed_array == 1900)[0]
其输出应如下:
Mean year of release: 1989
Median year of release: 1995
Index of '1900' after assigning median: []
这里同时求出了发行年份的平均值和中位值。从输出也可看到,发行年份分布的偏向使得其
中位值很高。特定情况下通常不容易确定选取什么样的值来做填充才够精确。 但在本例中,从该偏向来看使用中位值来填充的确可行。
严格来说,上面示例代码的可扩展性并不很高,因为它要把数据都返
回给驱动程序。平均值的计算可通过Spark下数值型RDD的mean函数来实现,但目前并
没相应的中位数函数。我们可以自己编写这个函数来求中位数,又或是用sample函数
(后面几章会更多看到)计算样本的中位数。3.4 从数据中提取有用特征
在完成对数据的初步探索、处理和清理后,便可从中提取可供机器学习模型训练用的特征。
特征(feature)指那些用于模型训练的变量。每一行数据包含可供提取到训练样本中的各
种信息。从根本上说,几乎所有机器学习模型都是与用向量表示的数值特征打交道;因此,我们需要将原始数据转换为数值。
特征可以概括地分为如下几种。
数值特征(numerical feature):这些特征通常为实数或整数,比如之前例子中
提到的年龄。
类别特征(categorical feature):它们的取值只能是可能状态集合中的某一种。
我们数据集中的用户性别、职业或电影类别便是这类。
文本特征(text feature):它们派生自数据中的文本内容,比如电影名、描述或
是评论。
其他特征:大部分其他特征都最终表示为数值。比如图像、视频和音频可被表
示为数值数据的集合。地理位置则可由经纬度或地理散列(geohash)表示。
这里我们将谈到数值、类别以及文本类的特征。
3.4.1 数值特征
原始的数值和一个数值特征之间的区别是什么?实际上,任何数值数据都能作为输入变量。
但是,机器学习模型中所学习的是各个特征所对应的向量的权值。这些权值在特征值到输出
或是目标变量(指在监督学习模型中)的映射过程中扮演重要角色。
由此我们会想使用那些合理的特征,让模型能从这些特征学到特征值和目标变量之间的关
系。比如年龄就是一个合理的特征。年龄的增加和某项支出之间可能就存在直接关系。类似
地,高度也是一个可直接使用的数值特征。
当数值特征仍处于原始形式时,其可用性相对较低,但可以转化为更有用的表示形式。位置
信息便是如此。若使用原始位置信息(比如用经纬度表示的),我们的模型可能学习不到该
信息和某个输出之间的有用关系,这就使得该信息的可用性不高,除非数据点的确很密集。
然而若对位置进行聚合或挑选后(比如聚焦为一个城市或国家),便容易和特定输出之间存
在某种关联了。
3.4.2 类别特征
当类别特征仍为原始形式时,其取值来自所有可能取值所构成的集合而不是一个数字,故不
能作为输入。如之前的例子中的用户职业便是一个类别特征变量,其可能取值有学生、程序
员等。这样的类别特征也称作名义(nominal)变量,即其各个可能取值之间没有顺序关系。相
反,那些存在顺序关系的(比如之前提到的评级,从定义上说评级5会高于或是好于评级1)
则被称为有序(ordinal)变量。
将类别特征表示为数字形式,常可借助 k 之1(1-of-k)方法进行。将名义变量表示为可用于
机器学习任务的形式,会需要借助如 k 之1编码这样的方法。有序变量的原始值可能就能直
接使用,但也常会经过和名义变量一样的编码处理。
假设变量可取的值有 k 个。如果对这些值用1到 k 编序,则可以用长度为k的二元向量来表示
一个变量的取值。在这个向量里,该取值对应的序号所在的元素为1,其他元素都为0。
比如,我们可以取回occupation的所有可能取值:
all_occupations = user_fields.map(lambda fields: fields[3]).
distinct.collect
all_occupations.sort
然后可以依次对各可能的职业分配序号(注意,为与Python、Scala以及Java中数组编序相
同,这里也从0开始编号):
idx = 0
all_occupations_dict = {}
for o in all_occupations:
all_occupations_dict[o] = idx
idx +=1
看一下“k之1”编码会对新的例子分配什么值 print Encoding of 'doctor': %d %
all_occupations_dict['doctor']
print Encoding of 'programmer': %d %
all_occupations_dict['programmer']
其输出如下:
Encoding of 'doctor': 2
Encoding of 'programmer': 14
最后来编码programmer的取值。首先需创建一个长度和可能的职业数目相同(本例中为
5)的numpy数组,其各元素值为0。这可通过numpy的zeros函数实现。
之后将提取单词programmer的序号,并将数组中对应该序号的那个元素值赋为1:
K = len(all_occupations_dict)
binary_x = np.zeros(K)k_programmer = all_occupations_dict['programmer']
binary_x[k_programmer] = 1
print Binary feature vector: %s % binary_x
print Length of binary vector: %d % K
对应的输出为:
Binary feature vector: [ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
0. 1. 0. 0. 0. 0. 0. 0.] Length
Length of binary vector: 21
3.4.3 派生特征
上面曾提到,从现有的一个或多个变量派生出新的特征常常是有帮助的。理想情况下,派生
出的特征能比原始属性带来更多信息。
比如,可以分别计算各用户已有的电影评级的平均数。这将能给模型加入针对不同用户的个
性化特征(事实上,这常用于推荐系统)。在前文中我们也从原始的评级数据里创建了新的
特征以学习出更好的模型。
从原始数据派生特征的例子包括计算平均值、中位值、方差、和、差、最大值或最小值以及
计数。在先前内容中,我们也看到是如何从电影的发行年份和当前年份派生了新的movie
age特征的。这类转换背后的想法常常是对数值数据进行某种概括,并期望它能让模型学习
更容易。
数值特征到类别特征的转换也很常见,比如划分为区间特征。进行这类转换的变量常见的有
年龄、地理位置和时间。
将时间戳转为类别特征
下面以对评级时间的转换为例,说明如何将数值数据装换为类别特征。该时间的格式为Unix
的时间戳。我们可以用Python的datetime模块从中提取出日期、时间以及点钟(hour)信
息。其结果将是由各评级对应的点钟数所构成的RDD。
需要定义一个函数将评级时间戳提取为datetime的格式:
def extract_datetime(ts):
import datetime
return datetime.datetime.fromtimestamp(ts)
下面会再次用到之前例子中求出的rating_data RDD。
我们首先使用map将时间戳属性转换为Py thon int 类型。然后通过extract_datetime函数将各时间戳转为datetime类型的对象,进而提取出其点钟数。
timestamps = rating_data.map(lambda fields: int(fields[3]))
hour_of_day = timestamps.map(lambda ts:
extract_datetime(ts).hour)
hour_of_day.take(5)
若取出结果RDD的前5条记录,可看到如下输出:
[17, 21, 9, 7, 7]
这就完成了从原始的时间数据到表示评级发生的点钟的类别特征的转换。
现在,假设我们觉得这样的表示过于粗糙,想更为精确。我们可以将点钟数划分到一天中的
不同时段。
比如可以说7点到12点是上午,12点到14点是中午,以此类推。要生成这些时间段,可以创
建一个以点钟数为输入的函数来返回相应的时间段:
def assign_tod(hr):
times_of_day = {
'morning' : range(7, 12),'lunch' : range(12, 14),'afternoon' : range(14, 18),'evening' : range(18, 23),'night' : range(23, 7)
}
for k, v in times_of_day.iteritems:
if hr in v:
return k
现在对hour_of_day RDD里的各次评级的点钟数调用assign_tod函数:
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
time_of_day.take(5)
如果我们选择查看该新RDD里的前5条记录,会输出如下已转换的值:
['afternoon', 'evening', 'morning', 'morning', 'morning']
我们已将时间戳变量转为点钟数,再接着转为了时间段,从而得到了一个类别特征。我们可
以借助之前提到的 k 之1编码方法来生成其相应的二元特征向量。3.4.4 文本特征
从某种意义上说,文本特征也是一种类别特征或派生特征。下面以电影的描述(我们的数据
集中不含该数据)来举例。即便作为类别数据,其原始的文本也不能直接使用。因为假设每
个单词都是一种可能的取值,那单词之间可能出现的组合有几乎无限种。这时模型几乎看不
到有相同的特征出现两次,学习的效果也就不理想。从中可以看出,我们会希望将原始的文
本转换为一种更便于机器学习的形式。
文本的处理方式有很多种。自然语言处理便是专注于文本内容的处理、表示和建模的一个领
域。关于文本处理的完整内容并不在本书的讨论范围内,但我们会介绍一种简单且标准化的
文本特征提取方法。该方法被称为词袋(bag-of-word)表示法。
词袋法将一段文本视为由其中的文本或数字组成的集合,其处理过程如下。
分词(tokenization):首先会应用某些分词方法来将文本分隔为一个由词(一般
如单词、数字等)组成的集合。可用的方法如空白分隔法。这种方法在空白处对
文本分隔并可能还删除其他如标点符号和其他非字母或数字字符。
删除停用 ......
您现在查看是摘要介绍页, 详见PDF附件(4429KB,326页)。





