Skip to content

Commit 49839b9

Browse files
committed
Supporting delete columns not in select columns
1 parent 8257ae0 commit 49839b9

15 files changed

+233
-156
lines changed

cpp-ch/local-engine/Storages/Parquet/ParquetMeta.cpp

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
#include "ParquetMeta.h"
1919

20+
#include <Formats/FormatFactory.h>
2021
#include <Formats/FormatSettings.h>
2122
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
23+
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
2224
#include <Processors/Formats/Impl/ArrowFieldIndexUtil.h>
2325
#include <Storages/Parquet/ArrowUtils.h>
2426
#include <parquet/arrow/reader.h>
2527
#include <parquet/arrow/schema.h>
2628
#include <parquet/metadata.h>
27-
#include "Processors/Formats/Impl/ArrowColumnToCHColumn.h"
2829

2930
namespace DB
3031
{
@@ -34,13 +35,14 @@ extern const int BAD_ARGUMENTS;
3435
}
3536
}
3637

38+
using namespace DB;
3739

3840
namespace local_engine
3941
{
4042

41-
std::unique_ptr<parquet::ParquetFileReader> ParquetMetaBuilder::openInputParquetFile(DB::ReadBuffer & read_buffer)
43+
std::unique_ptr<parquet::ParquetFileReader> ParquetMetaBuilder::openInputParquetFile(ReadBuffer & read_buffer)
4244
{
43-
const DB::FormatSettings format_settings{
45+
const FormatSettings format_settings{
4446
.seekable_read = true,
4547
};
4648
std::atomic<int> is_stopped{0};
@@ -49,13 +51,28 @@ std::unique_ptr<parquet::ParquetFileReader> ParquetMetaBuilder::openInputParquet
4951
return parquet::ParquetFileReader::Open(arrow_file, parquet::default_reader_properties(), nullptr);
5052
}
5153

54+
Block ParquetMetaBuilder::collectFileSchema(const ContextPtr & context, ReadBuffer & read_buffer)
55+
{
56+
assert(dynamic_cast<SeekableReadBuffer *>(&read_buffer) != nullptr);
57+
58+
FormatSettings format_settings = getFormatSettings(context);
59+
ParquetMetaBuilder metaBuilder{
60+
.case_insensitive = format_settings.parquet.case_insensitive_column_matching,
61+
.allow_missing_columns = false,
62+
.collectPageIndex = false,
63+
.collectSchema = true};
64+
metaBuilder.build(read_buffer);
65+
66+
return metaBuilder.fileHeader;
67+
}
68+
5269
std::vector<Int32> ParquetMetaBuilder::pruneColumn(
53-
const DB::Block & header, const parquet::FileMetaData & metadata, bool case_insensitive, bool allow_missing_columns)
70+
const Block & header, const parquet::FileMetaData & metadata, bool case_insensitive, bool allow_missing_columns)
5471
{
5572
std::shared_ptr<arrow::Schema> schema;
5673
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata.schema(), &schema));
5774

58-
DB::ArrowFieldIndexUtil field_util(case_insensitive, allow_missing_columns);
75+
ArrowFieldIndexUtil field_util(case_insensitive, allow_missing_columns);
5976
auto index_mapping = field_util.findRequiredIndices(header, *schema, metadata);
6077

6178
std::vector<Int32> column_indices;
@@ -93,7 +110,7 @@ ParquetMetaBuilder & ParquetMetaBuilder::buildSchema(const parquet::FileMetaData
93110
std::shared_ptr<arrow::Schema> schema;
94111
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(file_meta.schema(), &schema));
95112

