我目前正在使用Nifi来消耗数据,读取Tealium事件流并加载到HDFS。需要帮助过滤数据,当源错过了发送数据的属性。
{"account": "newtv", "twitter:description": "发现你最喜欢的NewTV节目和主持人的播放时间。", "og:locale": "en_US", "dcterms:publisher": "NewTV", "original-source": "www.newtv.com/", "og:url": "www.newtv.com/show/program-guide"}}, "post_time": "2019-10-09 11:27:46", "useragent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36", "event_id": "12345"}.
上面的信息样本。我目前被困在过滤数据上,当源错过了从以下样本数据集发送事件_id属性的数据。
目前的Nifi流程。 Consume Kafka -> Evaluate Json Path -> Jolttransform Json -> Evaluate Json Path-> RouteOnAttribute -> Merge Content -> Evaluate Json Path -> Update attribute -> PutHDFS ->MoveHDFS
需要帮助,如何使用RouteOnAttribute分割数据,以区分缺失的事件_id属性或属性_值到两个不同的流。有属性或属性值的流和缺失的值要出错并加载到不同的输出路径。
在
处理器中添加新的属性,从flowfile中提取
值。
EvaluateJsonPath
event_id
如果flowfile没有
,那么nifi会给属性添加空值。
event_id
EvaluateJsonPath Configs:
然后通过使用
处理器,我们可以检查属性值并相应地路由flowfile。
RouteOnAttribute
RouteOnAttribute Configs: