Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ The server successfully detected this situation and will download merged part fr
M(PerfLocalMemoryReferences, "Local NUMA node memory reads") \
M(PerfLocalMemoryMisses, "Local NUMA node memory read misses") \
\
M(CreatedHTTPConnections, "Total amount of created HTTP connections (counter increase every time connection is created).") \
\
M(CannotWriteToWriteBufferDiscard, "Number of stack traces dropped by query profiler or signal handler because pipe is full or cannot write to pipe.") \
M(QueryProfilerSignalOverruns, "Number of times we drop processing of a query profiler signal due to overrun plus the number of signals that OS has not delivered due to overrun.") \
M(QueryProfilerConcurrencyOverruns, "Number of times we drop processing of a query profiler signal due to too many concurrent query profilers in other threads, which may indicate overload.") \
Expand Down Expand Up @@ -436,8 +434,6 @@ The server successfully detected this situation and will download merged part fr
M(ReadBufferFromS3ResetSessions, "Number of HTTP sessions that were reset in ReadBufferFromS3.") \
M(ReadBufferFromS3PreservedSessions, "Number of HTTP sessions that were preserved in ReadBufferFromS3.") \
\
M(ReadWriteBufferFromHTTPPreservedSessions, "Number of HTTP sessions that were preserved in ReadWriteBufferFromHTTP.") \
\
M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \
M(WriteBufferFromS3RequestsErrors, "Number of exceptions while writing to S3.") \
Expand Down Expand Up @@ -726,6 +722,9 @@ The server successfully detected this situation and will download merged part fr
M(AddressesDiscovered, "Total count of new addresses in dns resolve results for http connections") \
M(AddressesExpired, "Total count of expired addresses which is no longer presented in dns resolve results for http connections") \
M(AddressesMarkedAsFailed, "Total count of addresses which has been marked as faulty due to connection errors for http connections") \
\
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \


#ifdef APPLY_FOR_EXTERNAL_EVENTS
Expand Down
3 changes: 0 additions & 3 deletions src/Coordination/KeeperConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
M(PerfLocalMemoryReferences) \
M(PerfLocalMemoryMisses) \
\
M(CreatedHTTPConnections) \
M(CannotWriteToWriteBufferDiscard) \
\
M(S3ReadMicroseconds) \
Expand Down Expand Up @@ -180,8 +179,6 @@
M(ReadBufferFromS3RequestsErrors) \
M(ReadBufferFromS3ResetSessions) \
M(ReadBufferFromS3PreservedSessions) \
\
M(ReadWriteBufferFromHTTPPreservedSessions) \
\
M(WriteBufferFromS3Microseconds) \
M(WriteBufferFromS3Bytes) \
Expand Down
6 changes: 3 additions & 3 deletions src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object_path); },
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
settings,
query_id,
object.bytes_size,
Expand All @@ -102,14 +102,14 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
/// former doesn't support seeks.
if (with_page_cache && !buf)
{
auto inner = read_buffer_creator(/* restricted_seek */false, object_path);
auto inner = read_buffer_creator(/* restricted_seek */false, object);
auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_path };
buf = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, settings.page_cache, std::move(inner), settings);
}

if (!buf)
buf = read_buffer_creator(/* restricted_seek */true, object_path);
buf = read_buffer_creator(/* restricted_seek */true, object);

if (read_until_position > start_offset && read_until_position < start_offset + object.bytes_size)
buf->setReadUntilPosition(read_until_position - start_offset);
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/ReadBufferFromRemoteFSGather.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ReadBufferFromRemoteFSGather final : public ReadBufferFromFileBase
friend class ReadIndirectBufferFromRemoteFS;

public:
using ReadBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>(bool restricted_seek, const std::string & path)>;
using ReadBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>(bool restricted_seek, const StoredObject & object)>;

ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
Expand Down
31 changes: 22 additions & 9 deletions src/Disks/IO/ReadBufferFromWebServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ namespace ErrorCodes
ReadBufferFromWebServer::ReadBufferFromWebServer(
const String & url_,
ContextPtr context_,
size_t file_size_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t read_until_position_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
, log(getLogger("ReadBufferFromWebServer"))
, context(context_)
, url(url_)
Expand All @@ -36,7 +37,7 @@ ReadBufferFromWebServer::ReadBufferFromWebServer(
}


std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
std::unique_ptr<SeekableReadBuffer> ReadBufferFromWebServer::initialize()
{
Poco::URI uri(url);
if (read_until_position)
Expand Down Expand Up @@ -119,9 +120,8 @@ bool ReadBufferFromWebServer::nextImpl()

auto result = impl->next();

BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());

chassert(working_buffer.begin() == impl->buffer().begin());
working_buffer = impl->buffer();
pos = impl->position();

if (result)
offset += working_buffer.size();
Expand All @@ -132,16 +132,29 @@ bool ReadBufferFromWebServer::nextImpl()

off_t ReadBufferFromWebServer::seek(off_t offset_, int whence)
{
if (impl)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer");

if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed");

if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_);

offset = offset_;
if (impl)
{
if (use_external_buffer)
{
impl->set(internal_buffer.begin(), internal_buffer.size());
}

impl->seek(offset_, SEEK_SET);

working_buffer = impl->buffer();
pos = impl->position();
offset = offset_ + available();
}
else
{
offset = offset_;
}

return offset;
}
Expand Down
5 changes: 3 additions & 2 deletions src/Disks/IO/ReadBufferFromWebServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ReadBufferFromWebServer : public ReadBufferFromFileBase
explicit ReadBufferFromWebServer(
const String & url_,
ContextPtr context_,
size_t file_size_,
const ReadSettings & settings_ = {},
bool use_external_buffer_ = false,
size_t read_until_position = 0);
Expand All @@ -39,15 +40,15 @@ class ReadBufferFromWebServer : public ReadBufferFromFileBase
bool supportsRightBoundedReads() const override { return true; }

