Skip to content

Commit a7790a8

Browse files
Backport #73323 to 24.12: Fix refreshable MV crash on shutdown
1 parent 3fc6ce1 commit a7790a8

File tree

8 files changed

+147
-15
lines changed

8 files changed

+147
-15
lines changed

programs/server/Server.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2523,6 +2523,8 @@ try
25232523
}
25242524
}
25252525

2526+
global_context->getRefreshSet().setRefreshesStopped(true);
2527+
25262528
if (current_connections)
25272529
LOG_WARNING(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
25282530
else
@@ -2538,15 +2540,20 @@ try
25382540
if (!server_settings[ServerSetting::shutdown_wait_unfinished_queries])
25392541
global_context->getProcessList().killAllQueries();
25402542

2543+
size_t wait_limit_seconds = server_settings[ServerSetting::shutdown_wait_unfinished];
2544+
auto wait_start = std::chrono::steady_clock::now();
2545+
25412546
if (current_connections)
2542-
current_connections = waitServersToFinish(servers, servers_lock, server_settings[ServerSetting::shutdown_wait_unfinished]);
2547+
current_connections = waitServersToFinish(servers, servers_lock, wait_limit_seconds);
25432548

25442549
if (current_connections)
25452550
LOG_WARNING(log, "Closed connections. But {} remain."
25462551
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>", current_connections);
25472552
else
25482553
LOG_INFO(log, "Closed connections.");
25492554

2555+
global_context->getRefreshSet().joinBackgroundTasks(wait_start + std::chrono::milliseconds(wait_limit_seconds * 1000));
2556+
25502557
dns_cache_updater.reset();
25512558

25522559
if (current_connections)

src/Databases/DatabaseReplicatedWorker.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,21 +334,28 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
334334
assert(!zookeeper->exists(task->getFinishedNodePath()));
335335
task->is_initial_query = true;
336336

337-
LOG_DEBUG(log, "Waiting for worker thread to process all entries before {}", entry_name);
338337
UInt64 timeout = query_context->getSettingsRef()[Setting::database_replicated_initial_query_timeout_sec];
339338
StopToken cancellation = query_context->getDDLQueryCancellation();
340339
StopCallback cancellation_callback(cancellation, [&] { wait_current_task_change.notify_all(); });
340+
LOG_DEBUG(log, "Waiting for worker thread to process all entries before {} (timeout: {}s{})", entry_name, timeout, cancellation.stop_possible() ? ", cancellable" : "");
341+
341342
{
342343
std::unique_lock lock{mutex};
343344
bool processed = wait_current_task_change.wait_for(lock, std::chrono::seconds(timeout), [&]()
344345
{
345346
assert(zookeeper->expired() || current_task <= entry_name);
346347

347348
if (zookeeper->expired() || stop_flag)
349+
{
350+
LOG_TRACE(log, "Not enqueueing query: {}", stop_flag ? "replication stopped" : "ZooKeeper session expired");
348351
throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "ZooKeeper session expired or replication stopped, try again");
352+
}
349353

350354
if (cancellation.stop_requested())
355+
{
356+
LOG_TRACE(log, "DDL query was cancelled");
351357
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "DDL query was cancelled");
358+
}
352359

353360
return current_task == entry_name;
354361
});

src/Storages/MaterializedView/RefreshSet.cpp

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,10 @@ void RefreshSet::setRefreshesStopped(bool stopped)
185185
TaskMap tasks_copy;
186186
{
187187
std::lock_guard lock(mutex);
188-
refreshes_stopped.store(stopped);
188+
if (refreshes_stopped.exchange(stopped) == stopped)
189+
return;
190+
if (stopped)
191+
refreshes_stopped_at = std::chrono::steady_clock::now();
189192
tasks_copy = tasks;
190193
}
191194
for (const auto & kv : tasks_copy)
@@ -198,6 +201,40 @@ bool RefreshSet::refreshesStopped() const
198201
return refreshes_stopped.load();
199202
}
200203

204+
void RefreshSet::joinBackgroundTasks(std::chrono::steady_clock::time_point deadline)
205+
{
206+
std::vector<RefreshTaskPtr> remaining_tasks;
207+
std::chrono::steady_clock::time_point stopped_at;
208+
{
209+
std::unique_lock lock(mutex);
210+
stopped_at = refreshes_stopped_at;
211+
for (const auto & [_, list] : tasks)
212+
remaining_tasks.insert(remaining_tasks.end(), list.begin(), list.end());
213+
}
214+
std::erase_if(remaining_tasks, [&](const auto & t)
215+
{
216+
return t->tryJoinBackgroundTask(deadline);
217+
});
218+
219+
if (!remaining_tasks.empty())
220+
{
221+
auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - stopped_at).count();
222+
String names;
223+
for (size_t i = 0; i < remaining_tasks.size(); ++i)
224+
{
225+
if (i > 0)
226+
names += ", ";
227+
if (i >= 20)
228+
{
229+
names += "...";
230+
break;
231+
}
232+
names += remaining_tasks[i]->getInfo().view_id.getNameForLogs();
233+
}
234+
LOG_ERROR(getLogger("RefreshSet"), "{} view refreshes failed to stop in {:.3}s: {}", remaining_tasks.size(), elapsed_seconds, names);
235+
}
236+
}
237+
201238
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, std::optional<StorageID> inner_table_id_, RefreshTaskList::iterator iter_, RefreshTaskList::iterator inner_table_iter_, std::vector<StorageID> dependencies_)
202239
: parent_set(parent_set_), id(std::move(id_)), inner_table_id(std::move(inner_table_id_)), dependencies(std::move(dependencies_))
203240
, iter(iter_), inner_table_iter(inner_table_iter_), metric_increment(CurrentMetrics::Increment(CurrentMetrics::RefreshableViews)) {}

