flink sql kafka 解析复杂json

目的:

从复杂json中提取关心的字段数据,利用ROW的方式, 可以让复杂的json转变为可操作的schema,然后可以通过 field as xx.xx.xx 来使用
version flink 1.13.0

原始json

"sln":"itn", "pl":"js", "sdk":"zg-js", "sdkv":"2.0", "owner":"zg", "ut":"2021-7-15 16:07:01", "tz":28800000, "debug":0, "ak":"xxxx", "usr":{ "did":"171ebd0070d8a-02a71a3974197b-11096a4c-46500-171ebd0070e57", "$zg_did":12590353 "data":[ "dt":"evt", "pr":{ "$ct":1626336421471, "$tz":28800000, "$sid":1626336206600, "$url":"https://a.xxx.com.cn/2656/m46991/#zoneclick=102782", "$ref":"https://a.xxx.com.cn/2656/", "$referrer_domain":"", "$eid":"viehcle_detail_page_view", "_platform_type":"M", "_project_name":"XCAR", "_login_status":0, "$zg_did":12590353, "$zg_sid":1626336206600, "$uuid":"828c1425345a42379ff4de1e6d0f6567", "$zg_zgid":12612355, "$zg_eid":115, "$zg_epid#_level":1498, "$zg_eptp#_level":"string", "$zg_epid#_page_name":2255, "$zg_eptp#_page_name":"string", "$zg_epid#_model_id":2613, "$zg_eptp#_model_id":"string", "$zg_epid#_sub_level":1502, "$zg_eptp#_sub_level":"string", "$zg_epid#_series_id":400, "$zg_eptp#_series_id":"string", "$zg_epid#_first_category":398, "$zg_eptp#_first_category":"string", "$zg_epid#_series_or_model":1503, "$zg_eptp#_series_or_model":"string", "$zg_epid#_brand":1501, "$zg_eptp#_brand":"string", "$zg_epid#_model":2614, "$zg_eptp#_model":"string", "$zg_epid#_platform_type":1490, "$zg_eptp#_platform_type":"string", "$zg_epid#_sub_brand":1497, "$zg_eptp#_sub_brand":"string", "$zg_epid#_login_status":1488, "$zg_eptp#_login_status":"number", "$zg_epid#_sub_brand_id":2611, "$zg_eptp#_sub_brand_id":"string", "$zg_epid#_energy":2612, "$zg_eptp#_energy":"string", "$zg_epid#_year":1499, "$zg_eptp#_year":"string", "$zg_epid#_structure":1500, "$zg_eptp#_structure":"string", "$zg_epid#_price_range":1506, "$zg_eptp#_price_range":"string", "$zg_epid#_brand_id":1507, "$zg_eptp#_brand_id":"string", "$zg_epid#_status":2677, "$zg_eptp#_status":"string", "$zg_epid#_project_name":1489, "$zg_eptp#_project_name":"string", "$zg_epid#_series":1505, "$zg_eptp#_series":"string" "dt":"zgid", "pr":{ "$ct":1626336421471, "$zg_did":12590353, "$tz":28800000, "$zg_zgid":12612355 "dt":"pl", "pr":{ "$dv":"x'x.io", "$zg_did":12590353 "ip":"39.144.27.100", "st":1626336451683, "ua":"Mozilla/5.0 (Linux; Android 10; MED-AL00; HMSCore 6.0.0.305) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.93 HuaweiBrowser/11.1.2.301 Mobile Safari/537.36", "plat":3, "app_id":6, "@version":"1", "@timestamp":"2021-07-15T08:07:12.661Z"
CREATE TABLE kafka_test (
    usr ROW<did string,`$zg_did` string>,
    data ARRAY<ROW<dt STRING,pr ROW<`$zg_zgid` STRING,`$cuid` STRING,`_jpush_id` STRING>>>,
    deviceId as usr.`$zg_did`,
    zg_id as data[1].pr.`$zg_zgid`,
    uid as data[1].pr.`$cuid`,
    jpush_id as data[1].pr.`_jpush_id`
WITH (
    'connector' = 'kafka',
    'topic' = 'zg_log',
    'properties.bootstrap.servers' = 'xxx:9092',
    'properties.group.id' = 'test_group_zg_log',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'false' 
CREATE TABLE print_table (
    deviceId STRING,
    zg_id STRING,
    uid STRING,
    jpush_id STRING
) WITH (
    'connector' = 'print'
insert into print_table select deviceId,zg_id,uid,jpush_id from kafka_test;