相关文章推荐
怕老婆的扁豆  ·  报错Usage: python -m ...·  6 月前    · 
苦闷的弓箭  ·  Call to undefined ...·  10 月前    · 
沉着的热水瓶  ·  Devicetree - how are ...·  1 年前    · 
爱玩的跑步鞋  ·  java 8 - bundling jdk ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I have a Spark question, so for the input for each entity k I have a sequence of probability p_i with a value associated v_i , for example the data can look like this

entity | Probability | value
A      | 0.8         | 10
A      | 0.6         | 15
A      | 0.3         | 20
B      | 0.8         | 10

Then, for entity A, I'm expecting the avg value to be 0.8*10 + (1-0.8)*0.6*15 + (1-0.8)*(1-0.6)*0.3*20 + (1-0.8)*(1-0.6)*(1-0.3)*MAX_VALUE_DEFINED.

How could I achieve this in Spark using DataFrame agg func? I found it's challenging given the complexity to groupBy entity and compute the sequence of results.

What is MAX_VALUE_DEFINED? And bigger question - the logic is not clear. Probabilities should somehow add up to 1. I don't see it neither in the table, nor in your calculation. Can you please explain your logic? – ZygD Apr 27, 2022 at 15:35 MAX_VALUE_DEFINED is just the largest value available here. The problem I was trying to solve is calculate avg waiting time for people to get their food, in which case Probability here means the acceptance rate at each dispatch and the last component here means no one accepts the delivery request. – Xiaonan Apr 27, 2022 at 21:58 Is MAX_VALUE_DEFINED provided as a constant? Does it differ for every entity? It would really help if you calculated your output manually and edited your question to add the expected result. I mean, you have already provided the input table. Now we need to see the expected output table. Use the "Edit" button under your question. – ZygD Apr 28, 2022 at 4:27 Yes MAX_VALUE_DEFINED will be a CONST and consistent across entity, let me provide more context in my question. – Xiaonan Apr 28, 2022 at 16:59

You can use UDF to perform such custom calculations. The idea is using collect_list to group all probab and values of A into one place so you can loop through it. However, collect_list does not respect the order of your records, therefore might lead to the wrong calculation. One way to fix it is generating ID for each row using monotonically_increasing_id

import pyspark.sql.functions as F
@F.pandas_udf('double')
def markov_udf(values):
    def markov(lst):
        # you can implement your markov logic here
        s = 0
        for i, prob, val in lst:
            s += prob
        return s
    return values.apply(markov)
    .withColumn('id', F.monotonically_increasing_id())
    .groupBy('entity')
    .agg(F.array_sort(F.collect_list(F.array('id', 'probability', 'value'))).alias('values'))
    .withColumn('markov', markov_udf('values'))
    .show(10, False)
+------+------------------------------------------------------+------+
|entity|values                                                |markov|
+------+------------------------------------------------------+------+
|B     |[[3.0, 0.8, 10.0]]                                    |0.8   |
|A     |[[0.0, 0.8, 10.0], [1.0, 0.6, 15.0], [2.0, 0.3, 20.0]]|1.7   |
+------+------------------------------------------------------+------+

There may be a better solution, but I think this does what you needed.

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [('A', 0.8, 10),
     ('A', 0.6, 15),
     ('A', 0.3, 20),
     ('B', 0.8, 10)],
    ['entity', 'Probability', 'value']
w_desc = W.partitionBy('entity').orderBy(F.desc('value'))
w_asc = W.partitionBy('entity').orderBy('value')
df = df.withColumn('_ent_max_val', F.max('value').over(w_desc))
df = df.withColumn('_prob2', 1 - F.col('Probability'))
df = df.withColumn('_cum_prob2', F.product('_prob2').over(w_asc) / F.col('_prob2'))
df = (df.groupBy('entity')
        .agg(F.round((F.max('_ent_max_val') * F.product('_prob2')
                     + F.sum(F.col('_cum_prob2') * F.col('Probability') * F.col('value'))
             ),2).alias('mean_value'))
df.show()
# +------+----------+
# |entity|mean_value|
# +------+----------+
# |     A|      11.4|
# |     B|      10.0|
# +------+----------+
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.