Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
240f1e3
Processors support for StorageDictionary.
KochetovNicolai Jan 31, 2020
0157de0
Processors support for StorageDistributed reading.
KochetovNicolai Jan 31, 2020
2d1f06a
Processors support for StorageFile reading.
KochetovNicolai Jan 31, 2020
6870132
Processors support for StorageHDFS reading.
KochetovNicolai Jan 31, 2020
f9db37e
Processors support for StorageInput reading.
KochetovNicolai Jan 31, 2020
384e68d
Processors support for StorageJoin reading.
KochetovNicolai Jan 31, 2020
046ff34
Processors support for StorageLog reading.
KochetovNicolai Jan 31, 2020
fc78397
Processors support for StorageMaterializedView reading.
KochetovNicolai Jan 31, 2020
dc96e6d
Processors support for StorageMemory reading.
KochetovNicolai Jan 31, 2020
7bedce4
Processors support for StorageMerge reading.
KochetovNicolai Jan 31, 2020
dbffabf
Processors support for StorageMerge reading.
KochetovNicolai Jan 31, 2020
edd2477
Processors support for StorageMerge reading.
KochetovNicolai Jan 31, 2020
dd888f5
Fix MemorySource.
KochetovNicolai Jan 31, 2020
cef9db8
Processors support for StorageMySQL reading.
KochetovNicolai Jan 31, 2020
fde8620
Fix StorageJoin
KochetovNicolai Feb 3, 2020
eae2ce1
Processors support for StorageNull reading.
KochetovNicolai Feb 3, 2020
a029e49
Processors support for StorageNull reading.
KochetovNicolai Feb 3, 2020
32aa100
fix totals port for pipe.
KochetovNicolai Feb 3, 2020
3058611
fix totals port for pipe.
KochetovNicolai Feb 3, 2020
a832a63
Update TreeExecutor.
KochetovNicolai Feb 3, 2020
71f746e
Update SelectStreamFactory.
KochetovNicolai Feb 3, 2020
bc757f6
Processors support for StorageS3 reading.
KochetovNicolai Feb 3, 2020
ed71fb3
Update SelectStreamFactory.
KochetovNicolai Feb 3, 2020
bb6d6a3
Fix build.
KochetovNicolai Feb 14, 2020
4cf6545
Update SelectStreamFactory.
KochetovNicolai Feb 14, 2020
0766e46
Fix StorageFile.
KochetovNicolai Feb 14, 2020
96b5ef8
Processors support for StorageStripeLog reading.
KochetovNicolai Feb 14, 2020
da6d43e
Fix TreeExecutor.
KochetovNicolai Feb 14, 2020
ce6bcb7
Fix TreeExecutor.
KochetovNicolai Feb 14, 2020
97f0e1f
Fix SelectStreamFactory.
KochetovNicolai Feb 14, 2020
52a6327
Merge with master.
KochetovNicolai Feb 14, 2020
1685c4d
Fix build.
KochetovNicolai Feb 14, 2020
9b2d03f
Fix StorageMerge.
KochetovNicolai Feb 17, 2020
4599fbd
Added force_tree_shaped_pipeline flag to SelectQueryInfo.
KochetovNicolai Feb 17, 2020
f4d5175
Fix build.
KochetovNicolai Feb 17, 2020
bbdac39
Fix build.
KochetovNicolai Feb 17, 2020
34e32db
Fix build.
KochetovNicolai Feb 17, 2020
9eda64f
Update SelectStreamFactory.
KochetovNicolai Feb 17, 2020
df76f1f
Processors support for StorageTinyLog reading.
KochetovNicolai Feb 17, 2020
5372942
Processors support for StorageURL reading.
KochetovNicolai Feb 17, 2020
4cc3eaa
Processors support for StorageValues reading.
KochetovNicolai Feb 17, 2020
728ece0
Processors support for StorageView reading.
KochetovNicolai Feb 17, 2020
dda0bcb
Processors support for StorageXDBC reading.
KochetovNicolai Feb 17, 2020
98b6ad4
Processors support for StorageXDBC reading.
KochetovNicolai Feb 17, 2020
34d7873
Processors support for StorageKafka reading.
KochetovNicolai Feb 17, 2020
fbfaac6
Processors support for StorageLiveView reading.
KochetovNicolai Feb 17, 2020
bdaeaff
Processors support for StorageLiveView reading.
KochetovNicolai Feb 17, 2020
e6a30fe
Fix build.
KochetovNicolai Feb 18, 2020
426a8fd
Update TreeExecutor.
KochetovNicolai Feb 18, 2020
9b8fa9f
Update TreeExecutor.
KochetovNicolai Feb 18, 2020
b6b4a87
Update TreeExecutorBlockInputStream.
KochetovNicolai Feb 18, 2020
145cb6e
Update TreeExecutorBlockInputStream.
KochetovNicolai Feb 18, 2020
4992609
Update BlockStreamProfileInfo
KochetovNicolai Feb 18, 2020
41851b6
Fix TreeExecutorBlockInputStream
KochetovNicolai Feb 18, 2020
5d0d2b2
Update MergingSortedTransform
KochetovNicolai Feb 18, 2020
3f55e6a
Processors support for StorageFromMergeTreeDataPart reading.
KochetovNicolai Feb 18, 2020
69a3895
fix PVS warning.
KochetovNicolai Feb 18, 2020
70a7ddc
Fix StorageURL
KochetovNicolai Feb 19, 2020
4fa6c86
Fix StorageHDFS.
KochetovNicolai Feb 19, 2020
75af541
Try fix test_delayed_replica_failover
KochetovNicolai Feb 19, 2020
3bfbd26
Use processors for sending external tables.
KochetovNicolai Feb 19, 2020
bf18eed
Use processors for sending external tables.
KochetovNicolai Feb 19, 2020
76a21b3
Use processors for sending external tables.
KochetovNicolai Feb 19, 2020
e0c3025
Remove readWithProcessors from IStorage.
KochetovNicolai Feb 19, 2020
a3c4b2e
Remove readWithProcessors from IStorage.
KochetovNicolai Feb 19, 2020
39916dd
Remove readWithProcessors from IStorage.
KochetovNicolai Feb 19, 2020
84af967
Try fix external tables for RemoteBlockInputStream
KochetovNicolai Feb 20, 2020
cca4d4e
More comments and remove spaces.
KochetovNicolai Feb 20, 2020
b4cda92
Added test from #9151.
KochetovNicolai Feb 20, 2020
30d41b3
Merged with master
KochetovNicolai Feb 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ class Client : public Poco::Util::Application
if (!select && !external_tables.empty())
throw Exception("External tables could be sent only with select query", ErrorCodes::BAD_ARGUMENTS);

