Skip to content

Commit 4d1f528

Browse files
authored
Merge pull request #17918 from ClickHouse/revert-17120-fix_granularity_on_block_borders
Revert "Fix index granularity calculation on block borders"
2 parents 6a9e1ce + 7783ddb commit 4d1f528

10 files changed

+27
-135
lines changed

src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -44,29 +44,6 @@ void IMergeTreeDataPartWriter::next()
4444
index_offset = next_index_offset;
4545
}
4646

47-
bool IMergeTreeDataPartWriter::adjustLastUnfinishedMark(size_t new_block_index_granularity)
48-
{
49-
/// If amount of rest rows in the last granule more then granularity of the new block
50-
/// than finish it.
51-
if (!index_granularity.empty() && index_offset > new_block_index_granularity)
52-
{
53-
size_t already_written_rows_in_last_granule = index_granularity.getLastMarkRows() - index_offset;
54-
/// We can still write some rows to the last granule
55-
if (already_written_rows_in_last_granule < new_block_index_granularity)
56-
{
57-
index_granularity.setLastMarkRows(new_block_index_granularity);
58-
index_offset = new_block_index_granularity - already_written_rows_in_last_granule;
59-
}
60-
else /// Our last granule is already full, let's start the new one
61-
{
62-
index_granularity.setLastMarkRows(already_written_rows_in_last_granule);
63-
index_offset = 0;
64-
}
65-
return true;
66-
}
67-
return false;
68-
}
69-
7047
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
7148

7249
}

src/Storages/MergeTree/IMergeTreeDataPartWriter.h

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,41 +62,28 @@ class IMergeTreeDataPartWriter : private boost::noncopyable
6262
protected:
6363
size_t getCurrentMark() const { return current_mark; }
6464
size_t getIndexOffset() const { return index_offset; }
65-
/// Finishes our current unfinished mark if we have already written more rows for it
66-
/// than granularity in the new block. Return true if last mark actually was adjusted.
67-
/// Example:
68-
/// __|________|___. <- previous block with granularity 8 and last unfinished mark with 3 rows
69-
/// new_block_index_granularity = 2, so
70-
/// __|________|___|__|__|__|
71-
/// ^ finish last unfinished mark, new marks will have granularity 2
72-
bool adjustLastUnfinishedMark(size_t new_block_index_granularity);
7365

7466
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
7567
using SerializationStates = std::unordered_map<String, SerializationState>;
7668

7769
MergeTreeData::DataPartPtr data_part;
7870
const MergeTreeData & storage;
79-
const StorageMetadataPtr metadata_snapshot;
80-
const NamesAndTypesList columns_list;
81-
const MergeTreeIndices skip_indices;
71+
StorageMetadataPtr metadata_snapshot;
72+
NamesAndTypesList columns_list;
73+
MergeTreeIndices skip_indices;
8274
MergeTreeIndexGranularity index_granularity;
83-
const MergeTreeWriterSettings settings;
84-
const bool with_final_mark;
75+
MergeTreeWriterSettings settings;
76+
bool with_final_mark;
8577

8678
size_t next_mark = 0;
8779
size_t next_index_offset = 0;
8880

89-
/// When we were writing fresh block granularity of the last mark was adjusted
90-
/// See adjustLastUnfinishedMark
91-
bool last_granule_was_adjusted = false;
92-
9381
MutableColumns index_columns;
9482

9583
private:
9684
/// Data is already written up to this mark.
9785
size_t current_mark = 0;
9886
/// The offset to the first row of the block for which you want to write the index.
99-
/// Or how many rows we have to write for this last unfinished mark.
10087
size_t index_offset = 0;
10188
};
10289

src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
184184
default_codec, settings.max_compress_block_size,
185185
0, settings.aio_threshold));
186186
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
187-
marks_in_skip_index_aggregator.push_back(0);
188-
rows_in_skip_index_aggregator_last_mark.push_back(0);
187+
skip_index_filling.push_back(0);
189188
}
190189

