Skip to content

Commit a00ad70

Browse files
committed
7
1 parent 95debd3 commit a00ad70

File tree

6 files changed

+102
-83
lines changed

6 files changed

+102
-83
lines changed

cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include <DataTypes/DataTypeNullable.h>
2222
#include <DataTypes/DataTypesDecimal.h>
2323
#include <IO/ReadBufferFromString.h>
24-
#include <Storages/SubstraitSource/ParquetFormatFile.h>
2524
#include <Storages/SubstraitSource/iceberg/IcebergReader.h>
2625
#include <boost/algorithm/string/case_conv.hpp>
2726
#include <Common/CHUtil.h>
@@ -275,13 +274,9 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
275274
{
276275
auto createInputFormat = [&](const DB::Block & new_read_header_) -> FormatFile::InputFormatPtr
277276
{
278-
// Apply key condition to the reader.
279-
// If use_local_format is true, column_index_filter will be used otherwise it will be ignored
280-
if (auto * parquetFile = dynamic_cast<ParquetFormatFile *>(file.get()))
281-
return parquetFile->createInputFormat(new_read_header_, key_condition, column_index_filter);
282-
277+
bool usePageIndexReader = file->preparePageIndexReader(new_read_header_, column_index_filter);
283278
auto input_format = file->createInputFormat(new_read_header_);
284-
if (key_condition)
279+
if (!usePageIndexReader && key_condition)
285280
input_format->inputFormat().setKeyCondition(key_condition);
286281
return input_format;
287282
};

cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ namespace local_engine
4040
{
4141

4242
class FormatFile;
43+
class ColumnIndexFilter;
44+
using ColumnIndexFilterPtr = std::shared_ptr<ColumnIndexFilter>;
4345

4446
class FileMetaColumns
4547
{
@@ -108,13 +110,15 @@ class FormatFile
108110
public:
109111
virtual ~InputFormat() = default;
110112
DB::IInputFormat & inputFormat() const { return *input; }
111-
void cancel() const noexcept { return input->cancel(); }
113+
void cancel() const noexcept { input->cancel(); }
112114
virtual DB::Chunk generate() { return input->generate(); }
113115
InputFormat(std::unique_ptr<DB::ReadBuffer> read_buffer_, const DB::InputFormatPtr & input_)
114116
: read_buffer(std::move(read_buffer_)), input(input_)
115117
{
116118
}
117119
};
120+
121+
118122
using InputFormatPtr = std::shared_ptr<InputFormat>;
119123

120124
FormatFile(DB::ContextPtr context_, const substraitInputFile & file_info_, const ReadBufferBuilderPtr & read_buffer_builder_);
@@ -123,10 +127,16 @@ class FormatFile
123127
/// Create a new input format for reading this file
124128
virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0;
125129

126-
/// Spark would split a large file into small segements and read in different tasks
127-
/// If this file doesn't support the split feacture, only the task with offset 0 will generate data.
130+
/// Spark would split a large file into small segments and read in different tasks
131+
/// If this file doesn't support the split feature, only the task with offset 0 will generate data.
128132
virtual bool supportSplit() const { return false; }
129133

134+
/// Prepare the page index reader for the file.
135+
/// Return true if the page index reader is prepared successfully.
136+
///
137+
/// TODO: replace ColumnIndexFilterPtr with KeyCondition
138+
virtual bool preparePageIndexReader(const DB::Block &, const ColumnIndexFilterPtr &) { return false; }
139+
130140
/// Try to get rows from file metadata
131141
virtual std::optional<size_t> getTotalRows() { return {}; }
132142

cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -132,31 +132,34 @@ ParquetFormatFile::ParquetFormatFile(
132132
const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_,
133133
const ReadBufferBuilderPtr & read_buffer_builder_,
134134
bool use_local_format_)
135-
: FormatFile(context_, file_info_, read_buffer_builder_), use_pageindex_reader(use_local_format_)
135+
: FormatFile(context_, file_info_, read_buffer_builder_)
136+
, use_pageindex_reader(use_local_format_)
137+
, meta_builder_{nullptr}
138+
, read_buffer_{nullptr}
136139
{
137140
}
138141

139-
FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(
140-
const DB::Block & header,
141-
const std::shared_ptr<const DB::KeyCondition> & key_condition,
142-
const ColumnIndexFilterPtr & column_index_filter) const
142+
bool ParquetFormatFile::preparePageIndexReader(const DB::Block & header, const ColumnIndexFilterPtr & column_index_filter)
143143
{
144144
bool readRowIndex = hasMetaColumns(header);
145145
bool usePageIndexReader = (use_pageindex_reader || readRowIndex) && onlyHasFlatType(header);
146-
auto read_buffer = read_buffer_builder->build(file_info);
147146
auto format_settings = DB::getFormatSettings(context);
148147

149-
DB::Block output_header = header;
150-
DB::Block read_header = removeMetaColumns(header);
148+
meta_builder_ = std::make_unique<ParquetMetaBuilder>();
149+
ParquetMetaBuilder & metaBuilder = *meta_builder_;
150+
metaBuilder.collectPageIndex = usePageIndexReader || readRowIndex;
151151

152-
ParquetMetaBuilder metaBuilder{
153-
.collectPageIndex = usePageIndexReader || readRowIndex,
154-
.collectSkipRowGroup = !usePageIndexReader,
155-
.case_insensitive = format_settings.parquet.case_insensitive_column_matching,
156-
.allow_missing_columns = format_settings.parquet.allow_missing_columns};
152+
// VectorizedParquetBlockInputFormat needn't collect skip rows,
153+
// ColumnIndexRowRangesProvider will include such information.
154+
metaBuilder.collectSkipRowGroup = !usePageIndexReader;
157155

156+
metaBuilder.case_insensitive = format_settings.parquet.case_insensitive_column_matching;
157+
metaBuilder.allow_missing_columns = format_settings.parquet.allow_missing_columns;
158+
159+
DB::Block read_header = removeMetaColumns(header);
158160
ShouldIncludeRowGroup should_include_row_group{file_info};
159-
if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer *>(read_buffer.get()))
161+
read_buffer_ = read_buffer_builder->build(file_info);
162+
if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer *>(read_buffer_.get()))
160163
{
161164
// reuse the read_buffer to avoid opening the file twice.
162165
// especially,the cost of opening a hdfs file is large.
@@ -169,41 +172,54 @@ FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(
169172
metaBuilder.build(*in, read_header, column_index_filter.get(), should_include_row_group);
170173
}
171174

172-
if (metaBuilder.readRowGroups.empty())
173-
return nullptr;
175+
return usePageIndexReader;
176+
}
177+
178+
FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const DB::Block & header)
179+
{
180+
bool readRowIndex = hasMetaColumns(header);
181+
bool usePageIndexReader = (use_pageindex_reader || readRowIndex) && onlyHasFlatType(header);
182+
DB::Block output_header = header;
183+
DB::Block read_header = removeMetaColumns(header);
174184

185+
assert(meta_builder_);
186+
assert(read_buffer_);
187+
ParquetMetaBuilder & metaBuilder = *meta_builder_;
175188
auto provider = usePageIndexReader || readRowIndex ? std::make_unique<ColumnIndexRowRangesProvider>(metaBuilder) : nullptr;
189+
meta_builder_.reset();
190+
191+
auto format_settings = DB::getFormatSettings(context);
176192

177193
if (usePageIndexReader)
178194
{
179-
auto input = std::make_shared<VectorizedParquetBlockInputFormat>(*read_buffer, read_header, *provider, format_settings);
195+
auto input = std::make_shared<VectorizedParquetBlockInputFormat>(*read_buffer_, read_header, *provider, format_settings);
180196
return std::make_shared<ParquetInputFormat>(
181-
std::move(read_buffer), input, std::move(provider), std::move(read_header), std::move(output_header));
197+
std::move(read_buffer_), input, std::move(provider), std::move(read_header), std::move(output_header));
182198
}
183199

