Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I am trying my hands on kafka spark structured streaming but getting some exception like
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '
device
' given input columns: [value, offset, partition, key, timestamp, timestampType, topic];
Attaching my code
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime
case class DeviceData(device: String, deviceType: String, signal: String)
object dataset_kafka {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
println(df.isStreaming)
println(df.printSchema())
val ds: Dataset[DeviceData] = df.as[DeviceData]
val values = df.select("device").where("signal == Strong")
values.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
Any help how to solve this ?
Kafka stream always produces following fields: value
, offset
, partition
, key
, timestamp
, timestampType
, topic
. In your case, you're interested in value
, but be aware that values are always deserialized as byte arrays, thus, type cast to string is required before deserializing JSON.
Try the following code:
import spark.implicits._
val kafkaStream =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
// If you don't want to build the schema manually
import org.apache.spark.sql.Encoders
val schema = Encoders.product[DeviceData].schema
import org.apache.spark.sql.functions.from_json
val ds = kafkaStream.select(from_json($"value" cast "string", schema)).as[DeviceData]
val values = ds.filter(_.signal == "Strong").map(_.device)
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.