Skip to content

Commit cb90c71

Browse files
Backport #95664 to 25.8: Fix race between distributed DDL and dropping Replicated database
1 parent 7b54cda commit cb90c71

16 files changed

+74
-58
lines changed

src/Databases/DatabaseReplicated.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,7 +1263,7 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_
12631263
}
12641264
}
12651265

1266-
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags)
1266+
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags, DDLGuardPtr && database_guard)
12671267
{
12681268
waitDatabaseStarted();
12691269

@@ -1306,7 +1306,7 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
13061306
}
13071307

13081308

1309-
return getQueryStatus(node_path, fs::path(zookeeper_path) / "replicas", query_context, hosts_to_wait);
1309+
return getQueryStatus(node_path, fs::path(zookeeper_path) / "replicas", query_context, hosts_to_wait, std::move(database_guard));
13101310
}
13111311

13121312
static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context)
@@ -2393,13 +2393,13 @@ void registerDatabaseReplicated(DatabaseFactory & factory)
23932393
}
23942394

23952395
BlockIO DatabaseReplicated::getQueryStatus(
2396-
const String & node_path, const String & replicas_path, ContextPtr context_, const Strings & hosts_to_wait)
2396+
const String & node_path, const String & replicas_path, ContextPtr context_, const Strings & hosts_to_wait, DDLGuardPtr && database_guard)
23972397
{
23982398
BlockIO io;
23992399
if (context_->getSettingsRef()[Setting::distributed_ddl_task_timeout] == 0)
24002400
return io;
24012401

2402-
auto source = std::make_shared<ReplicatedDatabaseQueryStatusSource>(node_path, replicas_path, context_, hosts_to_wait);
2402+
auto source = std::make_shared<ReplicatedDatabaseQueryStatusSource>(node_path, replicas_path, context_, hosts_to_wait, std::move(database_guard));
24032403
io.pipeline = QueryPipeline(std::move(source));
24042404

24052405
if (context_->getSettingsRef()[Setting::distributed_ddl_output_mode] == DistributedDDLOutputMode::NONE

src/Databases/DatabaseReplicated.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class DatabaseReplicated : public DatabaseAtomic
7171

7272
/// Try to execute DLL query on current host as initial query. If query is succeed,
7373
/// then it will be executed on all replicas.
74-
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags) override;
74+
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags, DDLGuardPtr && database_guard) override;
7575

7676
bool canExecuteReplicatedMetadataAlter() const override;
7777

@@ -182,7 +182,7 @@ class DatabaseReplicated : public DatabaseAtomic
182182
void reinitializeDDLWorker();
183183

184184
static BlockIO
185-
getQueryStatus(const String & node_path, const String & replicas_path, ContextPtr context, const Strings & hosts_to_wait);
185+
getQueryStatus(const String & node_path, const String & replicas_path, ContextPtr context, const Strings & hosts_to_wait, DDLGuardPtr && database_guard);
186186

187187
String zookeeper_path;
188188
String shard_name;

src/Databases/IDatabase.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ void IDatabase::stopReplication()
204204
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not run a replication thread", getEngineName());
205205
}
206206

207-
BlockIO IDatabase::tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] QueryFlags flags) /// NOLINT
207+
BlockIO IDatabase::tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] QueryFlags flags, DDLGuardPtr && /*database_guard*/) /// NOLINT
208208
{
209209
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not have replicated DDL queue", getEngineName());
210210
}

src/Databases/IDatabase.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <Databases/LoadingStrictnessLevel.h>
55
#include <Disks/IDisk.h>
66
#include <Interpreters/Context_fwd.h>
7+
#include <Interpreters/DDLGuard.h>
78
#include <Interpreters/QueryFlags.h>
89
#include <Parsers/IAST_fwd.h>
910
#include <QueryPipeline/BlockIO.h>
@@ -423,7 +424,7 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>
423424

424425
virtual bool shouldReplicateQuery(const ContextPtr & /*query_context*/, const ASTPtr & /*query_ptr*/) const { return false; }
425426

426-
virtual BlockIO tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] QueryFlags flags);
427+
virtual BlockIO tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] QueryFlags flags, DDLGuardPtr && /*database_guard*/);
427428

428429
/// Returns CREATE TABLE queries and corresponding tables prepared for writing to a backup.
429430
virtual std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & context) const;

src/Interpreters/DDLGuard.h

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#pragma once
2+
3+
#include <Common/SharedMutex.h>
4+
#include <mutex>
5+
6+
7+
namespace DB
8+
{
9+
10+
/// Allows executing DDL query only in one thread.
11+
/// Puts an element into the map, locks tables's mutex, counts how much threads run parallel query on the table,
12+
/// when counter is 0 erases element in the destructor.
13+
/// If the element already exists in the map, waits when ddl query will be finished in other thread.
14+
class DDLGuard
15+
{
16+
public:
17+
struct Entry
18+
{
19+
std::unique_ptr<std::mutex> mutex;
20+
UInt32 counter;
21+
};
22+
23+
/// Element name -> (mutex, counter).
24+
/// NOTE: using std::map here (and not std::unordered_map) to avoid iterator invalidation on insertion.
25+
using Map = std::map<String, Entry>;
26+
27+
DDLGuard(
28+
Map & map_,
29+
SharedMutex & db_mutex_,
30+
std::unique_lock<std::mutex> guards_lock_,
31+
const String & elem,
32+
const String & database_name);
33+
~DDLGuard();
34+
35+
/// Unlocks table name, keeps holding read lock for database name
36+
void releaseTableLock() noexcept;
37+
38+
private:
39+
Map & map;
40+
SharedMutex & db_mutex;
41+
Map::iterator it;
42+
std::unique_lock<std::mutex> guards_lock;
43+
std::unique_lock<std::mutex> table_lock;
44+
bool table_lock_removed = false;
45+
bool is_database_guard = false;
46+
};
47+
48+
using DDLGuardPtr = std::unique_ptr<DDLGuard>;
49+
50+
}