184200
const DB::Settings & settings = context->getSettingsRef();
185201
format_settings.parquet.skip_row_groups = std::unordered_set<int>(metaBuilder.skipRowGroups.begin(), metaBuilder.skipRowGroups.end());
186-
187202
if (readRowIndex)
188203
{
189204
assert(provider);
190-
/// In case of readRowIndex, we need to preserve the order of the rows
205+
206+
// In the case of readRowIndex, we need to preserve the order of the rows
191207
format_settings.parquet.preserve_order = true;
192208

193-
/// TODO: enable filter push down again
209+
// TODO: enable filter push down again
210+
// We need to disable fiter push down and read all row groups, so that we can
211+
// get correct row index.
194212
format_settings.parquet.filter_push_down = false;
195213
}
196-
197214
auto input = std::make_shared<DB::ParquetBlockInputFormat>(
198-
*read_buffer,
215+
*read_buffer_,
199216
read_header,
200217
format_settings,
201218
settings[DB::Setting::max_parsing_threads],
202219
settings[DB::Setting::max_download_threads],
203220
8192);
204-
input->setKeyCondition(key_condition);
205221
return std::make_shared<ParquetInputFormat>(
206-
std::move(read_buffer), input, std::move(provider), std::move(read_header), std::move(output_header));
222+
std::move(read_buffer_), input, std::move(provider), std::move(read_header), std::move(output_header));
207223
}
208224