private:
std::unique_ptr<ReadBuffer> initialize();
std::unique_ptr<SeekableReadBuffer> initialize();

LoggerPtr log;
ContextPtr context;

const String url;
size_t buf_size;

std::unique_ptr<ReadBuffer> impl;
std::unique_ptr<SeekableReadBuffer> impl;

ReadSettings read_settings;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL

auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(),
path,
object_.remote_path,
disk_read_settings,
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
Expand Down
3 changes: 2 additions & 1 deletion src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
auto disk_read_settings = patchSettings(read_settings);
auto read_buffer_creator =
[this, disk_read_settings]
(bool /* restricted_seek */, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
const auto & path = object_.remote_path;
size_t begin_of_path = path.find('/', path.find("//") + 2);
auto hdfs_path = path.substr(begin_of_path);
auto hdfs_uri = path.substr(0, begin_of_path);
Expand Down
4 changes: 2 additions & 2 deletions src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[=] (bool /* restricted_seek */, const std::string & file_path)
[=] (bool /* restricted_seek */, const StoredObject & object)
-> std::unique_ptr<ReadBufferFromFileBase>
{
return createReadBufferFromFileBase(file_path, modified_settings, read_hint, file_size);
return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
};

switch (read_settings.remote_fs_method)
Expand Down
4 changes: 2 additions & 2 deletions src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT

auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
uri.bucket,
path,
object_.remote_path,
uri.version_id,
settings_ptr->request_settings,
disk_read_settings,
Expand Down
16 changes: 9 additions & 7 deletions src/Disks/ObjectStorages/Web/WebObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,17 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
std::optional<size_t>,
std::optional<size_t>) const
{
size_t object_size = object.bytes_size;
auto read_buffer_creator =
[this, read_settings]
(bool /* restricted_seek */, const std::string & path_) -> std::unique_ptr<ReadBufferFromFileBase>
[this, read_settings, object_size]
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / path_,
getContext(),
read_settings,
/* use_external_buffer */true);
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / object_.remote_path,
getContext(),
object_size,
read_settings,
/* use_external_buffer */true);
};

auto global_context = Context::getGlobalContextInstance();
Expand Down
12 changes: 11 additions & 1 deletion src/IO/ReadWriteBufferFromHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
namespace ProfileEvents
{
extern const Event ReadBufferSeekCancelConnection;
extern const Event ReadWriteBufferFromHTTPRequestsSent;
extern const Event ReadWriteBufferFromHTTPBytes;
}


Expand Down Expand Up @@ -245,6 +247,8 @@ ReadWriteBufferFromHTTP::CallResult ReadWriteBufferFromHTTP::callImpl(

auto session = makeHTTPSession(connection_group, uri_, timeouts, proxy_config);

ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPRequestsSent);

auto & stream_out = session->sendRequest(request);
if (out_stream_callback)
out_stream_callback(stream_out);
Expand Down Expand Up @@ -480,6 +484,8 @@ bool ReadWriteBufferFromHTTP::nextImpl()
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());

offset_from_begin_pos += working_buffer.size();

ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPBytes, working_buffer.size());
},
/*on_retry=*/ [&] ()
{
Expand Down Expand Up @@ -528,6 +534,8 @@ size_t ReadWriteBufferFromHTTP::readBigAt(char * to, size_t n, size_t offset, co

copyFromIStreamWithProgressCallback(*result.response_stream, to, n, progress_callback, &bytes_copied, &is_canceled);

ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPBytes, bytes_copied);

offset += bytes_copied;
total_bytes_copied += bytes_copied;
to += bytes_copied;
Expand All @@ -536,6 +544,8 @@ size_t ReadWriteBufferFromHTTP::readBigAt(char * to, size_t n, size_t offset, co
},
/*on_retry=*/ [&] ()
{
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPBytes, bytes_copied);

offset += bytes_copied;
total_bytes_copied += bytes_copied;
to += bytes_copied;
Expand Down Expand Up @@ -574,7 +584,7 @@ off_t ReadWriteBufferFromHTTP::seek(off_t offset_, int whence)
if (impl)
{
auto position = getPosition();
if (offset_ > position)
if (offset_ >= position)
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,12 +725,12 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
auto context = getContext();
auto read_buffer_creator =
[this, read_settings, object_size]
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const StoredObject & object) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client,
bucket,
path,
object.remote_path,
version_id,
request_settings,
read_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@
</cached_web>
</policies>
</storage_configuration>

<query_log></query_log>
</clickhouse>
Loading