[SPARK-19680] OffsetOutOfRangeException 解决方案

当kafka中的数据丢失时,Spark程序消费kafka中数据的时候就可能会出现以下异常:

Lost task 12.0 in stage 398.0 (TID 2311, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {abc_2018-0=151260}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Spark在创建Kafka RDD时会将kafkaParams 中的 auto.offset.reset 强制修改为none因此,当在zookeeper中获取到的offset超出kafka中topic有效offset范围时,就会报这个异常。这个异常通常出现在kafka中的数据丢失或过期所导致。
问题源码参考:

DirectKafkaInputDStream.scala:218
DirectKafkaInputDStream.scala:63
KafkaUtils.scala:205

在创建KafkaRDD时,设置验证过的offset,代码如下:

* Kafka辅助处理工具 object MyKafkaUtils { private val logger: Logger = LoggerFactory.getLogger(this.getClass) * 获取最小offset * @param consumer 消费者 * @param partitions topic分区 * @return def getEarliestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = { consumer.seekToBeginning(partitions) partitions.map(tp => tp -> consumer.position(tp)).toMap * 获取最小offset * Returns the earliest (lowest) available offsets, taking new partitions into account. * @param kafkaParams kafka客户端配置 * @param topics 获取获取offset的topic def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = { val newKafkaParams = mutable.Map[String, Object]() newKafkaParams ++= kafkaParams newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams) consumer.subscribe(topics) val parts = consumer.assignment() consumer.seekToBeginning(parts) consumer.pause(parts) val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap consumer.unsubscribe() consumer.close() offsets * 获取最大offset * @param consumer 消费者 * @param partitions topic分区 * @return def getLatestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = { consumer.seekToEnd(partitions) partitions.map(tp => tp -> consumer.position(tp)).toMap * 获取最大offset * Returns the latest (highest) available offsets, taking new partitions into account. * @param kafkaParams kafka客户端配置 * @param topics 需要获取offset的topic def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = { val newKafkaParams = mutable.Map[String, Object]() newKafkaParams ++= kafkaParams newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams) consumer.subscribe(topics) val parts = consumer.assignment() consumer.seekToEnd(parts) consumer.pause(parts) val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap consumer.unsubscribe() consumer.close() offsets * 获取消费者当前offset * @param consumer 消费者 * @param partitions topic分区 * @return def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = { partitions.map(tp => tp -> consumer.position(tp)).toMap * 获取offsets * @param kafkaParams kafka参数 * @param topics topic * @return def getCurrentOffset(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = { val offsetResetConfig = kafkaParams.getOrElse(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").toString.toLowerCase val newKafkaParams = mutable.Map[String, Object]() newKafkaParams ++= kafkaParams newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams) consumer.subscribe(topics) val notOffsetTopicPartition = mutable.Set[TopicPartition]() try { consumer.poll(0) } catch { case ex: NoOffsetForPartitionException => logger.warn(s"consumer topic partition offset not found:${ex.partition()}") notOffsetTopicPartition.add(ex.partition()) val parts = consumer.assignment().toSet consumer.pause(parts) val topicPartition = parts.diff(notOffsetTopicPartition) //获取当前offset val currentOffset = mutable.Map[TopicPartition, Long]() topicPartition.foreach(x => { try { currentOffset.put(x, consumer.position(x)) } catch { case ex: NoOffsetForPartitionException => logger.warn(s"consumer topic partition offset not found:${ex.partition()}") notOffsetTopicPartition.add(ex.partition()) //获取earliestOffset val earliestOffset = getEarliestOffsets(consumer, parts) earliestOffset.foreach(x => { val value = currentOffset.get(x._1) if (value.isEmpty) { currentOffset(x._1) = x._2 } else if (value.get < x._2) { logger.warn(s"kafka data is lost from partition:${x._1} offset ${value.get} to ${x._2}") currentOffset(x._1) = x._2 //获取lastOffset val latestOffset = if (offsetResetConfig.equalsIgnoreCase("earliest")) { getLatestOffsets(consumer, topicPartition) } else { getLatestOffsets(consumer, parts) latestOffset.foreach(x => { val value = currentOffset.get(x._1) if (value.isEmpty || value.get > x._2) { currentOffset(x._1) = x._2 consumer.unsubscribe() consumer.close() currentOffset.toMap

Spark Kafka RDD 创建:

val offset = MyKafkaUtils.getCurrentOffset(kafkaParams.toMap,topics)
    val kafkaStreams: DStream[ConsumerRecord[String, ObjectNode]] = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(topics, kafkaParams,offset))

到些问题基本解决,但是如果是从checkpoint里面恢复时,依然会出现问题,这个就得使用commit了

//自动提交新的offset