191190
skip_indices_initialized = true;
@@ -257,11 +256,9 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
257256
skip_index_current_data_mark = skip_index_data_mark;
258257
while (prev_pos < rows)
259258
{
260-
bool new_block_started = prev_pos == 0;
261259
UInt64 limit = 0;
262260
size_t current_index_offset = getIndexOffset();
263-
/// We start new block, but have an offset from previous one
264-
if (new_block_started && current_index_offset != 0)
261+
if (prev_pos == 0 && current_index_offset != 0)
265262
{
266263
limit = current_index_offset;
267264
}
@@ -273,15 +270,10 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
273270
else
274271
{
275272
limit = index_granularity.getMarkRows(skip_index_current_data_mark);
276-
/// We just started new block serialization but last unfinished mark was shrinked to it's current_size
277-
/// it may happen that we have already aggregated current_size of rows of more for skip_index, but not flushed it to disk
278-
/// because previous granule size was bigger. So do it here.
279-
if (new_block_started && last_granule_was_adjusted && rows_in_skip_index_aggregator_last_mark[i] >= limit)
280-
accountMarkForSkipIdxAndFlushIfNeeded(i);
281-
282273
if (skip_indices_aggregators[i]->empty())
283274
{
284275
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
276+
skip_index_filling[i] = 0;
285277

286278
if (stream.compressed.offset() >= settings.min_compress_block_size)
287279
stream.compressed.next();
@@ -293,19 +285,24 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
293285
if (settings.can_use_adaptive_granularity)
294286
writeIntBinary(1UL, stream.marks);
295287
}
296-
297288
/// this mark is aggregated, go to the next one
298289
skip_index_current_data_mark++;
299290
}
300291

301292
size_t pos = prev_pos;
302293
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, limit);
303-
rows_in_skip_index_aggregator_last_mark[i] = (pos - prev_pos);
304294

305-
/// We just aggregated all rows in current mark, add new mark to skip_index marks counter
306-
/// and flush on disk if we already aggregated required amount of marks.
307-
if (rows_in_skip_index_aggregator_last_mark[i] == limit)
308-
accountMarkForSkipIdxAndFlushIfNeeded(i);
295+
if (pos == prev_pos + limit)
296+
{
297+
++skip_index_filling[i];
298+
299+
/// write index if it is filled
300+
if (skip_index_filling[i] == index_helper->index.granularity)
301+
{
302+
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
303+
skip_index_filling[i] = 0;
304+
}
305+
}
309306
prev_pos = pos;
310307
}
311308
}
@@ -363,21 +360,7 @@ void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(
363360

364361
skip_indices_streams.clear();
365362
skip_indices_aggregators.clear();
366-
marks_in_skip_index_aggregator.clear();
367-
rows_in_skip_index_aggregator_last_mark.clear();
368-
}
369-
370-
void MergeTreeDataPartWriterOnDisk::accountMarkForSkipIdxAndFlushIfNeeded(size_t skip_index_pos)
371-
{
372-
++marks_in_skip_index_aggregator[skip_index_pos];
373-
374-
/// write index if it is filled
375-
if (marks_in_skip_index_aggregator[skip_index_pos] == skip_indices[skip_index_pos]->index.granularity)
376-
{
377-
skip_indices_aggregators[skip_index_pos]->getGranuleAndReset()->serializeBinary(skip_indices_streams[skip_index_pos]->compressed);
378-
marks_in_skip_index_aggregator[skip_index_pos] = 0;
379-
rows_in_skip_index_aggregator_last_mark[skip_index_pos] = 0;
380-
}
363+
skip_index_filling.clear();
381364
}
382365

383366
}

src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,16 @@ class MergeTreeDataPartWriterOnDisk : public IMergeTreeDataPartWriter
9797
const String marks_file_extension;
9898
CompressionCodecPtr default_codec;
9999

100-
const bool compute_granularity;
100+
bool compute_granularity;
101+
bool need_finish_last_granule;
101102

102103
/// Number of marsk in data from which skip indices have to start
103104
/// aggregation. I.e. it's data mark number, not skip indices mark.
104105
size_t skip_index_data_mark = 0;
105106

