Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions src/Processors/SourceWithKeyCondition.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,25 @@ class SourceWithKeyCondition : public ISource

void setKeyConditionImpl(const SelectQueryInfo & query_info, ContextPtr context, const Block & keys)
{
if (!context->getSettingsRef().allow_experimental_analyzer)
{
key_condition = std::make_shared<const KeyCondition>(
query_info,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}
key_condition = std::make_shared<const KeyCondition>(
query_info,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}

void setKeyConditionImpl(const ActionsDAG::NodeRawConstPtrs & nodes, ContextPtr context, const Block & keys)
{
if (context->getSettingsRef().allow_experimental_analyzer)
{
std::unordered_map<std::string, DB::ColumnWithTypeAndName> node_name_to_input_column;
for (const auto & column : keys.getColumnsWithTypeAndName())
node_name_to_input_column.insert({column.name, column});

auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(nodes, node_name_to_input_column, context);
key_condition = std::make_shared<const KeyCondition>(
filter_actions_dag,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}
std::unordered_map<std::string, DB::ColumnWithTypeAndName> node_name_to_input_column;
for (const auto & column : keys.getColumnsWithTypeAndName())
node_name_to_input_column.insert({column.name, column});

auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(nodes, node_name_to_input_column, context);
key_condition = std::make_shared<const KeyCondition>(
filter_actions_dag,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}

public:
Expand Down
148 changes: 114 additions & 34 deletions src/Storages/HDFS/StorageHDFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>

#include <IO/WriteHelpers.h>
#include <IO/CompressionMethod.h>
Expand Down Expand Up @@ -408,22 +411,22 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
class HDFSSource::DisclosedGlobIterator::Impl
{
public:
Impl(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
Impl(const String & uri, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
uris = getPathsList(path_from_uri, uri_without_path, context);
ASTPtr filter_ast;
ActionsDAGPtr filter_dag;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, uris[0].path, context);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);

if (filter_ast)
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & path_with_info : uris)
paths.push_back(path_with_info.path);

VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, context, filter_ast);
VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, context);
}
auto file_progress_callback = context->getFileProgressCallback();

Expand Down Expand Up @@ -456,21 +459,21 @@ class HDFSSource::DisclosedGlobIterator::Impl
class HDFSSource::URISIterator::Impl : WithContext
{
public:
explicit Impl(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
explicit Impl(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
: WithContext(context_), uris(uris_), file_progress_callback(context_->getFileProgressCallback())
{
ASTPtr filter_ast;
ActionsDAGPtr filter_dag;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getPathFromUriAndUriWithoutPath(uris[0]).first, getContext());
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);

if (filter_ast)
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & uri : uris)
paths.push_back(getPathFromUriAndUriWithoutPath(uri).first);

VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, getContext(), filter_ast);
VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, getContext());
}

if (!uris.empty())
Expand Down Expand Up @@ -517,16 +520,16 @@ class HDFSSource::URISIterator::Impl : WithContext
std::function<void(FileProgress)> file_progress_callback;
};

HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, query, virtual_columns, context)) {}
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, predicate, virtual_columns, context)) {}

StorageHDFS::PathWithInfo HDFSSource::DisclosedGlobIterator::next()
{
return pimpl->next();
}

HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, query, virtual_columns, context))
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, predicate, virtual_columns, context))
{
}

Expand All @@ -541,8 +544,7 @@ HDFSSource::HDFSSource(
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IteratorWrapper> file_iterator_,
bool need_only_count_,
const SelectQueryInfo & query_info_)
bool need_only_count_)
: ISource(info.source_header, false)
, WithContext(context_)
, storage(std::move(storage_))
Expand All @@ -553,7 +555,6 @@ HDFSSource::HDFSSource(
, file_iterator(file_iterator_)
, columns_description(info.columns_description)
, need_only_count(need_only_count_)
, query_info(query_info_)
{
initialize();
}
Expand Down Expand Up @@ -843,7 +844,57 @@ bool StorageHDFS::supportsSubsetOfColumns(const ContextPtr & context_) const
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context_);
}

