Skip to content

Commit c9a3159

Browse files
committed
fix single-threading failsafe when number of files cannot be estimated
1 parent 0669591 commit c9a3159

File tree

2 files changed

+24
-2
lines changed

2 files changed

+24
-2
lines changed

src/Storages/StorageS3.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
229229
return buffer.size();
230230
}
231231

232+
bool hasMore()
233+
{
234+
if (!buffer.size())
235+
return !(expanded_keys_iter == expanded_keys.end() && is_finished_for_key);
236+
else
237+
return true;
238+
}
239+
232240
~Impl()
233241
{
234242
list_objects_pool.wait();
@@ -481,6 +489,11 @@ size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
481489
return pimpl->objectsCount();
482490
}
483491

492+
bool StorageS3Source::DisclosedGlobIterator::hasMore()
493+
{
494+
return pimpl->hasMore();
495+
}
496+
484497
class StorageS3Source::KeysIterator::Impl
485498
{
486499
public:
@@ -1243,8 +1256,16 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
12431256
if (estimated_keys_count > 1)
12441257
num_streams = std::min(num_streams, estimated_keys_count);
12451258
else
1246-
/// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case.
1247-
num_streams = 1;
1259+
{
1260+
const auto glob_iter = std::dynamic_pointer_cast<StorageS3Source::DisclosedGlobIterator>(iterator_wrapper);
1261+
if (!(glob_iter && glob_iter->hasMore()))
1262+
{
1263+
/// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case.
1264+
num_streams = 1;
1265+
}
1266+
/// Otherwise, 1000 files were already listed, but none of them is actually what we are looking for.
1267+
/// We cannot estimate _how many_ there are left, but if there are more files to list, it's faster to do it in many streams.
1268+
}
12481269

12491270
const size_t max_threads = context->getSettingsRef().max_threads;
12501271
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul));

src/Storages/StorageS3.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class StorageS3Source : public SourceWithKeyCondition, WithContext
8383

8484
KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT
8585
size_t estimatedKeysCount() override;
86+
bool hasMore();
8687

8788
private:
8889
class Impl;

0 commit comments

Comments
 (0)