106107
std::vector<StreamPtr> skip_indices_streams;
107108
MergeTreeIndexAggregators skip_indices_aggregators;
108-
/// Amount of marks currently serialized in skip index aggregator
109-
std::vector<size_t> marks_in_skip_index_aggregator;
110-
/// Amount of rows currently serialized in skip index aggregator for last mark
111-
std::vector<size_t> rows_in_skip_index_aggregator_last_mark;
109+
std::vector<size_t> skip_index_filling;
112110

113111
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
114112
std::unique_ptr<HashingWriteBuffer> index_stream;
@@ -127,11 +125,6 @@ class MergeTreeDataPartWriterOnDisk : public IMergeTreeDataPartWriter
127125
private:
128126
/// Index is already serialized up to this mark.
129127
size_t index_mark = 0;
130-
131-
/// Increment corresponding marks_in_skip_index_aggregator[skip_index_pos]
132-
/// value and flush skip_indices_streams[skip_index_pos] to disk if we have
133-
/// aggregated enough marks
134-
void accountMarkForSkipIdxAndFlushIfNeeded(size_t skip_index_pos);
135128
};
136129

137130
}

src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,6 @@ void MergeTreeDataPartWriterWide::write(const Block & block,
9595
if (compute_granularity)
9696
{
9797
size_t index_granularity_for_block = computeIndexGranularity(block);
98-
/// Finish last unfinished mark rows it it's required
99-
last_granule_was_adjusted = adjustLastUnfinishedMark(index_granularity_for_block);
100-
/// Fill index granularity with granules of new size
10198
fillIndexGranularity(index_granularity_for_block, block.rows());
10299
}
103100

src/Storages/MergeTree/MergeTreeIndexGranularity.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,6 @@ void MergeTreeIndexGranularity::addRowsToLastMark(size_t rows_count)
5555
marks_rows_partial_sums.back() += rows_count;
5656
}
5757

58-
void MergeTreeIndexGranularity::setLastMarkRows(size_t rows_count)
59-
{
60-
if (marks_rows_partial_sums.empty())
61-
marks_rows_partial_sums.push_back(rows_count);
62-
else
63-
{
64-
marks_rows_partial_sums.back() -= getLastMarkRows();
65-
marks_rows_partial_sums.back() += rows_count;
66-
}
67-
}
68-
6958
void MergeTreeIndexGranularity::popMark()
7059
{
7160
if (!marks_rows_partial_sums.empty())

src/Storages/MergeTree/MergeTreeIndexGranularity.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ class MergeTreeIndexGranularity
9898
/// Extends last mark by rows_count.
9999
void addRowsToLastMark(size_t rows_count);
100100

101-
/// Set amount of rows to last mark
102-
/// (add new mark if new have nothing)
103-
void setLastMarkRows(size_t rows_count);
104-
105101
/// Drops last mark if any exists.
106102
void popMark();
107103

tests/queries/0_stateless/00955_test_final_mark.sql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-1
119119

120120
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
121121

122-
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database=currentDatabase();
122+
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
123123

124124
INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
125125

126126
OPTIMIZE TABLE mt_without_pk FINAL;
127127

128128
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
129129

130-
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database=currentDatabase();
130+
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
131131

132132
DROP TABLE IF EXISTS mt_without_pk;
133133

@@ -149,14 +149,14 @@ INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (to
149149

150150
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
151151

152-
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database=currentDatabase();
152+
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
153153

154154
INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
155155

156156
OPTIMIZE TABLE mt_with_small_granularity FINAL;
157157

158158
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
159159

160-
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database=currentDatabase();
160+
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
161161

162162
DROP TABLE IF EXISTS mt_with_small_granularity;

tests/queries/0_stateless/01577_adaptive_granularity_block_borders.reference

Lines changed: 0 additions & 2 deletions
This file was deleted.

tests/queries/0_stateless/01577_adaptive_granularity_block_borders.sql

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)