from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from datetime import date
from datetime import datetime,timedelta
import json,os
def main(sparkSession):
df = sparkSession.read \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes", '197.4.218.113') \
.option('es.port', str(port)) \
.option("es.resource", 'freyr-20210315/serviceInvoke') \
.option('es.query', '{"query":{"bool":{"must":[{"match":{"serviceName":"lawEnforcement_person"}}, {"match":{"invokeType":"2"}}]}}}') \
.option('es.nodes.wan.only', 'true') \
.option("es.nodes.discovery", "false") \
.option("es.index.auto.create", "true") \
.option("es.read.field.exclude","requestParam.headers") \
.option("es.write.ignore_exception", "true") \
.option("es.read.ignore_exception", "true") \
.load()
df.createTempView('tdl_qm_stration_law_per_dt')
df.show()
result=sparkSession.sql("""
SELECT
responseData.data,
sequenceId,
serviceName,
errorCode,
createdTime,
recordDate,
organizationCode,
businessCode,
requestParam.urlParam.name,
requestParam.urlParam.documentNo
tdl_qm_stration_law_per_dt
WHERE
serviceName = 'lawEnforcement_person'
AND errorCode = '10000'
AND invokeType = '2' AND sequenceId ='1615946906356A11299'
""" +addContion)
res=result.select("serviceName","createdTime","recordDate","organizationCode","businessCode","errorCode"
,"name","documentNo","sequenceId"
,F.get_json_object("data","$.code").alias("code")
,F.get_json_object("data","$.message").alias("message")
,F.get_json_object("data","$.data.items.sfss_xzxf").alias("sfss_xzxf")
,F.get_json_object("data","$.data.items.hjqk_hjqk").alias("hjqk_hjqk"))
res.show()
res.createTempView('tdl_qm_stration_law_per_dt1')
sfss_xzxf = sparkSession.sql("""select sequenceId,sfss_xzxf from tdl_qm_stration_law_per_dt1 where sfss_xzxf is not null""")
schema="""array<struct<
punishCause :string,
punishmentOrgan :string,
penaltyTime :string,
caseNumber :string,
dataKeyId :string,
legalPerson :string,
dataType :string,
name :string,
remark :string,
uscCode :string,
idNumber :string
scjg_xzxf_rds=sfss_xzxf.withColumn("sfss_xzxf",F.from_json(F.col("sfss_xzxf"),schema)).withColumn("expl",F. explode("sfss_xzxf")).select(sfss_xzxf.sequenceId,"expl.*")
scjg_xzxf_rds.show()
[2021-05-16 18:44:02.236] [INFO] - spark status:Job5/stage5(1/1); Job6/stage6(1/1); Job7/stage7(2/2); Job8/stage8(0/1)
[2021-05-16 18:44:02.941] [INFO] - +--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
| serviceName| createdTime|recordDate|organizationCode|businessCode|errorCode| name| documentNo| sequenceId|code| message| sfss_xzxf|blgy_blgy|qtcf|sfss_gscg|zyfdr|
+--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
|lawEnforcement_pe...|2021-03-17 10:08:26| 20210317| tsbankyyglb| channel3| 10000|张*平|131081198308**1074|1615946906356A11299| 200|交易成功|[{"name":"张*平","b...| []| []| null| []|
+--------------------+-------------------+----------+----------------+------------+---------+------+------------------+-------------------+----+--------+-----------------------+---------+----+---------+-----+
[2021-05-16 18:44:02.942] [INFO] - create temp table: tdl_qm_stration_law_per_dt1
[2021-05-16 18:44:02.942] [INFO] - check authority success
[2021-05-16 18:44:02.942] [INFO] - executing sql: select sequenceId,sfss_xzxf from tdl_qm_stration_law_per_dt1 where sfss_xzxf is not null
[2021-05-16 18:44:03.647] [INFO] - spark status:Job5/stage5(1/1); Job6/stage6(1/1); Job7/stage7(2/2); Job8/stage8(1/1); Job9/stage9(2/2); Job10/stage10(1/1); Job11/stage11(0/2)
[2021-05-16 18:44:03.647] [INFO] - +-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+
| sequenceId|punishCause|punishmentOrgan|penaltyTime| caseNumber| dataKeyId|legalPerson|dataType| name|remark|uscCode| idNumber|
+-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+
|1615946906356A11299| null| 西西市人民法院| null|(2019)冀1081执827号|6fba119210fb44ec3...| null| 64|张*平| | |131081198308**1074|
|1615946906356A11299| null| 西西市人民法院| null|(2019)冀1081执826号|4f5ac433e126fe058...| null| 64|张*平| | |131081198308**1074|
+-------------------+-----------+---------------+-----------+---------------------+--------------------+-----------+--------+------+------+-------+------------------+
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as fun
parquet数据:列式存储结构,由Twitter和Cloudera合作开发,相比于行式存储,其特点是:
可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量;压缩编码可以降低磁盘存储空间,使用更高效的压缩编码节约存储空间;只读取需要的列,支持向量运算,能够获取更好的扫描性能。
那么我们怎么在pyspark中读取和使用parquet数据呢?我以local模式,linux下的pycharm执行作说明。
首先,导入库文件和配置环境:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.session
使用spark读取es的数据生成rdd
# spark读取es数据得到rdd
def read_rdd_from_es(ss, es_nodes, es_port, index, type, query_dic):
query = {"query": {"match_all": {}}}
if isinstance(query_dic, dict):
query = json.dumps(query_dic)
else:
query = json.du
def readEs():
conf = SparkConf().setAppName("es").setMaster("local[2]")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql") \
.option("es.nodes.wan.only"
我对Pyspark很新 . 我尝试使用以下代码解析JSON文件from pyspark.sql import SQLContextsqlContext = SQLContext(sc)df = sqlContext.read.json("file:///home/malwarehunter/Downloads/122116-path.json")df.printSchema()输出如下 .root...
20/12/03 10:56:04 WARN Resource: Detected type name in resource [media_index/media]. Type names are deprecated and will be removed in a later release.
20/12/03 10:56:05 WARN Resource: Detected type name in resource [media_index/me.
#!/usr/bin/pythonimport threadingimport jsonimport timefrom elasticsearch import Elasticsearchfrom elasticsearch import helpersimport osimport sysimport argparsehost_list = [{"host":"1.58.55.11","port...
12341234123412342|asefr-3423|[{"name":"spark","score":"65"},{"name":"airlow","score":"70"},{"name":"flume","score":"55"},{"name":"python","score":"33"},{"name":"s...