本案例由厦门大学计算机系2018级研究生王福泰同学制作,这里对他表示衷心的感谢!
相关教材:林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》( 访问教材官网
相关案例: 基于Python语言的Spark数据处理分析案例集锦(PySpark)

一、环境搭建

假设目前的环境是裸机。

(1)安装Linux操作系统:比如可以安装Ubuntu 16.04,可以参考 这篇博客 ,完成操作系统的安装。
(2)安装Hadoop:需要在Linux系统上安装Hadoop,可以参考 这篇博客 ,完成Hadoop的安装。
(3)安装Spark:需要在Linux系统上安装Spark,可以参考 这篇博客 ,完成Hadoop的安装。
(4)安装Bottle:
Bottle 是一个非常小巧但高效的微型 Python Web 框架,它被设计为仅仅只有一个文件的Python模块,并且除Python标准库外,它不依赖于任何第三方模块。关于Bottle的更多介绍,可以参考 这篇博客
实验使用 web 框架 bottle 进行数据可视化,首先,打开一个Linux终端,运行下面命令安装 pip3 命令:

sudo apt-get install python3-pip

然后,执行如下命令安装 bottle web 框架:

pip3 install bottle

至此,完成环境搭建过程。

二、数据预处理

本次项目使用的数据集来自知名数据网站 Kaggle 的 tmdb-movie-metadata 电影数据集,该数据集包含大约 5000 部电影的相关数据。本次实验使用数据集中有关电影的数据表 tmdb_5000_movies.csv 进行实验。数据包含以下字段:
字段名称 解释 例子
budget 预算 10000000
genres 体裁 "[{""id"": 18, ""name"": ""Drama""}]"
homepage 主页 ""
id id 268238
keywords 关键词 "[{""id"": 14636, ""name"": ""india""}]"
original_language 原始语言 en
original_title 原标题 The Second Best Exotic Marigold Hotel
overview 概览 As the Best Exotic Marigold Hotel ...
popularity 流行度 17.592299
production_companies 生产公司 "[{""name"": ""Fox Searchlight Pictures"", ""id"": 43}, ...]"
production_countries 生产国家 "[{""iso31661"": ""GB"", ""name"": ""United Kingdom""}, ...]"
release_date 发行日期 2015-02-26
revenue 盈收 85978266
runtime 片长 122
spoken_languages 语言 "[{""iso6391"": ""en"", ""name"": ""English""}]"
status 状态 Released
tagline 宣传语 ""
title 标题 The Second Best Exotic Marigold Hotel
vote_average 平均分 6.3
vote_count 投票人数 272

由于数据中某些字段包含 json 数据,因此直接使用 DataFrame 进行读取会出现分割错误,所以如果要创建 DataFrame,需要先直接读取文件生成 RDD,再将 RDD 转为 DataFrame。过程中,使用 python3 中的 csv 模块对数据进行解析和转换。

为了更方便的对 csv 文件转化的 RDD 进行处理,需要首先去除csv文件的标题行。完成后,将处理好的文件 tmdb_5000_movies.csv 存储到 HDFS 上方便进一步的处理,使用下面命令将文件上传至 HDFS:

hdfs dfs -put tmdb_5000_movies.csv

此时文件在 HDFS 上的路径为 /user/hadoop/tmdb_5000_movies.csv。之后在程序中,使用下面语句即可读取该文件:

sc.textFile('tmdb_5000_movies.csv')

tmdb_5000_movies.csv文件也可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7

三、使用 Spark 将数据转为 DataFrame

由于读入的文件是 csv 文件,是结构化的数据,因此可以将数据创建为 DataFrame 方便进行分析。从本节开始,之后的代码均存放在 analyst.py 中,下面会逐行代码进行解释,解释完毕以后,再给出 analyst.py代码的全貌。(备注:所有代码文件可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7 )
为了创建 DataFrame,首先需要将 HDFS 上的数据加载成 RDD,再将 RDD 转化为 DataFrame。下面代码段完成从文件到 RDD 再到 DataFrame 的转化:

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, StructField, StructType
import json # 用于后面的流程
import csv
# 1. 创建 SparkSession 和 SparkContext 对象
sc = SparkContext('local', 'spark_project')
sc.setLogLevel('WARN') # 减少不必要的 LOG 输出
spark = SparkSession.builder.getOrCreate()
# 2. 为 RDD 转为 DataFrame 创建 schema
schemaString = "budget,genres,homepage,id,keywords,original_language,original_title,overview,popularity,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,vote_average,vote_count"
fields = [StructField(field, StringType(), True)
          for field in schemaString.split(")]
schema = StructType(fields)
# 3. 对于每一行用逗号分隔的数据,使用 csv 模块进行解析并转为 Row 对象,得到可以转为 DataFrame 的 RDD
moviesRdd = sc.textFile('tmdb_5000_movies.csv').map(
    lambda line: Row(*next(csv.reader([line]))))
# 4. 使用 createDataFrame 创建 DataFrame
mdf = spark.createDataFrame(moviesRdd, schema)

上述代码完成 4 件事:
首先,创建 SparkSession 和 SparkContext 对象。
然后,为 RDD 转为 DataFrame 制作表头 (schema)。schema 是一个 StructType 对象,该对象使用一个 StructField 数组创建。
每一个 StructField 都代表结构化数据中的一个字段,构造 StructField 需要 3 个参数
1. 字段名称
2. 字段类型
3. 字段是否可以为空
下面是这些对象的结构关系:

StructType([StructField(name, type, null), ..., StructField(name, type, null)])

接着,开始创建用于转为 DataFrame 的 RDD。这个过程首先读取 HDFS 上的数据文件,然后为了将 RDD 转为 DataFrame,还需要将数据的每一行转为一个 Row 对象。
这个过程首先使用 csv 模块进行解析,得到一个包含每个字段的迭代器:

csv.reader([line]) # 这里 [line] 表示包含一行数据的数组

然后使用 next 函数将迭代器中的数据读取到数组中:

next(csv.reader([line]))

最后使用 * 将数组转为 Row 对象的构造函数参数,创建 Row 对象:

Row(*next(csv.reader([line])))

至此,moviesRdd 中每一行为一个 Row 对象。
最后,通过 SparkSession 接口 createDataFrame ,使用准备好的表头 (schema) 和 RDD 创建 DataFrame:

mdf = spark.createDataFrame(moviesRdd, schema)

至此完成 DataFrame 的创建。

四、使用 Spark 进行数据分析

(备注:所有代码文件可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7 )
下面使用通过 Spark 处理得到的 DataFrame mdf 进行数据分析,首先对数据中的主要字段单独进行分析(概览小节),然后再分析不同字段间的关系(关系小节)。
为了方便进行数据可视化,每个不同的分析,都将分析结果导出为 json 文件由 web 页面读取并进行可视化。导出直接使用下面的 save 函数:

def save(path, data):
  with open(path, 'w') as f:
    f.write(data)

该函数向 path 中写入 data。
下面分别介绍各个分析的生成过程。

这个部分对数据进行整体的分析。

(1)TMDb 电影中的体裁分布

从上面的数据字典描述可以看出,电影的体裁字段是一个 json 格式的数据,因此,为了统计不同体裁的电影的数量,需要首先解析 json 数据,从中取出每个电影对应的体裁数组,然后使用词频统计的方法统计不同体裁出现的频率,即可得到电影的体裁分布。
首先实现一个函数 countByJson(field) ,该函数实现解析 json 格式字段从中提取出 name 并进行词频统计的功能:

def countByJson(field):
    return mdf.select(field).filter(mdf[field] != '').rdd.flatMap(lambda g: [(v, 1) for v in map(lambda x: x['name'], json.loads(g[field]))]).repartition(1).reduceByKey(lambda x, y: x + y)

该函数返回一个 RDD,整个过程如下所示。

基于这个函数实现 countByGenres 用来生成不同体裁的电影数统计结果:

def countByGenres():
    res = countByJson("genres").collect()
    return list(map(lambda v: {"genre": v[0], "count": v[1]}, res))

这个函数调用 countByJson 得到频率统计结果,并将其转为 json 数据格式并返回,方便进行可视化。最终函数返回数据格式如下:

"genre": ..., "count": ... "genre": ..., "count": ... }, ...]

接着,使用下面代码进行数据导出至 genres.json 方便之后进行可视化

save('genres.json', json.dumps(countByGenres())) # 确保 json 包已导入

2. 前 100 个常见关键词

该项分析电影关键词中出现频率最高的前一百个。由于关键词字段也是 json 格式数据,因此调用 countByJson 进行频率统计,同时对于统计结果进行降序排序并取前 100 项即可:

def countByKeywords():
    res = countByJson("keywords").sortBy(lambda x: -x[1]).take(100)
    return list(map(lambda v: {"x": v[0], "value": v[1]}, res))

最终该函数返回 json 数据格式如下:

"x": ..., "value": ... "x": ..., "value": ... }, ...]

