我有以下的火花DF
请注意,只有当你已经通过运行下面的命令安装了spark,你才可以在本地运行这个命令。否则在Databricks集群上复制这个问题,它将自动初始化一个spark上下文。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
spark_dataframe = pd.DataFrame({'id' : ['867', '430', '658', '157', '521', '867', '430', '867'],
'Probability':[0.12, 0.72, 0.32, 0.83, 0.12, 0.49, 0.14, 0.12],
'RAG': ['G', 'R', 'A', 'R', 'G', 'A', 'G', 'G'],
'Timestamp': ['2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 16-45-32', '2020-07-01 16-45-32', '2020-07-01 15-45-32']})
spark_dataframe = spark.createDataFrame(spark_dataframe)
现在我想通过'id'对这个spark数据框架进行分组,并计算'RAG'列的值,将它们分割成不同的列。所以得到这样的结果。
+---+--------------------+-------------+------------+
| id||G(count)|A(count)|R(count)|Timestamp(max) |
+---+--------------------+-------------+------------+
|867| 2| 1| 0|2020-07-01 17-49-32|
|430| 1| 0| 1|2020-07-01 17-49-32|
|658| 0| 1| 0|2020-07-01 17-49-32|
|157| 0| 0| 1|2020-07-01 17-49-32|
|521| 1| 0| 0|2020-07-01 17-49-32|
+---+--------------------+-------------+------------+
基于上面的Spark数据框架,创建一个字典的列表,如。
final_list=[]
map_dictionary={"R":0.6, "A":0.3, "G":0.1}
final_list=[{"id": "867", "RAG": "G", "Timestamp": "2020-07-01 17-49-32"}, #because for the id 867 the G column had 2 counts greater than the rest A, R column values on the same row.
{"id": "430", "RAG": "R", "Timestamp": "2020-07-01 17-49-32"} #because G and R had 1 occurrence but R has greater weight based on the map dictionary,...
] #length of the list is equal to 5 since five are the unique rows of the spark df above.