Pipe StorageHDFS::read(
class ReadFromHDFS : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromHDFS"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;

ReadFromHDFS(
Block sample_block,
ReadFromFormatInfo info_,
bool need_only_count_,
std::shared_ptr<StorageHDFS> storage_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, info(std::move(info_))
, need_only_count(need_only_count_)
, storage(std::move(storage_))
, context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}

private:
ReadFromFormatInfo info;
const bool need_only_count;
std::shared_ptr<StorageHDFS> storage;

ContextPtr context;
size_t max_block_size;
size_t num_streams;

std::shared_ptr<HDFSSource::IteratorWrapper> iterator_wrapper;

void createIterator(const ActionsDAG::Node * predicate);
};

void ReadFromHDFS::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);

createIterator(predicate);
}

void StorageHDFS::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
Expand All @@ -852,50 +903,79 @@ Pipe StorageHDFS::read(
size_t max_block_size,
size_t num_streams)
{
std::shared_ptr<HDFSSource::IteratorWrapper> iterator_wrapper{nullptr};
if (distributed_processing)
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), virtual_columns);
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context_->getSettingsRef().optimize_count_from_files;

auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());

auto reading = std::make_unique<ReadFromHDFS>(
read_from_format_info.source_header,
std::move(read_from_format_info),
need_only_count,
std::move(this_ptr),
context_,
max_block_size,
num_streams);

query_plan.addStep(std::move(reading));
}

void ReadFromHDFS::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator_wrapper)
return;

if (storage->distributed_processing)
{
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>(
[callback = context_->getReadTaskCallback()]() -> StorageHDFS::PathWithInfo {
[callback = context->getReadTaskCallback()]() -> StorageHDFS::PathWithInfo {
return StorageHDFS::PathWithInfo{callback(), std::nullopt};
});
}
else if (is_path_with_globs)
else if (storage->is_path_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uris[0], query_info.query, virtual_columns, context_);
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(storage->uris[0], predicate, storage->virtual_columns, context);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, query_info.query, virtual_columns, context_);
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(storage->uris, predicate, storage->virtual_columns, context);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();
});
}
}

auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context_->getSettingsRef().optimize_count_from_files;
void ReadFromHDFS::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);

Pipes pipes;
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<HDFSSource>(
read_from_format_info,
this_ptr,
context_,
info,
storage,
context,
max_block_size,
iterator_wrapper,
need_only_count,
query_info));
need_only_count));
}
return Pipe::unitePipes(std::move(pipes));

auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(info.source_header));

for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);

pipeline.init(std::move(pipe));
}

SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/)
Expand Down
12 changes: 6 additions & 6 deletions src/Storages/HDFS/StorageHDFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class StorageHDFS final : public IStorage, WithContext

String getName() const override { return "HDFS"; }

Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
Expand Down Expand Up @@ -93,6 +94,7 @@ class StorageHDFS final : public IStorage, WithContext

protected:
friend class HDFSSource;
friend class ReadFromHDFS;

private:
std::vector<String> uris;
Expand All @@ -114,7 +116,7 @@ class HDFSSource : public ISource, WithContext
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(const String & uri_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
DisclosedGlobIterator(const String & uri_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
Expand All @@ -125,7 +127,7 @@ class HDFSSource : public ISource, WithContext
class URISIterator
{
public:
URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
URISIterator(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
Expand All @@ -142,8 +144,7 @@ class HDFSSource : public ISource, WithContext
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IteratorWrapper> file_iterator_,
bool need_only_count_,
const SelectQueryInfo & query_info_);
bool need_only_count_);

String getName() const override;

Expand All @@ -162,7 +163,6 @@ class HDFSSource : public ISource, WithContext
ColumnsDescription columns_description;
bool need_only_count;
size_t total_rows_in_file = 0;
SelectQueryInfo query_info;

std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<IInputFormat> input_format;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/HDFS/StorageHDFSCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String
}


RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uri, query, virtual_columns, context);
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uri, predicate, virtual_columns, context);
auto callback = std::make_shared<std::function<String()>>([iter = std::move(iterator)]() mutable -> String { return iter->next().path; });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/HDFS/StorageHDFSCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class StorageHDFSCluster : public IStorageCluster

NamesAndTypesList getVirtuals() const override;

RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override;

bool supportsSubcolumns() const override { return true; }

Expand Down
Loading