Skip to content

Commit 361b92f

Browse files
Merge pull request #11200 from ClickHouse/fix-strict-resize-finishing
Fix StrictResize
2 parents 3ff28d0 + b16936b commit 361b92f

File tree

4 files changed

+33
-4
lines changed

4 files changed

+33
-4
lines changed

src/Processors/ResizeProcessor.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,11 +347,16 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
347347

348348
auto & waiting_output = output_ports[input_with_data.waiting_output];
349349

350-
if (waiting_output.status != OutputStatus::NeedData)
351-
throw Exception("Invalid status for associated output.", ErrorCodes::LOGICAL_ERROR);
350+
if (waiting_output.status == OutputStatus::NotActive)
351+
throw Exception("Invalid status NotActive for associated output.", ErrorCodes::LOGICAL_ERROR);
352352

353-
waiting_output.port->pushData(input_with_data.port->pullData(/* set_not_needed = */ true));
354-
waiting_output.status = OutputStatus::NotActive;
353+
if (waiting_output.status != OutputStatus::Finished)
354+
{
355+
waiting_output.port->pushData(input_with_data.port->pullData(/* set_not_needed = */ true));
356+
waiting_output.status = OutputStatus::NotActive;
357+
}
358+
else
359+
abandoned_chunks.emplace_back(input_with_data.port->pullData(/* set_not_needed = */ true));
355360

356361
if (input_with_data.port->isFinished())
357362
{
@@ -370,6 +375,18 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
370375
return Status::Finished;
371376
}
372377

378+
/// Process abandoned chunks if any.
379+
while (!abandoned_chunks.empty() && !waiting_outputs.empty())
380+
{
381+
auto & waiting_output = output_ports[waiting_outputs.front()];
382+
waiting_outputs.pop();
383+
384+
waiting_output.port->pushData(std::move(abandoned_chunks.back()));
385+
abandoned_chunks.pop_back();
386+
387+
waiting_output.status = OutputStatus::NotActive;
388+
}
389+
373390
/// Enable more inputs if needed.
374391
while (!disabled_input_ports.empty() && !waiting_outputs.empty())
375392
{
@@ -383,6 +400,7 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
383400
waiting_outputs.pop();
384401
}
385402

403+
/// Close all other waiting for data outputs (there is no corresponding input for them).
386404
while (!waiting_outputs.empty())
387405
{
388406
auto & output = output_ports[waiting_outputs.front()];

src/Processors/ResizeProcessor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ class StrictResizeProcessor : public IProcessor
128128

129129
std::vector<InputPortWithStatus> input_ports;
130130
std::vector<OutputPortWithStatus> output_ports;
131+
/// This field contained chunks which were read for output which had became finished while reading was happening.
132+
/// They will be pushed to any next waiting output.
133+
std::vector<Port::Data> abandoned_chunks;
131134
};
132135

133136
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
49999995000000
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
drop table if exists num_10m;
2+
create table num_10m (number UInt64) engine = MergeTree order by tuple();
3+
insert into num_10m select * from numbers(10000000);
4+
5+
select * from (select sum(number) from num_10m union all select sum(number) from num_10m) limit 1 settings max_block_size = 1024;
6+
7+
drop table if exists num_1m;

0 commit comments

Comments
 (0)