Skip to content

Commit d688d94

Browse files
Backport #79033 to 25.4: Support for refresh in readonly MergeTree tables
1 parent a3be7f7 commit d688d94

28 files changed

+347
-105
lines changed

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,8 @@ const VersionToSettingsChangesMap & getMergeTreeSettingsChangesHistory()
703703
{
704704
addSettingsChanges(merge_tree_settings_changes_history, "25.4",
705705
{
706+
/// Release closed. Please use 25.5
707+
{"refresh_parts_interval", 0, 0, "A new setting"},
706708
{"max_merge_delayed_streams_for_parallel_write", 1000, 100, "New setting"},
707709
{"max_postpone_time_for_failed_replicated_fetches_ms", 1ULL * 60 * 1000, 1ULL * 60 * 1000, "Added new setting to enable postponing fetch tasks in the replication queue."},
708710
{"max_postpone_time_for_failed_replicated_merges_ms", 1ULL * 60 * 1000, 1ULL * 60 * 1000, "Added new setting to enable postponing merge tasks in the replication queue."},

src/Databases/DatabaseLazy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ try
326326

327327
if (!it->second.table || isSharedPtrUnique(it->second.table))
328328
{
329-
LOG_DEBUG(log, "Drop table {} from cache.", backQuote(it->first));
329+
LOG_DEBUG(log, "Removing table {} from cache.", backQuote(it->first));
330330
it->second.table.reset();
331331
expired_tables.erase(it->second.expiration_iterator);
332332
it->second.expiration_iterator = cache_expiration_queue.end();

src/Disks/IDisk.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,9 +461,10 @@ class IDisk : public Space
461461
virtual void startupImpl(ContextPtr) {}
462462

463463
/// If the state can be changed under the hood and become outdated in memory, perform a reload if necessary.
464+
/// but don't do it more frequently than the specified parameter.
464465
/// Note: for performance reasons, it's allowed to assume that only some subset of changes are possible
465466
/// (those that MergeTree tables can make).
466-
virtual void refresh()
467+
virtual void refresh(UInt64 /* not_sooner_than_milliseconds */)
467468
{
468469
/// The default no-op implementation when the state in memory cannot be out of sync of the actual state.
469470
}

src/Disks/ObjectStorages/DiskObjectStorage.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,9 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
146146

147147
void startupImpl(ContextPtr context) override;
148148

149-
void refresh() override
149+
void refresh(UInt64 not_sooner_than_milliseconds) override
150150
{
151-
metadata_storage->refresh();
151+
metadata_storage->refresh(not_sooner_than_milliseconds);
152152
}
153153

154154
ReservationPtr reserve(UInt64 bytes) override;

src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -766,11 +766,11 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
766766
if (metadata_helper)
767767
{
768768
if (!object_key.hasPrefix())
769-
throw Exception(ErrorCodes::LOGICAL_ERROR, "metadata helper is not supported with absolute paths");
769+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata helper is not supported with absolute paths");
770770

771-
auto revision = metadata_helper->revision_counter + 1;
772-
metadata_helper->revision_counter++;
773-
object_attributes = {
771+
auto revision = ++metadata_helper->revision_counter;
772+
object_attributes =
773+
{
774774
{"path", path}
775775
};
776776

@@ -782,7 +782,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
782782
/// Does metadata_storage support empty files without actual blobs in the object_storage?
783783
const bool do_not_write_empty_blob = metadata_storage.supportsEmptyFilesWithoutBlobs();
784784

785-
/// seems ok
785+
/// Seems ok
786786
auto object = StoredObject(object_key.serialize(), path);
787787
std::function<void(size_t count)> create_metadata_callback;
788788

@@ -862,7 +862,6 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
862862
operations_to_execute.emplace_back(std::move(write_operation));
863863
}
864864

865-
866865
auto impl = object_storage.writeObject(
867866
object,
868867
/// We always use mode Rewrite because we simulate append using metadata and different files

src/Disks/ObjectStorages/FlatDirectoryStructureKeyGenerator.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ FlatDirectoryStructureKeyGenerator::FlatDirectoryStructureKeyGenerator(
1919
ObjectStorageKey FlatDirectoryStructureKeyGenerator::generate(const String & path, bool is_directory, const std::optional<String> & key_prefix) const
2020
{
2121
if (is_directory)
22-
chassert(path.ends_with('/'));
22+
chassert(path.empty() || path.ends_with('/'));
2323

24-
const auto p = std::filesystem::path(path);
25-
auto directory = p.parent_path();
24+
const auto fs_path = std::filesystem::path(path);
25+
std::filesystem::path directory = fs_path.parent_path();
2626

2727
std::optional<std::filesystem::path> remote_path;
2828
{
2929
const auto ptr = path_map.lock();
30-
auto res = ptr->getRemotePathInfoIfExists(p);
30+
auto res = ptr->getRemotePathInfoIfExists(fs_path);
3131
if (res)
3232
return ObjectStorageKey::createAsRelative(key_prefix.has_value() ? *key_prefix : storage_key_prefix, res->path);
3333

@@ -41,7 +41,7 @@ ObjectStorageKey FlatDirectoryStructureKeyGenerator::generate(const String & pat
4141
: directory;
4242

4343
if (!is_directory)
44-
key /= p.filename();
44+
key /= fs_path.filename();
4545

4646
return ObjectStorageKey::createAsRelative(key_prefix.has_value() ? *key_prefix : storage_key_prefix, key);
4747
}

src/Disks/ObjectStorages/IMetadataStorage.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,11 @@ class IMetadataStorage : private boost::noncopyable
256256
/// This method is overridden for specific metadata implementations in ClickHouse Cloud.
257257
}
258258

259-
/// If the state can be changed under the hood and become outdated in memory, perform a reload if necessary.
259+
/// If the state can be changed under the hood and become outdated in memory, perform a reload if necessary,
260+
/// but don't do it more frequently than the specified parameter.
260261
/// Note: for performance reasons, it's allowed to assume that only some subset of changes are possible
261262
/// (those that MergeTree tables can make).
262-
virtual void refresh()
263+
virtual void refresh(UInt64 /* not_sooner_than_milliseconds */)
263264
{
264265
/// The default no-op implementation when the state in memory cannot be out of sync of the actual state.
265266
}

src/Disks/ObjectStorages/InMemoryDirectoryPathMap.h

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class InMemoryDirectoryPathMap
3939
struct RemotePathInfo
4040
{
4141
std::string path;
42+
std::string etag;
4243
time_t last_modified = 0;
4344
FileNames files;
4445
};
@@ -55,28 +56,49 @@ class InMemoryDirectoryPathMap
5556
return remote_directories.contains(remote_path);
5657
}
5758

59+
bool existsRemotePathUnchanged(const std::string & remote_path, const std::string & etag) const
60+
{
61+
std::lock_guard lock(mutex);
62+
auto it = remote_directories.find(remote_path);
63+
return it != remote_directories.end() && it->second->second.etag == etag;
64+
}
65+
5866
bool existsLocalPath(const std::string & local_path) const
5967
{
6068
std::lock_guard lock(mutex);
6169
return map.contains(local_path);
6270
}
6371

64-
auto addPathIfNotExists(std::string path, RemotePathInfo info)
72+
void addOrReplacePath(std::string path, RemotePathInfo info)
6573
{
6674
std::string remote_path = info.path;
6775
std::lock_guard lock(mutex);
6876

6977
size_t num_files = info.files.size();
70-
auto res = map.emplace(std::move(path), std::move(info));
7178

72-
if (res.second)
79+
/// If the logical path already exists, skip it.
80+
if (map.contains(path))
81+
return;
82+
83+
/// If the path was differently named before.
84+
auto old_it = remote_directories.find(info.path);
85+
if (old_it != remote_directories.end())
7386
{
74-
remote_directories.emplace(remote_path);
75-
metric_directories.add(1);
76-
metric_files.add(num_files);
87+
metric_files.sub(old_it->second->second.files.size());
88+
metric_directories.sub(1);
89+
90+
map.erase(old_it->second->first);
91+
remote_directories.erase(old_it);
7792
}
7893

79-
return res;
94+
auto res = map.emplace(std::move(path), std::move(info));
95+
96+
if (!res.second)
97+
return;
98+
99+
remote_directories.emplace(remote_path, &*res.first);
100+
metric_directories.add(1);
101+
metric_files.add(num_files);
80102
}
81103

82104
bool existsFile(const std::string & local_path) const
@@ -219,11 +241,14 @@ class InMemoryDirectoryPathMap
219241
mutable std::mutex mutex;
220242

221243
/// A mapping from logical filesystem path to the storage path.
222-
using Map = std::map<std::filesystem::path, RemotePathInfo, PathComparator>;
223-
Map TSA_GUARDED_BY(mutex) map;
244+
using LogicalToPhysicalMap = std::map<std::filesystem::path, RemotePathInfo, PathComparator>;
245+
LogicalToPhysicalMap TSA_GUARDED_BY(mutex) map;
246+
247+
/// A mapping from the storage path to info. Note: std::map has pointers to its nodes stable.
248+
using PhysicalPaths = std::map<std::string, LogicalToPhysicalMap::const_pointer>;
224249

225250
/// A set of known storage paths (randomly-assigned names).
226-
FileNames TSA_GUARDED_BY(mutex) remote_directories;
251+
PhysicalPaths TSA_GUARDED_BY(mutex) remote_directories;
227252

228253
CurrentMetrics::Increment metric_directories;
229254
CurrentMetrics::Increment metric_files;

src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,21 @@ namespace ErrorCodes
2121
extern const int NOT_IMPLEMENTED;
2222
extern const int BAD_ARGUMENTS;
2323
extern const int CANNOT_UNLINK;
24+
extern const int CANNOT_RMDIR;
25+
extern const int READONLY;
2426
}
2527

26-
LocalObjectStorage::LocalObjectStorage(String key_prefix_)
27-
: key_prefix(std::move(key_prefix_))
28+
LocalObjectStorage::LocalObjectStorage(LocalObjectStorageSettings settings_)
29+
: settings(std::move(settings_))
2830
, log(getLogger("LocalObjectStorage"))
2931
{
3032
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
3133
description = *block_device_id;
3234
else
3335
description = "/";
3436

35-
fs::create_directories(key_prefix);
37+
if (!settings.read_only)
38+
fs::create_directories(settings.key_prefix);
3639
}
3740

3841
bool LocalObjectStorage::exists(const StoredObject & object) const
@@ -69,6 +72,8 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
6972
size_t buf_size,
7073
const WriteSettings & /* write_settings */)
7174
{
75+
throwIfReadonly();
76+
7277
if (mode != WriteMode::Rewrite)
7378
throw Exception(ErrorCodes::BAD_ARGUMENTS, "LocalObjectStorage doesn't support append to files");
7479

@@ -83,28 +88,52 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
8388

8489
void LocalObjectStorage::removeObject(const StoredObject & object) const
8590
{
91+
throwIfReadonly();
92+
8693
/// For local object storage files are actually removed when "metadata" is removed.
8794
if (!exists(object))
8895
return;
8996

9097
if (0 != unlink(object.remote_path.data()))
9198
ErrnoException::throwFromPath(ErrorCodes::CANNOT_UNLINK, object.remote_path, "Cannot unlink file {}", object.remote_path);
99+
100+
/// Remove empty directories.
101+
fs::path dir = fs::path(object.remote_path).parent_path();
102+
fs::path root = fs::weakly_canonical(settings.key_prefix);
103+
while (dir.has_parent_path() && dir.has_relative_path() && dir != root && pathStartsWith(dir, root))
104+
{
105+
LOG_TEST(log, "Removing empty directory {}, has_parent_path: {}, has_relative_path: {}, root: {}, starts with root: {}",
106+
std::string(dir), dir.has_parent_path(), dir.has_relative_path(), std::string(root), pathStartsWith(dir, root));
107+
108+
std::string dir_str = dir;
109+
if (0 != rmdir(dir_str.data()))
110+
{
111+
if (errno == ENOTDIR || errno == ENOTEMPTY)
112+
break;
113+
ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, dir_str, "Cannot remove directory {}", dir_str);
114+
}
115+
116+
dir = dir.parent_path();
117+
}
92118
}
93119

94120
void LocalObjectStorage::removeObjects(const StoredObjects & objects) const
95121
{
122+
throwIfReadonly();
96123
for (const auto & object : objects)
97124
removeObject(object);
98125
}
99126

100127
void LocalObjectStorage::removeObjectIfExists(const StoredObject & object)
101128
{
129+
throwIfReadonly();
102130
if (exists(object))
103131
removeObject(object);
104132
}
105133

106134
void LocalObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
107135
{
136+
throwIfReadonly();
108137
for (const auto & object : objects)
109138
removeObjectIfExists(object);
110139
}
@@ -113,14 +142,21 @@ ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & path) c
113142
{
114143
ObjectMetadata object_metadata;
115144
LOG_TEST(log, "Getting metadata for path: {}", path);
145+
146+
auto time = fs::last_write_time(path);
147+
116148
object_metadata.size_bytes = fs::file_size(path);
149+
object_metadata.etag = std::to_string(std::chrono::duration_cast<std::chrono::nanoseconds>(time.time_since_epoch()).count());
117150
object_metadata.last_modified = Poco::Timestamp::fromEpochTime(
118-
std::chrono::duration_cast<std::chrono::seconds>(fs::last_write_time(path).time_since_epoch()).count());
151+
std::chrono::duration_cast<std::chrono::seconds>(time.time_since_epoch()).count());
119152
return object_metadata;
120153
}
121154

122155
void LocalObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t/* max_keys */) const
123156
{
157+
if (!fs::is_directory(path))
158+
return;
159+
124160
for (const auto & entry : fs::directory_iterator(path))
125161
{
126162
if (entry.is_directory())
@@ -147,6 +183,7 @@ void LocalObjectStorage::copyObject( // NOLINT
147183
const WriteSettings & write_settings,
148184
std::optional<ObjectAttributes> /* object_to_attributes */)
149185
{
186+
throwIfReadonly();
150187
auto in = readObject(object_from, read_settings);
151188
auto out = writeObject(object_to, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings);
152189
copyData(*in, *out);
@@ -161,6 +198,12 @@ void LocalObjectStorage::startup()
161198
{
162199
}
163200

201+
void LocalObjectStorage::throwIfReadonly() const
202+
{
203+
if (settings.read_only)
204+
throw Exception(ErrorCodes::READONLY, "Local object storage `{}` is readonly", getName());
205+
}
206+
164207
std::unique_ptr<IObjectStorage> LocalObjectStorage::cloneObjectStorage(
165208
const std::string & /* new_namespace */,
166209
const Poco::Util::AbstractConfiguration & /* config */,
@@ -173,7 +216,7 @@ ObjectStorageKey
173216
LocalObjectStorage::generateObjectKeyForPath(const std::string & /* path */, const std::optional<std::string> & /* key_prefix */) const
174217
{
175218
constexpr size_t key_name_total_size = 32;
176-
return ObjectStorageKey::createAsRelative(key_prefix, getRandomASCIIString(key_name_total_size));
219+
return ObjectStorageKey::createAsRelative(settings.key_prefix, getRandomASCIIString(key_name_total_size));
177220
}
178221

179222
}

0 commit comments

Comments
 (0)