接着,使用下面代码将数据导出至 keywords.json 方便之后进行可视化

save('keywords.json', json.dumps(countByKeywords()))

3. TMDb 中最常见的 10 种预算数

这一项探究电影常见的预算数是多少,因此需要对电影预算进行频率统计,代码如下:

def countByBudget(order='count', ascending=False):
    return mdf.filter(mdf["budget"] != 0).groupBy("budget").count().orderBy(order, ascending=ascending).toJSON().map(lambda j: json.loads(j)).take(10)

首先,需要对预算字段进行过滤,去除预算为 0 的项目,然后根据预算聚合并计数,接着根据计数进行排序,并将结果导出为 json 字符串,为了统一输出,这里将 json 字符串转为 python 对象,最后取前 10 项作为最终的结果。
最终该函数返回 json 数据格式如下:

"budget": ..., "count": ... "budget": ..., "count": ... }, ...]

接着,使用下面代码进行数据导出至 budget.json 方便之后进行可视化

save('budget.json', json.dumps(countByBudget()))

4. TMDb 中最常见电影时长 (只展示电影数大于 100 的时长)

这一项统计 TMDb 中最常见的电影时长,首先,需要过滤时长为 0 的电影,然后根据时长字段聚合并计数,接着过滤掉出现频率小于 100 的时长 (这一步是为了方便可视化,避免过多冗余信息)得到最终的结果。

