Skip to content

Commit 600fdcb

Browse files
author
SBALAVIGNESH123
committed
fix: do not advance sequence state on failed commit (review feedback)
progressSequences() was called unconditionally after the retry loop, which meant that if all MAX_COMMIT_RETRIES attempts failed the committed SN would still advance — causing commitSNLocal() to persist a sequence number for data that was never written to disk. Now we track success with a bool flag and only call progressSequences() on the success path. On the failure path we log the dropped range and release the pool thread without acknowledging the sequence, so NativeLog will replay the missing SN range on the next restart. Addresses review feedback from @yokofly.
1 parent a812443 commit 600fdcb

File tree

1 file changed

+20
-7
lines changed

1 file changed

+20
-7
lines changed

src/Storages/Stream/StreamShardStore.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -858,10 +858,13 @@ void StreamShardStore::doCommit(
858858

859859
/// Limit retries to prevent a permanently-failing commit (e.g. TOO_MANY_PARTS)
860860
/// from pinning this thread indefinitely and saturating the commit pool.
861-
/// After the cap, we log the failure and advance the sequence number so the
862-
/// producer is not blocked forever. The part will be retried on the next restart.
861+
/// After the cap, we log the failure and release the pool thread WITHOUT
862+
/// advancing the committed sequence number. This leaves a gap in the SN
863+
/// sequence, so the missing data will be re-fetched and re-committed on the
864+
/// next restart (NativeLog replays from the last persisted SN).
863865
static constexpr size_t MAX_COMMIT_RETRIES = 10;
864866
size_t attempt = 0;
867+
bool committed = false;
865868
while (!isStopped() && attempt < MAX_COMMIT_RETRIES)
866869
{
867870
++attempt;
@@ -881,7 +884,7 @@ void StreamShardStore::doCommit(
881884

882885
merge_tree_sink->consume(Chunk(moved_block.getColumns(), moved_block.rows()));
883886
merge_tree_sink->onFinish();
884-
attempt = 0; /// success — clear attempt counter before break
887+
committed = true;
885888
break;
886889
}
887890
catch (...)
@@ -897,16 +900,26 @@ void StreamShardStore::doCommit(
897900
}
898901
}
899902

900-
if (attempt >= MAX_COMMIT_RETRIES)
903+
if (committed)
904+
{
905+
progressSequences(moved_seq);
906+
}
907+
else
908+
{
909+
/// Do NOT call progressSequences() here. Advancing the committed SN
910+
/// without having written the part would let commitSNLocal() persist
911+
/// a sequence number for data that was never stored — causing data loss.
912+
/// By leaving the SN gap, NativeLog will replay this range on restart.
901913
LOG_ERROR(
902914
logger,
903-
"Giving up committing rows={} sn_range=[{},{}] after {} attempts — releasing pool thread to avoid deadlock",
915+
"Giving up committing rows={} sn_range=[{},{}] after {} attempts — "
916+
"releasing pool thread WITHOUT advancing sequence state so data can "
917+
"be recovered on restart",
904918
moved_block.rows(),
905919
moved_seq.first,
906920
moved_seq.second,
907921
MAX_COMMIT_RETRIES);
908-
909-
progressSequences(moved_seq);
922+
}
910923
},
911924
/*wait_timeout_ms=*/{500});
912925

0 commit comments

Comments
 (0)