Skip to content

Commit 9a2b163

Browse files
Merge pull request #18411 from ClickHouse/try-fix-max_result_rows
Use Port::Data instead of Chunk in LazyOutputFormat.
2 parents 5e97df7 + 34cd4fe commit 9a2b163

File tree

7 files changed

+76
-37
lines changed

7 files changed

+76
-37
lines changed

src/Processors/Formats/IOutputFormat.cpp

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
namespace DB
66
{
77

8+
namespace ErrorCodes
9+
{
10+
extern const int NOT_IMPLEMENTED;
11+
}
12+
813
IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_)
914
: IProcessor({header_, header_, header_}, {}), out(out_)
1015
{
@@ -30,7 +35,7 @@ IOutputFormat::Status IOutputFormat::prepare()
3035
if (!input.hasData())
3136
return Status::NeedData;
3237

33-
current_chunk = input.pull(true);
38+
current_chunk = input.pullData(true);
3439
current_block_kind = kind;
3540
has_input = true;
3641
return Status::Ready;
@@ -44,23 +49,31 @@ IOutputFormat::Status IOutputFormat::prepare()
4449
return Status::Finished;
4550
}
4651

47-
static Chunk prepareTotals(Chunk chunk)
52+
static Port::Data prepareTotals(Port::Data data)
4853
{
49-
if (!chunk.hasRows())
54+
if (data.exception)
55+
return data;
56+
57+
if (!data.chunk.hasRows())
5058
return {};
5159

52-
if (chunk.getNumRows() > 1)
60+
if (data.chunk.getNumRows() > 1)
5361
{
5462
/// This may happen if something like ARRAY JOIN was executed on totals.
5563
/// Skip rows except the first one.
56-
auto columns = chunk.detachColumns();
64+
auto columns = data.chunk.detachColumns();
5765
for (auto & column : columns)
5866
column = column->cut(0, 1);
5967

60-
chunk.setColumns(std::move(columns), 1);
68+
data.chunk.setColumns(std::move(columns), 1);
6169
}
6270

63-
return chunk;
71+
return data;
72+
}
73+
74+
void IOutputFormat::consume(Chunk)
75+
{
76+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method consume s not implemented for {}", getName());
6477
}
6578

6679
void IOutputFormat::work()
@@ -84,17 +97,24 @@ void IOutputFormat::work()
8497
switch (current_block_kind)
8598
{
8699
case Main:
87-
result_rows += current_chunk.getNumRows();
88-
result_bytes += current_chunk.allocatedBytes();
100+
{
101+
result_rows += current_chunk.chunk.getNumRows();
102+
result_bytes += current_chunk.chunk.allocatedBytes();
89103
consume(std::move(current_chunk));
90104
break;
105+
}
91106
case Totals:
92-
if (auto totals = prepareTotals(std::move(current_chunk)))
107+
{
108+
auto totals = prepareTotals(std::move(current_chunk));
109+
if (totals.exception || totals.chunk)
93110
consumeTotals(std::move(totals));
94111
break;
112+
}
95113
case Extremes:
114+
{
96115
consumeExtremes(std::move(current_chunk));
97116
break;
117+
}
98118
}
99119

100120
if (auto_flush)

src/Processors/Formats/IOutputFormat.h

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class IOutputFormat : public IProcessor
2828
protected:
2929
WriteBuffer & out;
3030

31-
Chunk current_chunk;
31+
Port::Data current_chunk;
3232
PortKind current_block_kind = PortKind::Main;
3333
bool has_input = false;
3434
bool finished = false;
@@ -39,9 +39,14 @@ class IOutputFormat : public IProcessor
3939

4040
RowsBeforeLimitCounterPtr rows_before_limit_counter;
4141

42-
virtual void consume(Chunk) = 0;
42+
virtual void consume(Chunk);
4343
virtual void consumeTotals(Chunk) {}
4444
virtual void consumeExtremes(Chunk) {}
45+
46+
virtual void consume(Port::Data data) { consume(data.getChunkOrTrow()); }
47+
virtual void consumeTotals(Port::Data data) { consumeTotals(data.getChunkOrTrow()); }
48+
virtual void consumeExtremes(Port::Data data) { consumeExtremes(data.getChunkOrTrow()); }
49+
4550
virtual void finalize() {}
4651

4752
public:
@@ -77,8 +82,19 @@ class IOutputFormat : public IProcessor
7782
virtual void doWritePrefix() {}
7883
virtual void doWriteSuffix() { finalize(); }
7984

