相关文章推荐
近视的橙子  ·  spring 集成 kafka ...·  2 周前    · 
飘逸的茄子  ·  java ...·  8 月前    · 
好帅的砖头  ·  qt设置label居中-掘金·  1 年前    · 
温文尔雅的蘑菇  ·  typescript - ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am using spark structured streaming to consume data from kafka topic and write the data into another kafka sink.

I want to store the offset twice - once when reading from the topic and stire the offset. Secondly- when writing the data onto output sink and write the offset, which is possible by giving checkpoint directory location ,

Is it possible to write the offset consumed during subscribing the topic.

You can use a StreamingQueryListener . You can add the listener to your stream by

spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { 
    // insert code here to log the offsets in addition to Spark's checkpoint
  override def onQueryProgress(event: QueryProgressEvent): Unit = {}
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.