def distrbutionOfRuntime(order='count', ascending=False):
    return mdf.filter(mdf["runtime"] != 0).groupBy("runtime").count().filter('count>=100').toJSON().map(lambda j: json.loads(j)).collect()

最终该函数返回 json 数据格式如下:

"runtime": ..., "count": ... "runtime": ..., "count": ... }, ...]

接着,使用下面代码进行数据导出至 runtime.json 方便之后进行可视化

save('runtime.json', json.dumps(distrbutionOfRuntime()))

5. 生产电影最多的 10 大公司

这一项统计电影产出最多的 10 个公司,同样使用 countByJson 对 JSON 数据进行频率统计,然后进行降序排列取前 10 项即可。

def countByCompanies():
    res = countByJson("production_companies").sortBy(lambda x: -x[1]).take(10)
    return list(map(lambda v: {"company": v[0], "count": v[1]}, res))

最终该函数返回 JSON 数据格式如下:

"company": ..., "count": ... "company": ..., "count": ... }, ...]

接着,使用下面代码进行数据导出至 company_count.json 方便之后进行可视化
save('company_count.json', json.dumps(countByCompanies()))

6. TMDb 中的 10 大电影语言

该项统计 TMDb 中出现最多的语言,与前面类似,该字段也是 JSON 数据,因此首先对每个项目进行词频统计,然后过滤掉语言为空的项目,最后排序取前十即可。

def countByLanguage():
    res = countByJson("spoken_languages").filter(
        lambda v: v[0] != '').sortBy(lambda x: -x[1]).take(10)
    return list(map(lambda v: {"language": v[0], "count": v[1]}, res))

最终该函数返回 json 数据格式如下:

"language": ..., "count": ... "language": ..., "count": ... }, ...]

接着,使用下面代码进行数据导出至 language.json 方便之后进行可视化

save('language.json', json.dumps(countByLanguage()))

