Skip to content

Commit 6692670

Browse files
Backport #61953 to 24.2: fix logical-error when undoing quorum insert transaction
1 parent 98014d8 commit 6692670

File tree

11 files changed

+120
-3
lines changed

11 files changed

+120
-3
lines changed

src/Access/Common/AccessType.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ enum class AccessType
201201
M(SYSTEM_FLUSH, "", GROUP, SYSTEM) \
202202
M(SYSTEM_THREAD_FUZZER, "SYSTEM START THREAD FUZZER, SYSTEM STOP THREAD FUZZER, START THREAD FUZZER, STOP THREAD FUZZER", GLOBAL, SYSTEM) \
203203
M(SYSTEM_UNFREEZE, "SYSTEM UNFREEZE", GLOBAL, SYSTEM) \
204-
M(SYSTEM_FAILPOINT, "SYSTEM ENABLE FAILPOINT, SYSTEM DISABLE FAILPOINT", GLOBAL, SYSTEM) \
204+
M(SYSTEM_FAILPOINT, "SYSTEM ENABLE FAILPOINT, SYSTEM DISABLE FAILPOINT, SYSTEM WAIT FAILPOINT", GLOBAL, SYSTEM) \
205205
M(SYSTEM_LISTEN, "SYSTEM START LISTEN, SYSTEM STOP LISTEN", GLOBAL, SYSTEM) \
206206
M(SYSTEM_JEMALLOC, "SYSTEM JEMALLOC PURGE, SYSTEM JEMALLOC ENABLE PROFILE, SYSTEM JEMALLOC DISABLE PROFILE, SYSTEM JEMALLOC FLUSH PROFILE", GLOBAL, SYSTEM) \
207207
M(SYSTEM, "", GROUP, ALL) /* allows to execute SYSTEM {SHUTDOWN|RELOAD CONFIG|...} */ \

src/Common/FailPoint.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ static struct InitFiu
4242
REGULAR(check_table_query_delay_for_part) \
4343
REGULAR(dummy_failpoint) \
4444
REGULAR(prefetched_reader_pool_failpoint) \
45-
PAUSEABLE_ONCE(dummy_pausable_failpoint_once) \
45+
PAUSEABLE_ONCE(replicated_merge_tree_insert_retry_pause) \
46+
PAUSEABLE_ONCE(finish_set_quorum_failed_parts) \
47+
PAUSEABLE_ONCE(finish_clean_quorum_failed_parts) \
4648
PAUSEABLE(dummy_pausable_failpoint) \
4749
ONCE(execute_query_calling_empty_set_result_func_on_exception)
4850

src/Interpreters/InterpreterSystemQuery.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,14 @@ BlockIO InterpreterSystemQuery::execute()
737737
FailPointInjection::disableFailPoint(query.fail_point_name);
738738
break;
739739
}
740+
case Type::WAIT_FAILPOINT:
741+
{
742+
getContext()->checkAccess(AccessType::SYSTEM_FAILPOINT);
743+
LOG_TRACE(log, "waiting for failpoint {}", query.fail_point_name);
744+
FailPointInjection::pauseFailPoint(query.fail_point_name);
745+
LOG_TRACE(log, "finished failpoint {}", query.fail_point_name);
746+
break;
747+
}
740748
case Type::RESET_COVERAGE:
741749
{
742750
getContext()->checkAccess(AccessType::SYSTEM);
@@ -1430,6 +1438,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
14301438
case Type::STOP_THREAD_FUZZER:
14311439
case Type::START_THREAD_FUZZER:
14321440
case Type::ENABLE_FAILPOINT:
1441+
case Type::WAIT_FAILPOINT:
14331442
case Type::DISABLE_FAILPOINT:
14341443
case Type::RESET_COVERAGE:
14351444
case Type::UNKNOWN:

src/Parsers/ASTSystemQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
348348
}
349349
case Type::ENABLE_FAILPOINT:
350350
case Type::DISABLE_FAILPOINT:
351+
case Type::WAIT_FAILPOINT:
351352
{
352353
settings.ostr << ' ';
353354
print_identifier(fail_point_name);

src/Parsers/ASTSystemQuery.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
8585
UNFREEZE,
8686
ENABLE_FAILPOINT,
8787
DISABLE_FAILPOINT,
88+
WAIT_FAILPOINT,
8889
SYNC_FILESYSTEM_CACHE,
8990
STOP_PULLING_REPLICATION_LOG,
9091
START_PULLING_REPLICATION_LOG,

src/Parsers/ParserSystemQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
262262
}
263263
case Type::ENABLE_FAILPOINT:
264264
case Type::DISABLE_FAILPOINT:
265+
case Type::WAIT_FAILPOINT:
265266
{
266267
ASTPtr ast;
267268
if (ParserIdentifier{}.parse(pos, ast, expected))

src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
55
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
66
#include <Interpreters/Context.h>
7+
#include <Common/FailPoint.h>
78
#include <Common/ZooKeeper/KeeperException.h>
89
#include <Common/randomSeed.h>
910
#include <Core/ServerUUID.h>
@@ -24,6 +25,11 @@ namespace ErrorCodes
2425
extern const int REPLICA_IS_ALREADY_ACTIVE;
2526
}
2627

