Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I hava a DataFrame,the DataFrame hava two column 'value' and 'timestamp',,the 'timestmp' is ordered,I want to get the last row of the DataFrame,what should I do?

this is my input:

+-----+---------+
|value|timestamp|
+-----+---------+
|    1|        1|
|    4|        2|
|    3|        3|
|    2|        4|
|    5|        5|
|    7|        6|
|    3|        7|
|    5|        8|
|    4|        9|
|   18|       10|
+-----+---------+

this is my code:

    val arr = Array((1,1),(4,2),(3,3),(2,4),(5,5),(7,6),(3,7),(5,8),(4,9),(18,10))
    var df=m_sparkCtx.parallelize(arr).toDF("value","timestamp")

this is my expected result:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

I would use simply the query that - orders your table by descending order - takes 1st value from this order

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY value DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

The most efficient way is to reduce your DataFrame. This gives you a single row which you can convert back to a DataFrame, but as it contains only 1 record, this does not make much sense.

sparkContext.parallelize(
  df.reduce {
    (a, b) => if (a.getAs[Int]("timestamp") > b.getAs[Int]("timestamp")) a else b 
   } match {case Row(value:Int,timestamp:Int) => (value,timestamp)}
.toDF("value","timestamp")
.show
+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

Less efficient (as it needs shuffling) although shorter is this solution:

.where($"timestamp" === df.groupBy().agg(max($"timestamp")).map(_.getInt(0)).collect.head)

If your timestamp column is unique and is in increasing order then there are following ways to get the last row

println(df.sort($"timestamp", $"timestamp".desc).first())
// Output [1,1]
df.sort($"timestamp", $"timestamp".desc).take(1).foreach(println)
// Output [1,1]
df.where($"timestamp" === df.count()).show

Output:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

If not create a new column with the index and select the last index as below

val df1 = spark.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map {
  case (row, index) => Row.fromSeq(row.toSeq :+ index)
StructType(df.schema.fields :+ StructField("index", LongType, false)))
df1.where($"timestamp" === df.count()).drop("index").show

Output:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

Java:

Dataset<Row> sortDF = inputDF.orderBy(org.apache.spark.sql.functions.col(config.getIncrementingColumn()).desc());
Row row = sortDF.first()

You can also use this function desc: Column desc(String columnName)

df.orderBy(desc("value")).show(1)

which gives same result as

df.orderBy($"value".desc).show(1)
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.