209225
std::optional<size_t> ParquetFormatFile::getTotalRows()

cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,9 @@ class ParquetFormatFile : public FormatFile
3636
bool use_local_format_);
3737
~ParquetFormatFile() override = default;
3838

39-
InputFormatPtr createInputFormat(const DB::Block & /*header*/) override
40-
{
41-
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Use createInputFormat with key_condition and column_index_filter");
42-
}
39+
InputFormatPtr createInputFormat(const DB::Block & /*header*/) override;
40+
bool preparePageIndexReader(const DB::Block & header, const ColumnIndexFilterPtr & column_index_filter) override;
4341

44-
InputFormatPtr createInputFormat(
45-
const DB::Block & header,
46-
const std::shared_ptr<const DB::KeyCondition> & key_condition = nullptr,
47-
const ColumnIndexFilterPtr & column_index_filter = nullptr) const;
4842

4943
std::optional<size_t> getTotalRows() override;
5044

@@ -58,6 +52,8 @@ class ParquetFormatFile : public FormatFile
5852
bool use_pageindex_reader;
5953
std::mutex mutex;
6054
std::optional<size_t> total_rows;
55+
std::unique_ptr<ParquetMetaBuilder> meta_builder_;
56+
std::unique_ptr<DB::ReadBuffer> read_buffer_;
6157
};
6258

6359
}

cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,13 @@ void EqualityDeleteFileReader::readDeleteValues(EqualityDeleteActionBuilder & ex
139139

140140
assert(deleteFile_.equalityfieldids_size() == deleteBlock.columns());
141141
Names names;
142+
142143
//TODO: deleteFile_.equalityfieldids(i) - 1 ? why
143144
for (int i = 0; i < deleteFile_.equalityfieldids_size(); i++)
145+
{
146+
std::cerr << fmt::format("deleteFile_.equalityfieldids({}) = {}", i, deleteFile_.equalityfieldids(i)) << std::endl;
144147
names.push_back(read_header_.getByPosition(deleteFile_.equalityfieldids(i) - 1).name);
145-
148+
}
146149

147150
while (deleteBlock.rows() > 0)
148151
{

cpp-ch/local-engine/tests/gtest_iceberge_test.cpp

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -644,20 +644,19 @@ class IcebergTest : public ReaderTestBase
644644

645645
assertEqualityDeletes(*icebergSplit, duckDbSql);
646646

647-
// TODO: Select a column that's not in the filter columns
648-
// if (numDataColumns > 1 &&
649-
// equalityDeleteVectorMap.at(0).size() < numDataColumns) {
650-
// std::string duckDbSql1 = "SELECT c0 FROM IcebergTest.tmp";
651-
// if (numDeletedValues > 0) {
652-
// duckDbSql += fmt::format(" WHERE {}", predicates);
653-
// }
654-
//
655-
// auto icebergSplit1 = makeIcebergSplit(dataFilePath->string(),
656-
// DB::Block{DB::ColumnWithTypeAndName(BIGINT(),"c0")},
657-
// deleteFiles);
658-
//
659-
// assertEqualityDeletes(*icebergSplit1, duckDbSql1);
660-
// }
647+
if (numDataColumns > 1 &&
648+
equalityDeleteVectorMap.at(0).size() < numDataColumns) {
649+
std::string duckDbSql1 = "SELECT c0 FROM IcebergTest.tmp";
650+
if (numDeletedValues > 0) {
651+
duckDbSql1 += fmt::format(" WHERE {}", predicates);
652+
}
653+
654+
auto icebergSplit1 = makeIcebergSplit(dataFilePath->string(),
655+
DB::Block{DB::ColumnWithTypeAndName(BIGINT(),"c0")},
656+
deleteFiles);
657+
658+
assertEqualityDeletes(*icebergSplit1, duckDbSql1);
659+
}
661660
}
662661

663662
void assertMultipleSplits(
@@ -1094,31 +1093,31 @@ TEST_F(IcebergTest, equalityDeletesSingleFileColumn2) {
10941093
equalityDeleteVectorMap.insert({0, {{0, 1, 2, 3}}});
10951094
assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
10961095

1097-
// Delete the smallest value 0 and the largest value 9999 from the second
1098-
// column, which has the range [0, 9999]
1099-
equalityDeleteVectorMap.clear();
1100-
equalityDeleteVectorMap.insert({0, {{0, 9999}}});
1101-
assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1102-
1103-
// Delete non-existent values from the second column
1104-
equalityDeleteVectorMap.clear();
1105-
equalityDeleteVectorMap.insert({0, {{10000, 10002, 19999}}});
1106-
assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1107-
1108-
// Delete random rows from the second column
1109-
equalityDeleteVectorMap.clear();
1110-
equalityDeleteVectorMap.insert({0, {makeSequenceValues(rowCount)}});
1111-
assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1112-
1113-
// Delete 0 values
1114-
equalityDeleteVectorMap.clear();
1115-
equalityDeleteVectorMap.insert({0, {{}}});
1116-
assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1117-
1118-
// Delete all values
1119-
equalityDeleteVectorMap.clear();
1120-
equalityDeleteVectorMap.insert({0, {makeSequenceValues(rowCount / 2)}});
1121-
assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1096+
// // Delete the smallest value 0 and the largest value 9999 from the second
1097+
// // column, which has the range [0, 9999]
1098+
// equalityDeleteVectorMap.clear();
1099+
// equalityDeleteVectorMap.insert({0, {{0, 9999}}});
1100+
// assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1101+
//
1102+
// // Delete non-existent values from the second column
1103+
// equalityDeleteVectorMap.clear();
1104+
// equalityDeleteVectorMap.insert({0, {{10000, 10002, 19999}}});
1105+
// assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1106+
//
1107+
// // Delete random rows from the second column
1108+
// equalityDeleteVectorMap.clear();
1109+
// equalityDeleteVectorMap.insert({0, {makeSequenceValues(rowCount)}});
1110+
// assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1111+
//
1112+
// // Delete 0 values
1113+
// equalityDeleteVectorMap.clear();
1114+
// equalityDeleteVectorMap.insert({0, {{}}});
1115+
// assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
1116+
//
1117+
// // Delete all values
1118+
// equalityDeleteVectorMap.clear();
1119+
// equalityDeleteVectorMap.insert({0, {makeSequenceValues(rowCount / 2)}});
1120+
// assertEqualityDeletes(equalityDeleteVectorMap, equalityFieldIdsMap);
11221121
}
11231122

11241123
// Delete values from 2 columns with the following data:

0 commit comments

Comments
 (0)