Skip to content

Commit 8b38fab

Browse files
committed
better
1 parent eb9ed41 commit 8b38fab

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

src/Processors/Transforms/SquashingChunksTransform.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,25 +74,28 @@ void SimpleSquashingChunksTransform::transform(Chunk & chunk)
7474

7575
auto block = squashing.add({});
7676
chunk.setColumns(block.getColumns(), block.rows());
77-
78-
/// ISimpleTransform keeps output chunk (result of transform() execution) for some time and push it in the output port within subsequent prepare() call.
79-
/// Because of our custom prepare() implementation we have to take care of both places where data could be buffered: `output_data` and `squashing`.
80-
if (output_data.chunk.hasRows())
81-
{
82-
auto res = std::move(output_data.chunk);
83-
output_data.chunk.clear();
84-
if (chunk.hasRows())
85-
res.append(chunk);
86-
chunk = std::move(res);
87-
}
8877
}
8978
}
9079

9180
IProcessor::Status SimpleSquashingChunksTransform::prepare()
9281
{
9382
if (!finished && input.isFinished())
9483
{
84+
if (output.isFinished())
85+
return Status::Finished;
86+
87+
if (!output.canPush())
88+
return Status::PortFull;
89+
90+
if (has_output)
91+
{
92+
output.pushData(std::move(output_data));
93+
has_output = false;
94+
return Status::PortFull;
95+
}
96+
9597
finished = true;
98+
/// On the next call to transform() we will return all data buffered in `squashing` (if any)
9699
return Status::Ready;
97100
}
98101
return ISimpleTransform::prepare();

0 commit comments

Comments
 (0)