src/Interpreters/DatabaseCatalog.h

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <Core/UUID.h>
44
#include <Databases/TablesDependencyGraph.h>
55
#include <Interpreters/Context_fwd.h>
6+
#include <Interpreters/DDLGuard.h>
67
#include <Interpreters/StorageID.h>
78
#include <Parsers/IAST_fwd.h>
89
#include <Storages/IStorage_fwd.h>
@@ -37,45 +38,6 @@ using Databases = std::map<String, std::shared_ptr<IDatabase>>;
3738
using DiskPtr = std::shared_ptr<IDisk>;
3839
using TableNamesSet = std::unordered_set<QualifiedTableName>;
3940

40-
/// Allows executing DDL query only in one thread.
41-
/// Puts an element into the map, locks tables's mutex, counts how much threads run parallel query on the table,
42-
/// when counter is 0 erases element in the destructor.
43-
/// If the element already exists in the map, waits when ddl query will be finished in other thread.
44-
class DDLGuard
45-
{
46-
public:
47-
struct Entry
48-
{
49-
std::unique_ptr<std::mutex> mutex;
50-
UInt32 counter;
51-
};
52-
53-
/// Element name -> (mutex, counter).
54-
/// NOTE: using std::map here (and not std::unordered_map) to avoid iterator invalidation on insertion.
55-
using Map = std::map<String, Entry>;
56-
57-
DDLGuard(
58-
Map & map_,
59-
SharedMutex & db_mutex_,
60-
std::unique_lock<std::mutex> guards_lock_,
61-
const String & elem,
62-
const String & database_name);
63-
~DDLGuard();
64-
65-
/// Unlocks table name, keeps holding read lock for database name
66-
void releaseTableLock() noexcept;
67-
68-
private:
69-
Map & map;
70-
SharedMutex & db_mutex;
71-
Map::iterator it;
72-
std::unique_lock<std::mutex> guards_lock;
73-
std::unique_lock<std::mutex> table_lock;
74-
bool table_lock_removed = false;
75-
bool is_database_guard = false;
76-
};
77-
78-
using DDLGuardPtr = std::unique_ptr<DDLGuard>;
7941

8042
class FutureSetFromSubquery;
8143
using FutureSetFromSubqueryPtr = std::shared_ptr<FutureSetFromSubquery>;

src/Interpreters/InterpreterAlterQuery.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
133133
{
134134
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name, database.get());
135135
guard->releaseTableLock();
136-
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), {});
136+
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), {}, std::move(guard));
137137
}
138138

139139
if (!table)

src/Interpreters/InterpreterCreateIndexQuery.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ BlockIO InterpreterCreateIndexQuery::execute()
7777
{
7878
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name, database.get());
7979
guard->releaseTableLock();
80-
return database->tryEnqueueReplicatedDDL(query_ptr, current_context, {});
80+
return database->tryEnqueueReplicatedDDL(query_ptr, current_context, {}, std::move(guard));
8181
}
8282

8383
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, current_context);

src/Interpreters/InterpreterCreateQuery.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
15321532
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.getTable(), database.get());
15331533
create.setDatabase(database_name);
15341534
guard->releaseTableLock();
1535-
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), QueryFlags{ .internal = internal, .distributed_backup_restore = is_restore_from_backup });
1535+
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), QueryFlags{ .internal = internal, .distributed_backup_restore = is_restore_from_backup }, std::move(guard));
15361536
}
15371537

15381538
if (!create.cluster.empty())
@@ -1725,7 +1725,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
17251725
auto guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable(), database.get());
17261726
assertOrSetUUID(create, database);
17271727
guard->releaseTableLock();
1728-
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), QueryFlags{ .internal = internal, .distributed_backup_restore = is_restore_from_backup });
1728+
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), QueryFlags{ .internal = internal, .distributed_backup_restore = is_restore_from_backup }, std::move(guard));
17291729
}
17301730

17311731
if (!create.cluster.empty())

src/Interpreters/InterpreterDeleteQuery.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ BlockIO InterpreterDeleteQuery::execute()
8282
{
8383
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name, database.get());
8484
guard->releaseTableLock();
85-
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), {});
85+
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), {}, std::move(guard));
8686
}
8787

8888
auto table_lock = table->lockForShare(getContext()->getCurrentQueryId(), settings[Setting::lock_acquire_timeout]);

0 commit comments

Comments
 (0)