96-
fileHeader = DB::ArrowColumnToCHColumn::arrowSchemaToCHHeader(*schema, "Parquet", false, true);
113+
fileHeader = ArrowColumnToCHColumn::arrowSchemaToCHHeader(*schema, "Parquet", false, true);
97114
}
98115
return *this;
99116
}
@@ -175,7 +192,7 @@ ParquetMetaBuilder & ParquetMetaBuilder::buildAllRowRange(const parquet::FileMet
175192
ParquetMetaBuilder & ParquetMetaBuilder::buildRowRange(
176193
parquet::ParquetFileReader & reader,
177194
const parquet::FileMetaData & file_meta,
178-
const DB::Block & readBlock,
195+
const Block & readBlock,
179196
const ColumnIndexFilter * column_index_filter)
180197
{
181198
if (collectPageIndex)
@@ -200,8 +217,8 @@ ParquetMetaBuilder & ParquetMetaBuilder::buildRowRange(
200217
}
201218

202219
ParquetMetaBuilder & ParquetMetaBuilder::build(
203-
DB::ReadBuffer & read_buffer,
204-
const DB::Block & readBlock,
220+
ReadBuffer & read_buffer,
221+
const Block & readBlock,
205222
const ColumnIndexFilter * column_index_filter,
206223
const std::function<bool(UInt64)> & should_include_row_group)
207224
{
@@ -213,7 +230,7 @@ ParquetMetaBuilder & ParquetMetaBuilder::build(
213230
.buildRowRange(*reader, *fileMetaData, readBlock, column_index_filter);
214231
}
215232

216-
ParquetMetaBuilder & ParquetMetaBuilder::build(DB::ReadBuffer & read_buffer, const std::function<bool(UInt64)> & should_include_row_group)
233+
ParquetMetaBuilder & ParquetMetaBuilder::build(ReadBuffer & read_buffer, const std::function<bool(UInt64)> & should_include_row_group)
217234
{
218235
auto reader = openInputParquetFile(read_buffer);
219236
fileMetaData = reader->metadata();

cpp-ch/local-engine/Storages/Parquet/ParquetMeta.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ struct ParquetMetaBuilder
7272

7373
static std::unique_ptr<parquet::ParquetFileReader> openInputParquetFile(DB::ReadBuffer & read_buffer);
7474

75+
static DB::Block collectFileSchema(const DB::ContextPtr & context, DB::ReadBuffer & read_buffer);
76+
7577
private:
7678
ParquetMetaBuilder &
7779
buildRequiredRowGroups(const parquet::FileMetaData & file_meta, const std::function<bool(UInt64)> & should_include_row_group);

cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ class VectorizedParquetBlockInputFormat final : public DB::IInputFormat
214214
protected:
215215
void onCancel() noexcept override { is_stopped = 1; }
216216

217+
// TODO: create ColumnIndexFilter here, currently disable it now.
218+
void setKeyCondition(const std::shared_ptr<const DB::KeyCondition> & key_condition_) override { }
219+
217220
public:
218221
VectorizedParquetBlockInputFormat(
219222
DB::ReadBuffer & in_,

cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -273,15 +273,11 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
273273
const std::shared_ptr<const DB::KeyCondition> & key_condition = nullptr,
274274
const ColumnIndexFilterPtr & column_index_filter = nullptr)
275275
{
276+
file->initialize(column_index_filter);
276277
auto createInputFormat = [&](const DB::Block & new_read_header_) -> FormatFile::InputFormatPtr
277278
{
278-
// Apply key condition to the reader.
279-
// If use_local_format is true, column_index_filter will be used otherwise it will be ignored
280-
if (auto * parquetFile = dynamic_cast<ParquetFormatFile *>(file.get()))
281-
return parquetFile->createInputFormat(new_read_header_, key_condition, column_index_filter);
282-
283279
auto input_format = file->createInputFormat(new_read_header_);
284-
if (key_condition)
280+
if (key_condition && input_format)
285281
input_format->inputFormat().setKeyCondition(key_condition);
286282
return input_format;
287283
};

cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ namespace local_engine
4040
{
4141

4242
class FormatFile;
43-
43+
class ColumnIndexFilter;
44+
using ColumnIndexFilterPtr = std::shared_ptr<ColumnIndexFilter>;
4445
class FileMetaColumns
4546
{
4647
public:
@@ -108,13 +109,14 @@ class FormatFile
108109
public:
109110
virtual ~InputFormat() = default;
110111
DB::IInputFormat & inputFormat() const { return *input; }
111-
void cancel() const noexcept { return input->cancel(); }
112+
void cancel() const noexcept { input->cancel(); }
112113
virtual DB::Chunk generate() { return input->generate(); }
113114
InputFormat(std::unique_ptr<DB::ReadBuffer> read_buffer_, const DB::InputFormatPtr & input_)
114115
: read_buffer(std::move(read_buffer_)), input(input_)
115116
{
116117
}
117118
};
119+
118120
using InputFormatPtr = std::shared_ptr<InputFormat>;
119121

120122
FormatFile(DB::ContextPtr context_, const substraitInputFile & file_info_, const ReadBufferBuilderPtr & read_buffer_builder_);
@@ -127,6 +129,9 @@ class FormatFile
127129
/// If this file doesn't support the split feacture, only the task with offset 0 will generate data.
128130
virtual bool supportSplit() const { return false; }
129131

132+
/// Initialize the file with column index filter
133+
virtual void initialize(const ColumnIndexFilterPtr &) { }
134+
130135
/// Try to get rows from file metadata
131136
virtual std::optional<size_t> getTotalRows() { return {}; }
132137

@@ -143,6 +148,8 @@ class FormatFile
143148
const substraitInputFile & getFileInfo() const { return file_info; }
144149
const DB::ContextPtr & getContext() const { return context; }
145150

151+
const DB::Block & getFileSchema() const { return file_schema; }
152+
146153
protected:
147154
DB::ContextPtr context;
148155
const substraitInputFile file_info;
@@ -152,6 +159,9 @@ class FormatFile
152159
std::map<String, String> normalized_partition_values;
153160
std::shared_ptr<const DB::KeyCondition> key_condition;
154161
const FileMetaColumns meta_columns;
162+
163+
/// Currently, it is used to read an iceberg format, and initialized in the constructor of child class
164+
DB::Block file_schema;
155165
};
156166

157167
using FormatFilePtr = std::shared_ptr<FormatFile>;

0 commit comments

Comments
 (0)