相关文章推荐
开朗的楼梯  ·  python - ARIMA ...·  12 月前    · 
暴躁的人字拖  ·  C# ...·  1 年前    · 
开心的山羊  ·  云服务器再测试 - 知乎·  1 年前    · 

今天要帶大家做另外一個簡單的場境應用,我們繼續沿用昨天所處理的 parquet File 來做今天的小實作,大致上今天要實作的內容如下:

讀取 local 端的 parquet file,並且依據 Stage 欄位的值,只選擇值 Armor、Champion 和 Mega 的資料送到 AWS SQS。

這邊的事前準備就是記得在 AWS 建立一個要來用的 SQS,並且複製 SQS 的 URL ,實際的畫面如下:

How to Build?

這次的範例的 data pipeline 大致會長的如下圖:

GetFile

這個 Processor 是可以讓我們讀取 Local 端的某一個 Folder 下的檔案,來看一下如何設定:

這邊以我的範例來說,我是將前一天實作完的檔案暫存到 /tmp/data 這個路徑下,所以只要在 Input Directory 這個 Property 設定好,他就會將底下的檔案讀上來做使用。

SplitRecord

因為前一個 Processor 只是將檔案讀取變成一個 FlowFiles 而已,尚未將裡面的資料取出來,所以我們可以透過該 Processor 做到這件事情:

原先的檔案為 Parquet 格式,所以我們以 ParquetReader 的方式來做讀取,並且以 一筆 Record 為單位,接著再以 JsonRecordSetWriter 來轉換成 Json 格式來給下游 Processor 做處理,因為後續我們需要透過 Json 格式來做欄位的判斷。

因此我們可以看到經過這個 Processor 的 Content,都會轉換成 Json 格式,且一筆為單位,內容如下:

EvaluateJsonPath

這個 Processor 是可以讓我們去解析將原先 Content 的某一個欄位轉換成 Attribute,所以來看一下設定:
Destination : 代表要轉換的目的地,這邊我們先選擇城 flowfile-attribute Stage : 是我新增的一個 Property,後面的 $.Stage 代表他會去解析進來的 FlowFiles 中的 Stage 這個欄位,並且帶到名為 Stage 的 Attribute。

所以經過該 Processor 的 Flowfiles,我們會發現都會多帶一個名為 Stage 的 Attribute:
有了 Stage 這個 Attribute 之後,原則上該 Attribute 的值會跟 Content 內的 Stage 這個欄位值相同,接著就可以做過濾的動作,所以就會用到 RouteOnAttribute 這個 Processor。

RouteOnAttribute

該 Processor 是讓我們可依據 FlowFiles 的狀況動代增加下游的 Connection 的 Route,我們先看一下原先的 Processor 設定只會有 unmatched 的 Route:

但是我們可以在 Property 增加更多的條件,如下設定:
這邊我們加入了 7 個 Property ,分別對應的是:

0, Baby
1, In-Training
2, Rookie
3, Champion
4, Ultimate
5, Mega
6, Ultra
7, Armor

一但設定完成之後,我們會發現 Route 會多出這些剛剛設定的 Property Name:

接著我們在連接下游 Processor 的時候,就可以選定符合哪些條件的 Route 可以連接到下游的 Processor。
以接下來的 PutSQS 的範例為例,我們只需要Armor、Champion 和 Mega 的資料送到 AWS SQS 即可其他的都不要,所以既勾選對應的 Route 即可。

接著就可以看到 RouteOnAttribute 和 PutSQS 之間就只會有這三個的 Connection:

然後其他不會用到的我們先傳送到 Wait Processor

PutSQS

一切準備就緒之後,接下來就可以設定 PutSQS 這個 Processor,還記得一開始要你們先事前建立好 SQS,這邊就會用到了:
Queue URL: 這裡就填上你剛剛建立好 SQS URL AWS Credentials Provider service: 對接好 AWS Controller Service Region: 設定好 AWS Region

上述的設定完成,就可以將符合條件的資料送到 AWS SQS 了。

Wait 這個 Processor 其實就是一個暫停的 Processor。通常會是什麼時候會用他呢?

  • 通常用於開發的階段,因為 Date Pipeline 是由多個 Processor 所建構而成的,所以會需要一個一個 Processor 做設定與測試,所以會在需要做測試的 Processor 的下游接一個 Wait Processor。
  • 另一個用法就是會用在不需要的資料,以這次的範例來說,我們就可以把不符合條件的 FlowFiles 先送到 Wait,主要是用來確定資料確實有依照我們的限制來做判斷,來進一步地決定下游的流向。
  • 上述就是我帶給各位的第二個範例,這些看似簡單的 Processor,其實都是很常用的,所以希望透過這兩天的小實作那大家可以對於 NiFi 在做 Data Pipeline 的設定與流程可以有更多的體悟與理解。

    那明天會介紹一個國外企業是如何使用 Apache NiFi 的小案例分享,以及他的架構是如何做的,對於未來要導入該 Tool 的企業或許有一定的幫助。

    Reference

    Apache NiFi Document