1. Zookeeper概述和集群搭建:
(1) Zookeeper概述:
Zookeeper 是一个分布式协调服务的开源框架。主要用来解决分布式集群中应用系统的一致性问题,例如怎样避免同时操作同一数据造成脏读的问题。ZooKeeper 本质上是一个分布式的小文件存储系统。提供基于类似于文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理。
(2) Zookeeper特性:
全局数据一致:每个 server 保存一份相同的数据副本,client 无论连接到哪个 server,展示的数据都是一致的,这是最重要的特征;
可靠性:如果消息被其中一台服务器接受,那么将被所有的服务器接受。
顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息 a 在消息 b 前发布,则在所有 Server 上消息 a 都将在消息 b 前被发布;偏序是指如果一个消息 b 在消息 a 后被同一个发送者发布,a 必将排在 b 前面。
数据更新原子性:一次数据更新要么成功(半数以上节点成功),要么失败,不存在中间状态;
实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。
(3) ZooKeeper集群角色:
Leader:Zookeeper 集群工作的核心,事务请求(写操作)的唯一调度和处理者,保证集群事务处理的顺序性;集群内部各个服务器的调度者。对于 create,setData,delete 等有写操作的请求,则需要统一转发给leader 处理,leader 需要决定编号、执行操作,这个过程称为一个事务。
Follower:处理客户端非事务(读操作)请求,转发事务请求给 Leader;参与集群 Leader 选举投票。
Observer(观察者,可选):观察者角色,观察 Zookeeper 集群的最新状态变化并将这些状态同步过来,其对于非事务请求可以进行独立处理,对于事务请求,则会转发给 Leader服务器进行处理。不会参与投票。
(4) ZooKeeper ZooKeeper集群搭建:
Zookeeper 运行需要java环境,所以需要提前安装jdk。对于安装leader+follower模式的集群,大致过程如下:
①配置主机名称到 IP 地址映射配置 ②修改 ZooKeeper 配置文件 ③远程复制分发安装文件
④设置 myid ⑤启动 ZooKeeper 集群
注意点:a.如果要想使用 Observer 模式,可在对应节点的配置文件添加如下配置:peerType=observer
b.必须在配置文件指定哪些节点被指定为 Observer,如:server.1:localhost:2181:3181:observer
详细安装请参考Zookeeper集群安装笔记
2. ZooKeeper shell
二、 大数据概述
1. 数据分析
(1) 数据分析定义:
数据分析离不开数据,计量和记录一起促成了数据的诞生。数据分析是指用适当的统计分析方法对收集来的数据进行分析,将它们加以汇总和理解并消化,以求最大化地开发数据的功能,发挥数据的作用。商业领域中,数据分析能够给帮助企业进行判断和决策,以便采取相应的策略与行动。数据分析可划分为:描述性数据分析、探索性数据分析、验证性数据分析。我们日常学习和工作中所涉及的数据分析主要描述性数据分析。
(2) 数据分析作用:
在商业领域中,数据分析的目的是把隐藏在数据背后的信息集中和提炼出来,总结出所研究对象的内在规律,帮助管理者进行有效的判断和决策。数据分析在企业日常经营分析中主要有三大作用:
现状分析:简单来说就是告诉你当前的状况。
原因分析:简单来说就是告诉你某一现状为什么发生。
预测分析:简单来说就是告诉你将来会发生什么。
(3) 数据分析基本步骤( 六个步骤 ):
(4) 数据分析行业前景:
① 蓬勃发展的趋势:中国数据分析行业前景和特点有 市场巨大、尚没出现平台级公司、企业技术外包的氛围在国内尚没完全形成、整个行业很大而且需求旺盛。
② 数据分析师的职业要求:
懂业务:熟悉行业知识、公司业务及流程;
懂管理:确定分析思路就需要用到营销、管理等理论知识来指导;针对数据分析结论提出有指导意义的分析建议;
懂分析:掌握数据分析的基本原理与一些有效的数据分析方法;
懂工具:掌握数据分析相关的常用工具,还要能根据研究的问题选择合适的工具;
懂设计:是指运用图表有效表达数据分析师的分析观点;
2. 科技发展带来的挑战
在科技的快速发展推动下,在 IT 领域,企业会面临两个方面的问题。一是如何实现网站的高可用、易伸缩、可扩展、高安全等目标。为了解决这样一系列问题,迫使网站的架构在不断发展。从单一架构迈向高可用架构,这过程中不得不提的就是分布式。二是用户规模越来越大,由此产生的数据也在以指数倍增长,俗称数据大爆炸。海量数据处理的场景也越来越多。技术上该如何面对?
(1) 分布式系统:
概述:分布式系统是一个硬件或软件组件分布在不同的网络计算机上,彼此之间通过消息传递进行通信和协调的系统。
特征:分布性、透明性、同一性、通信性;
常用分布式方案:分布式应用和服务、分布式静态资源、分布式数据和存储、分布式计算;
分布式和集群:分布式是指在多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务。集群是指在多台不同的服务器中部署相同应用或服务模块,构成一个集群,通过负载均衡设备对外提供服务。
(2) 海量数据处理:
数据分析的前提是有数据,数据存储的目的是支撑数据分析;
当解决了海量数据的存储问题,接下来面临的就是海量数据的计算问题;
3. 大数据时代
(1) 大数据时代的概述( 4V ):
Volume(大量)、Velocity(高速)、Variety(多样)、Value(价值),即数据体量巨大、数据类型繁多、价值密度低、处理速度快。
(2) 大数据分析:
当数据分析遇到大数据时代,于是就产生了完美的契合:大数据分析。大数据分析可以分为如下几方面:
一是大数据分析可以让人们对数据产生更加优质的诠释;
二是大数据的分析与存储和数据的管理是一些数据分析层面的最佳实践;
还有需注意的是传统的数据分析就是在数据中寻找有价值的规律,这和现在的大数据在方向上是一致的。
4. 日志数据自定义采集( 数据收集的方法之一 )
(1) 原理分析:
1. Hadoop来源和特性介绍
(1) Hadoop概述:
Hadoop 是 Apache 旗下的一个用 java 语言实现开源软件框架,是一个开发和运行处理大规模数据的软件平台。允许使用简单的编程模型在大量计算机集群上对大型数据集进行分布式处理。
(2) Hadoop的定义:
狭义上说,Hadoop 指 Apache 这款开源框架,它的核心组件有:
HDFS(分布式文件系统):解决海量数据存储(底层文件存储系统)
YARN(作业调度和集群资源管理的框架):解决资源任务调度(中间组件,基本不需要了解)
MAPREDUCE(分布式运算编程框架):解决海量数据计算(java代码的主要对接点)
广义上来说,Hadoop 通常是指一个更广泛的概念——Hadoop 生态圈:
HDFS(分布式文件系统)和MAPREDUCE(分布式运算程序开发框架)等
(3) Hadoop发展简史:
Hadoop 是 Apache Lucene 创始人 Doug Cutting 创建的。最早起源于 Nutch,它是 Lucene 的子项目。
2003 年 Google 发表了一篇论文为该问题提供了可行的解决方案。
2004 年 Google 发表论文向全世界介绍了谷歌版的 MapReduce 系统。
2006 年 Google 发表了论文是关于 BigTable 的,这促使了后来的 Hbase 的发展。
(4) Hadoop特性优点:
扩容能力(Scalable):Hadoop 是在可用的计算机集群间分配数据并完成计算任务的,这些集群可用方便的扩展到数以千计的节点中。
成本低(Economical):Hadoop 通过普通廉价的机器组成服务器集群来分发以及处理数据,以至于成本很低。
高效率(Efficient):通过并发数据,Hadoop 可以在节点之间动态并行的移动数据,使得速度非常快。
可靠性(Rellable):能自动维护数据的多份复制,并且在任务失败后能自动地重新部署(redeploy)计算任务。所以 Hadoop 的按位存储和处理数据的能力值得人们信赖。
2. Hadoop集群介绍
(1) 发行版本:
Hadoop 发行版本分为开源 社区版 和 商业版,社区版是指由 Apache 软件基金会维护的版本,是官方维护的版本体系。商业版 Hadoop 是指由第三方商业公司在社区版 Hadoop 基础上进行了一些修改、整合以及各个服务组件兼容性测试而发行的版本,比较著名的有 cloudera 的 CDH、mapR 等。
(2) Hadoop2.0组件介绍:
Hadoop 2.0 则包含一个支持 NameNode 横向扩展的 HDFS,一个资源管理系统YARN 和一个运行在 YARN 上的离线计算框架 MapReduce。相比于 Hadoop1.0,Hadoop 2.0 功能更加强大,且具有更好的扩展性、性能,并支持多种计算框架。
Hadoop 3.0 相比之前的 Hadoop 2.0 有一系列的功能增强。但目前还是个alpha 版本,有很多 bug,且不能保证 API 的稳定和质量。
HADOOP 集群具体来说包含两个集群:HDFS 集群和 YARN 集群,两者逻辑上分离,但物理上常在一起。
HDFS 集群负责海量数据的存储,集群中的角色主要有:NameNode、DataNode、SecondaryNameNode
YARN 集群负责海量数据运算时的资源调度,集群中的角色主要有:ResourceManager、NodeManager
mapreduce是一个分布式运算编程框架,是应用程序开发包,由用户按照编程规范进行程序开发,后打包运行在 HDFS 集群上,并且受到 YARN 集群的资源调度管理。
以三节点部署Hadoop集群,角色分布如下:
node-01 NameNode DataNode ResourceManager
node-02 DataNode NodeManager SecondaryNameNode
node-03 DataNode NodeManager
3. Hadoop集群搭建
(1) 解压( 解压即安装,将hadoop-2.7.4-with-centos-6.7.tar.gz上传到Linux中,并解压即完成安装 )
(2) 配置hadoop-env.sh中的java_home( 配置JDK环境 )
(3) 在core-site.xml 中配置nameNode和存放数据的目录位置
(4) 在hdfs-site.xml中配置复制副本数和secondaryNode的位置
(5) 在mapred-site.xml中配置mr运行时的框架( 一般指定在yarn上 )
(6) yarn-site.xml配置resourceManager( YARN中的主节点 )
(7) slaves 配置dataNode所在的主机名称 (HDFS的从节点 )
(8) 配置hadoop环境变量( 将Hadoop添加到Linux的环境变量中 )
(9) 复制到其它集群节点( 将Hadoop集群上的其他机器进行相同配置 )
(10) 格式化namenode( 在配置好Hadoop之后需要在NameNode所在机器(node-1)上进行一次性的格式化,之后再重启Hadoop不需要执行 )
(11) 启动hadoop集群
4. Hadoop安装包目录结构
(1) bin:Hadoop 最基本的管理脚本和使用脚本的目录,这些脚本是 sbin 目录下管理脚本的基础实现,用户可以直接使用这些脚本管理和使用 Hadoop。
(2) etc:Hadoop 配置文件所在的目录,包括 core-site,xml、hdfs-site.xml、
(3) mapred-site.xml 等从 Hadoop1.0 继承而来的配置文件和 yarn-site.xml 等
(4) Hadoop2.0 新增的配置文件。
(5) include:对外提供的编程库头文件(具体动态库和静态库在 lib 目录中),这些头文件均是用 C++定义的,通常用于 C++程序访问 HDFS 或者编写 MapReduce程序。
(6) lib:该目录包含了 Hadoop 对外提供的编程动态库和静态库,与 include 目录中的头文件结合使用。
(7) libexec:各个服务对用的 shell 配置文件所在的目录,可用于配置日志输出、启动参数(比如 JVM 参数)等基本信息。
(8) sbin:Hadoop 管理脚本所在的目录,主要包含 HDFS 和 YARN 中各类服务的启动/关闭脚本。
(9) share:Hadoop 各个模块编译后的 jar 包所在的目录。
四、 HDFS (Hadoop分布式文件系统)
1. HDFS基本概念
(1) HDFS介绍:
HDFS 是 Hadoop Distribute File System 的简称,意为:Hadoop 分布式文件系统。是 Hadoop 核心组件之一,作为最底层的分布式存储服务而存在。分布式文件系统解决的问题就是大数据存储。
(2) HDFS设计目标:
可以进行故障的检测和自动快速恢复、以流式读取数据有很高的数据访问的高吞吐量、
支持大文件和多文件、write-one-read-many 访问模型(一次写入多次读取)。
2. HDFS的重要特性
(1) 主从节点(NameNode和DataNode)
(2) 分块存储(物理结构上会将上传的文件默认按照每块128M进行切分并保存这些切分后的文件块)
(3) 名字空间:抽象目录树(/hello/aaa.txt)
(4) NameNode元数据管理(将各个文件保存在不同的dataNode和块数等信息存储)
(5) DataNode数据存储
(6) 副本机制(在安装的时候可以指定文件块在hdfs上有多少个副本)
(7) 一次写入,多次读出(读得多,用于计算处理数据;一般保存的都是日志类型;如果要修改,先删除,之后再传递最新的文件)
3. Hadoop Shell
(1) hadoop fs -ls /user/hadoop/file1 显示文件、目录信息
(2) hadoop fs -mkdir –p /user/hadoop/dir1 在 hdfs 上创建目录,-p 表示会创建路径中的各级父目录
(3) hadoop fs -put –f [-p] localfile1 localfile2 /user/hadoop/hadoopdir
(上传)将单个 src 或多个 srcs 从本地文件系统复制到目标文件系统。
-p:保留访问和修改时间,所有权和权限。 -f:覆盖目的地(如果已经存在)
(4) hadoop fs -get hdfs://host:port/user/hadoop/file localfile (下载)将文件复制到本地文件系统
(5) hadoop fs -appendToFile localfile /hadoop/hadoopfile 追加一个文件到已经存在的文件末尾
(6) hadoop fs -cat /hadoop/hadoopfile 显示文件内容到 stdout
(7) hadoop fs -chmod 666 /hadoop/hadoopfile 改变文件的权限。使用-R将使改变在目录结构下递归进行
(8) hadoop fs -copyFromLocal /root/1.txt / 从本地文件系统中拷贝文件到 hdfs 路径去
(9) hadoop fs -copyToLocal /aaa/jdk.tar.gz 从 hdfs 拷贝到本地
(10) hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2 从 hdfs 的一个路径拷贝 hdfs 的另一个路径
(11) hadoop fs -mv /aaa/jdk.tar.gz / 在 hdfs 目录中移动文件
(12) hadoop fs -rm -r /aaa/bbb/ 删除文件,只删除非空目录和文件;-r递归删除。
4. NameNode和DataNode概述
5. HDFS的工作机制
(1) HDFS的工作机制概述:
HDFS 的内部工作机制对客户端保持透明,客户端请求访问 HDFS 都是通过向NameNode 申请来进行。
NameNode 负责管理整个文件系统元数据;DataNode 负责管理具体文件数据块存储;Secondary NameNode 协助 NameNode 进行元数据的备份。
(2) HDFS写数据流程:
① client 发起文件上传请求,通过 RPC 与 NameNode 建立通讯,NameNode检查目标文件是否已存在,父目录是否存在,返回是否可以上传;
② client 请求第一个 block 该传输到哪些 DataNode 服务器上;
③ NameNode 根据配置文件中指定的备份数量及机架感知原理进行文件分配,返回可用的 DataNode 的地址如:A,B,C;注:Hadoop 在设计时考虑到数据的安全与高效,数据文件默认在 HDFS 上存放三份,存储策略为本地一份,同机架内其它某一节点上一份,不同机架的某一节点上一份。
④ client 请求 3 台 DataNode 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline),A 收到请求会继续调用 B,然后 B 调用 C,将整个pipeline 建立完成,后逐级返回 client;
⑤ client 开始往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位(默认 64K),A 收到一个 packet 就会传给 B,B 传给 C;A 每传一个 packet 会放入一个应答队列等待应答。
⑥ 数据被分割成一个个 packet 数据包在 pipeline 上依次传输,在pipeline 反方向上,逐个发送 ack(命令正确应答),最终由 pipeline中第一个 DataNode 节点 A 将 pipeline ack 发送给 client;
⑦ 当一个 block 传输完成之后,client 再次请求 NameNode 上传第二个block 到服务器。
(3) HDFS读数据流程:
① Client 向 NameNode 发起 RPC 请求,来确定请求文件 block 所在的位置;
② NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的 DataNode 地址;
③ 这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离 Client 近的排靠前;心跳机制中超时汇报的 DN 状态为 STALE,这样的排靠后;
④ Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据;
⑤ 底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,直到这个块上的数据读取完毕;
⑥ 当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表;
⑦ 读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的DataNode 继续读。
⑧ read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据;
⑨ 最终读取来所有的 block 会合并成一个完整的最终文件。
6. HDFS应用开发
(1) HDFS的JAVA API操作和所需的JAVA JAR包:
HDFS 在生产应用中主要是客户端的开发,其核心步骤是从 HDFS 提供的api中构造一个 HDFS 的访问客户端对象,然后通过该客户端对象操作(增删改查)HDFS 上的文件。
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
(3) 构造客户端对象( 使用Java代码操作HDFS ):
① Java代码中操作的具体对象:
Configuration:该类的对象封转了客户端或者服务器的配置;
FileSystem:文件系统对象,可以用该对象的一些方法来对文件进行操作,通过FileSystem的静态方法get获得该对象。
② 示例代码:
五、 MapReduce(分布式运算编程框架)
1. MapReduce的概述和设计构思:
(1) MapReduce 的思想核心是“
分而治之
”;Map 负责“分”,在各个Map之间几乎没有依赖关系;Reduce负责
“合”,即对 map 阶段的结果进行全局汇总。
(2) Hadoop MapReduce设计构思:
MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop 集群上。在MapReduce中,程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。
(3) MapReduce框架结构:
一个完整的 mapreduce 程序在分布式运行时有三类实例进程:
a、MRAppMaster:负责整个程序的过程调度及状态协调
b、MapTask:负责 map 阶段的整个数据处理流程
c、ReduceTask:负责 reduce 阶段的整个数据处理流程
2. MapReduce的编程规范和示例编写
(1) 编程规范:
① 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr 程序的客户端)
② Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
③ Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
④ Mapper 中的业务逻辑写在 map()方法中
⑤ map()方法(maptask 进程)对每一个<K,V>调用一次
⑥ Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
⑦ Reducer 的业务逻辑写在 reduce()方法中
⑧ Reducetask 进程对每一组相同 k 的<k,v>组调用一次 reduce()方法
⑨ 用户自定义的 Mapper 和 Reducer 都要继承各自的父类
⑩ 整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象
(2) 示例编写:
以示例:在一堆给定的文本文件中统计输出每一个单词出现的总次数 编写示例工程如下
3. MapReduce的输入和输出
MapReduce 框架运转在<key,value> 键值对上,也就是说,框架把作业的输入看成是一组<key,value>键值对,同样也产生一组<key,value>键值对作为作业的输出,这两组键值对可能是不同的。具体输入和输入如下图所示:
4. MapReduce的处理流程解析
(1) Mapper阶段任务执行过程详解:
第一阶段:
把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。(每一个切片=一个MapTask)
第二阶段:
对切片中的数据按照一定的规则解析成<key,value>对。默认是把每一行文本内容解析成键值对。
第三阶段:
调用 Mapper 类中的 map 方法。上阶段中每解析出来的一个<k,v>,调用一次 map 方法。
第四阶段:
按照一定的规则对第三阶段输出的键值对进行分区。分区的数量就是Reducer任务运行的数量。
第五阶段:
对每个分区中的键值对进行排序。
第六阶段:
对数据进行局部聚合处理,也就是 combiner 处理。键相等的键值对会调用一次 reduce 方法。
(2) Reduce阶段任务执行过程详解:
第一阶段:
Reducer任务会主动从 Mapper 任务复制其输出的键值对。一个Reduce可能对于多个Mapper。
第二阶段:
把复制到Reducer本地数据,全部进行合并。即把分散的数据合并成一个大的数据;再进行排序。
第三阶段:
对排序后的键值对调用 reduce 方法。键相等的键值对调用一次reduce方法,每次调用会产生零
个或者多个键值对。最后把这些输出的键值对写入到 HDFS 文件中。
(3) 在整个MapReduce程序的开发过程中,我们最大的工作量是覆盖map函数和覆盖reduce函数:
5. MapReduce中的小知识点
(1) MapReduce的数据分区:在执行主类中设置NumReduceTask的个数N;然后对应有N的输出内容。
(2) MapReduce的逻辑分片:根据文件的大小进行文件块划分为M个块,将由M个mapTask进行处理。
(3) MapReduce中的序列化:
当需要在进程和网络之间传递对象或持久化对象时,就需要将对象进行序列化。但Java的Serializable序列化框架是一个重量级的序列化框架,当对一个对象进行序列化时,会有很多额外的信息。
所以,
hadoop
自己开发了一套序列化机制(
Writable
),
Writable
是
Hadoop
的序列化格式,
hadoop
定义了这样一个
Writable
接口。一个类要支持可序列化只需实现这个接口即可。
MapReduce中的基本序列化类型:IntWritable(int)、LongWritable(long)、Text(String)、NullWritable(null)等。
6. MapReduce的排序
(1) 排序的实现思路:
MR程序在处理数据的过程中会对数据排序(map 输出的 kv 对传输到 reduce之前,会排序),排序的依据是 map 输出的 key。所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到 key 中,让 key 实现接口:WritableComparable,然后重写 key 的 compareTo 方法
(2) MapReduce实现排序的具体接口和方法:
要对MapReduce进行排序,需实现WritableComparable<Bean>接口( 即进行序列化又进行排序 )。
还需重载该接口中的compareTo() 方法,该方法的比较规格如下:
如果指定的数与参数相等返回 0。如果指定的数小于参数返回 -1。如果指定的数大于参数返回 1。
返回正数的话,当前对象(调用compareTo方法的对象)要排在比较对象后面;
返回负数的话,放在前面;如果返回0会有一个对象不会显示(在开发中应该避免出现0)。
(3) 排序具体实现代码如下:
7. MapReduce的分区( Partitioner&NumReduceTasks )
(1) 为什么要进行分区:
Mapreduce中会将map输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask;
默认的分发规则为:根据 key 的 hashcode%reducetask 数来分发;
所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件 Partitioner;
自定义一个 CustomPartitioner 继承抽象类:Partitioner,然后在job 对象中,设置自定义的分组规则。
(2) 分区的具体实现:
需自定义分区类(ProvincePartitioner )继承Partitioner<Map中的key,value>类,并重写getPartition()方法,在该方法中自定义分区规格;
需在程序入口的main()方法中设置reduceTask个数,和设置使用我们自定义的分区组件,如下:
//这里设置运行 reduceTask 的个数,即程序执行后生成几个part-r-00000文件
//getPartition 返回的分区个数 = NumReduceTasks 正常执行
//getPartition 返回的分区个数 > NumReduceTasks 报错:Illegal partition,多出的数据没地方存放
//getPartition 返回的分区个数 < NumReduceTasks 可以执行 ,多出空白文件
job.setNumReduceTasks(10);
//这里指定使用我们自定义的分区组件,即程序执行后,最终数据分成几部分
job.setPartitionerClass(ProvincePartitioner.class);
8. MapReduce的合并(combiner)
(1) Combiner的作用:
每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络 IO 性能,是 MapReduce 的一种优化手段之一。
(2) Combiner在MapReduce中的概述:
combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件,combiner 组件的父类就是 Reducer;
combiner 和 reducer 的区别在于运行的位置:
Combiner 是在每一个 maptask 所在的节点运行;
Reducer 是接收全局所有 Mapper 的输出结果;
combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量;
combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来。
(3) Combiner的使用步骤:
自定义一个 combiner 继承 Reducer,重写 reduce 方法
在 job 中设置:job.setCombinerClass(CustomCombiner.class)
六、 Apache Flume
1. Apache Flume的概述:
(1) Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件。核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。
(2) Flume的运行机制:
Flume 系统中核心的角色是agent,agent本身是一个Java 进程,一般运行在日志收集节点。有如下三个组件:
Source:采集源,用于跟数据源对接,以获取数据;
Sink:下沉地,采集数据的传送目的,用于往下一级 agent 传递数据或者往最终存储系统传递数据;
Channel:agent内部的数据传输通道,用于从 source将数据传递到 sink;
2. Apache Flume的安装部署:
3. 采集目录到HDFS & 采集文件到 HDFS
(1) 需求1:服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去。
(2) 需求2:业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs。
七、 数据 & 数据仓库
1. 数据的来源
(1) 业务系统数据:
业务数据为公司内部的数据,获取的的成本低,方式容易。可以直接通过接口调用,从公司的业务系统中获取数据,但是要注意不能影响业务系统数据库的性能。也可以进行数据库的dump(从数据库中导出全部数据),比如 MySQL 数据库,使用 mysqldump 工具就可以进行数据库的导出( mysqldump -uroot -pPassword [database name] [dump file] )。
(2) 爬虫数据:
爬虫(Web crawler),是指一种按照一定的规则,自动地抓取万维网信息的程序或者脚本。它们被广泛用于互联网搜索引擎或其他类似网站,可以自动采集所有其能够访问到的页面内容,以获取或更新这些网站的内容和检索方式。电子商务行业最初的爬虫需求来源于比价。
2. 数据的管理:
(1) 数据的文件管理:
随着数据种类的越来越多,数据量的越来越大,我们为了方便保存和迅速提取数据,对数据进行了分类管理:从每一个文件夹的建立,我们都要按照数据文件的属性,分为大大小小、多个层级的文件夹,建立合理的文件保存架构。此外所有的文件、文件夹,都要规范化地命名,并放入最合适的文件夹中。
在企业中,一般是直接使用文件管理服务器来管理文件,文件服务器有如下好处:定时集中对文件进行备份;可以统一制定文件安全访问权限策略;可以统一进行文件服务器防病毒管理。
常见的文件服务器有如下几种:
n ftp 文件服务:FTP 是一个文件传输的协议,采用 Client/Server 架构。用户可以通过各种不同的 FTP 客户端程序,借助 FTP 协议,来连接 FTP 服务器,以上传或者下载文件。
n Samba 文件服务:NFS 是 Network File System 的缩写,即网络文件系统。它允许网络中的计算机之间通过 TCP/IP 网络共享资源。NFS 在文件传送或信息传送过程中依赖于 RPC 协议。RPC,远程过程调用(Remote Procedure Call) 是能使客户端执行其他系统中程序的一种机制。
n NFS 文件服务:SMB(Server Messages Block,信息服务块)是一种在局域网上共享文件和打印机的一种通信协议,Samba 是一组软件包,在 Linux 和 UNIX 系统上实现 SMB 协议的一个免费软件。
(2) 文件管理规范:
为了更快速,更准确,更规范的进行数据文件管理,企业一般都会去制定相应的管理规范。规范着重于文件命名规则,以及一些校验性文件的描述。例如FTP 服务进行跨部门文件共享的相关规范等。
(3) 数据质量检测:
数据质量是保证数据应用的基础,它的评估标准主要包括四个方面:完整性、一致性、准确性、及时性。
完整性指的是数据信息是否存在缺失的状况,一般可以通过数据统计中的记录值和唯一值进行评估。
一致性是指数据是否遵循了统一的规范,数据集合是否保持了统一的格式。主要体现在数据记录的规范和数据是否符合逻辑。
准确性是指数据记录的信息是否存在异常或错误。最为常见的数据准确性错误如乱码。
及时性是指数据从产生到可以查看的时间间隔,也叫数据的延时时长。
3. 数据仓库
(1) 数据仓库的基本概念:
数据仓库,英文名称为 Data Warehouse,可简写为 DW 或 DWH。数据仓库的目的是构建面向分析的集成化数据环境,为企业提供决策支持(DecisionSupport)。它出于分析性报告和决策支持目的而创建。
数据仓库本身并不“生产”任何数据,同时自身也不需要“消费”任何的数据,数据来源于外部,并且开放给外部应用,这也是为什么叫“仓库”,而不叫“工厂”的原因。
八、 Apache Hive(数据仓库)
1. Hive概述
(1) Hive简介:
Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类 SQL 查询功能。本质是将 SQL 转换为 MapReduce 程序。主要用途:用来做离线数据分析,比直接用 MapReduce 开发效率更高。
(2) 为什么使用Hive:
① 直接使用 Hadoop MapReduce 处理数据所面临的问题:
人员学习成本太高;MapReduce 实现复杂查询逻辑开发难度太大。
② 使用 Hive 的优点:
操作接口采用类SQL语法,提供快速开发的能力;避免了去写MapReduce,减少开发人员的学习成本;
功能扩展很方便。
(3) Hive架构:
① Hive与Hadoop的关系:Hive利用HDFS存储数据,利用MapReduce查询分析数据。
② Hive组件:
解释器、编译器、优化器、执行器:完成 HQL 查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在 HDFS 中,并在随后有 MapReduce 调用执行。
元数据存储:通常是存储在关系数据库如 mysql/derby 中。
用户接口:包括 CLI、JDBC/ODBC、WebGUI等接口。
(4) Hive与传统数据库的对比:
hive 用于海量数据的离线数据分析。hive 具有 sql 数据库的外表,但应用场景完全不同,hive 只适合用来做批量数据统计分析。
(5) Hive数据模型:
Hive 中所有的数据都存储在HDFS中,没有专门的数据存储格式在创建表时指定分隔符,Hive 就可以映射成功,解析数据。
Hive 中包含以下数据模型:
db :在 hdfs 中表现为 hive.metastore.warehouse.dir 目录下一个文件夹
table :在 hdfs 中表现所属 db 目录下一个文件夹
external table :数据存放位置可以在 HDFS 任意指定路径
partition :在 hdfs 中表现为 table 目录下的子目录
bucket :在 hdfs 中表现为同一个表目录下根据 hash 散列之后的多个文件
(6) Hive安装部署:
Hive 安装前需要安装好JDK和Hadoop,并配置好环境变量。如果使用MySQL版本还需要安装好MySQL。
(7) Hive连接说明:
hive启动为一个服务器,来对外提供服务: bin/hiveserver2
在node-3使用beeline去连接node-1中的Hive: bin/beeline ! connect jdbc:hive2://node-1:10000
或(bin/beeline -u jdbc:hive2://node-1:10000 -n root )
2. Hive基本操作
(1)
DDL
体验之数据分隔符:
① 分隔符语法:
[FIELDS TERMINATED BY char]
[COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char]
[LINES TERMINATED BY char] | SERDE serde_name
[WITH SERDEPROPERTIES
(property_name=property_value, property_name=property_value,...)]
② 普通数据指定分隔符:
create table day_table (id int, content string) partitioned by (dt string)
row format delimited //使用内置的分隔符;
fields terminated by ',' //字段之间通过逗号进行分割;
此数据表可以识别路径/user/hive/warehouse/itcast.db/t_t2下文件中的 , 分隔符
③ 复杂数据结构映射(表中集合数据):
create table complex_array(name string,work_locations array<string>) row format delimited
fields terminated by '\t' //字段之间使用tab分隔符
collection items terminated by ',' //在集合之间的元素使用逗号分隔符
例:zhangsan beijing,shanghai,tianjin,hangzhou
④ 复杂数据结构映射(2):
create table t_map(id int,name string,hobby map<string,string>)
row format delimited //使用内置的分隔符
fields terminated by ',' //字段之间使用,分隔符
collection items terminated by '-' //在集合之间的元素使用-分隔符
map keys terminated by ':' ; //在Map(Key-Value对)中使用:分隔符
例:1,zhangsan,唱歌:非常喜欢-跳舞:喜欢-游泳:一般般
⑤ 默认分隔符:
create table t_t5(id int, name string); //直接创建表就是使用默认分隔符
在Linux的vim命令中中要使用默认的分隔符,ctrl+V 再按ctrl+A
在其他文本中使用默认分隔符使用”\001”, 例:1\001tom
(2)
DDL
体验之分区表:
① 单分区表:
create table t_user (id int, name string) //创建表名为t_user的数据库表
partitioned by (country string) //对表进行分区(虚拟字段实际文件中不存在该字段)
row format delimited fields terminated by ','; //使用分隔符
//将Linux的本地文件加载到数据表的分区中(指定具体哪个分区),会在对应的表文件夹下创建对应的分区文件夹,如果命令中有local代表加载的是Linux本地文件,如果没有代表加载的是HDFS中的文件
LOAD DATA local INPATH '/root/hivedata/5.txt' INTO TABLE t_user partition(country='USA');
select * from t_user where country=’CHN’; //可以根据该字段查询(该字段在表中显示,在文件中不存在)
② 双分区表:
create table day_hour_table (id int, name string) //创建表名为day_hour_table的数据库表
partitioned by (dt string, hour string) //对表进行多分区
row format delimited fields terminated by ','; //使用分隔符
//加载文件到表的具体分区中(多分区中每一个分区字段代表一级文件夹)
LOAD DATA local INPATH '/root/hivedata/5.txt' INTO TABLE day_hour_table partition(dt='20180101',hour='08');
//在多分区表中可以根据任意分区字段进行查询
select * from day_hour_table where dt='20180101'; 和 select * from day_hour_table where hour = "08";
(3)
DDL
体验之分桶表&
外部表:
① 分桶表:
设置分桶参数:
set hive.enforce.bucketing = true; //设置可以进行分桶
set mapreduce.job.reduces=4; //设置分桶的最大个数(相当于Reduce中的job.setNumReduceTasks(10);)
//Hive 采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中
create table stu_buck(Sno int,Sname string,Sex string,Sage int,Sdept string)
clustered by(Sno) into 4 buckets //进行分桶设置(将数据分到4个桶中)
row format delimited fields terminated by ',';
分桶表导入数据
insert overwrite table stu_buck select * from student cluster by(Sno);
② 外部表:
create external table student_ext(Sno int,Sname string,Sex string,Sage int,Sdept string)
row format delimited fields terminated by ',' location '/stu';
创建外部表需添加external关键字,和location表示表的地址
在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。
(4)
DDL
的显示命令:
① 显示表分区信息,不是分区表执行报错 show partitions table_name;
② 显示当前版本 hive 支持的所有方法 show functions;
③ 查看表信息 desc extended table_name;
④ 查看表信息(格式化美观) desc formatted table_name;
⑤ 查看数据库相关信息 describe database database_name;
(5)
DML
体验之基本操作:
① Load加载命令:
在Hive中执行的命令,将HDFS或Linux中的文件加载到对应的表目录下
LOAD DATA local INPATH '/root/hivedata/aaa.txt' [OVERWRITE] INTO TABLE source_table;
local:如果加了local,load命令会在Linux本地加载路径,并将该文件复制到对应的表目录下,如果没有
加local,load命令会在HDFS中加载路径,并将该文件移动到对应的表目录下;
filepath:可以是相对路径,也可以是绝对路径,如果是HDFS可以写完整URI,建议写成绝对路径即可;
OVERWRITE:如果使用了 OVERWRITE 关键字,则目标表(或者分区)中的内容会被全部删除,如果没有
使用该关键字,那有文件名冲突的2个文件会全部保存,但新文件会重命名;不建议使用
② 多重插入:
Hive 中 insert 主要是结合 select 查询语句使用,将查询结果插入到表中;需要保证查询结果列的数目和需要插入数据表格的列数目一致.如果查询出来的数据类型和插入表格对应的列数据类型不一致,将会进行转换,但是不能保证转换一定成功,转换失败的数据将会为 NULL。
多重插入即为同时对多张表进行插入:
from source_table
insert overwrite table test_insert1 select id //将source_table的id字段插入test_insert1表中
insert overwrite table test_insert2 select name; //将source_table的name字段插入test_insert2中
③ 动态分区插入:
开启动态分区,并更改动态分区的模式:
需求:将dynamic_partition_table中的数据按照时间(day),插入到目标表d_p_t的相应分区中。
原始表:create table dynamic_partition_table(day string,ip string)row format delimited fields terminated by ",";
原始表数据类型:2015-05-10,ip1
目标表:create table d_p_t(ip string) partitioned by (month string,day string); //指定month和day为分区
set mapred.reduce.tasks=3;
#进行查询的语法
select * from student distribute by(Sno) sort by(Sage desc);
order by 会对输入做全局排序,因此只有一个 reducer,会导致当输入规模较大时,需要较长的计算时间。
sort by 不是全局排序,其在数据进入 reducer 前完成排序。因此,如果用 sort by 进行排序,并且设置 mapred.reduce.tasks>1,则 sort by 只保证每个 reducer 的输出有序,不保证全局有序。
distribute by(字段)根据指定字段将数据分到不同的 reducer,分发算法是 hash 散列。
Cluster by(字段) 除了具有 Distribute by 的功能外,还会对该字段进行排序。
cluster(分且排序,必须一样)==distribute(分) + sort(排序)(可以不一样)
(7)
导出数据到文件系统(
将查询结果保存到指定的文件目录 )
:
导出数据到本地:insert overwrite local directory '/root/aaa666' select * from dynamic_partition_table;
导出数据到HDFS:insert overwrite directory '/aa666' select * from dynamic_partition_table;
(8)
Havi join
:
① 开启本地模式,在开发情况下使用(上线时千万不能使用),能极大的提高速度:
set hive.exec.mode.local.auto=true;
② 内连接查询(inner join ):
select * from a inner join b on a.id=b.id;
③ 左外连接:
select * from a left join b on a.id=b.id;
④ 右外连接:
select * from a right join b on a.id=b.id;
⑤ 全外连接:
select * from a full outer join b on a.id=b.id;
⑥ hive中的特别join:
select * from a left semi join b on a.id = b.id;
⑦ cross join(##慎用),返回两个表的笛卡尔积结果,不需要指定关联键:
select a.*,b.* from a cross join b;
3. Hive参数配置:
(1) Hive参数参考网站:
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
(2) 参数的设定方式和优先级:
配置文件hive-site.xml (全局有效)
用户自定义或默认的配置文件,配置文件的设定对本机启动的所有 Hive 进程都有效。
命令行参数 (--hiveconf 对 hive 启动实例有效)
启动 Hive(客户端或 Server 方式)时,可以在命令行添加-hiveconf 来设定参数,例如:bin/hive -hiveconf hive.root.logger=INFO,console;设定对本次启动的 Session(对于 Server 方式启动,则是所有请求的 Sessions)有效。
参数声明 (set xxx=xxx 对 hive 的连接 session 有效)
可以在 HQL 中使用 SET 关键字设定参数,这一设定的作用域也是 session 级的。
上述三种设定方式的优先级依次递增。即参数声明覆盖命令行参数,命令行参数覆盖配置文件设定。
4. Hive函数
(1) Hive函数资料:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
(2) 使用自带函数:
add JAR /root/hivedata/**.jar;
#创建临时函数与开发好的 java class 关联
create temporary function 随便起一个函数名称as '刚刚编写的类的全限定名';
#可调用该函数进行测试
create table t_bi_reg(id string,name string)
row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe'
with serdeproperties(
'input.regex'='(.*)\\|\\|(.*)',
'output.format.string'='%1$s %2$s'
stored as textfile;
load data local inpath '/root/hivedata/shuang.txt' into table t_bi_reg;
select * from t_bi_reg;
九、 网站流量日志数据分析
1. 点击流数据模型
(1) 点击流概念:点击流(Click Stream)是指用户在网站上持续访问的轨迹。我们可以通过对网站日志的分析可以获得用户的点击流数据。如果把 Page 视为“点”的话,那么我们可以很容易的把 Session描绘成一条“线”,也就是用户的点击流数据轨迹曲线。
(2) 点击流模型( PageView和Visit ):
① PageView表述用户在一次会话中的访问页面情况,然后将这次会话中的各个访问按照时间顺序排序并标注访问步骤。
② Visit是用户每一个会话中的信息统计;包括:
2. 流量分析常见分类
指标是网站分析的基础,用来记录和衡量访问者在网站自的各种行为。
(1) 骨灰级指标:
IP:1 天之内,访问网站的不重复 IP 数。一天内相同 IP 地址多次访问网站只被计算 1 次。曾经 IP 指标可以用来表示用户访问身份,目前则更多的用来获取访问者的地理位置信息。
PageView浏览量: 即通常说的 PV 值,用户每打开 1 个网站页面,记录 1 个PV。用户多次打开同一页面 PV 累计多次。通俗解释就是页面被加载的总次数。
Unique PageView: 1 天之内,访问网站的不重复用户数(以浏览器 cookie 为依据),一天内同一访客多次访问网站只被计算 1 次。
(2) 基础级指标:
访问次数:访客从进入网站到离开网站的一系列活动记为一次访问,也称会话(session),1 次访问(会话)可能包含多个 PV。
网站停留时间:访问者在网站上花费的时间。
页面停留时间:访问者在某个特定页面或某组网页上所花费的时间。
3. 数据分析系统技术
数据处理流程:
a) 数据采集:定制开发采集程序,或使用开源框架 Flume
b) 数据预处理:定制开发 mapreduce 程序运行于 hadoop 集群
c) 数据仓库技术:基于 hadoop 之上的 Hive
d) 数据导出:基于 hadoop 的 sqoop 数据导入导出工具
e) 数据可视化:定制开发 web 程序(echarts)
f) 整个过程的流程调度:hadoop 生态圈中的 azkaban 工具
4. 模块开发之数据采集
(1) 需求:在网站web流量日志分析这种场景中,对数据采集部分要求不严格,故可以使用通用的flume日志采集框架来采集数据。
(2) 1
十、 Apache Sqoop
1. Apache Sqoop的概述:
Sqoop 是 是 p Hadoop 和关系数据库服务器之间传送数据的一种工具。它是用来从关系数据库如:MySQL,Oracle 到 Hadoop 的 HDFS,并从 Hadoop 的文件系统导出数据到关系数据库。由 Apache 软件基金会提供。
Sqoop:“SQL 到 Hadoop 和 Hadoop 到 SQL”。
2. Sqoop的安装:
3. Sqoop的导入和导出:
十一、 模块开发
1. 模块开发之工作流调度:
整个项目的数据按照处理过程,从数据采集到数据分析,再到结果数据的导出,一系列的任务可以分割成若干个 azkaban 的 job 单元,然后由工作流调度器调度执行。
调度脚本的编写难点在于 shell 脚本。但是一般都是有固定编写模式。大家可以参考资料中的脚本进行编写。大体框架如下:
#!/bin/bash
#set java env
#set hadoop env
#设置一些主类、目录等常量
#获取时间信息
#shell 主程序、结合流程控制(if....else)去分别执行 shell 命令。
更多工作流及 hql 脚本定义见参考资料。
2. Echarts( 模块开发之数据可视化图表工具 ):
(1) Echarts概述:
ECharts 是一款由百度前端技术部开发的,基于 Javascript 的数据可视化图表库,提供直观,生动,可交互,可个性化定制的数据可视化图表。
可以从ECharts官网中下载该工具,开发环境建议下载源代码版本,包含了常见的错误提示和警告。
(2) ECharts的使用:
引入ECharts: <script src="echarts.min.js"></script>
为 ECharts 准备一个具备大小(宽高)的 DOM : <div id="main" style="width: 600px;height:400px;"></div>
在div下编写一个script,获取该DOM,并初始化ECharts:
<script type="text/javascript">
// 基于准备好的dom,初始化echarts实例: var myChart = echarts.init(document.getElementById('main'));
// 指定图表的配置项和数据 var option = {可以从ECharts官网中获取模板}
// 使用刚指定的配置项和数据显示图表: myChart.setOption(option);
在使用ECharts中,前端一般需要数组,所以在后端一般将数组封装到对象中,对象的成员变量为数组,然后使用@ResponseBody将该对象转为json格式,传送到前端。
(3) 1
十二、 Hadoop总结
1. Hadoop之MapReduce
(1) MapReduce工作机制详解(重点掌握):
① 将一个文件进行切分(128M);如果切分为多个则会有多个MapTask处理不同的分片文件;
② 具体的MapTask对文件进行逐行读取并调用map方法进行处理;
③ 将处理的结果写到一个环形缓冲区,如果缓冲区达到80M则溢出并进行排序或者调用combiner进行合并处理(如果job设置过Combiner,那么就是现在使用。Combiner 会优化MapReduce的中间结果,所以它在整个模型中会多次使用。);
④ 将上述内存中的文件数据按照分区数进行写入到不同的分区文件中(调用了partitioner获取该kv应该写入到哪个分区文件中);
⑤ reduceTask阶段将mapTask处理的分区文件进行获取并排序合并到具体的某个reduceTask的reduce方法进行处理;
⑥ 再输出具体的reduce处理后结果到HDFS。
(2) MapReduce工作机制图解(重点掌握):
(3) MapReduce的Shuffle机制(重点掌握):
map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称shuffle。shuffle机制的6个阶段如下所示:
① Collect 阶段:将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,保存的是 key/value,Partition 分区信息等。
② Spill 阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,并进行排序。
③ Merge 阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask 最终只产生一个中间数据文件。
④ Copy 阶段: ReduceTask到已经完成MapTask的节点上复制一份属于自己的数据,并将这些数据保存到内存中。
⑤ Merge 阶段:在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
⑥ Sort 阶段:在对数据进行合并的同时,会进行排序操作。
(4) MapReduce的并行机制(熟悉):
① MapTask并行机制( 多个MapTask共同运行 ):
MapTask 的并行度指的是 map 阶段有多少个并行的 task 共同处理任务。
一个 MapReduce的map阶段并行度由客户端在提交job时决定,即客户端提交 job 之前会对待处理数据进行逻辑切片;一个逻辑切片对应一个MapTask。
在Hadoop2.x中,一个逻辑切片默认为128M,可以对大小进行参数设置,但不论怎么调参数,都不能让多个小文件“划入”一个逻辑切片中。
② ReduceTask并行机制:
reducetask 并行度同样影响整个 job 的执行并发度和执行效率;
Reducetask 数量的决定是可以直接手动设置:job.setNumReduceTasks(4);
如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜。
③ Task并行度经验之谈:
最好每个 task 的执行时间至少一分钟。( 每个task的调度时间有几秒钟,如果task很快跑完,那调度时间会浪费 )
一个 JVM 上最多可以顺序执行的 task 数目是 1,也就是说一个 task 启一个 JVM;
如果 input 的文件非常的大,比如 1TB,可以考虑将 hdfs 上的每个blocksize设大,比如设成256MB或者512MB;
(5) MapReduce的优化参数(了解):
① 资源相关参数:
mapreduce.map.memory.mb: 一个 Map Task 可使用的内存上限(单位:MB),默认为 1024。如果超过会被杀死。
mapreduce.reduce.memory.mb: 一个 Reduce Task 可使用的资源上限(单位:MB),默认为 1024。
mapreduce.map.cpu.vcores: 每个 Maptask 可用的最多 cpu core 数目, 默认值: 1
mapreduce.reduce.cpu.vcores: 每个 Reducetask 可用最多 cpu core 数目默认值: 1
② 还有容错相关参数和效率跟稳定性参数。
(6) MapReduce的其他功能(熟悉):
① 计数器功能:
计数器是用来记录 job 的执行进度和状态的。MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。
内置计数器包括:文件系统计数器(File System Counters);作业计数器(Job Counters)等。
自定义计数器(定义在MapTask的map函数中):
Counter counter =context.getCounter(“SelfCounters”,”myCounters”);
String[] words = value.toString().split(",");
for (String word : words) {
if("hello".equals(word)){counter.increment(1)};
context.write(new Text(word), new LongWritable(1));
ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration()); controlledJob1.setJob(job1);
ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration()); controlledJob2.setJob(job2);
5. controlledJob2.addDependingJob(controlledJob1); // job2 依赖于 job1
(1) Yarn通俗介绍(熟悉):
是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统和调度平台,可以把 yarn 理解为相当于一个分布式的操作系统平台,而 mapreduce 等运算程序则相当于运行于操作系统之上的应用程序,Yarn 为这些程序提供运算所需的资源(内存、cpu)。
yarn 并不清楚用户提交的程序的运行机制,yarn 只提供运算资源的调度。
yarn 中的主管角色叫 ResourceManager,yarn 中具体提供运算资源的角色叫 NodeManager。
yarn与运行的用户程序完全解耦,意味着yarn上可以运行各种类型的分布式运算程序,比如mapreduce。
(2) Yarn的三大组件(熟悉):
YARN是一个资源管理、任务调度的框架,主要包含三大模块:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。
a) ResourceManager:负责整个集群的资源管理和分配,是一个全局的资源管理系统。
b) NodeManager:是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控。并向RM汇报本节点的资源使用情况。
c) ApplicationMaster:每个应用程序均包含一个AM,负责每一个具体应用程序的调度和协调。
(3) Yarn的运行流程(熟悉):
(4) Yarn的调度器Scheduler(了解):
在Yarn中,负责给应用分配资源的就是Scheduler;有三种调度器可以选择:FIFO Scheduler,Capacity Scheduler,FairScheduler。
a) FIFO Scheduler:先进先出,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配。
b) Capacity Scheduler:此调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力。
c) Fair Scheduler:此调度器中,我们不需要预先占用一定的系统资源,Fair 调度器会为所有运行的job动态的调整系统资源。
元数据包含:文件、目录和datanode信息、文件块信息。
元数据在namenode运行的时候由其维护并完整地在内存中存储;为了避免丢失在hadoop的数据存放目录下存有:edits(修改日志)和fsimage镜像文件(比edits要旧一些;文件、目录和操作)。
b) 元数据目录:
在namenode所在的机器的存放数据相应目录的current目录下包含:VERSION、seen_txid,fsimage,edits_*
c) secondary namenode:
NameNode 职责是管理元数据信息,DataNode 的职责是负责数据具体存储,SecondaryNameNode的职责是合并 NameNode 的 editlogs 到 fsimage 文件中。
d) Checkpoint:
每达到触发条件,会由 secondary namenode 将 namenode 上积累的所有 edits 和一个最新的 fsimage 下载到本地,并加载到内存进行 merge(这个过程称为 checkpoint)。
如果NameNode中的fsimage真的出问题了,还是可以用SecondaryNamenode中的fsimage替换一下NameNode上的fsimage,虽然已经不是最新的fsimage,但是我们可以将损失减小到最少。
(2) HDFS的安全模式:
a) 安全模式概述:
安全模式是 HDFS 所处的一种特殊状态,在这种状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求,是一种保护机制,用于保证集群中的数据块的安全性。
如果 HDFS 处于安全模式下,不允许HDFS客户端进行任何修改文件的操作,包括上传文件等。
b) 安全模式命令:
手动进入安全模式: hdfs dfsadmin -safemode enter
手动退出安全模式: hdfs dfsadmin -safemode leave
获取集群是否处于安全模式:hdfs dfsadmin -safemode get(也可在 web 页面查看安全模式状态)
4. Hadoop之High Availability
(1) High Availability概述:
HA(High Available), 高可用,是保证业务连续性的有效解决方案,一般有两个或两个以上的节点,分为 活动节点(Active)及备用节点(Standby)。
你现在所遭遇的每一个不幸,都来自一个不肯努力的曾经