Всем привет! Хочу рассказать об особенностях использования механизмов retry, которые дает spring для kafka из коробки. У нас java-21, spring-boot-3.2, kafka-3.7, spring-kafka-3.1.x
Сразу должен извиниться за качество скриншотов — они доставались из чатов, т. к. доступа к grafana на нагрузочном стенде у нас не было, кое-где не совпадает время, потому что были разные прогоны, а скрины были не везде полные. Но в целом картинки отражают действительность.
Цель – сделать асинхронный retry сообщений на топике, который бы не блокировал основной поток сообщений. Т.е. при обработке сообщения может произойти прогнозируемая ошибка, при которой мы точно знаем, что можно (и нужно) попробовать обработать сообщение еще раз позже. Лучше всего переслать сообщение в соседний топик, а основной продолжить читать без задержек.
Механизм был построен на том, что дает spring-kafka из коробки — чтение через @KafkaListener, запись через KafkaTemplate, плюс настроена транзакционная обработка (в данном случае даже поставили ack-mode=record, max.poll.records=1, но это не особо важно). Чтобы сделать retry взяли тоже стандартный spring-овый @RetryableTopic, который делает ровно то, что надо. Там можно настроить количество попыток, задержки (+ exponensial backoff) и самое главное – пересылка в соседний retry-топик автоматически! Настраиваешь префиксы и получаешь вместе с основным топиком еще N топиков, где N – количество попыток.
Провели тесты, посмотрели как работает разбор сообщений — всё отлично и классно, отправляем на тест!)
Что может пойти не так
При высокой (относительно) нагрузке начались проблемы — видим, что retry-топик (обычно это retry-0 – т.е. самый первый, иногда retry-1) почему-то медленно разбирается, и разбирается рывками — как-то так:
Начали проводить нагрузочные тесты и видим похожую картину (даже хуже) — retry-0 топик копит сообщения минут 10-12, потом пару раз успевает почитать, и далее совсем умирает и лаг копится практически бесконечно:

Начали смотреть детально, добавили метрики на poll, fetch, что видим:


retry-0, на котором и были проблемы.То есть poll вызывается периодически (раз в 30 секунд = poll interval) с редкими «всплесками», значит consumer считает что сообщений на чтение нет (так я думал сначала).
Заглядываем в исходники внутрь poll – там не происходит сразу чтение записей на самом деле. Там есть раздельная логика для разных способов чтения — batch/record (в данной проблеме это было не важно). И, как оказалось poll вызывается постоянно spring, чтобы брокер знал, что consumer жив и не надо делать ребаланс. Это своего рода healthcheck. А вот внутри уже делается реальное чтение записей - fetch:


Видим, что fetch вызывается как раз в моменты «всплесков», когда чтение оживает, судя по latency. Кажется проблема где-то внутри fetch.
Также видно по графику kafka listener (метрики по которому в новых версиях spring-kafka поставляется из коробки), что код приложения вызывается как раз в эти судорожные моменты пробуждения:

