本案例由厦门大学计算机系2018级研究生王福泰同学制作,这里对他表示衷心的感谢!
相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》(
访问教材官网
)
相关案例:
基于Python语言的Spark数据处理分析案例集锦(PySpark)
一、环境搭建
假设目前的环境是裸机。
(1)安装Linux操作系统:比如可以安装Ubuntu 16.04,可以参考
这篇博客
,完成操作系统的安装。
(2)安装Hadoop:需要在Linux系统上安装Hadoop,可以参考
这篇博客
,完成Hadoop的安装。
(3)安装Spark:需要在Linux系统上安装Spark,可以参考
这篇博客
,完成Hadoop的安装。
(4)安装Bottle:
Bottle 是一个非常小巧但高效的微型 Python Web 框架,它被设计为仅仅只有一个文件的Python模块,并且除Python标准库外,它不依赖于任何第三方模块。关于Bottle的更多介绍,可以参考
这篇博客
。
实验使用 web 框架 bottle 进行数据可视化,首先,打开一个Linux终端,运行下面命令安装 pip3 命令:
sudo apt-get install python3-pip
然后,执行如下命令安装 bottle web 框架:
pip3 install bottle
至此,完成环境搭建过程。
二、数据预处理
本次项目使用的数据集来自知名数据网站 Kaggle 的 tmdb-movie-metadata 电影数据集,该数据集包含大约 5000 部电影的相关数据。本次实验使用数据集中有关电影的数据表 tmdb_5000_movies.csv 进行实验。数据包含以下字段:
字段名称 解释 例子
budget 预算 10000000
genres 体裁 "[{""id"": 18, ""name"": ""Drama""}]"
homepage 主页 ""
id id 268238
keywords 关键词 "[{""id"": 14636, ""name"": ""india""}]"
original_language 原始语言 en
original_title 原标题 The Second Best Exotic Marigold Hotel
overview 概览 As the Best Exotic Marigold Hotel ...
popularity 流行度 17.592299
production_companies 生产公司 "[{""name"": ""Fox Searchlight Pictures"", ""id"": 43}, ...]"
production_countries 生产国家 "[{""iso31661"": ""GB"", ""name"": ""United Kingdom""}, ...]"
release_date 发行日期 2015-02-26
revenue 盈收 85978266
runtime 片长 122
spoken_languages 语言 "[{""iso6391"": ""en"", ""name"": ""English""}]"
status 状态 Released
tagline 宣传语 ""
title 标题 The Second Best Exotic Marigold Hotel
vote_average 平均分 6.3
vote_count 投票人数 272
由于数据中某些字段包含 json 数据,因此直接使用 DataFrame 进行读取会出现分割错误,所以如果要创建 DataFrame,需要先直接读取文件生成 RDD,再将 RDD 转为 DataFrame。过程中,使用 python3 中的 csv 模块对数据进行解析和转换。
为了更方便的对 csv 文件转化的 RDD 进行处理,需要首先去除csv文件的标题行。完成后,将处理好的文件 tmdb_5000_movies.csv 存储到 HDFS 上方便进一步的处理,使用下面命令将文件上传至 HDFS:
hdfs dfs -put tmdb_5000_movies.csv
此时文件在 HDFS 上的路径为 /user/hadoop/tmdb_5000_movies.csv。之后在程序中,使用下面语句即可读取该文件:
sc.textFile('tmdb_5000_movies.csv')
tmdb_5000_movies.csv文件也可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7
三、使用 Spark 将数据转为 DataFrame
由于读入的文件是 csv 文件,是结构化的数据,因此可以将数据创建为 DataFrame 方便进行分析。从本节开始,之后的代码均存放在 analyst.py 中,下面会逐行代码进行解释,解释完毕以后,再给出 analyst.py代码的全貌。(备注:所有代码文件可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7 )
为了创建 DataFrame,首先需要将 HDFS 上的数据加载成 RDD,再将 RDD 转化为 DataFrame。下面代码段完成从文件到 RDD 再到 DataFrame 的转化:
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, StructField, StructType
import json # 用于后面的流程
import csv
# 1. 创建 SparkSession 和 SparkContext 对象
sc = SparkContext('local', 'spark_project')
sc.setLogLevel('WARN') # 减少不必要的 LOG 输出
spark = SparkSession.builder.getOrCreate()
# 2. 为 RDD 转为 DataFrame 创建 schema
schemaString = "budget,genres,homepage,id,keywords,original_language,original_title,overview,popularity,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,vote_average,vote_count"
fields = [StructField(field, StringType(), True)
for field in schemaString.split(")]
schema = StructType(fields)
# 3. 对于每一行用逗号分隔的数据,使用 csv 模块进行解析并转为 Row 对象,得到可以转为 DataFrame 的 RDD
moviesRdd = sc.textFile('tmdb_5000_movies.csv').map(
lambda line: Row(*next(csv.reader([line]))))
# 4. 使用 createDataFrame 创建 DataFrame
mdf = spark.createDataFrame(moviesRdd, schema)
上述代码完成 4 件事:
首先,创建 SparkSession 和 SparkContext 对象。
然后,为 RDD 转为 DataFrame 制作表头 (schema)。schema 是一个 StructType 对象,该对象使用一个 StructField 数组创建。
每一个 StructField 都代表结构化数据中的一个字段,构造 StructField 需要 3 个参数
1. 字段名称
2. 字段类型
3. 字段是否可以为空
下面是这些对象的结构关系:
StructType([StructField(name, type, null), ..., StructField(name, type, null)])
接着,开始创建用于转为 DataFrame 的 RDD。这个过程首先读取 HDFS 上的数据文件,然后为了将 RDD 转为 DataFrame,还需要将数据的每一行转为一个 Row 对象。
这个过程首先使用 csv 模块进行解析,得到一个包含每个字段的迭代器:
csv.reader([line]) # 这里 [line] 表示包含一行数据的数组
然后使用 next 函数将迭代器中的数据读取到数组中:
next(csv.reader([line]))
最后使用 * 将数组转为 Row 对象的构造函数参数,创建 Row 对象:
Row(*next(csv.reader([line])))
至此,moviesRdd 中每一行为一个 Row 对象。
最后,通过 SparkSession 接口 createDataFrame ,使用准备好的表头 (schema) 和 RDD 创建 DataFrame:
mdf = spark.createDataFrame(moviesRdd, schema)
至此完成 DataFrame 的创建。
四、使用 Spark 进行数据分析
(备注:所有代码文件可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7 )
下面使用通过 Spark 处理得到的 DataFrame mdf 进行数据分析,首先对数据中的主要字段单独进行分析(概览小节),然后再分析不同字段间的关系(关系小节)。
为了方便进行数据可视化,每个不同的分析,都将分析结果导出为 json 文件由 web 页面读取并进行可视化。导出直接使用下面的 save 函数:
def save(path, data):
with open(path, 'w') as f:
f.write(data)
该函数向 path 中写入 data。
下面分别介绍各个分析的生成过程。
这个部分对数据进行整体的分析。
(1)TMDb 电影中的体裁分布
从上面的数据字典描述可以看出,电影的体裁字段是一个 json 格式的数据,因此,为了统计不同体裁的电影的数量,需要首先解析 json 数据,从中取出每个电影对应的体裁数组,然后使用词频统计的方法统计不同体裁出现的频率,即可得到电影的体裁分布。
首先实现一个函数 countByJson(field) ,该函数实现解析 json 格式字段从中提取出 name 并进行词频统计的功能:
def countByJson(field):
return mdf.select(field).filter(mdf[field] != '').rdd.flatMap(lambda g: [(v, 1) for v in map(lambda x: x['name'], json.loads(g[field]))]).repartition(1).reduceByKey(lambda x, y: x + y)
该函数返回一个 RDD,整个过程如下所示。
基于这个函数实现 countByGenres 用来生成不同体裁的电影数统计结果:
def countByGenres():
res = countByJson("genres").collect()
return list(map(lambda v: {"genre": v[0], "count": v[1]}, res))
这个函数调用 countByJson 得到频率统计结果,并将其转为 json 数据格式并返回,方便进行可视化。最终函数返回数据格式如下:
"genre": ...,
"count": ...
"genre": ...,
"count": ...
}, ...]
接着,使用下面代码进行数据导出至 genres.json 方便之后进行可视化
save('genres.json', json.dumps(countByGenres())) # 确保 json 包已导入
2. 前 100 个常见关键词
该项分析电影关键词中出现频率最高的前一百个。由于关键词字段也是 json 格式数据,因此调用 countByJson 进行频率统计,同时对于统计结果进行降序排序并取前 100 项即可:
def countByKeywords():
res = countByJson("keywords").sortBy(lambda x: -x[1]).take(100)
return list(map(lambda v: {"x": v[0], "value": v[1]}, res))
最终该函数返回 json 数据格式如下:
"x": ...,
"value": ...
"x": ...,
"value": ...
}, ...]
接着,使用下面代码将数据导出至 keywords.json 方便之后进行可视化
save('keywords.json', json.dumps(countByKeywords()))
3. TMDb 中最常见的 10 种预算数
这一项探究电影常见的预算数是多少,因此需要对电影预算进行频率统计,代码如下:
def countByBudget(order='count', ascending=False):
return mdf.filter(mdf["budget"] != 0).groupBy("budget").count().orderBy(order, ascending=ascending).toJSON().map(lambda j: json.loads(j)).take(10)
首先,需要对预算字段进行过滤,去除预算为 0 的项目,然后根据预算聚合并计数,接着根据计数进行排序,并将结果导出为 json 字符串,为了统一输出,这里将 json 字符串转为 python 对象,最后取前 10 项作为最终的结果。
最终该函数返回 json 数据格式如下:
"budget": ...,
"count": ...
"budget": ...,
"count": ...
}, ...]
接着,使用下面代码进行数据导出至 budget.json 方便之后进行可视化
save('budget.json', json.dumps(countByBudget()))
4. TMDb 中最常见电影时长 (只展示电影数大于 100 的时长)
这一项统计 TMDb 中最常见的电影时长,首先,需要过滤时长为 0 的电影,然后根据时长字段聚合并计数,接着过滤掉出现频率小于 100 的时长 (这一步是为了方便可视化,避免过多冗余信息)得到最终的结果。
def distrbutionOfRuntime(order='count', ascending=False):
return mdf.filter(mdf["runtime"] != 0).groupBy("runtime").count().filter('count>=100').toJSON().map(lambda j: json.loads(j)).collect()
最终该函数返回 json 数据格式如下:
"runtime": ...,
"count": ...
"runtime": ...,
"count": ...
}, ...]
接着,使用下面代码进行数据导出至 runtime.json 方便之后进行可视化
save('runtime.json', json.dumps(distrbutionOfRuntime()))
5. 生产电影最多的 10 大公司
这一项统计电影产出最多的 10 个公司,同样使用 countByJson 对 JSON 数据进行频率统计,然后进行降序排列取前 10 项即可。
def countByCompanies():
res = countByJson("production_companies").sortBy(lambda x: -x[1]).take(10)
return list(map(lambda v: {"company": v[0], "count": v[1]}, res))
最终该函数返回 JSON 数据格式如下:
"company": ...,
"count": ...
"company": ...,
"count": ...
}, ...]
接着,使用下面代码进行数据导出至 company_count.json 方便之后进行可视化
save('company_count.json', json.dumps(countByCompanies()))
6. TMDb 中的 10 大电影语言
该项统计 TMDb 中出现最多的语言,与前面类似,该字段也是 JSON 数据,因此首先对每个项目进行词频统计,然后过滤掉语言为空的项目,最后排序取前十即可。
def countByLanguage():
res = countByJson("spoken_languages").filter(
lambda v: v[0] != '').sortBy(lambda x: -x[1]).take(10)
return list(map(lambda v: {"language": v[0], "count": v[1]}, res))
最终该函数返回 json 数据格式如下:
"language": ...,
"count": ...
"language": ...,
"count": ...
}, ...]
接着,使用下面代码进行数据导出至 language.json 方便之后进行可视化
save('language.json', json.dumps(countByLanguage()))
这个部分考虑数据之间的关系。
1. 预算与评价的关系
这部分考虑预算与评价之间的关系,因此对于每个电影,需要导出如下的数据:
[电影标题,预算,评价]
基于 DataFrame 对数据进行字段过滤即可:
def budgetVote():
return mdf.select("title", "budget", "vote_average").filter(mdf["budget"] != 0).filter(mdf["vote_count"] > 100).collect()
这里还要注意过滤掉预算为空的数据,同时,结果只保留了投票数大于 100 的数据确保公平。
得到的数据存储在 budget_vote.json 中:
save('budget_vote.json', json.dumps(budgetVote()))
2. 发行时间与评价的关系
这部分考虑发行时间与评价之间的关系,因此对于每个电影,需要导出如下的数据:
[电影标题,发行时间,评价]
基于 DataFrame 对数据进行字段过滤即可:
def dateVote():
return mdf.select(mdf["release_date"], "vote_average", "title").filter(mdf["release_date"] != "").filter(mdf["vote_count"] > 100).collect()
这里还是要注意过滤掉发行时间为空的数据,保留投票数大于 100 的数据。
得到的数据存储在 date_vote.json 中:
save('date_vote.json', json.dumps(dateVote()))
3. 流行度和评价的关系
这部分考虑流行度与评价之间的关系,因此对于每个电影,需要导出如下的数据:
[电影标题,流行度,评价]
基于 DataFrame 对数据进行字段过滤即可:
def popVote():
return mdf.select("title", "popularity", "vote_average").filter(mdf["popularity"] != 0).filter(mdf["vote_count"] > 100).collect()
同时,过滤掉流行度为 0 的数据,保留投票数大于 100 的数据。
得到的数据存储在 pop_vote.json 中:
save('pop_vote.json', json.dumps(popVote()))
4. 公司生产的电影平均分和数量的关系
这部分计算每个公司生产的电影数量及这些电影的平均分分布。首先,需要对数据进行过滤,去掉生产公司字段为空和评价人数小于 100 的电影,然后对于每一条记录,得到一条如下形式的记录:
[公司名,(评分,1)]
接着将所有记录的评分和计数累加,最后用总评分除以计数得到一个公司的平均评分及电影数,整个过程如下所示。
def moviesVote():
return mdf.filter(mdf["production_companies"] != '').filter(mdf["vote_count"] > 100).rdd.flatMap(lambda g: [(v, [float(g['vote_average']), 1]) for v in map(lambda x: x['name'], json.loads(g["production_companies"]))]).repartition(1).reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]]).map(lambda v: [v[0], v[1][0] / v[1][1], v[1][1]]).collect()
下面是过程的图示
得到的数据存储在 movies_vote.json 中:
save('movies_vote.json', json.dumps(moviesVote()))
5. 电影预算和营收的关系
这部分考虑电影的营收情况,因此对于每个电影,需要导出如下的数据:
[电影标题,预算,收入]
基于 DataFrame 对数据进行字段过滤即可:
def budgetRevenue():
return mdf.select("title", "budget", "revenue").filter(mdf["budget"] != 0).filter(mdf['revenue'] != 0).collect()
过滤掉预算,收入为 0 的数据。
得到的数据存储在 budget_revenue.json 中:
save('budget_revenue.json', json.dumps(budgetRevenue()))
最后,将上面的过程整合起来方便进行调用,因此在 analyst.py 中添加 main 函数:
if __name__ == "__main__":
m = {
"countByGenres": {
"method": countByGenres,
"path": "genres.json"
"countByKeywords": {
"method": countByKeywords,
"path": "keywords.json"
"countByCompanies": {
"method": countByCompanies,
"path": "company_count.json"
"countByBudget": {
"method": countByBudget,
"path": "budget.json"
"countByLanguage": {
"method": countByLanguage,
"path": "language.json"
"distrbutionOfRuntime": {
"method": distrbutionOfRuntime,
"path": "runtime.json"
"budgetVote": {
"method": budgetVote,
"path": "budget_vote.json"
"dateVote": {
"method": dateVote,
"path": "date_vote.json"
"popVote": {
"method": popVote,
"path": "pop_vote.json"
"moviesVote": {
"method": moviesVote,
"path": "movies_vote.json"
"budgetRevenue": {
"method": budgetRevenue,
"path": "budget_revenue.json"
base = "static/" # 生成文件的 base 目录
if not os.path.exists(base): # 如果目录不存在则创建一个新的
os.mkdir(base)
for k in m: # 执行上述所有方法
p = m[k]
f = p["method"]
save(base + m[k]["path"], json.dumps(f()))
print ("done -> " + k + " , save to -> " + base + m[k]["path"])
上面代码将所有的函数整合在变量 m中,然后通过循环调用上述所有方法并导出json文件。
通过下面方法进行数据分析:
spark-submit analyst.py
五、数据可视化方法
(备注:所有代码文件可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7 )
数据可视化基于阿里开源的数据可视化工具 G2 实现。G2 是一套基于可视化编码的图形语法,以数据驱动,具有高度的易用性和扩展性,用户无需关注各种繁琐的实现细节,一条语句即可构建出各种各样的可交互的统计图表。下面以 TMDb 中电影的体裁分布为例说明可视化过程。
首先使用 python Web 框架 bottle 访问可视化页面方便进行 json 数据的读取。使用下面代码web.py 可以实现一个简单的静态文件读取:
from bottle import route, run, static_file
import json
@route('/static/<filename>')
def server_static(filename):
return static_file(filename, root="./static")
@route("/<name:re:.*\.html>")
def server_page(name):
return static_file(name, root=".")
@route("/")
def index():
return static_file("index.html", root=".")
run(host="0.0.0.0", port=9999)
bottle 对于接收到的请求进行路由
1. 对于 web 服务启动目录中 static 文件夹下的文件,直接返回对应文件名的文件;
2. 对于启动目录下的 html 文件,也返回对应的页面。
3. 直接访问本机的 9999 端口,则返回主页。
最后,将 web 服务绑定到本机的 9999 端口。根据上面的实现,对于 web 页面 (html 文件),直接放在服务启动的目录下,对于 Spark 分析的结果,则保存在 static 目录下。
接下来实现主页文件 index.html。
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width,height=device-height">
<title>TMDb 电影数据分析</title>
<style>
/* 这里省略 */
</style>
</head>
<div class="container">
<h1 style="font-size: 40px;"># TMDb Movie Data Analysis <br> <small style="font-size: 55%;color: rgba(0,0,0,0.65);">>
Big Data Processing Technology on Spark</small> </h1>
<h1 style="font-size: 30px;color: #404040;">I. Overviews</h1>
<div class="chart-group">
<h2>- Distribution of Genres in TMDb <br> <small style="font-size: 72%;">> This figure
compares the genre
distribution in TMDb, and you can see that most of the movies in TMDb is Drama.</small> </h2>
<iframe src="genres.html" class="frame" frameborder="0"></iframe>
<script>/*Fixing iframe window.innerHeight 0 issue in Safari*/document.body.clientHeight;</script>
</body>
</html>
每个图表通过一个 iframe 引入到主页中。对于每一个图表,主页中都包含标题和图表所在的页面的 iframe。对于 TMDb 中的体裁分布分析结果,在 genres.html 中实现,下面对该文件进行实现。
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width,height=device-height">
<title>TOP 5000 电影数据分析</title>
<style>
::-webkit-scrollbar {
display: none;
html,
body {
font-family: 'Ubuntu Condensed';
height: 100%;
margin: 0;
color: rgba(0, 0, 0, 0.85);
</style>
</head>
<div id="mountNode"></div>
<script>/*Fixing iframe window.innerHeight 0 issue in Safari*/document.body.clientHeight;</script>
<script src="static/g2.min.js"></script>
<script src="static/data-set.min.js"></script>
<script src="static/jquery-3.2.1.min.js"></script>
<script>
function generateChart(id, type, xkey, xlabel, ykey, ylabel) {
var chart = new G2.Chart({ // 初始化 chart
container: id,
forceFit: true,
height: 500,
padding: [40, 80, 80, 80],
chart.scale(ykey, { // 对 y 尺度进行设置
alias: ylabel,
min: 0,
// max: 3000,
tickCount: 4
chart.axis(xkey, { // 对 x 坐标轴设置
label: {
textStyle: {
fill: '#aaaaaa'
tickLine: {
alignWithLabel: false,
length: 0
chart.axis(ykey, { // 对 y 坐标轴设置
label: {
textStyle: {
fill: '#aaaaaa'
title: {
offset: 50
chart.legend({ // 设置图例
position: 'top-center'
//设置标签和颜色等
chart.interval().position(`${xkey}*${ykey}`).label(ykey).color('#ffb877').opacity(1).adjust([{
type,
marginRatio: 1 / 32
chart.render();
return chart;
</script>
<script>
// 调用上述函数创建图表
let chart = generateChart('mountNode', 'dodge', 'genre', 'genres', 'count', '# movies');
window.onload = () => {
// 当页面加载后使用 jQuery 提供的方法进行json文件的读取。
$.getJSON("/static/genres.json", d => {
chart.changeData(d) // 使用 chart 的更新数据 API 进行数据更新。
</script>
</body>
</html>
代码的过程解释以注释给出,使用该页面前,还需要将对应的 js 库( g2.js, data-set.js, jquery )放入到 static 文件夹下。
代码完成后,在代码所在的根目录下,执行:
spark-submit web.py # 或者执行 python3.5 web.py
命令行出现:
Bottle v0.12.16 server starting up (using WSGIRefServer())...
Listening on http://0.0.0.0:9999/
Hit Ctrl-C to quit.
即完成启动,打开浏览器访问 http://127.0.0.1:9999 即可看到可视化结果,下面是图表的例子:
六、数据图表
(一)概览
TMDb 电影中的体裁分布
从图中可以看出,Drama 的电影在 TMDb 中占比较大,其次 Science Fiction、Action 和 Thriller 的数量也较多。
前 100 个常见关键词
TMDb 中最常见的关键词是 Woman Director,其次还有 independent film 等。
3. TMDb 中最常见的 10 种预算数
有 144 部电影的预算为 20,000,000,是最常见的预算值。
4. TMDb 中最常见电影时长 (只展示电影数大于 100 的时长)
多数电影的时长是90分钟或100分钟。
5. 生产电影最多的 10 大公司
生产电影较多的公司是 Warner Bros.、Universal Pictures等。
6. TMDb 中的 10 大电影语言
大多数电影中的语言是英语。
(二)关系
预算与评价的关系
预算高的电影不见得能取得更好的评价,例如预算高达 380,000,000 美元的 Pirates of the Caribbean: On Stranger Tides(加勒比海盗)评价只有6.4分。
发行时间与评价的关系
早期的电影评价都比较高,例如发行于1936年的 Modern Times(摩登时代)评价高达8.1分。
流行度和评价的关系
流行度较高的话一般能取得平均水平以上的评价,例如 Interstellar(星际穿越)流行度很高,评价为8.1分。
公司生产的电影平均分和数量的关系
从图中可以看出,一个公司生产的电影越多,其电影平均分越接近整体的平均水平。
电影预算和营收的关系
从图中可以看出,多数电影都能实现正收入,而预算为 237,000,000 美元的 Avatar(阿凡达)最终收入为2787,965,087美元
本案例由厦门大学计算机系2018级研究生王福泰同学制作,这里对他表示衷心的感谢!
(备注:所有代码文件可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7 )