相关文章推荐
灰常酷的伏特加  ·  el-table el-column ...·  5 月前    · 
气势凌人的打火机  ·  c# - Unity ...·  1 年前    · 
酷酷的冲锋衣  ·  spark(strcuted ...·  1 年前    · 
"_index" : "freyr-20210315" , "_type" : "serviceInvoke" , "_id" : "AXg98PySCQeiakUv0e4k" , "_score" : 36.110207 , "_source" : { "msg" : "成功" , "invokeType" : 2 , "cost" : 309 , "invokeSource" : 1 , "tokenId" : "3d854363ae749d1a4e8d00451b1a8f86" , "docId" : "vyHjSULoZiKkcpgNMuBfxBPgKclcecPRCKPDlKVB" , "serviceDisplayName" : "xxxx信息平台-个人" , "requestParam" : { "headers" : { "bodyParam" : { "outBody" : "" , "urlParam" : { "documentNo" : "131081198308**1074" , "appid" : "722433dac0b421987162276f0c1e0367" , "name" : "张*平" "serviceName" : "lawEnforcement_person" , "sequenceId" : "1615946906356A11299" , "businessServiceName" : "个贷系统" , "requestTime" : 1615946906363 , "businessCode" : "channel3" , "processData" : { "data" : "{\"code\":\"200\",\"data\":{\"pagination\":{\"total\":8,\"pageCount\":1,\"previousPage\":1,\"nextPage\":1,\"currentCount\":8,\"currentPage\":1},\"items\":{\"sfss_xzxf\":[{\"punishmentOrgan\":\"西西市人民法院\",\"dataKeyId\":\"6fba119210fb44ec3b96dd05898f5669\",\"sex\":\"男\",\"implementation\":\"\",\"dataType\":\"64\",\"cause\":\"本院于2019年04月09日立案执行申请人王同生申请执行你借款合同纠纷一案,因你未按执行通知书指定的期间履行生效法律文书确定的给付义务\",\"remark\":\"\",\"idNumber\":\"131081198308**1074\",\"filingTime\":\"2019-04-09\",\"caseNumber\":\"(2019)冀1081执827号\",\"name\":\"张*平\",\"zqr\":\"\",\"uscCode\":\"\",\"bzxr\":\"\"},{\"punishmentOrgan\":\"西西市人民法院\",\"dataKeyId\":\"4f5ac433e126fe058b955a6b59874102\",\"sex\":\"男\",\"implementation\":\"\",\"dataType\":\"64\",\"cause\":\"本院于2019年04月09日立案执行申请人王同生申请执行你借款合同纠纷一案,因你未按执行通知书指定的期间履行生效法律文书确定的给付义务\",\"remark\":\"\",\"idNumber\":\"131081198308**1074\",\"filingTime\":\"2019-04-09\",\"caseNumber\":\"(2019)冀1081执826号\",\"name\":\"张*平\",\"zqr\":\"\",\"uscCode\":\"\",\"bzxr\":\"\"}],\"hjqk_hjqk\":[],\"swcf_zdss\":[],\"gxyd_xzcf\":[],\"zcxx_ldzc\":[]}},\"success\":\"true\",\"message\":\"交易成功\"}" "recordTime" : 1615946906678 , "organizationCode" : "tsbankyyglb" , "success" : true , "contractId" : "8da17650f014443cbccd6cb1c53ff50e" , "createdTime" : "2021-03-17 10:08:26" , "recordDate" : "20210317" , "status" : 1

processData.data

"code" : "200" , "data" : { "pagination" : { "total" : 8 , "pageCount" : 1 , "previousPage" : 1 , "nextPage" : 1 , "currentCount" : 8 , "currentPage" : 1 "items" : { "sfss_xzxf" : [ "punishmentOrgan" : "西西市人民法院" , "dataKeyId" : "6fba119210fb44ec3b96dd05898f5669" , "sex" : "男" , "implementation" : "" , "dataType" : "64" , "cause" : "本院于2019年04月09日立案执行申请人王同生申请执行你借款合同纠纷一案,因你未按执行通知书指定的期间履行生效法律文书确定的给付义务" , "remark" : "" , "idNumber" : "131081198308**1074" , "filingTime" : "2019-04-09" , "caseNumber" : "(2019)冀1081执827号" , "name" : "张*平" , "zqr" : "" , "uscCode" : "" , "bzxr" : "" "punishmentOrgan" : "西西市人民法院" , "dataKeyId" : "4f5ac433e126fe058b955a6b59874102" , "sex" : "男" , "implementation" : "" , "dataType" : "64" , "cause" : "本院于2019年04月09日立案执行申请人王同生申请执行你借款合同纠纷一案,因你未按执行通知书指定的期间履行生效法律文书确定的给付义务" , "remark" : "" , "idNumber" : "131081198308**1074" , "filingTime" : "2019-04-09" , "caseNumber" : "(2019)冀1081执826号" , "name" : "张*平" , "zqr" : "" , "uscCode" : "" , "bzxr" : "" "hjqk_hjqk" : [ "swcf_zdss" : [ "gxyd_xzcf" : [ "zcxx_ldzc" : [ "success" : "true" , "message" : "交易成功"

pyspark读取es解析json (包含循环数组):

#/bin/python3 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from datetime import date
from datetime import datetime,timedelta
import json,os
def main(sparkSession):
    #读取ES数据并创建临时表
    df = sparkSession.read \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes",  '197.4.218.113') \
        .option('es.port',  str(port)) \
        .option("es.resource",  'freyr-20210315/serviceInvoke') \
        .option('es.query', '{"query":{"bool":{"must":[{"match":{"serviceName":"lawEnforcement_person"}},			{"match":{"invokeType":"2"}}]}}}') \
        .option('es.nodes.wan.only', 'true') \
        .option("es.nodes.discovery", "false") \
        .option("es.index.auto.create", "true") \
        .option("es.read.field.exclude","requestParam.headers") \
        .option("es.write.ignore_exception", "true") \
        .option("es.read.ignore_exception", "true") \
        .load()
    df.createTempView('tdl_qm_stration_law_per_dt')
    df.show()
    #读取临时表中的数据
    result=sparkSession.sql("""
        SELECT
        responseData.data,
        sequenceId,
        serviceName,
        errorCode,
        createdTime,
        recordDate,
        organizationCode,
        businessCode,
        requestParam.urlParam.name,
        requestParam.urlParam.documentNo
            tdl_qm_stration_law_per_dt
        WHERE
            serviceName = 'lawEnforcement_person'
        AND errorCode = '10000'
        AND invokeType = '2'    AND sequenceId ='1615946906356A11299'
    """ +addContion)
    #将数据写入创建好的HDFS目录
    #result.write.mode("append").format('json').save("/user/datacompute/users/xin.zhou/elasticData/"+tdate)
    #解析数据集中的json数据
    res=result.select("serviceName","createdTime","recordDate","organizationCode","businessCode","errorCode"
    ,"name","documentNo","sequenceId"
    ,F.get_json_object("data","$.code").alias("code")
    ,F.get_json_object("data","$.message").alias("message")
    ,F.get_json_object("data","$.data.items.sfss_xzxf").alias("sfss_xzxf")
    ,F.get_json_object("data","$.data.items.hjqk_hjqk").alias("hjqk_hjqk"))
    res.show()
    res.createTempView('tdl_qm_stration_law_per_dt1')
    #限制消费循环解析 
    sfss_xzxf = sparkSession.sql("""select sequenceId,sfss_xzxf from tdl_qm_stration_law_per_dt1 where 			sfss_xzxf is not null""") 
    schema="""array<struct<
                punishCause     :string,
                punishmentOrgan :string,
                penaltyTime     :string,
                caseNumber      :string,
                dataKeyId       :string,
                legalPerson     :string,
                dataType        :string,
                name            :string,
                remark          :string,
                uscCode         :string,
                idNumber        :string
    scjg_xzxf_rds=sfss_xzxf.withColumn("sfss_xzxf",F.from_json(F.col("sfss_xzxf"),schema)).withColumn("expl",F.	explode("sfss_xzxf")).select(sfss_xzxf.sequenceId,"expl.*")
    scjg_xzxf_rds.show()
    #   scjg_xzcf_rds.createTempView("tdl_scjg_xzcf_dt")
    # 	sparkSession.sql("""issert into  bigdata.per_ms_pther_penalties_dt partition (ds="""+tdate+""")
    # 	select
    # 		sequenceId		as sequence_id,
    # 		name			as customer_name,
    # 		uscCode			as usc_code,
    # 		legalPerson		as legal_person,
    # 		idNumber		as id_number,
    # 		caseNumber		as case_number,
    # 		punishmentOrgan	as punishment_organ,
    # 		penaltyTime		as penalty_time,
    # 		'',
    # 		punishCause		as punish_cause,
    # 		'',
    # 		'',
    # 		remark			as remark,
    # 		dataType		as data_type,
    # 		dataKeyId		as data_key_id
    # 	 from tdl_scjg_xzcf_dt""")
[2021-05-16 18:44:02.236] [INFO] - spark status:Job5/stage5(1/1); Job6/stage6(1/1); Job7/stage7(2/2); Job8/stage8(0/1)
[2021-05-16 18:44:02.941] [INFO] - +--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
|         serviceName|        createdTime|recordDate|organizationCode|businessCode|errorCode|  name|        documentNo|         sequenceId|code| message|              sfss_xzxf|blgy_blgy|qtcf|sfss_gscg|zyfdr|
+--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
|lawEnforcement_pe...|2021-03-17 10:08:26|  20210317|     tsbankyyglb|    channel3|    10000|*|131081198308**1074|1615946906356A11299| 200|交易成功|[{"name":"张*平","b...|       []|  []|     null|   []|
+--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
[2021-05-16 18:44:02.942] [INFO] - create temp table: tdl_qm_stration_law_per_dt1
[2021-05-16 18:44:02.942] [INFO] - check authority success
[2021-05-16 18:44:02.942] [INFO] - executing sql: select sequenceId,sfss_xzxf from tdl_qm_stration_law_per_dt1 where sfss_xzxf is not null
[2021-05-16 18:44:03.647] [INFO] - spark status:Job5/stage5(1/1); Job6/stage6(1/1); Job7/stage7(2/2); Job8/stage8(1/1); Job9/stage9(2/2); Job10/stage10(1/1); Job11/stage11(0/2)
[2021-05-16 18:44:03.647] [INFO] - +-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+
|         sequenceId|punishCause|punishmentOrgan|penaltyTime|           caseNumber|           dataKeyId|legalPerson|dataType|  name|remark|uscCode|          idNumber|
+-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+
|1615946906356A11299|       null| 西西市人民法院|       null|(2019)冀1081执827号|6fba119210fb44ec3...|       null|      64|*|      |       |131081198308**1074|
|1615946906356A11299|       null| 西西市人民法院|       null|(2019)冀1081执826号|4f5ac433e126fe058...|       null|      64|*|      |       |131081198308**1074|
+-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as fun
                                    parquet数据:列式存储结构,由Twitter和Cloudera合作开发,相比于行式存储,其特点是:
可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量;压缩编码可以降低磁盘存储空间,使用更高效的压缩编码节约存储空间;只读取需要的列,支持向量运算,能够获取更好的扫描性能。
那么我们怎么在pyspark读取和使用parquet数据呢?我以local模式,linux下的pycharm执行作说明。
首先,导入库文件和配置环境:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.session 
                                    使用spark读取es的数据生成rdd
# spark读取es数据得到rdd
def read_rdd_from_es(ss, es_nodes, es_port, index, type, query_dic):
    query = {"query": {"match_all": {}}}
    if isinstance(query_dic, dict):
        query = json.dumps(query_dic)
    else:
        query = json.du
def readEs():
    conf = SparkConf().setAppName("es").setMaster("local[2]")
    sc = SparkContext(conf=conf)
    sqlContext = SQLContext(sc)
    df = sqlContext.read.format("org.elasticsearch.spark.sql") \
        .option("es.nodes.wan.only"
                                    我对Pyspark很新 . 我尝试使用以下代码解析JSON文件from pyspark.sql import SQLContextsqlContext = SQLContext(sc)df = sqlContext.read.json("file:///home/malwarehunter/Downloads/122116-path.json")df.printSchema()输出如下 .root...
20/12/03 10:56:04 WARN Resource: Detected type name in resource [media_index/media]. Type names are deprecated and will be removed in a later release.
20/12/03 10:56:05 WARN Resource: Detected type name in resource [media_index/me.
                                    #!/usr/bin/pythonimport threadingimport jsonimport timefrom elasticsearch import Elasticsearchfrom elasticsearch import helpersimport osimport sysimport argparsehost_list = [{"host":"1.58.55.11","port...
12341234123412342|asefr-3423|[{"name":"spark","score":"65"},{"name":"airlow","score":"70"},{"name":"flume","score":"55"},{"name":"python","score":"33"},{"name":"s...