Полезные метрики для анализа:
-
kafka_consumer_time_between_poll_avg
-
kafka_consumer_time_between_poll_max
-
kafka_consumer_last_poll_seconds_ago
-
kafka_consumer_poll_idle_ratio_avg
-
poll_idle_ratio_avg
-
time_between_poll_avg
-
kafka_consumer_fetch_manager_fetch_rate
-
kafka_consumer_fetch_manager_fetch_latency_avg
-
spring_kafka_listener_seconds_count
-
spring_kafka_listener_seconds_sum
Локализация проблемы
Была идея в том, что concurrency, который мы настроили по количеству партиций для всех топиков слишком большой (6 партиций и concurrency=6), ведь количество топиков умножается на количество попыток retry в таком неблокирующем подходе. Но ребята из spring молодцы и код написали хорошо, всё предусмотрено и тут проблемы не обнаружилось, то есть сама механика poll+fetch для различных ситуаций работает верно.
Также не помогли игры с параметром poll.records и ack.mode – некоторые время покопали и стало понятно, что транзакции и batch’и тоже не при чем.
Помимо метрик, сняли threaddump, видим, что ~180 потоков в состоянии TIMED_WAITING, причем ждут они на одном и том же стеке:
Unsafe.park
LockSupport.parkNanos
AbstractQueuedSynchronizer$ConditionObject.awaitNanos
ScheduledThreadPoolExecutor$DelayedWorkQueue.take
ThreadPoolExecutor.getTask
ThreadPoolExecutor.runWorker
Тут уже прям толстый намек в строке 4 - ScheduledThreadPoolExecutor$DelayedWorkQueue.take.
Пошли смотреть исходники, что же там активно использует ScheduledThreadPoolExecutor и почему копится ожидание?
Как читает retry consumer
Как я ожидал, что делается разбор отложенных retry-сообщений: в header сохраняется некий retry_topic-backoff-timestamp, когда сообщение должно обработаться, и consumer периодически читает сообщения. Если дата еще не наступила — засыпает на N миллисекунд. Причем такой код где-то в исходниках я уже видел, пока копал spring-kafka.
Но ребята сделали более умно — они сделали возможность ставить partition на паузу на нужное время сразу, без периодических чтений — вроде бы здравая идея! Т.е. вы можете поставить partition на паузу и сделать resume когда угодно. А можете и не сделать=) За это отвечает ListenerContainerPauseService.
Там есть вот такой код (убрал только log из тела метода):
private final TaskScheduler scheduler;
public void pausePartition(MessageListenerContainer messageListenerContainer,
TopicPartition partition,
Duration pauseDuration) {
Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
messageListenerContainer.pausePartition(partition);
this.scheduler.schedule(() -> {
messageListenerContainer.resumePartition(partition);
}, resumeAt);
}
В чем проблема
Получается, что @RetryableTopic механизм проверяет в ContainerPartitionPausingBackOffManager надо ли сейчас читать сообщение первый раз, когда оно ему попадает. Далее останавливает чтение партиции через pausePartition на определенное время (now() + delay), и отправляет себе задачу на будущее — «разбуди меня, когда пройдет delay» через resumePartition.
Когда идет большой поток сообщений, то scheduler не справляется и сообщения, видимо, просто копятся в очереди и не успевают разбираться. А вот почему именно? Кажется из-за гонки потоков за время cpu управление до waiting потоков не доходит, но тут надо еще покопать детальнее.
По идее тут надо было посмотреть на executor-метрики и размер очереди, но я на радостях пошел копать определение TaskScheduler и до этого не дошел=)
Наверное умные люди сразу настраивают нужный себе spring.task.scheduling.pool.size, но у нас он переопределен не был, а по умолчанию размер пула = 1. Интересный пул в один поток=)
Пробуем поднять размер пула и делать ретест, очень уж хочется проверить гипотезу в деле!
Фиксим!
Простое решение для проверки гипотезы — поднимаем spring.task.scheduling.pool.size для размера пула, поставили 10.
Замеры:

Лаг держится на приемлемом уровне — он никогда не опускается до нуля, потому что consumer реально ждет какое-то время, когда же можно запроцессить сообщение. Кажется работает! Смотрим poll и fetch:


Poll пошел (падающие вниз графики) — опрос топика пошел чаще, задержки между чтениями упали до минимума.


С fetch тоже разницу видно, хоть я не очень понял почему rate показал около нуля — метрика сразу отдается spring, как готовая (kafka_consumer_fetch_manager_fetch_rate), рассчитывать ничего не надо… Может кто знает, может это специфический глюк на самой метрике?
Но лаг не растет - получается проблема локализована верно - пофиксили!
Итог и выводы
Здравая идея с отложенным пробуждением consumer накладывает свои нюансы, может быть в определенных случаях лучше делать свой собственный простой механизм с небольшой задержкой и проверкой timestamp в цикле? Кажется, это освобождает часть ресурсов.
Конечно поиск проблемы занял больше времени, чем я изложил тут в статье, были шаги не в ту сторону, куча ретестов и неправильных гипотез. Мы бы точно сократили время на поиск, если бы настроили сразу подробный дашборд, не только на lag. Также теперь отслеживаем commit/rollback rate, очень полезно для транзакций и exactly-once.
Ну а выводы чисто по классике — сначала тесты, метрики и мониторинг, потом прод.
Наверное для тех, кто живет в highload проектах, я рассказал базу, но может быть для кого-то статья будет полезна. Иногда полезно покопать исходники, разобраться, как работают различные фишки фреймворка.
Собирайте подробные метрики, благо spring-kafka публикует тонну всего, и да пребудет с вами мониторинг и тестирование=)
Автор: Thedo