这个部分考虑数据之间的关系。

1. 预算与评价的关系

这部分考虑预算与评价之间的关系,因此对于每个电影,需要导出如下的数据:

[电影标题,预算,评价]

基于 DataFrame 对数据进行字段过滤即可:

def budgetVote():
    return mdf.select("title", "budget", "vote_average").filter(mdf["budget"] != 0).filter(mdf["vote_count"] > 100).collect()

这里还要注意过滤掉预算为空的数据,同时,结果只保留了投票数大于 100 的数据确保公平。
得到的数据存储在 budget_vote.json 中:

save('budget_vote.json', json.dumps(budgetVote()))

2. 发行时间与评价的关系

这部分考虑发行时间与评价之间的关系,因此对于每个电影,需要导出如下的数据:

[电影标题,发行时间,评价]

基于 DataFrame 对数据进行字段过滤即可:

def dateVote():
    return mdf.select(mdf["release_date"], "vote_average", "title").filter(mdf["release_date"] != "").filter(mdf["vote_count"] > 100).collect()

这里还是要注意过滤掉发行时间为空的数据,保留投票数大于 100 的数据。
得到的数据存储在 date_vote.json 中:

save('date_vote.json', json.dumps(dateVote()))

3. 流行度和评价的关系

这部分考虑流行度与评价之间的关系,因此对于每个电影,需要导出如下的数据:

[电影标题,流行度,评价]

基于 DataFrame 对数据进行字段过滤即可:

def popVote():
    return mdf.select("title", "popularity", "vote_average").filter(mdf["popularity"] != 0).filter(mdf["vote_count"] > 100).collect()

同时,过滤掉流行度为 0 的数据,保留投票数大于 100 的数据。
得到的数据存储在 pop_vote.json 中:

save('pop_vote.json', json.dumps(popVote()))

4. 公司生产的电影平均分和数量的关系

这部分计算每个公司生产的电影数量及这些电影的平均分分布。首先,需要对数据进行过滤,去掉生产公司字段为空和评价人数小于 100 的电影,然后对于每一条记录,得到一条如下形式的记录:

[公司名,(评分,1)]

接着将所有记录的评分和计数累加,最后用总评分除以计数得到一个公司的平均评分及电影数,整个过程如下所示。

def moviesVote():
    return mdf.filter(mdf["production_companies"] != '').filter(mdf["vote_count"] > 100).rdd.flatMap(lambda g: [(v, [float(g['vote_average']), 1]) for v in map(lambda x: x['name'], json.loads(g["production_companies"]))]).repartition(1).reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]]).map(lambda v: [v[0], v[1][0] / v[1][1], v[1][1]]).collect()

下面是过程的图示
得到的数据存储在 movies_vote.json 中:

save('movies_vote.json', json.dumps(moviesVote()))

5. 电影预算和营收的关系

这部分考虑电影的营收情况,因此对于每个电影,需要导出如下的数据:

[电影标题,预算,收入]

基于 DataFrame 对数据进行字段过滤即可:

def budgetRevenue():
    return mdf.select("title", "budget", "revenue").filter(mdf["budget"] != 0).filter(mdf['revenue'] != 0).collect()

过滤掉预算,收入为 0 的数据。
得到的数据存储在 budget_revenue.json 中:

save('budget_revenue.json', json.dumps(budgetRevenue()))

最后,将上面的过程整合起来方便进行调用,因此在 analyst.py 中添加 main 函数:

