Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Backups/BackupEntryFromAppendOnlyFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ namespace DB
BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_,
const String & file_path_,
const ReadSettings & settings_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_)
: BackupEntryFromImmutableFile(disk_, file_path_, file_size_, checksum_, temporary_file_)
: BackupEntryFromImmutableFile(disk_, file_path_, settings_, file_size_, checksum_, temporary_file_)
, limit(BackupEntryFromImmutableFile::getSize())
{
}
Expand Down
1 change: 1 addition & 0 deletions src/Backups/BackupEntryFromAppendOnlyFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile
BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_,
const String & file_path_,
const ReadSettings & settings_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_ = {});
Expand Down
10 changes: 8 additions & 2 deletions src/Backups/BackupEntryFromImmutableFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ namespace DB
BackupEntryFromImmutableFile::BackupEntryFromImmutableFile(
const DiskPtr & disk_,
const String & file_path_,
const ReadSettings & settings_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_)
: disk(disk_), file_path(file_path_), file_size(file_size_), checksum(checksum_), temporary_file_on_disk(temporary_file_)
: disk(disk_)
, file_path(file_path_)
, settings(settings_)
, file_size(file_size_)
, checksum(checksum_)
, temporary_file_on_disk(temporary_file_)
{
}

Expand All @@ -30,7 +36,7 @@ UInt64 BackupEntryFromImmutableFile::getSize() const

std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
{
return disk->readFile(file_path);
return disk->readFile(file_path, settings);
}


Expand Down
3 changes: 3 additions & 0 deletions src/Backups/BackupEntryFromImmutableFile.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Backups/IBackupEntry.h>
#include <IO/ReadSettings.h>
#include <base/defines.h>
#include <mutex>

Expand All @@ -19,6 +20,7 @@ class BackupEntryFromImmutableFile : public IBackupEntry
BackupEntryFromImmutableFile(
const DiskPtr & disk_,
const String & file_path_,
const ReadSettings & settings_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_ = {});
Expand All @@ -37,6 +39,7 @@ class BackupEntryFromImmutableFile : public IBackupEntry
private:
const DiskPtr disk;
const String file_path;
ReadSettings settings;
mutable std::optional<UInt64> file_size TSA_GUARDED_BY(get_file_size_mutex);
mutable std::mutex get_file_size_mutex;
const std::optional<UInt128> checksum;
Expand Down
6 changes: 6 additions & 0 deletions src/Backups/BackupIO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/SeekableReadBuffer.h>
#include <Interpreters/Context.h>


namespace DB
Expand All @@ -22,6 +23,11 @@ void IBackupReader::copyFileToDisk(const String & file_name, size_t size, DiskPt
write_buffer->finalize();
}

IBackupWriter::IBackupWriter(const ContextPtr & context_)
: read_settings(context_->getBackupReadSettings())
, has_throttling(static_cast<bool>(context_->getBackupsThrottler()))
{}

void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
{
auto read_buffer = create_read_buffer();
Expand Down
14 changes: 14 additions & 0 deletions src/Backups/BackupIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <Core/Types.h>
#include <Disks/DiskType.h>
#include <Disks/IDisk.h>
#include <IO/ReadSettings.h>
#include <Interpreters/Context_fwd.h>

namespace DB
{
Expand All @@ -28,6 +30,8 @@ class IBackupWriter /// BackupWriterFile, BackupWriterDisk
public:
using CreateReadBufferFunction = std::function<std::unique_ptr<SeekableReadBuffer>()>;

explicit IBackupWriter(const ContextPtr & context_);

virtual ~IBackupWriter() = default;
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
Expand All @@ -38,7 +42,17 @@ class IBackupWriter /// BackupWriterFile, BackupWriterDisk
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name);
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const { return false; }

/// Copy file using native copy (optimized for S3 to use CopyObject)
///
/// NOTE: It still may fall back to copyDataToFile() if native copy is not possible:
/// - different buckets
/// - throttling had been requested
virtual void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name);