28+
namespace FailPoints
29+
{
30+
extern const char finish_clean_quorum_failed_parts[];
31+
};
32+
2733
/// Used to check whether it's us who set node `is_active`, or not.
2834
static String generateActiveNodeIdentifier()
2935
{
@@ -241,6 +247,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
241247
storage.queue.removeFailedQuorumPart(part->info);
242248
}
243249
}
250+
FailPointInjection::disableFailPoint(FailPoints::finish_clean_quorum_failed_parts);
244251
}
245252

246253

src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ namespace FailPoints
3030
extern const char replicated_merge_tree_commit_zk_fail_after_op[];
3131
extern const char replicated_merge_tree_insert_quorum_fail_0[];
3232
extern const char replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault[];
33+
extern const char replicated_merge_tree_insert_retry_pause[];
3334
}
3435

3536
namespace ErrorCodes
@@ -913,14 +914,27 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
913914
});
914915

915916
bool node_exists = false;
917+
bool quorum_fail_exists = false;
916918
/// The loop will be executed at least once
917919
new_retry_controller.retryLoop([&]
918920
{
919921
fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault, { zookeeper->forceFailureBeforeOperation(); });
922+
FailPointInjection::pauseFailPoint(FailPoints::replicated_merge_tree_insert_retry_pause);
920923
zookeeper->setKeeper(storage.getZooKeeper());
921924
node_exists = zookeeper->exists(fs::path(storage.replica_path) / "parts" / part->name);
925+
if (isQuorumEnabled())
926+
quorum_fail_exists = zookeeper->exists(fs::path(storage.zookeeper_path) / "quorum" / "failed_parts" / part->name);
922927
});
923928

929+
/// if it has quorum fail node, the restarting thread will clean the garbage.
930+
if (quorum_fail_exists)
931+
{
932+
LOG_INFO(log, "Part {} fails to commit and will not retry or clean garbage. Restarting Thread will do everything.", part->name);
933+
transaction.clear();
934+
/// `quorum/failed_parts/part_name` exists because table is read only for a while, So we return table is read only.
935+
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to shutdown: replica_path={}", storage.replica_path);
936+
}
937+
924938
if (node_exists)
925939
{
926940
LOG_DEBUG(log, "Insert of part {} recovered from keeper successfully. It will be committed", part->name);

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ namespace FailPoints
141141
{
142142
extern const char replicated_queue_fail_next_entry[];
143143
extern const char replicated_queue_unfail_entries[];
144+
extern const char finish_set_quorum_failed_parts[];
144145
}
145146

146147
namespace ErrorCodes
@@ -2130,6 +2131,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
21302131
if (code == Coordination::Error::ZOK)
21312132
{
21322133
LOG_DEBUG(log, "Marked quorum for part {} as failed.", entry.new_part_name);
2134+
FailPointInjection::disableFailPoint(FailPoints::finish_set_quorum_failed_parts);
21332135
queue.removeFailedQuorumPart(part_info);
21342136
return true;
21352137
}

tests/integration/test_quorum_inserts/test.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import concurrent
12
import time
23

34
import pytest
45
from helpers.cluster import ClickHouseCluster
6+
from helpers.network import PartitionManager
57
from helpers.test_tools import TSV
68

79
cluster = ClickHouseCluster(__file__)
@@ -361,3 +363,81 @@ def test_insert_quorum_with_ttl(started_cluster):
361363
)
362364

363365
zero.query("DROP TABLE IF EXISTS test_insert_quorum_with_ttl ON CLUSTER cluster")
366+
367+
368+
def test_insert_quorum_with_keeper_loss_connection():
369+
zero.query(
370+
"DROP TABLE IF EXISTS test_insert_quorum_with_keeper_fail ON CLUSTER cluster"
371+
)
372+
create_query = (
373+
"CREATE TABLE test_insert_quorum_with_keeper_loss"
374+
"(a Int8, d Date) "
375+
"Engine = ReplicatedMergeTree('/clickhouse/tables/{table}', '{replica}') "
376+
"ORDER BY a "
377+
)
378+
379+
zero.query(create_query)
380+
first.query(create_query)
381+
382+
first.query("SYSTEM STOP FETCHES test_insert_quorum_with_keeper_loss")
383+
384+
zero.query("SYSTEM ENABLE FAILPOINT replicated_merge_tree_commit_zk_fail_after_op")
385+
zero.query("SYSTEM ENABLE FAILPOINT replicated_merge_tree_insert_retry_pause")
386+
387+
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
388+
insert_future = executor.submit(
389+
lambda: zero.query(
390+
"INSERT INTO test_insert_quorum_with_keeper_loss(a,d) VALUES(1, '2011-01-01')",
391+
settings={"insert_quorum_timeout": 150000},
392+
)
393+
)
394+
395+
pm = PartitionManager()
396+
pm.drop_instance_zk_connections(zero)
397+
398+
retries = 0
399+
zk = cluster.get_kazoo_client("zoo1")
400+
while True:
401+
if (
402+
zk.exists(
403+
"/clickhouse/tables/test_insert_quorum_with_keeper_loss/replicas/zero/is_active"
404+
)
405+
is None
406+
):
407+
break
408+
print("replica is still active")
409+
time.sleep(1)
410+
retries += 1
411+
if retries == 120:
412+
raise Exception("Can not wait cluster replica inactive")
413+
414+
first.query("SYSTEM ENABLE FAILPOINT finish_set_quorum_failed_parts")
415+
quorum_fail_future = executor.submit(
416+
lambda: first.query(
417+
"SYSTEM WAIT FAILPOINT finish_set_quorum_failed_parts", timeout=300
418+
)
419+
)
420+
first.query("SYSTEM START FETCHES test_insert_quorum_with_keeper_loss")
421+
422+
concurrent.futures.wait([quorum_fail_future])
423+
424+
assert quorum_fail_future.exception() is None
425+
426+
zero.query("SYSTEM ENABLE FAILPOINT finish_clean_quorum_failed_parts")
427+
clean_quorum_fail_parts_future = executor.submit(
428+
lambda: first.query(
429+
"SYSTEM WAIT FAILPOINT finish_clean_quorum_failed_parts", timeout=300
430+
)
431+
)
432+
pm.restore_instance_zk_connections(zero)
433+
concurrent.futures.wait([clean_quorum_fail_parts_future])
434+
435+
assert clean_quorum_fail_parts_future.exception() is None
436+
437+
zero.query("SYSTEM DISABLE FAILPOINT replicated_merge_tree_insert_retry_pause")
438+
concurrent.futures.wait([insert_future])
439+
assert insert_future.exception() is not None
440+
assert not zero.contains_in_log("LOGICAL_ERROR")
441+
assert zero.contains_in_log(
442+
"fails to commit and will not retry or clean garbage"
443+
)

0 commit comments

Comments
 (0)