80-
void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
81-
void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }
85+
void setTotals(const Block & totals)
86+
{
87+
Port::Data data;
88+
data.chunk = Chunk(totals.getColumns(), totals.rows());
89+
consumeTotals(std::move(data));
90+
}
91+
92+
void setExtremes(const Block & extremes)
93+
{
94+
Port::Data data;
95+
data.chunk = Chunk(extremes.getColumns(), extremes.rows());
96+
consumeExtremes(std::move(data));
97+
}
8298

8399
size_t getResultRows() const { return result_rows; }
84100
size_t getResultBytes() const { return result_bytes; }

src/Processors/Formats/LazyOutputFormat.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,24 @@ Chunk LazyOutputFormat::getChunk(UInt64 milliseconds)
1515
return {};
1616
}
1717

18-
Chunk chunk;
19-
if (!queue.tryPop(chunk, milliseconds))
18+
Port::Data data;
19+
if (!queue.tryPop(data, milliseconds))
2020
return {};
2121

22-
if (chunk)
23-
info.update(chunk.getNumRows(), chunk.allocatedBytes());
22+
if (!data.exception)
23+
info.update(data.chunk.getNumRows(), data.chunk.allocatedBytes());
2424

25-
return chunk;
25+
return data.getChunkOrTrow();
2626
}
2727

2828
Chunk LazyOutputFormat::getTotals()
2929
{
30-
return std::move(totals);
30+
return totals.getChunkOrTrow();
3131
}
3232

3333
Chunk LazyOutputFormat::getExtremes()
3434
{
35-
return std::move(extremes);
35+
return extremes.getChunkOrTrow();
3636
}
3737

3838
void LazyOutputFormat::setRowsBeforeLimit(size_t rows_before_limit)

src/Processors/Formats/LazyOutputFormat.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,28 @@ class LazyOutputFormat : public IOutputFormat
3737
}
3838

3939
protected:
40-
void consume(Chunk chunk) override
40+
void consume(Port::Data data) override
4141
{
4242
if (!finished_processing)
43-
queue.emplace(std::move(chunk));
43+
queue.emplace(std::move(data));
4444
}
4545

46-
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
47-
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
46+
void consumeTotals(Port::Data data) override { totals = std::move(data); }
47+
void consumeExtremes(Port::Data data) override { extremes = std::move(data); }
4848

4949
void finalize() override
5050
{
5151
finished_processing = true;
5252

5353
/// In case we are waiting for result.
54-
queue.emplace(Chunk());
54+
queue.emplace(Port::Data{});
5555
}
5656

5757
private:
5858

59-
ConcurrentBoundedQueue<Chunk> queue;
60-
Chunk totals;
61-
Chunk extremes;
59+
ConcurrentBoundedQueue<Port::Data> queue;
60+
Port::Data totals;
61+
Port::Data extremes;
6262

6363
/// Is not used.
6464
static WriteBuffer out;

src/Processors/Port.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ class Port
6060
/// Note: std::variant can be used. But move constructor for it can't be inlined.
6161
Chunk chunk;
6262
std::exception_ptr exception;
63+
64+
Chunk getChunkOrTrow()
65+
{
66+
if (exception)
67+
std::rethrow_exception(std::move(exception));
68+
69+
return std::move(chunk);
70+
}
6371
};
6472

6573
private:
@@ -303,12 +311,7 @@ class InputPort : public Port
303311

304312
Chunk ALWAYS_INLINE pull(bool set_not_needed = false)
305313
{
306-
auto data_ = pullData(set_not_needed);
307-
308-
if (data_.exception)
309-
std::rethrow_exception(data_.exception);
310-
311-
return std::move(data_.chunk);
314+
return pullData(set_not_needed).getChunkOrTrow();
312315
}
313316

314317
bool ALWAYS_INLINE isFinished() const

tests/integration/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ You must install latest Docker from
1212
https://docs.docker.com/engine/installation/linux/docker-ce/ubuntu/#set-up-the-repository
1313
Don't use Docker from your system repository.
1414

15-
* [pip](https://pypi.python.org/pypi/pip) and `libpq-dev`. To install: `sudo apt-get install python3-pip libpq-dev zlib1g-dev libcrypto++-dev libssl-dev`
15+
* [pip](https://pypi.python.org/pypi/pip) and `libpq-dev`. To install: `sudo apt-get install python3-pip libpq-dev zlib1g-dev libcrypto++-dev libssl-dev libkrb5-dev`
1616
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
1717
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install:
1818

tests/integration/test_grpc_protocol/test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def test_progress():
239239
, output: "6\\t0\\n7\\t0\\n"
240240
, stats {
241241
rows: 8
242-
blocks: 4
242+
blocks: 5
243243
allocated_bytes: 324
244244
applied_limit: true
245245
rows_before_limit: 8

0 commit comments

Comments
 (0)