protected:
const ReadSettings read_settings;
const bool has_throttling;
};

}
9 changes: 6 additions & 3 deletions src/Backups/BackupIO_Disk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ void BackupReaderDisk::copyFileToDisk(const String & file_name, size_t size, Dis
}


BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & path_) : disk(disk_), path(path_)
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & path_, const ContextPtr & context_)
: IBackupWriter(context_)
, disk(disk_)
, path(path_)
{
}

Expand Down Expand Up @@ -127,9 +130,9 @@ void BackupWriterDisk::copyFileNative(DiskPtr src_disk, const String & src_file_
if (!src_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot natively copy data to disk without source disk");

if ((src_offset != 0) || (src_size != src_disk->getFileSize(src_file_name)))
if (has_throttling || (src_offset != 0) || (src_size != src_disk->getFileSize(src_file_name)))
{
auto create_read_buffer = [src_disk, src_file_name] { return src_disk->readFile(src_file_name); };
auto create_read_buffer = [this, src_disk, src_file_name] { return src_disk->readFile(src_file_name, read_settings); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
return;
}
Expand Down
3 changes: 2 additions & 1 deletion src/Backups/BackupIO_Disk.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <filesystem>
#include <Backups/BackupIO.h>
#include <Interpreters/Context_fwd.h>

namespace DB
{
Expand Down Expand Up @@ -30,7 +31,7 @@ class BackupReaderDisk : public IBackupReader
class BackupWriterDisk : public IBackupWriter
{
public:
BackupWriterDisk(const DiskPtr & disk_, const String & path_);
BackupWriterDisk(const DiskPtr & disk_, const String & path_, const ContextPtr & context_);
~BackupWriterDisk() override;

bool fileExists(const String & file_name) override;
Expand Down
8 changes: 5 additions & 3 deletions src/Backups/BackupIO_File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ void BackupReaderFile::copyFileToDisk(const String & file_name, size_t size, Dis
}


BackupWriterFile::BackupWriterFile(const String & path_) : path(path_)
BackupWriterFile::BackupWriterFile(const String & path_, const ContextPtr & context_)
: IBackupWriter(context_)
, path(path_)
{
}

Expand Down Expand Up @@ -152,9 +154,9 @@ void BackupWriterFile::copyFileNative(DiskPtr src_disk, const String & src_file_
else
abs_source_path = fs::absolute(src_file_name);

if ((src_offset != 0) || (src_size != fs::file_size(abs_source_path)))
if (has_throttling || (src_offset != 0) || (src_size != fs::file_size(abs_source_path)))
{
auto create_read_buffer = [abs_source_path] { return createReadBufferFromFileBase(abs_source_path, {}); };
auto create_read_buffer = [this, abs_source_path] { return createReadBufferFromFileBase(abs_source_path, read_settings); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
return;
}
Expand Down
3 changes: 2 additions & 1 deletion src/Backups/BackupIO_File.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <filesystem>
#include <Backups/BackupIO.h>
#include <Interpreters/Context_fwd.h>

namespace DB
{
Expand All @@ -27,7 +28,7 @@ class BackupReaderFile : public IBackupReader
class BackupWriterFile : public IBackupWriter
{
public:
explicit BackupWriterFile(const String & path_);
explicit BackupWriterFile(const String & path_, const ContextPtr & context_);
~BackupWriterFile() override;

bool fileExists(const String & file_name) override;
Expand Down
6 changes: 3 additions & 3 deletions src/Backups/BackupIO_S3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ void BackupReaderS3::copyFileToDisk(const String & file_name, size_t size, DiskP

BackupWriterS3::BackupWriterS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, const ContextPtr & context_)
: s3_uri(s3_uri_)
: IBackupWriter(context_)
, s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, read_settings(context_->getReadSettings())
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
, log(&Poco::Logger::get("BackupWriterS3"))
{
Expand All @@ -189,7 +189,7 @@ void BackupWriterS3::copyFileNative(DiskPtr src_disk, const String & src_file_na
auto objects = src_disk->getStorageObjects(src_file_name);
if (objects.size() > 1)
{
auto create_read_buffer = [src_disk, src_file_name] { return src_disk->readFile(src_file_name); };
auto create_read_buffer = [this, src_disk, src_file_name] { return src_disk->readFile(src_file_name, read_settings); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
}
else
Expand Down
2 changes: 1 addition & 1 deletion src/Backups/BackupIO_S3.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/Context_fwd.h>


namespace DB
Expand Down Expand Up @@ -76,7 +77,6 @@ class BackupWriterS3 : public IBackupWriter

S3::URI s3_uri;
std::shared_ptr<S3::Client> client;
ReadSettings read_settings;
S3Settings::RequestSettings request_settings;
Poco::Logger * log;
std::optional<bool> supports_batch_delete;
Expand Down
4 changes: 2 additions & 2 deletions src/Backups/registerBackupEnginesFileAndDisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ void registerBackupEnginesFileAndDisk(BackupFactory & factory)
{
std::shared_ptr<IBackupWriter> writer;
if (engine_name == "File")
writer = std::make_shared<BackupWriterFile>(path);
writer = std::make_shared<BackupWriterFile>(path, params.context);
else
writer = std::make_shared<BackupWriterDisk>(disk, path);
writer = std::make_shared<BackupWriterDisk>(disk, path, params.context);
return std::make_unique<BackupImpl>(
backup_name_for_logging,
archive_params,
Expand Down
12 changes: 8 additions & 4 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,14 @@
M(S3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 GET and SELECT request throttling.") \
M(S3PutRequestThrottlerCount, "Number of S3 PUT, COPY, POST and LIST requests passed through throttler.") \
M(S3PutRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform S3 PUT, COPY, POST and LIST request throttling.") \
M(RemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth_for_server' throttler.") \
M(RemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth_for_server' throttling.") \
M(RemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth_for_server' throttler.") \
M(RemoteWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_write_network_bandwidth_for_server' throttling.") \
M(RemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth_for_server'/'max_remote_read_network_bandwidth' throttler.") \
M(RemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth_for_server'/'max_remote_read_network_bandwidth' throttling.") \
M(RemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth_for_server'/'max_remote_write_network_bandwidth' throttler.") \
M(RemoteWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_write_network_bandwidth_for_server'/'max_remote_write_network_bandwidth' throttling.") \
M(LocalReadThrottlerBytes, "Bytes passed through 'max_local_read_bandwidth_for_server'/'max_local_read_bandwidth' throttler.") \
M(LocalReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_local_read_bandwidth_for_server'/'max_local_read_bandwidth' throttling.") \
M(LocalWriteThrottlerBytes, "Bytes passed through 'max_local_write_bandwidth_for_server'/'max_local_write_bandwidth' throttler.") \
M(LocalWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_local_write_bandwidth_for_server'/'max_local_write_bandwidth' throttling.") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform all throttling settings.") \
\
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
Expand Down
5 changes: 4 additions & 1 deletion src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ void ServerSettings::loadSettingsFromConfig(const Poco::Util::AbstractConfigurat
"background_buffer_flush_schedule_pool_size",
"background_schedule_pool_size",
"background_message_broker_schedule_pool_size",
"background_distributed_schedule_pool_size"
"background_distributed_schedule_pool_size",

"max_remote_read_network_bandwidth_for_server",
"max_remote_write_network_bandwidth_for_server",
};

for (auto setting : all())
Expand Down
5 changes: 5 additions & 0 deletions src/Core/ServerSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ namespace DB
M(UInt64, max_io_thread_pool_size, 100, "The maximum number of threads that would be used for IO operations", 0) \
M(UInt64, max_io_thread_pool_free_size, 0, "Max free size for IO thread pool.", 0) \
M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \
M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited.", 0) \
M(UInt64, max_remote_write_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for write. Zero means unlimited.", 0) \
M(UInt64, max_local_read_bandwidth_for_server, 0, "The maximum speed of local reads in bytes per second. Zero means unlimited.", 0) \
M(UInt64, max_local_write_bandwidth_for_server, 0, "The maximum speed of local writes in bytes per second. Zero means unlimited.", 0) \
M(UInt64, max_backups_io_thread_pool_size, 1000, "The maximum number of threads that would be used for IO operations for BACKUP queries", 0) \
M(UInt64, max_backups_io_thread_pool_free_size, 0, "Max free size for backups IO thread pool.", 0) \
M(UInt64, backups_io_thread_pool_queue_size, 0, "Queue size for backups IO thread pool.", 0) \
M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \
M(UInt64, max_backup_bandwidth_for_server, 0, "The maximum read speed in bytes per second for all backups on server. Zero means unlimited.", 0) \
M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \
M(Int32, max_connections, 1024, "Max server connections.", 0) \
M(UInt32, asynchronous_metrics_update_period_s, 1, "Period in seconds for updating asynchronous metrics.", 0) \
Expand Down
10 changes: 8 additions & 2 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ class IColumn;
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_remote_write_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for write. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_remote_read_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for read.", 0) \
M(UInt64, max_remote_write_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for write.", 0) \
M(UInt64, max_local_read_bandwidth, 0, "The maximum speed of local reads in bytes per second.", 0) \
M(UInt64, max_local_write_bandwidth, 0, "The maximum speed of local writes in bytes per second.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\
Expand Down Expand Up @@ -422,6 +424,7 @@ class IColumn;
M(UInt64, backup_restore_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \
M(UInt64, backup_restore_keeper_value_max_size, 1048576, "Maximum size of data of a [Zoo]Keeper's node during backup", 0) \
M(UInt64, backup_restore_batch_size_for_keeper_multiread, 10000, "Maximum size of batch for multiread request to [Zoo]Keeper during backup or restore", 0) \
M(UInt64, max_backup_bandwidth, 0, "The maximum read speed in bytes per second for particular backup on server. Zero means unlimited.", 0) \
\
M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \
M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \
Expand Down Expand Up @@ -735,6 +738,7 @@ class IColumn;
#define MAKE_OBSOLETE(M, TYPE, NAME, DEFAULT) \
M(TYPE, NAME, DEFAULT, "Obsolete setting, does nothing.", BaseSettingsHelpers::Flags::OBSOLETE)

/// NOTE: ServerSettings::loadSettingsFromConfig() should be updated to include this settings
#define MAKE_DEPRECATED_BY_SERVER_CONFIG(M, TYPE, NAME, DEFAULT) \
M(TYPE, NAME, DEFAULT, "User-level setting is deprecated, and it must be defined in the server configuration instead.", BaseSettingsHelpers::Flags::OBSOLETE)

Expand Down Expand Up @@ -768,6 +772,8 @@ class IColumn;
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, background_schedule_pool_size, 128) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, background_message_broker_schedule_pool_size, 16) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, background_distributed_schedule_pool_size, 16) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_remote_read_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_remote_write_network_bandwidth_for_server, 0) \
/* ---- */ \
MAKE_OBSOLETE(M, DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic) \
MAKE_OBSOLETE(M, UInt64, max_pipeline_depth, 0) \
Expand Down
6 changes: 4 additions & 2 deletions src/Disks/DiskLocal.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "DiskLocal.h"
#include <Common/Throttler_fwd.h>
#include <Common/createHardLink.h>
#include "DiskFactory.h"

Expand Down Expand Up @@ -367,10 +368,11 @@ std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path,
}

std::unique_ptr<WriteBufferFromFileBase>
DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings &)
DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings & settings)
{
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
return std::make_unique<WriteBufferFromFile>(fs::path(disk_path) / path, buf_size, flags);
return std::make_unique<WriteBufferFromFile>(
fs::path(disk_path) / path, buf_size, flags, settings.local_throttler);
}

void DiskLocal::removeFile(const String & path)
Expand Down
Loading