Skip to content

Commit e5d115b

Browse files
Backport #65198 to 24.4: Fix a bug in ClickHouse Keeper that causes digest mismatch during closing session
1 parent 583d9ca commit e5d115b

File tree

3 files changed

+84
-3
lines changed

3 files changed

+84
-3
lines changed

src/Coordination/KeeperStorage.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,10 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
607607
uncommitted_auth.pop_front();
608608
if (uncommitted_auth.empty())
609609
session_and_auth.erase(add_auth->session_id);
610-
610+
}
611+
else if (auto * close_session = std::get_if<CloseSessionDelta>(&front_delta.operation))
612+
{
613+
closed_sessions.erase(close_session->session_id);
611614
}
612615

613616
deltas.pop_front();
@@ -680,6 +683,10 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
680683
session_and_auth.erase(add_auth->session_id);
681684
}
682685
}
686+
else if (auto * close_session = std::get_if<CloseSessionDelta>(&delta_it->operation))
687+
{
688+
closed_sessions.erase(close_session->session_id);
689+
}
683690
}
684691

685692
if (delta_it == deltas.rend())
@@ -876,6 +883,10 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
876883
session_and_auth[operation.session_id].emplace_back(std::move(operation.auth_id));
877884
return Coordination::Error::ZOK;
878885
}
886+
else if constexpr (std::same_as<DeltaType, KeeperStorage::CloseSessionDelta>)
887+
{
888+
return Coordination::Error::ZOK;
889+
}
879890
else
880891
{
881892
// shouldn't be called in any process functions
@@ -2366,12 +2377,15 @@ void KeeperStorage::preprocessRequest(
23662377

23672378
ephemerals.erase(session_ephemerals);
23682379
}
2380+
new_deltas.emplace_back(transaction.zxid, CloseSessionDelta{session_id});
2381+
uncommitted_state.closed_sessions.insert(session_id);
23692382

23702383
new_digest = calculateNodesDigest(new_digest, new_deltas);
23712384
return;
23722385
}
23732386

2374-
if (check_acl && !request_processor->checkAuth(*this, session_id, false))
2387+
if ((check_acl && !request_processor->checkAuth(*this, session_id, false)) ||
2388+
uncommitted_state.closed_sessions.contains(session_id)) // Is session closed but not committed yet
23752389
{
23762390
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
23772391
return;

src/Coordination/KeeperStorage.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,13 @@ class KeeperStorage
314314
AuthID auth_id;
315315
};
316316

317+
struct CloseSessionDelta
318+
{
319+
int64_t session_id;
320+
};
321+
317322
using Operation = std::
318-
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
323+
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta, CloseSessionDelta>;
319324

320325
struct Delta
321326
{
@@ -351,6 +356,7 @@ class KeeperStorage
351356
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
352357

353358
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
359+
std::unordered_set<int64_t> closed_sessions;
354360

355361
struct UncommittedNode
356362
{

src/Coordination/tests/gtest_coordination.cpp

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2019,6 +2019,67 @@ TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitte
20192019
EXPECT_EQ(acls[0].permissions, 31);
20202020
}
20212021

2022+
TEST_P(CoordinationTest, TestPreprocessWhenCloseSessionIsPrecommitted)
2023+
{
2024+
using namespace Coordination;
2025+
using namespace DB;
2026+
2027+
ChangelogDirTest snapshots("./snapshots");
2028+
setSnapshotDirectory("./snapshots");
2029+
ResponsesQueue queue(std::numeric_limits<size_t>::max());
2030+
SnapshotsQueue snapshots_queue{1};
2031+
int64_t session_id = 1;
2032+
size_t term = 0;
2033+
2034+
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
2035+
state_machine->init();
2036+
2037+
auto & storage = state_machine->getStorageUnsafe();
2038+
const auto & uncommitted_state = storage.uncommitted_state;
2039+
2040+
// Create first node for the session
2041+
String node_path_1 = "/node_1";
2042+
std::shared_ptr<ZooKeeperCreateRequest> create_req_1 = std::make_shared<ZooKeeperCreateRequest>();
2043+
create_req_1->path = node_path_1;
2044+
auto create_entry_1 = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), create_req_1);
2045+
2046+
state_machine->pre_commit(1, create_entry_1->get_buf());
2047+
EXPECT_TRUE(uncommitted_state.nodes.contains(node_path_1));
2048+
2049+
state_machine->commit(1, create_entry_1->get_buf());
2050+
EXPECT_TRUE(storage.container.contains(node_path_1));
2051+
2052+
// Close session
2053+
std::shared_ptr<ZooKeeperCloseRequest> close_req = std::make_shared<ZooKeeperCloseRequest>();
2054+
auto close_entry = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), close_req);
2055+
// Pre-commit close session
2056+
state_machine->pre_commit(2, close_entry->get_buf());
2057+
2058+
// Try to create second node after close session is pre-committed
2059+
String node_path_2 = "/node_2";
2060+
std::shared_ptr<ZooKeeperCreateRequest> create_req_2 = std::make_shared<ZooKeeperCreateRequest>();
2061+
create_req_2->path = node_path_2;
2062+
auto create_entry_2 = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), create_req_2);
2063+
2064+
// Pre-commit creating second node
2065+
state_machine->pre_commit(3, create_entry_2->get_buf());
2066+
// Second node wasn't created
2067+
EXPECT_FALSE(uncommitted_state.nodes.contains(node_path_2));
2068+
2069+
// Rollback pre-committed closing session
2070+
state_machine->rollback(3, create_entry_2->get_buf());
2071+
state_machine->rollback(2, close_entry->get_buf());
2072+
2073+
// Pre-commit creating second node
2074+
state_machine->pre_commit(2, create_entry_2->get_buf());
2075+
// Now second node was created
2076+
EXPECT_TRUE(uncommitted_state.nodes.contains(node_path_2));
2077+
2078+
state_machine->commit(2, create_entry_2->get_buf());
2079+
EXPECT_TRUE(storage.container.contains(node_path_1));
2080+
EXPECT_TRUE(storage.container.contains(node_path_2));
2081+
}
2082+
20222083
TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
20232084
{
20242085
using namespace Coordination;

0 commit comments

Comments
 (0)