std::vector<ExternalTableData> data;
std::vector<ExternalTableDataPtr> data;
for (auto & table : external_tables)
data.emplace_back(table.getData(context));

Expand Down
67 changes: 60 additions & 7 deletions dbms/src/Client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#include <Common/config_version.h>
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>
#include <Processors/Pipe.h>
#include <Processors/ISink.h>
#include <Processors/Executors/PipelineExecutor.h>

#include <Common/config.h>
#if USE_POCO_NETSSL
Expand Down Expand Up @@ -534,6 +537,45 @@ void Connection::sendScalarsData(Scalars & data)
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
}

namespace
{
/// Sink which sends data for external table.
class ExternalTableDataSink : public ISink
{
public:
using OnCancell = std::function<void()>;

ExternalTableDataSink(Block header, Connection & connection_, ExternalTableData & table_data_, OnCancell callback)
: ISink(std::move(header)), connection(connection_), table_data(table_data_),
on_cancell(std::move(callback))
{}

String getName() const override { return "ExternalTableSink"; }

size_t getNumReadRows() const { return num_rows; }

protected:
void consume(Chunk chunk) override
{
if (table_data.is_cancelled)
{
on_cancell();
return;
}

num_rows += chunk.getNumRows();

auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
connection.sendData(block, table_data.table_name);
}

private:
Connection & connection;
ExternalTableData & table_data;
OnCancell on_cancell;
size_t num_rows = 0;
};
}

void Connection::sendExternalTablesData(ExternalTablesData & data)
{
Expand All @@ -553,13 +595,24 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)

for (auto & elem : data)
{
elem.first->readPrefix();
while (Block block = elem.first->read())
{
rows += block.rows();
sendData(block, elem.second);
}
elem.first->readSuffix();
PipelineExecutorPtr executor;
auto on_cancel = [& executor]() { executor->cancel(); };

auto sink = std::make_shared<ExternalTableDataSink>(elem->pipe->getHeader(), *this, *elem, std::move(on_cancel));
DB::connect(elem->pipe->getPort(), sink->getPort());

auto processors = std::move(*elem->pipe).detachProcessors();
processors.push_back(sink);

executor = std::make_shared<PipelineExecutor>(processors);
executor->execute(/*num_threads = */ 1);

auto read_rows = sink->getNumReadRows();
rows += read_rows;

/// If table is empty, send empty block with name.
if (read_rows == 0)
sendData(sink->getPort().getHeader(), elem->table_name);
}

