本文介绍了NIFI在实时数仓中的缓存应用,包括PutDistributedMapCache和FetchDistributedMapCache组件的使用,以及如何通过DistributedMapCacheServer和DistributedMapCacheClientService实现缓存操作。虽然NIFI没有直接的删除缓存组件,但可以通过ExecuteGroovyScript自定义删除逻辑。
摘要由CSDN通过智能技术生成
要理解本文内容,需对NIFI有一定的了解,如果您是新手,想学习NIFI,或者想在数仓架构中引入NIFI,欢迎加我微信bigdata_work,我这有一整套使用NIFI的实时数仓落地方案。
在建设实时数仓的过程中,利用缓存机制来提升实时指标的实时性,是一种常用的方法。NIFI提供了专门的缓存组件来应对这一需求。
了解缓存组件
NIFI中的缓存组件是成对出现的,即有负责将数据放入缓存的组件,有负责将数据从缓存取出的组件。这两个组件如下:
首先将PutDistributedMapCache组件添加到画布中,看一下有什么属性:
这里要理解的一个属性是Distributed Cache Service(缓存服务)。缓存服务是要单独创建的一个Controller Service,NIFI支持多种第三方缓存服务:
支持的缓存服务大概有8种, 本示例中使用DistributedMapCacheClientService.
从DistributedMapCacheClientService名字中可以看出,这是一个Client,那么需要有一个Server。也就是说,上图8种缓存服务Client,都要连接上对应的Server地址,比如Redis缓存Client ,需要有Redis Server。
创建DistributedMapCacheServer
回到ETL开发界面,继续开发ETL任务。
将数据放入缓存
PutDistributedMapCache 将数据放入缓存服务DistributedMapCacheClientService的机制是:将Cache Entry Identifier属性值作为map的key,将当前FlowFile内容作为map的value,放入到缓存中。
将数据从缓存取出
FetchDistributedMapCache 将数据从缓存服务DistributedMapCacheClientService取出的机制是:将Cache Entry Identifier属性值作为map的key,从缓存服务中将对应的value值查询出来,作为FlowFile的内容。
将数据从缓存删除
NIFI并没有提供相应的删除缓存组件,可以使用ExecuteGroovyScript组件,自定义代码,从缓存删除数据。
用视频演示上述操作(需要本系列全套视频的朋友加微信
bigdata_work
)。
之前通过
nifi
每隔几分钟将数据库
中
的数据 写入到 hdfs上,不过后面数据
中
小文件过多,因为需要删除历史的数据,所幸写入hdfs的时候都是 table_name/分区键 写入的。因此只要删除历史的分区文件夹就可以完成数据的删除操作了。
/user/hive/warehouse/zhong/ge_sys_person/pt=20190203/....
/user/hive/war...
RedisConnectionPoolService
该控制器服务用来链接到Redis,并控制链接参数
Redis
Distributed
Map
Cache
ClientService
Put
Distributed
Map
Cache
Fetch
Distributed
Map
Cache
通用步骤指的是读写Redis都需要执行的操作,其实就
启动Mysql服务(5.7版本),在Mysql
中
运行\资料\mysql\
nifi
_test.sql
中
的SQL语句。
启动Hadoop集群(与
NiFi
集群在同一个可访问的局域网网段)
1.1处理器流程
Distributed
Cache
是Hadoop为
Map
Reduce框架提供的一种分布式
缓存
机制,它会将需要
缓存
的文件分发到各个执行任务的子节点机器
中
,各个节点可以自行读取本地文件系统上的数据进行处理。
2、符号链接
可以在原本HDFS文件路径上+“ #somename”来设置符号连接(相当于一个快捷方式)。
这样在
Map
Reduce程序
中
可以直接通过:
File file = ...
简单地说,
NiFi
是为了自动化系统之间的数据流。虽然数据流这种形式很容易理解,但我们在此使用它来表示系统之间的自动化和不同系统之间数据的流转。企业拥有多个系统,其
中
一些系统创建了数据,部分系统消耗了数据,那么问题就出现了。出现的问题和解决方案已经广泛讨论和阐述。
nifi
就是一个致力于数据对接的集成框架。
数据流面临的一些比较高级的挑战包...