src/Storages/MaterializedView/RefreshSet.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ class RefreshSet
6868
void setRefreshesStopped(bool stopped);
6969
bool refreshesStopped() const;
7070

71+
/// Called during shutdown, after setRefreshesStopped(true).
72+
void joinBackgroundTasks(std::chrono::steady_clock::time_point deadline);
73+
7174
private:
7275
using TaskMap = std::unordered_map<StorageID, RefreshTaskList, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
7376
using DependentsMap = std::unordered_map<StorageID, std::unordered_set<RefreshTaskPtr>, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
@@ -82,6 +85,7 @@ class RefreshSet
8285
InnerTableMap inner_tables;
8386

8487
std::atomic<bool> refreshes_stopped {false};
88+
std::chrono::steady_clock::time_point refreshes_stopped_at;
8589

8690
RefreshTaskList::iterator addTaskLocked(StorageID id, RefreshTaskPtr task);
8791
void removeTaskLocked(StorageID id, RefreshTaskList::iterator iter);

src/Storages/MaterializedView/RefreshTask.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,22 @@ void RefreshTask::wait()
307307
}
308308
}
309309

310+
bool RefreshTask::tryJoinBackgroundTask(std::chrono::steady_clock::time_point deadline)
311+
{
312+
std::unique_lock lock(mutex);
313+
314+
execution.cancel_ddl_queries.request_stop();
315+
316+
auto duration = deadline - std::chrono::steady_clock::now();
317+
/// (Manually clamping to 0 because the standard library used to have (and possibly still has?)
318+
/// a bug that wait_until would wait forever if the timestamp is in the past.)
319+
duration = std::max(duration, std::chrono::steady_clock::duration(0));
320+
return refresh_cv.wait_for(lock, duration, [&]
321+
{
322+
return state != RefreshState::Running && state != RefreshState::Scheduling;
323+
});
324+
}
325+
310326
std::chrono::sys_seconds RefreshTask::getNextRefreshTimeslot() const
311327
{
312328
std::lock_guard guard(mutex);

src/Storages/MaterializedView/RefreshTask.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,16 @@ class RefreshTask : public std::enable_shared_from_this<RefreshTask>
7575
/// Cancel task execution
7676
void cancel();
7777

78-
/// Waits for the currently running refresh attempt to complete.
78+
/// Waits for the currently running refresh attempt to complete, either on this replica
79+
/// or on another one (if `coordinated`).
7980
/// If the refresh fails, throws an exception.
8081
/// If no refresh is running, completes immediately, throwing an exception if previous refresh failed.
8182
void wait();
8283

84+
/// Wait for background work (refreshing or scheduling) on this replica to complete.
85+
/// Returns false if `deadline` was reached before the work completed. Used by server shutdown.
86+
bool tryJoinBackgroundTask(std::chrono::steady_clock::time_point deadline);
87+
8388
/// A measure of how far this view has progressed. Used by dependent views.
8489
std::chrono::sys_seconds getNextRefreshTimeslot() const;
8590

tests/integration/helpers/cluster.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2047,12 +2047,12 @@ def _replace(self, path, what, to):
20472047
def restart_instance_with_ip_change(self, node, new_ip):
20482048
if "::" in new_ip:
20492049
if node.ipv6_address is None:
2050-
raise Exception("You should specity ipv6_address in add_node method")
2050+
raise Exception("You should specify ipv6_address in add_node method")
20512051
self._replace(node.docker_compose_path, node.ipv6_address, new_ip)
20522052
node.ipv6_address = new_ip
20532053
else:
20542054
if node.ipv4_address is None:
2055-
raise Exception("You should specity ipv4_address in add_node method")
2055+
raise Exception("You should specify ipv4_address in add_node method")
20562056
self._replace(node.docker_compose_path, node.ipv4_address, new_ip)
20572057
node.ipv4_address = new_ip
20582058
run_and_check(self.base_cmd + ["stop", node.name])
@@ -2119,9 +2119,11 @@ def exec_in_container(
21192119
detach: bool = False,
21202120
nothrow: bool = False,
21212121
use_cli: bool = True,
2122+
get_exec_id: bool = False,
21222123
**kwargs: Any,
21232124
) -> str:
21242125
if use_cli:
2126+
assert not get_exec_id
21252127
logging.debug(
21262128
f"run container_id:{container_id} detach:{detach} nothrow:{nothrow} cmd: {cmd}"
21272129
)
@@ -2159,8 +2161,9 @@ def exec_in_container(
21592161
else:
21602162
raise Exception(message)
21612163
if not detach:
2164+
assert not get_exec_id
21622165
return output.decode()
2163-
return output
2166+
return exec_id if get_exec_id else output
21642167

21652168
def copy_file_to_container(self, container_id, local_path, dest_path):
21662169
with open(local_path, "rb") as fdata:
@@ -2732,7 +2735,7 @@ def wait_cassandra_to_start(self, timeout=180):
27322735
logging.info(
27332736
f"Check Cassandra Online {self.cassandra_id} {self.cassandra_ip} {self.cassandra_port}"
27342737
)
2735-
check = self.exec_in_container(
2738+
self.exec_in_container(
27362739
self.cassandra_id,
27372740
[
27382741
"bash",
@@ -4032,16 +4035,20 @@ def start_clickhouse(
40324035
)
40334036
start_time = time.time()
40344037
time_to_sleep = 0.5
4038+
exec_id = None
40354039

40364040
while start_time + start_wait_sec >= time.time():
40374041
# sometimes after SIGKILL (hard reset) server may refuse to start for some time
40384042
# for different reasons.
40394043
pid = self.get_process_pid("clickhouse")
40404044
if pid is None:
40414045
logging.debug("No clickhouse process running. Start new one.")
4042-
self.exec_in_container(
4043-
["bash", "-c", self.clickhouse_start_command_in_daemon],
4046+
exec_id = self.exec_in_container(
4047+
["bash", "-c", self.clickhouse_start_command],
40444048
user=str(os.getuid()),
4049+
detach=True,
4050+
use_cli=False,
4051+
get_exec_id=True,
40454052
)
40464053
if expected_to_fail:
40474054
self.wait_start_failed(start_wait_sec + start_time - time.time())
@@ -4054,7 +4061,7 @@ def start_clickhouse(
40544061
raise Exception("ClickHouse was expected not to be running.")
40554062
try:
40564063
self.wait_start(start_wait_sec + start_time - time.time())
4057-
return
4064+
return exec_id
40584065
except Exception as e:
40594066
logging.warning(
40604067
f"Current start attempt failed. Will kill {pid} just in case."

tests/integration/test_refreshable_mv/test.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
import os
2-
import re
3-
import shutil
4-
import threading
1+
import logging
52
import time
63
from random import randint
74

@@ -221,3 +218,55 @@ def test_refreshable_mv_in_system_db(started_cluster):
221218
assert node1.query("select count(), sum(x) from system.a") == "2\t3\n"
222219

223220
node1.query("drop table system.a")
221+
222+
223+
def test_refresh_vs_shutdown_smoke(started_cluster):
224+
for node in nodes:
225+
node.query(
226+
"create database re engine = Replicated('/test/re', 'shard1', '{replica}');"
227+
)
228+
229+
node1.stop_clickhouse()
230+
231+
num_tables = 2
232+
233+
for i in range(10):
234+
exec_id = node1.start_clickhouse()
235+
assert exec_id is not None
236+
237+
if i == 0:
238+
node1.query("select '===test_refresh_vs_shutdown_smoke start==='")
239+
for j in range(num_tables):
240+
node1.query(
241+
f"create materialized view re.a{j} refresh every 1 second (x Int64) engine ReplicatedMergeTree order by x as select number*10 as x from numbers(2)"
242+
)
243+
244+
if randint(0, 1):
245+
for j in range(num_tables):
246+
if randint(0, 1):
247+
node1.query(f"system refresh view re.a{j}")
248+
r = randint(0, 2)
249+
if r == 1:
250+
time.sleep(randint(0, 10) / 1000)
251+
elif r == 2:
252+
time.sleep(randint(0, 100) / 1000)
253+
254+
node1.stop_clickhouse(stop_wait_sec=300)
255+
while True:
256+
exit_code = cluster.docker_client.api.exec_inspect(exec_id)["ExitCode"]
257+
if exit_code is not None:
258+
assert exit_code == 0
259+
break
260+
time.sleep(1)
261+
262+
assert not node1.contains_in_log("view refreshes failed to stop", from_host=True)
263+
assert not node1.contains_in_log("Closed connections. But", from_host=True)
264+
assert not node1.contains_in_log("Will shutdown forcefully.", from_host=True)
265+
assert not node1.contains_in_log("##########", from_host=True)
266+
assert node1.contains_in_log(
267+
"===test_refresh_vs_shutdown_smoke start===", from_host=True
268+
)
269+
270+
node1.start_clickhouse()
271+
node1.query("drop database re sync")
272+
node2.query("drop database re sync")

0 commit comments

Comments
 (0)