相关文章推荐
坏坏的茴香  ·  nginx ...·  2 月前    · 
逼格高的核桃  ·  Exception in thread ...·  5 月前    · 
不拘小节的米饭  ·  Can I access a ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Spark - I get a java.lang.UnsupportedOperationException when I invoke a custom function from a map

Ask Question |-- NPAData: struct (nullable = true) | |-- NPADetails: struct (nullable = true) | | |-- location: string (nullable = true) | | |-- manager: string (nullable = true) | |-- service: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- serviceName: string (nullable = true) | | | |-- serviceCode: string (nullable = true) |-- NPAHeader: struct (nullable = true) | | |-- npaNumber: string (nullable = true) | | |-- date: string (nullable = true)

What I am trying is to:

  • Group the records which has got the same npaNumber into a list
  • Inside each list, order the elements depending on their date
  • Once I have the elements grouped and ordered, I need merge the elements applying some logic. To perform this list step I decided to use a map.
  • Here is what I tried so far:

    val toUpdate = sourceDF.withColumn("count", count($"NPAHeader").over(Window.partitionBy("NPAHeader.npaNumber").orderBy($"NPAHeader.date".desc))).filter($"count" > 1)
    val groupedNpa = toUpdate.groupBy($"NPAHeader.npaNumber" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))
    //This is a simply version of my logic.                                                                                                 
    def pickOne(List: Seq[Row]): Row = {
          println("First element: "+List.get(0))
          List.get(0)
    val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa")))) 
    

    An example of a Row after the groupBy would be:

    [1234,WrappedArray([npaNew,npaOlder,...npaOldest])]

    But I am getting an exception when I try to invoke the function from the map.

    Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row - field (class: "org.apache.spark.sql.Row", name: "_2") - root class: "scala.Tuple2"

    What I understand is I can not invoke the function pickOne() from the map (Or at least not in the way I am trying it). But I don't know what am I doing wrong.

    Why am I having that exception?

    Thanks for your time!

    Note: I know there are easier ways to pick up one element from the list without invoking the custom function. But I need to invoke it yes or yes, because in the next step I need to place there a far more complex logic to merge rows.

    After using Mahesh Chand Kandpal suggestion:

    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    grouped.map(row => "emdNumber: "+row.getAs[String]("emdNumber"))
    val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))(RowEncoder(row.schema)))) 
    

    I get the following error:

    type mismatch; found : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] required: Int

    How should I apply the Encoder instead?

    When you use map with dataframe, you need to give encoder.

    In spark 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    implicit val encoder = RowEncoder(schema)
                    I think you should give encoder outside the map over groupedNpa. implicit val encoder = RowEncoder(schema) Give schema of your dataframe.
    – Mahesh Chand
                    Oct 1, 2017 at 16:06
                    It's a toy example. The real one I have is far more complex. But, could you add a line in your answer showing how you would invoke the custom function using the encoder?
    – Ignacio Alorre
                    Oct 2, 2017 at 10:03
            

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.