Skip to content

During table drop Engine=Kafka can commit unprocessed messages #11034

@filimonov

Description

@filimonov

Test catched missing data:

SELECT 
    toUInt64(20) AS _partition, 
    number + 1 AS _offset
FROM numbers(119)
LEFT JOIN test.destination USING (_partition, _offset)
WHERE test.destination.key = 0
ORDER BY _offset ASC

┌─_partition─┬─_offset─┐
│         20 │     104 │
│         20 │     105 │
│         20 │     106 │
│         20 │     107 │
│         20 │     108 │
│         20 │     109 │
│         20 │     110 │
│         20 │     111 │
│         20 │     112 │
│         20 │     113 │
│         20 │     114 │
│         20 │     115 │
│         20 │     116 │
│         20 │     117 │
└────────────┴─────────┘

Log

2020.05.19 11:11:57.480807 [ 29 ] {} <Debug> StorageKafka (kafka_consumer0): Started streaming to 1 attached views
2020.05.19 11:11:57.485702 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Already subscribed to topics: [ topic_with_multiple_partitions ]
2020.05.19 11:11:57.485765 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Already assigned to : [  ]
2020.05.19 11:11:57.486071 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Topics/partitions assigned: [ topic_with_multiple_partitions[20:#], topic_with_multiple_partitions[21:#] ]
2020.05.19 11:11:57.987310 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Polled batch of 24 messages. Offset position: [ topic_with_multiple_partitions[20:118], topic_with_multiple_partitions[21:113] ]

2020.05.19 11:11:58.376121 [ 36 ] {b6b2459d-3729-448e-ba52-b207bc34a9e0} <Debug> executeQuery: (from 172.22.0.1:34506) DROP TABLE IF EXISTS test.kafka_consumer0
2020.05.19 11:11:58.376277 [ 36 ] {b6b2459d-3729-448e-ba52-b207bc34a9e0} <Trace> ContextAccess (default): Access granted: DROP TABLE ON test.kafka_consumer0
2020.05.19 11:11:58.376332 [ 36 ] {b6b2459d-3729-448e-ba52-b207bc34a9e0} <Trace> StorageKafka (kafka_consumer0): Waiting for cleanup

2020.05.19 11:11:58.493658 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Polled batch of 3 messages. Offset position: [ topic_with_multiple_partitions[20:120], topic_with_multiple_partitions[21:114] ]
2020.05.19 11:11:58.497287 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Polled offset 120 (topic: topic_with_multiple_partitions, partition: 20)
2020.05.19 11:11:58.497567 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Polled offset 114 (topic: topic_with_multiple_partitions, partition: 21)
2020.05.19 11:11:58.497862 [ 29 ] {} <Warning> StorageKafka (kafka_consumer0): Logical error. Non all polled messages were processed.

2020.05.19 11:11:58.498330 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: Group "rebalance_test_group" received op OFFSET_COMMIT (v0) in state up (join state started, v9 vs 0)
2020.05.19 11:11:58.498479 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: Topic topic_with_multiple_partitions [20]: stored offset 118, committed offset 104: setting stored offset 118 for commit
2020.05.19 11:11:58.498620 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: Topic topic_with_multiple_partitions [21]: stored offset 114, committed offset 103: setting stored offset 114 for commit
2020.05.19 11:11:58.498745 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: GroupCoordinator/1: Committing offsets for 2 partition(s): manual
2020.05.19 11:11:58.498888 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: GroupCoordinator/1: Enqueue OffsetCommitRequest(v6, 2/2 partition(s))): manual
2020.05.19 11:11:58.499206 [ 68 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:GroupCoordinator]: GroupCoordinator/1: Sent OffsetCommitRequest (v6, 193 bytes @ 0, CorrId 35)
2020.05.19 11:11:58.500496 [ 68 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:GroupCoordinator]: GroupCoordinator/1: Received OffsetCommitResponse (v6, 56 bytes, CorrId 35, rtt 1.29ms)
2020.05.19 11:11:58.500721 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: GroupCoordinator/1: OffsetCommit for 2 partition(s): manual: returned: Success
2020.05.19 11:11:58.500967 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: Group "rebalance_test_group" received op GET_ASSIGNMENT (v0) in state up (join state started, v9 vs 0)
2020.05.19 11:11:58.501272 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: Group "rebalance_test_group" received op OFFSET_FETCH (v0) in state up (join state started, v9 vs 0)
2020.05.19 11:11:58.501418 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: GroupCoordinator/1: OffsetFetchRequest(v1) for 2/2 partition(s)
2020.05.19 11:11:58.501532 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 2/2 partition(s)
2020.05.19 11:11:58.501793 [ 68 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:GroupCoordinator]: GroupCoordinator/1: Sent OffsetFetchRequest (v1, 103 bytes @ 0, CorrId 36)
2020.05.19 11:11:58.502260 [ 68 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:GroupCoordinator]: GroupCoordinator/1: Received OffsetFetchResponse (v1, 72 bytes, CorrId 36, rtt 0.47ms)
2020.05.19 11:11:58.502456 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: Topic topic_with_multiple_partitions [20]: setting default offset INVALID
2020.05.19 11:11:58.502565 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: Topic topic_with_multiple_partitions [21]: setting default offset INVALID
2020.05.19 11:11:58.502673 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: GroupCoordinator/1: OffsetFetchResponse: topic_with_multiple_partitions [20] offset 118, metadata 0 byte(s)
2020.05.19 11:11:58.502777 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: GroupCoordinator/1: OffsetFetchResponse: topic_with_multiple_partitions [21] offset 114, metadata 0 byte(s)
2020.05.19 11:11:58.502872 [ 69 ] {} <Debug> StorageKafka (kafka_consumer0): [thrd:main]: GroupCoordinator/1: OffsetFetch for 2/2 partition(s) returned Success
2020.05.19 11:11:58.503088 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Committed offset 118 (topic: topic_with_multiple_partitions, partition: 20)
2020.05.19 11:11:58.503207 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Committed offset 114 (topic: topic_with_multiple_partitions, partition: 21)
2020.05.19 11:11:58.503990 [ 29 ] {} <Trace> StorageKafka (kafka_consumer0): Execution took 1023 ms.

2020.05.19 11:11:58.792052 [ 29 ] {} <Debug> StorageKafka (kafka_consumer16): Started streaming to 1 attached views
2020.05.19 11:11:58.794984 [ 29 ] {} <Trace> StorageKafka (kafka_consumer16): Already subscribed to topics: [ topic_with_multiple_partitions ]
2020.05.19 11:11:58.795033 [ 29 ] {} <Trace> StorageKafka (kafka_consumer16): Already assigned to : [ topic_with_multiple_partitions[18:#], topic_with_multiple_partitions[19:#] ]
2020.05.19 11:11:59.295823 [ 29 ] {} <Trace> StorageKafka (kafka_consumer16): Polled batch of 3 messages. Offset position: [ topic_with_multiple_partitions[18:117], topic_with_multiple_partitions[19:148] ]
2020.05.19 11:11:59.796602 [ 29 ] {} <Trace> StorageKafka (kafka_consumer16): Stalled

Metadata

Metadata

Assignees

Labels

bugConfirmed user-visible misbehaviour in official releasecomp-message-queuesMessage queue integrations (Kafka, RabbitMQ, NATS table engines for stream ingestion/egress).

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions