|
| 1 | +import concurrent |
1 | 2 | import time |
2 | 3 |
|
3 | 4 | import pytest |
4 | 5 | from helpers.cluster import ClickHouseCluster |
| 6 | +from helpers.network import PartitionManager |
5 | 7 | from helpers.test_tools import TSV |
6 | 8 |
|
7 | 9 | cluster = ClickHouseCluster(__file__) |
@@ -361,3 +363,81 @@ def test_insert_quorum_with_ttl(started_cluster): |
361 | 363 | ) |
362 | 364 |
|
363 | 365 | zero.query("DROP TABLE IF EXISTS test_insert_quorum_with_ttl ON CLUSTER cluster") |
| 366 | + |
| 367 | + |
| 368 | +def test_insert_quorum_with_keeper_loss_connection(): |
| 369 | + zero.query( |
| 370 | + "DROP TABLE IF EXISTS test_insert_quorum_with_keeper_fail ON CLUSTER cluster" |
| 371 | + ) |
| 372 | + create_query = ( |
| 373 | + "CREATE TABLE test_insert_quorum_with_keeper_loss" |
| 374 | + "(a Int8, d Date) " |
| 375 | + "Engine = ReplicatedMergeTree('/clickhouse/tables/{table}', '{replica}') " |
| 376 | + "ORDER BY a " |
| 377 | + ) |
| 378 | + |
| 379 | + zero.query(create_query) |
| 380 | + first.query(create_query) |
| 381 | + |
| 382 | + first.query("SYSTEM STOP FETCHES test_insert_quorum_with_keeper_loss") |
| 383 | + |
| 384 | + zero.query("SYSTEM ENABLE FAILPOINT replicated_merge_tree_commit_zk_fail_after_op") |
| 385 | + zero.query("SYSTEM ENABLE FAILPOINT replicated_merge_tree_insert_retry_pause") |
| 386 | + |
| 387 | + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
| 388 | + insert_future = executor.submit( |
| 389 | + lambda: zero.query( |
| 390 | + "INSERT INTO test_insert_quorum_with_keeper_loss(a,d) VALUES(1, '2011-01-01')", |
| 391 | + settings={"insert_quorum_timeout": 150000}, |
| 392 | + ) |
| 393 | + ) |
| 394 | + |
| 395 | + pm = PartitionManager() |
| 396 | + pm.drop_instance_zk_connections(zero) |
| 397 | + |
| 398 | + retries = 0 |
| 399 | + zk = cluster.get_kazoo_client("zoo1") |
| 400 | + while True: |
| 401 | + if ( |
| 402 | + zk.exists( |
| 403 | + "/clickhouse/tables/test_insert_quorum_with_keeper_loss/replicas/zero/is_active" |
| 404 | + ) |
| 405 | + is None |
| 406 | + ): |
| 407 | + break |
| 408 | + print("replica is still active") |
| 409 | + time.sleep(1) |
| 410 | + retries += 1 |
| 411 | + if retries == 120: |
| 412 | + raise Exception("Can not wait cluster replica inactive") |
| 413 | + |
| 414 | + first.query("SYSTEM ENABLE FAILPOINT finish_set_quorum_failed_parts") |
| 415 | + quorum_fail_future = executor.submit( |
| 416 | + lambda: first.query( |
| 417 | + "SYSTEM WAIT FAILPOINT finish_set_quorum_failed_parts", timeout=300 |
| 418 | + ) |
| 419 | + ) |
| 420 | + first.query("SYSTEM START FETCHES test_insert_quorum_with_keeper_loss") |
| 421 | + |
| 422 | + concurrent.futures.wait([quorum_fail_future]) |
| 423 | + |
| 424 | + assert quorum_fail_future.exception() is None |
| 425 | + |
| 426 | + zero.query("SYSTEM ENABLE FAILPOINT finish_clean_quorum_failed_parts") |
| 427 | + clean_quorum_fail_parts_future = executor.submit( |
| 428 | + lambda: first.query( |
| 429 | + "SYSTEM WAIT FAILPOINT finish_clean_quorum_failed_parts", timeout=300 |
| 430 | + ) |
| 431 | + ) |
| 432 | + pm.restore_instance_zk_connections(zero) |
| 433 | + concurrent.futures.wait([clean_quorum_fail_parts_future]) |
| 434 | + |
| 435 | + assert clean_quorum_fail_parts_future.exception() is None |
| 436 | + |
| 437 | + zero.query("SYSTEM DISABLE FAILPOINT replicated_merge_tree_insert_retry_pause") |
| 438 | + concurrent.futures.wait([insert_future]) |
| 439 | + assert insert_future.exception() is not None |
| 440 | + assert not zero.contains_in_log("LOGICAL_ERROR") |
| 441 | + assert zero.contains_in_log( |
| 442 | + "fails to commit and will not retry or clean garbage" |
| 443 | + ) |
0 commit comments