Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am trying my hands on kafka spark structured streaming but getting some exception like Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve ' device ' given input columns: [value, offset, partition, key, timestamp, timestampType, topic];

Attaching my code

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime
case class DeviceData(device: String, deviceType: String, signal: String)
object dataset_kafka {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
          .builder()
          .appName("kafka-consumer")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._
       spark.sparkContext.setLogLevel("WARN")
    val df = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "172.21.0.187:9093")
        .option("subscribe", "test")
        .option("startingOffsets", "earliest")
        .load()
        println(df.isStreaming)
        println(df.printSchema())
    val ds: Dataset[DeviceData] = df.as[DeviceData]
    val values = df.select("device").where("signal == Strong")
    values.writeStream
          .outputMode("append")
          .format("console")
          .start()
            .awaitTermination()

Any help how to solve this ?

Kafka stream always produces following fields: value, offset, partition, key, timestamp, timestampType, topic. In your case, you're interested in value, but be aware that values are always deserialized as byte arrays, thus, type cast to string is required before deserializing JSON.

Try the following code:

import spark.implicits._
val kafkaStream =
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "172.21.0.187:9093")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()
// If you don't want to build the schema manually
import org.apache.spark.sql.Encoders
val schema = Encoders.product[DeviceData].schema
import org.apache.spark.sql.functions.from_json
val ds = kafkaStream.select(from_json($"value" cast "string", schema)).as[DeviceData]
val values = ds.filter(_.signal == "Strong").map(_.device)
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.