/// Send empty block, which means end of data transfer.
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Client/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,20 @@ namespace DB
{

class ClientInfo;
class Pipe;

/// The stream of blocks reading from the table and its name
using ExternalTableData = std::pair<BlockInputStreamPtr, std::string>;
/// Vector of pairs describing tables
using ExternalTablesData = std::vector<ExternalTableData>;
/// Struct which represents data we are going to send for external table.
struct ExternalTableData
{
/// Pipe of data form table;
std::unique_ptr<Pipe> pipe;
std::string table_name;
/// Flag if need to stop reading.
std::atomic_bool is_cancelled = false;
};

using ExternalTableDataPtr = std::unique_ptr<ExternalTableData>;
using ExternalTablesData = std::vector<ExternalTableDataPtr>;

class Connection;

Expand Down
34 changes: 23 additions & 11 deletions dbms/src/Core/ExternalTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
#include <IO/LimitReadBuffer.h>
#include <Storages/StorageMemory.h>
#include <Poco/Net/MessageHeader.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>

#include <Core/ExternalTable.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>


namespace DB
Expand All @@ -22,12 +26,18 @@ namespace ErrorCodes
}


ExternalTableData BaseExternalTable::getData(const Context & context)
ExternalTableDataPtr BaseExternalTable::getData(const Context & context)
{
initReadBuffer();
initSampleBlock();
auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
return std::make_pair(std::make_shared<AsynchronousBlockInputStream>(input), name);
auto stream = std::make_shared<AsynchronousBlockInputStream>(input);

auto data = std::make_unique<ExternalTableData>();
data->table_name = name;
data->pipe = std::make_unique<Pipe>(std::make_shared<SourceFromInputStream>(std::move(stream)));

return data;
}

void BaseExternalTable::clean()
Expand Down Expand Up @@ -156,22 +166,24 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
else
throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so.", ErrorCodes::BAD_ARGUMENTS);

ExternalTableData data = getData(context);
ExternalTableDataPtr data = getData(context);

/// Create table
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
StoragePtr storage = StorageMemory::create(StorageID("_external", data.second), ColumnsDescription{columns}, ConstraintsDescription{});
StoragePtr storage = StorageMemory::create(StorageID("_external", data->table_name), ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup();
context.addExternalTable(data.second, storage);
context.addExternalTable(data->table_name, storage);
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);

/// Write data
data.first->readPrefix();
output->writePrefix();
while (Block block = data.first->read())
output->write(block);
data.first->readSuffix();
output->writeSuffix();
auto sink = std::make_shared<SinkToOutputStream>(std::move(output));
connect(data->pipe->getPort(), sink->getPort());

auto processors = std::move(*data->pipe).detachProcessors();
processors.push_back(std::move(sink));

auto executor = std::make_shared<PipelineExecutor>(processors);
executor->execute(/*num_threads = */ 1);

/// We are ready to receive the next file, for this we clear all the information received
clean();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Core/ExternalTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class BaseExternalTable
virtual void initReadBuffer() {}

/// Get the table data - a pair (a stream with the contents of the table, the name of the table)
ExternalTableData getData(const Context & context);
ExternalTableDataPtr getData(const Context & context);

protected:
/// Clear all accumulated information
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/BlockStreamProfileInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
/// Then the data about `rows_before_limit` can be in `RemoteBlockInputStream` (come from a remote server).
BlockStreamProfileInfos remotes;
collectInfosForStreamsWithName("Remote", remotes);
collectInfosForStreamsWithName("TreeExecutor", remotes);

if (remotes.empty())
return;
Expand Down
16 changes: 7 additions & 9 deletions dbms/src/DataStreams/BlocksBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,31 @@ namespace DB

