Skip to content

Commit af01aa5

Browse files
authored
Handle fields missing in the source in ParquetNativeRecordReader (#7742)
* Fix ParquetNativeRecordExtractor for fields missing in the source * nit * Same bug in proto
1 parent 928d4ef commit af01aa5

File tree

4 files changed

+71
-3
lines changed

4 files changed

+71
-3
lines changed

pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public GenericRow extract(Group from, GenericRow to) {
114114
}
115115
} else {
116116
for (String fieldName : _fields) {
117-
Object value = extractValue(from, fromType.getFieldIndex(fieldName));
117+
Object value = fromType.containsField(fieldName) ? extractValue(from, fromType.getFieldIndex(fieldName)) : null;
118118
if (value != null) {
119119
value = convert(value);
120120
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.plugin.inputformat.parquet;
20+
21+
import java.io.File;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.Map;
25+
import org.apache.avro.Schema;
26+
import org.apache.avro.generic.GenericData;
27+
import org.apache.avro.generic.GenericRecord;
28+
import org.apache.hadoop.fs.Path;
29+
import org.apache.parquet.hadoop.ParquetWriter;
30+
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
31+
import org.apache.pinot.spi.data.FieldSpec;
32+
import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
33+
import org.apache.pinot.spi.data.readers.RecordReader;
34+
35+
36+
public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
37+
private final File _dataFile = new File(_tempDir, "data.parquet");
38+
39+
@Override
40+
protected RecordReader createRecordReader()
41+
throws Exception {
42+
ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader();
43+
recordReader.init(_dataFile, _sourceFields, null);
44+
return recordReader;
45+
}
46+
47+
@Override
48+
protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
49+
throws Exception {
50+
Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(getPinotSchema());
51+
List<GenericRecord> records = new ArrayList<>();
52+
for (Map<String, Object> r : recordsToWrite) {
53+
GenericRecord record = new GenericData.Record(schema);
54+
for (FieldSpec fieldSpec : getPinotSchema().getAllFieldSpecs()) {
55+
record.put(fieldSpec.getName(), r.get(fieldSpec.getName()));
56+
}
57+
records.add(record);
58+
}
59+
try (ParquetWriter<GenericRecord> writer = ParquetUtils
60+
.getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
61+
for (GenericRecord record : records) {
62+
writer.write(record);
63+
}
64+
}
65+
}
66+
}

pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public GenericRow extract(Message from, GenericRow to) {
6767
} else {
6868
for (String fieldName : _fields) {
6969
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(fieldName);
70-
Object fieldValue = from.getField(fieldDescriptor);
70+
Object fieldValue = fieldDescriptor != null ? from.getField(fieldDescriptor) : null;
7171
if (fieldValue != null) {
7272
fieldValue = convert(new ProtoBufFieldInfo(fieldValue, descriptor.findFieldByName(fieldName)));
7373
}

pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ protected List<String> getPrimaryKeyColumns() {
150150
}
151151

152152
protected Set<String> getSourceFields(Schema schema) {
153-
return Sets.newHashSet(schema.getColumnNames());
153+
Set<String> sourceFields = Sets.newHashSet(schema.getColumnNames());
154+
sourceFields.add("column_not_in_source");
155+
return sourceFields;
154156
}
155157

156158
@BeforeClass

0 commit comments

Comments
 (0)