if __name__ == "__main__":
    m = {
        "countByGenres": {
            "method": countByGenres,
            "path": "genres.json"
        "countByKeywords": {
            "method": countByKeywords,
            "path": "keywords.json"
        "countByCompanies": {
            "method": countByCompanies,
            "path": "company_count.json"
        "countByBudget": {
            "method": countByBudget,
            "path": "budget.json"
        "countByLanguage": {
            "method": countByLanguage,
            "path": "language.json"
        "distrbutionOfRuntime": {
            "method": distrbutionOfRuntime,
            "path": "runtime.json"
        "budgetVote": {
            "method": budgetVote,
            "path": "budget_vote.json"
        "dateVote": {
            "method": dateVote,
            "path": "date_vote.json"
        "popVote": {
            "method": popVote,
            "path": "pop_vote.json"
        "moviesVote": {
            "method": moviesVote,
            "path": "movies_vote.json"
        "budgetRevenue": {
            "method": budgetRevenue,
            "path": "budget_revenue.json"
    base = "static/" # 生成文件的 base 目录
    if not os.path.exists(base): # 如果目录不存在则创建一个新的
        os.mkdir(base)
    for k in m: # 执行上述所有方法
        p = m[k]
        f = p["method"]
        save(base + m[k]["path"], json.dumps(f()))
        print ("done -> " + k + " , save to -> " + base + m[k]["path"])

上面代码将所有的函数整合在变量 m中,然后通过循环调用上述所有方法并导出json文件。
通过下面方法进行数据分析:

spark-submit analyst.py

五、数据可视化方法

(备注:所有代码文件可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7 )
数据可视化基于阿里开源的数据可视化工具 G2 实现。G2 是一套基于可视化编码的图形语法,以数据驱动,具有高度的易用性和扩展性,用户无需关注各种繁琐的实现细节,一条语句即可构建出各种各样的可交互的统计图表。下面以 TMDb 中电影的体裁分布为例说明可视化过程。
首先使用 python Web 框架 bottle 访问可视化页面方便进行 json 数据的读取。使用下面代码web.py 可以实现一个简单的静态文件读取:

from bottle import route, run, static_file
import json
@route('/static/<filename>')
def server_static(filename):
  return static_file(filename, root="./static")
@route("/<name:re:.*\.html>")
def server_page(name):
  return static_file(name, root=".")
@route("/")
def index():
  return static_file("index.html", root=".")
run(host="0.0.0.0", port=9999)

bottle 对于接收到的请求进行路由
1. 对于 web 服务启动目录中 static 文件夹下的文件,直接返回对应文件名的文件;
2. 对于启动目录下的 html 文件,也返回对应的页面。
3. 直接访问本机的 9999 端口,则返回主页。
最后,将 web 服务绑定到本机的 9999 端口。根据上面的实现,对于 web 页面 (html 文件),直接放在服务启动的目录下,对于 Spark 分析的结果,则保存在 static 目录下。
接下来实现主页文件 index.html。

<!DOCTYPE html>
<html lang="en">
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width,height=device-height">
  <title>TMDb 电影数据分析</title>
  <style>
    /* 这里省略 */
  </style>
</head>
  <div class="container">
    <h1 style="font-size: 40px;"># TMDb Movie Data Analysis <br> <small style="font-size: 55%;color: rgba(0,0,0,0.65);">>
        Big Data Processing Technology on Spark</small> </h1>
    <h1 style="font-size: 30px;color: #404040;">I. Overviews</h1>
    <div class="chart-group">
      <h2>- Distribution of Genres in TMDb <br> <small style="font-size: 72%;">> This figure
          compares the genre
          distribution in TMDb, and you can see that most of the movies in TMDb is Drama.</small> </h2>
      <iframe src="genres.html" class="frame" frameborder="0"></iframe>
  <script>/*Fixing iframe window.innerHeight 0 issue in Safari*/document.body.clientHeight;</script>
</body>
</html>

每个图表通过一个 iframe 引入到主页中。对于每一个图表,主页中都包含标题和图表所在的页面的 iframe。对于 TMDb 中的体裁分布分析结果,在 genres.html 中实现,下面对该文件进行实现。

<!DOCTYPE html>
<html lang="en">
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width,height=device-height">
  <title>TOP 5000 电影数据分析</title>
  <style>
    ::-webkit-scrollbar {
      display: none;
    html,
    body {
      font-family: 'Ubuntu Condensed';
      height: 100%;
      margin: 0;
      color: rgba(0, 0, 0, 0.85);
  </style>
</head>
  <div id="mountNode"></div>
  <script>/*Fixing iframe window.innerHeight 0 issue in Safari*/document.body.clientHeight;</script>
  <script src="static/g2.min.js"></script>
  <script src="static/data-set.min.js"></script>
  <script src="static/jquery-3.2.1.min.js"></script>
  <script>
    function generateChart(id, type, xkey, xlabel, ykey, ylabel) {
      var chart = new G2.Chart({ // 初始化 chart
        container: id,
        forceFit: true,
        height: 500,
        padding: [40, 80, 80, 80],
      chart.scale(ykey, { // 对 y 尺度进行设置
        alias: ylabel,
        min: 0,
        // max: 3000,
        tickCount: 4
      chart.axis(xkey, { // 对 x 坐标轴设置
        label: {
          textStyle: {
            fill: '#aaaaaa'
        tickLine: {
          alignWithLabel: false,
          length: 0
      chart.axis(ykey, { // 对 y 坐标轴设置
        label: {
          textStyle: {
            fill: '#aaaaaa'
        title: {
          offset: 50
      chart.legend({    // 设置图例
        position: 'top-center'
        //设置标签和颜色等
      chart.interval().position(`${xkey}*${ykey}`).label(ykey).color('#ffb877').opacity(1).adjust([{
        type,
        marginRatio: 1 / 32
      chart.render();
      return chart;
  </script>
  <script>
    // 调用上述函数创建图表
    let chart = generateChart('mountNode', 'dodge', 'genre', 'genres', 'count', '# movies');
    window.onload = () => {
      // 当页面加载后使用 jQuery 提供的方法进行json文件的读取。
      $.getJSON("/static/genres.json", d => {
        chart.changeData(d) // 使用 chart 的更新数据 API 进行数据更新。
  </script>
</body>
</html>

代码的过程解释以注释给出,使用该页面前,还需要将对应的 js 库( g2.js, data-set.js, jquery )放入到 static 文件夹下。
代码完成后,在代码所在的根目录下,执行:

spark-submit web.py # 或者执行 python3.5 web.py

命令行出现:

Bottle v0.12.16 server starting up (using WSGIRefServer())...
Listening on http://0.0.0.0:9999/
Hit Ctrl-C to quit.

即完成启动,打开浏览器访问 http://127.0.0.1:9999 即可看到可视化结果,下面是图表的例子:

六、数据图表

(一)概览

  • TMDb 电影中的体裁分布
    从图中可以看出,Drama 的电影在 TMDb 中占比较大,其次 Science Fiction、Action 和 Thriller 的数量也较多。
  • 前 100 个常见关键词
  • TMDb 中最常见的关键词是 Woman Director,其次还有 independent film 等。
    3. TMDb 中最常见的 10 种预算数
    有 144 部电影的预算为 20,000,000,是最常见的预算值。
    4. TMDb 中最常见电影时长 (只展示电影数大于 100 的时长)

    多数电影的时长是90分钟或100分钟。
    5. 生产电影最多的 10 大公司

    生产电影较多的公司是 Warner Bros.、Universal Pictures等。
    6. TMDb 中的 10 大电影语言

    大多数电影中的语言是英语。

    (二)关系

  • 预算与评价的关系
  • 预算高的电影不见得能取得更好的评价,例如预算高达 380,000,000 美元的 Pirates of the Caribbean: On Stranger Tides(加勒比海盗)评价只有6.4分。

  • 发行时间与评价的关系
  • 早期的电影评价都比较高,例如发行于1936年的 Modern Times(摩登时代)评价高达8.1分。

  • 流行度和评价的关系
  • 流行度较高的话一般能取得平均水平以上的评价,例如 Interstellar(星际穿越)流行度很高,评价为8.1分。

  • 公司生产的电影平均分和数量的关系
    从图中可以看出,一个公司生产的电影越多,其电影平均分越接近整体的平均水平。
  • 电影预算和营收的关系
  • 从图中可以看出,多数电影都能实现正收入,而预算为 237,000,000 美元的 Avatar(阿凡达)最终收入为2787,965,087美元

    本案例由厦门大学计算机系2018级研究生王福泰同学制作,这里对他表示衷心的感谢!
    (备注:所有代码文件可以直接从百度网盘下载,百度网盘地址:https://pan.baidu.com/s/1lt7PHF17-gHieOU0B0zJ3A 提取码:cui7 )