# -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark.sql.functions import collect_list from pyspark.sql.functions import concat, concat_ws, lit import os # 创建一个连接 spark = SparkSession. \ Builder(). \ appName('sql'). \ master('local'). \ getOrCreate() df = spark.read.format("csv"). \ option("header", "false"). \ option("delimiter"," "). \ load("file:///home/software/20220126/pre.txt") df1 = df.withColumnRenamed('_c0','id1').withColumnRenamed('_c1','id2_1') df2 = df.withColumnRenamed('_c0','id1').withColumnRenamed('_c1','id2_2') df = df1.drop('id2_1').distinct() # 首先通过表连接 t1.id1 = t2.id1 and t1.id2 != t2.id2 构造一个需要的多行数据 df3 = df1.join(df2,df1.id1 == df2.id1 ,'inner').select(df1.id1, df1.id2_1, df2.id2_2) df4 = df3.select("id1", "id2_1", "id2_2").where(" id2_1 != id2_2 ") # 其次剔除掉id1这个多余的列,id2_1可以有id2_2 这么多个间接好友,因为可能存在重复,进行去重操作 df5 = df4.drop('id1') df6 = df5.distinct() # 因为上述的结果集是id2_1的间接好友集,但是可能也会含有id2_1的直接好友,需要剔除 df7 = df6.select("id2_1", "id2_2").subtract(df1.select("id1", "id2_1")) # df8 = df7.groupby('id2_1').agg(collect_list(df7["id2_2"]).alias("id2_2_new")) df8 = df7.groupby('id2_1').agg( concat_ws( " | ", collect_list( concat(lit("("), concat_ws(", ", 'id2_2'), lit(")")) )).alias("id2_2_new")) df9 = df8.join(df, df.id1 == df8.id2_1,'inner').select(df8.id2_1, df8.id2_2_new) df9.show() #保留第一行,以逗号作为分隔符,#overwrite 清空后再写入 file1=r"file:///home/software/20220126/output" df10 = df9.coalesce(numPartitions= 1) df10.write.csv(file1) # 关闭spark会话 spark.stop()

运行截图:

option("header", "false").
option("delimiter"," ").
load("file:///home/software/20220126/pre.txt")

2.2.2 collect_list使用注意

df8 = df7.groupby('id2_1').agg(collect_list(df7["id2_2"]).alias("id2_2_new"))

df8 = df7.groupby('id2_1').agg(
concat_ws(
" | ",
collect_list(
concat(lit("("), concat_ws(", ", 'id2_2'), lit(")"))
)).alias("id2_2_new"))

注释的df8输出会报错:
pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<string> data type.;'

2.2.3 getPythonAuthSocketTimeout does not exist in the JVM

运行代码报错:
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getPythonAuthSocketTimeout does not exist in the JVM

网上搜索了一下,spark安装也没问题,python安装也没有问题,只是python找不到spark,此时需要安装findspark包

pip install findspark

然后在程序中添加一以下代码

import findspark
findspark.init()