/** A stream of blocks from a shared vector of blocks
*/
class BlocksBlockInputStream : public IBlockInputStream
class BlocksSource : public SourceWithProgress
{
public:
/// Acquires shared ownership of the blocks vector
BlocksBlockInputStream(const std::shared_ptr<BlocksPtr> & blocks_ptr_, Block header_)
: blocks(*blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()), header(std::move(header_)) {}
BlocksSource(const std::shared_ptr<BlocksPtr> & blocks_ptr_, Block header)
: SourceWithProgress(std::move(header))
, blocks(*blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()) {}

String getName() const override { return "Blocks"; }

Block getHeader() const override { return header; }

protected:
Block readImpl() override
Chunk generate() override
{
if (it == end)
return Block();
return {};

Block res = *it;
++it;
return res;
return Chunk(res.getColumns(), res.rows());
}

private:
BlocksPtr blocks;
Blocks::iterator it;
const Blocks::iterator end;
Block header;
};

}
30 changes: 24 additions & 6 deletions dbms/src/DataStreams/RemoteBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
#include <Interpreters/castColumn.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Storages/IStorage.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Pipe.h>

#include <IO/ConnectionTimeouts.h>
#include <Processors/Sources/SourceFromSingleChunk.h>


namespace DB
Expand Down Expand Up @@ -112,7 +116,7 @@ void RemoteBlockInputStream::cancel(bool kill)
/// Stop sending external data.
for (auto & vec : external_tables_data)
for (auto & elem : vec)
elem.first->cancel(kill);
elem->is_cancelled = true;
}

if (!isQueryPending() || hasThrownException())
Expand Down Expand Up @@ -142,12 +146,26 @@ void RemoteBlockInputStream::sendExternalTables()
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context);
BlockInputStreams input = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(std::make_shared<OneBlockInputStream>(cur->getSampleBlock()), table.first));

Pipes pipes;

pipes = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);

auto data = std::make_unique<ExternalTableData>();
data->table_name = table.first;

if (pipes.empty())
data->pipe = std::make_unique<Pipe>(std::make_shared<SourceFromSingleChunk>(cur->getSampleBlock(), Chunk()));
else if (pipes.size() == 1)
data->pipe = std::make_unique<Pipe>(std::move(pipes.front()));
else
res.push_back(std::make_pair(input[0], table.first));
{
auto concat = std::make_shared<ConcatProcessor>(pipes.front().getHeader(), pipes.size());
data->pipe = std::make_unique<Pipe>(std::move(pipes), std::move(concat));
}

res.emplace_back(std::move(data));
}
external_tables_data.push_back(std::move(res));
}
Expand Down
50 changes: 43 additions & 7 deletions dbms/src/DataStreams/narrowBlockInputStreams.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
#include <random>
#include <Common/thread_local_rng.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Pipe.h>
#include "narrowBlockInputStreams.h"


namespace DB
{

namespace
{
using Distribution = std::vector<size_t>;
Distribution getDistribution(size_t from, size_t to)
{
Distribution distribution(from);

for (size_t i = 0; i < from; ++i)
distribution[i] = i % to;

std::shuffle(distribution.begin(), distribution.end(), thread_local_rng);
return distribution;
}
}

BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width)
{
size_t size = inputs.size();
Expand All @@ -15,13 +32,7 @@ BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t wid

std::vector<BlockInputStreams> partitions(width);

using Distribution = std::vector<size_t>;
Distribution distribution(size);

for (size_t i = 0; i < size; ++i)
distribution[i] = i % width;

std::shuffle(distribution.begin(), distribution.end(), thread_local_rng);
auto distribution = getDistribution(size, width);

for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].push_back(inputs[i]);
Expand All @@ -33,4 +44,29 @@ BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t wid
return res;
}

Pipes narrowPipes(Pipes pipes, size_t width)
{
size_t size = pipes.size();
if (size <= width)
return pipes;

std::vector<Pipes> partitions(width);

auto distribution = getDistribution(size, width);

for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].emplace_back(std::move(pipes[i]));

Pipes res;
res.reserve(width);

for (size_t i = 0; i < width; ++i)
{
auto processor = std::make_shared<ConcatProcessor>(partitions[i].at(0).getHeader(), partitions[i].size());
res.emplace_back(std::move(partitions[i]), std::move(processor));
}

return res;
}

}
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/narrowBlockInputStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
namespace DB
{

class Pipe;
using Pipes = std::vector<Pipe>;

/** If the number of sources of `inputs` is greater than `width`,
* then glues the sources to each other (using ConcatBlockInputStream),
* so that the number of sources becomes no more than `width`.
Expand All @@ -14,5 +17,6 @@ namespace DB
* (to avoid overweighting if the distribution of the amount of data in different sources is subject to some pattern)
*/
BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width);
Pipes narrowPipes(Pipes pipes, size_t width);

}
Loading