Skip to content

Conversation

@nikitapecasa
Copy link
Contributor

@nikitapecasa nikitapecasa commented Apr 8, 2021

This PR:

  • adds a new way of describing rebalance listener (RebalanceListener1) via new subscribe method in Consumer
  • which gives a correct behaviour of consumer methods executed from within RebalanceListener
def subscribe(topics: Nes[Topic], listener: RebalanceListener1[F]): F[Unit]

RebalanceListener1 callbacks are executed on current thread where consumer.poll/close/unsubscribe is running in a blocking fashion

example of new API usage:

class SaveOffsetsOnRebalance[F[_]: Applicative] extends RebalanceListener1[F] {
  import RebalanceCallback._
  def onPartitionsAssigned(partitions: Nes[TopicPartition]) =
    for {
      // read the offsets from an external store using some custom code not described here
      offsets <- lift(readOffsetsFromExternalStore[F](partitions))
      a       <- offsets.toList.foldMapM { case (partition, offset) => seek(partition, offset) }
    } yield a
  def onPartitionsRevoked(partitions: Nes[TopicPartition]) =
    for {
      positions <- partitions.foldM(Map.empty[TopicPartition, Offset]) {
        case (offsets, partition) =>
          for {
            position <- position(partition)
          } yield offsets + (partition -> position)
      }
      // save the offsets in an external store using some custom code not described here
      a <- lift(saveOffsetsInExternalStore[F](positions))
    } yield a
  // do not need to save the offsets since these partitions are probably owned by other consumers already
  def onPartitionsLost(partitions: Nes[TopicPartition]) = empty
}

List of allowed KafkaConsumer methods available via RebalanceCallback (as agreed with @t3hnar):

- assign
+ assignment
+ beginningOffsets
- close
- commitAsync
+ commitSync
+ committed
+ endOffsets
+ groupMetadata
+ listTopics
- metrics
+ offsetsForTimes
+ partitionsFor
- pause
+ paused
- poll
+ position
- resume
+ seek
+ seekToBeginning
+ seekToEnd
- subscribe
+ subscription
- unsubscribe
- wakeup

nikitapecasa and others added 30 commits March 30, 2021 19:40
…nsumer.position when used from within rebalance listener
…ee toTry.get execution on current thread in RebalanceListener
… guarantee toTry.get execution on current thread in RebalanceListener"

This reverts commit 206a3b7
…asses, no deadlock on consumer.position"

This reverts commit d9278bb
…(topics: Nes[Topic], listener: ConsumerRebalanceListener[F])
…d in ConsumerLogging, add TODO to implement it later
…outside of consumer.poll has no effect on test result
…ction with kafka java consumer from within poll method
@nikitapecasa nikitapecasa changed the title WIP: rebalance listener correctness Rebalance listener correctness Apr 14, 2021
@nikitapecasa nikitapecasa merged commit e75cb9c into master Apr 14, 2021
@dfakhritdinov dfakhritdinov deleted the rebalance-listener-correctness branch February 1, 2024 16:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants