Skip to content

Commit b36d656

Browse files
add LazyRow abstraction for previously indexed record
1 parent c3f7b6d commit b36d656

File tree

5 files changed

+252
-36
lines changed

5 files changed

+252
-36
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.segment.readers;
20+
21+
import java.io.IOException;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
26+
import javax.annotation.Nullable;
27+
import org.apache.pinot.segment.spi.IndexSegment;
28+
29+
30+
/**
31+
* <p>A wrapper class to read column values of a row for a given {@link IndexSegment} and docId.<br>
32+
* The advantage of having wrapper over segment and docId is column values are read only when
33+
* {@link LazyRow#getValue(String)} is invoked.
34+
* This is useful to reduce the disk reads incurred due to loading the complete previous row during merge step.
35+
*
36+
* <p>The LazyRow has an internal state and should not be used concurrently. To reuse the LazyRow, create an instance
37+
* using no arg constructor and re-initialise using {@link LazyRow#init(IndexSegment, int)}
38+
*/
39+
public class LazyRow {
40+
private final Map<String, Object> _fieldToValueMap = new HashMap<>();
41+
private final Set<String> _nullValueFields = new HashSet<>();
42+
private IndexSegment _segment;
43+
private int _docId;
44+
45+
public LazyRow() {
46+
}
47+
48+
public void init(IndexSegment segment, int docId) {
49+
clear();
50+
_segment = segment;
51+
_docId = docId;
52+
}
53+
54+
/**
55+
* Computes a field's value in an indexed row.
56+
* @param fieldName
57+
* @return Returns value or null for persisted null values
58+
*/
59+
@Nullable
60+
public Object getValue(String fieldName) {
61+
62+
// if field's value was previously read as null, return null
63+
if (_nullValueFields.contains(fieldName)) {
64+
return null;
65+
}
66+
if (_segment == null) {
67+
throw new IllegalStateException("Index segment for Lazy row is uninitialized.");
68+
}
69+
70+
// compute the _fieldToValueMap or _nullValueFields based on the indexed value
71+
return _fieldToValueMap.computeIfAbsent(fieldName, col -> {
72+
Object value = null;
73+
try (PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(_segment, col)) {
74+
if (!columnReader.isNull(_docId)) {
75+
value = columnReader.getValue(_docId);
76+
} else {
77+
_nullValueFields.add(fieldName);
78+
}
79+
} catch (IOException e) {
80+
throw new RuntimeException(
81+
String.format("Caught exception while closing pinotSegmentColumnReader for fieldName: %s", fieldName), e);
82+
}
83+
return value;
84+
});
85+
}
86+
87+
public boolean isNullValue(String fieldName) {
88+
return _nullValueFields.contains(fieldName) || getValue(fieldName) == null;
89+
}
90+
91+
public void clear() {
92+
_fieldToValueMap.clear();
93+
_nullValueFields.clear();
94+
}
95+
96+
public Set<String> getColumnNames() {
97+
if (_segment == null) {
98+
throw new IllegalStateException("Index segment for Lazy row is uninitialized.");
99+
}
100+
return _segment.getColumnNames();
101+
}
102+
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.pinot.common.metrics.ServerMetrics;
3434
import org.apache.pinot.common.utils.LLCSegmentName;
3535
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
36+
import org.apache.pinot.segment.local.segment.readers.LazyRow;
3637
import org.apache.pinot.segment.local.utils.HashUtils;
3738
import org.apache.pinot.segment.spi.IndexSegment;
3839
import org.apache.pinot.segment.spi.MutableSegment;
@@ -51,6 +52,9 @@
5152
@ThreadSafe
5253
public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUpsertMetadataManager {
5354

55+
// Used to initialize a reference to previous row for merging in partial upsert
56+
private final LazyRow _reusePreviousRow = new LazyRow();
57+
5458
@VisibleForTesting
5559
final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
5660

@@ -302,7 +306,8 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
302306
ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds();
303307
int currentDocId = recordLocation.getDocId();
304308
if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
305-
_partialUpsertHandler.merge(currentSegment, currentDocId, record);
309+
_reusePreviousRow.init(currentSegment, currentDocId);
310+
_partialUpsertHandler.merge(_reusePreviousRow, record);
306311
}
307312
}
308313
return recordLocation;

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,13 @@
1818
*/
1919
package org.apache.pinot.segment.local.upsert;
2020

21-
import java.io.IOException;
2221
import java.util.HashMap;
2322
import java.util.List;
2423
import java.util.Map;
25-
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
24+
import org.apache.pinot.segment.local.segment.readers.LazyRow;
2625
import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
2726
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
2827
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
29-
import org.apache.pinot.segment.spi.IndexSegment;
3028
import org.apache.pinot.spi.config.table.UpsertConfig;
3129
import org.apache.pinot.spi.data.Schema;
3230
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -64,53 +62,42 @@ public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> pa
6462
* For example, overwrite merger will only override the prev value if the new value is not null.
6563
* Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
6664
*
67-
* @param indexSegment the segment of the last derived full record during ingestion.
68-
* @param docId the docId of the last derived full record during ingestion in the segment.
65+
* @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
66+
* re-reads.
6967
* @param newRecord the new consumed record.
7068
*/
71-
public void merge(IndexSegment indexSegment, int docId, GenericRow newRecord) {
72-
for (String column : indexSegment.getColumnNames()) {
69+
public void merge(LazyRow prevRecord, GenericRow newRecord) {
70+
for (String column : prevRecord.getColumnNames()) {
7371
if (!_primaryKeyColumns.contains(column)) {
7472
PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
7573
// Non-overwrite mergers
7674
// (1) If the value of the previous is null value, skip merging and use the new value
7775
// (2) Else If the value of new value is null, use the previous value (even for comparison columns).
7876
// (3) Else If the column is not a comparison column, we applied the merged value to it.
7977
if (!(merger instanceof OverwriteMerger)) {
80-
try (PinotSegmentColumnReader pinotSegmentColumnReader = new PinotSegmentColumnReader(indexSegment, column)) {
81-
if (!pinotSegmentColumnReader.isNull(docId)) {
82-
Object previousValue = pinotSegmentColumnReader.getValue(docId);
83-
if (newRecord.isNullValue(column)) {
84-
// Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
85-
// using
86-
// multiple comparison columns. We never apply a merge function to it, rather we just take any/all
87-
// non-null comparison column values from the previous record, and the sole non-null comparison column
88-
// value from the new record.
89-
newRecord.putValue(column, previousValue);
90-
newRecord.removeNullValueField(column);
91-
} else if (!_comparisonColumns.contains(column)) {
92-
newRecord.putValue(column, merger.merge(previousValue, newRecord.getValue(column)));
93-
}
78+
Object prevValue = prevRecord.getValue(column);
79+
if (prevValue != null) {
80+
if (newRecord.isNullValue(column)) {
81+
// Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
82+
// using
83+
// multiple comparison columns. We never apply a merge function to it, rather we just take any/all
84+
// non-null comparison column values from the previous record, and the sole non-null comparison column
85+
// value from the new record.
86+
newRecord.putValue(column, prevValue);
87+
newRecord.removeNullValueField(column);
88+
} else if (!_comparisonColumns.contains(column)) {
89+
newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
9490
}
95-
} catch (IOException e) {
96-
throw new RuntimeException(
97-
String.format("Caught exception while closing pinotSegmentColumnReader for column: %s", column), e);
9891
}
9992
} else {
10093
// Overwrite mergers.
10194
// (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
10295
// (2) Otherwise, if previous is not null, init columnReader and use the previous value.
10396
if (newRecord.isNullValue(column)) {
104-
try (PinotSegmentColumnReader pinotSegmentColumnReader = new PinotSegmentColumnReader(indexSegment,
105-
column)) {
106-
if (!pinotSegmentColumnReader.isNull(docId)) {
107-
Object previousValue = pinotSegmentColumnReader.getValue(docId);
108-
newRecord.putValue(column, previousValue);
109-
newRecord.removeNullValueField(column);
110-
}
111-
} catch (IOException e) {
112-
throw new RuntimeException(
113-
String.format("Caught exception while closing pinotSegmentColumnReader for column: %s", column), e);
97+
Object prevValue = prevRecord.getValue(column);
98+
if (prevValue != null) {
99+
newRecord.putValue(column, prevValue);
100+
newRecord.removeNullValueField(column);
114101
}
115102
}
116103
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.segment.readers;
20+
21+
import java.util.Arrays;
22+
import java.util.HashSet;
23+
import org.apache.pinot.segment.spi.IndexSegment;
24+
import org.apache.pinot.segment.spi.datasource.DataSource;
25+
import org.apache.pinot.segment.spi.index.reader.Dictionary;
26+
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
27+
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
28+
import org.testng.annotations.Test;
29+
30+
import static org.mockito.ArgumentMatchers.any;
31+
import static org.mockito.ArgumentMatchers.eq;
32+
import static org.mockito.Mockito.*;
33+
import static org.testng.Assert.assertEquals;
34+
import static org.testng.Assert.assertFalse;
35+
import static org.testng.Assert.assertNull;
36+
import static org.testng.Assert.assertTrue;
37+
38+
39+
public class LazyRowTest {
40+
41+
private DataSource _col1Datasource;
42+
private Dictionary _col2Dictionary;
43+
44+
@Test
45+
public void testIsNullField() {
46+
IndexSegment segment = getMockSegment();
47+
LazyRow lazyRow = spy(new LazyRow());
48+
lazyRow.init(segment, 1);
49+
50+
// first invocation will read from disk
51+
assertTrue(lazyRow.isNullValue("col1"));
52+
53+
assertTrue(lazyRow.isNullValue("col1"));
54+
// only one disk read.
55+
verify(lazyRow, times(1)).getValue("col1");
56+
57+
// should return false when value exists for a field in an indexed row
58+
assertFalse(lazyRow.isNullValue("col2"));
59+
}
60+
61+
@Test
62+
public void testGetValue() {
63+
IndexSegment segment = getMockSegment();
64+
LazyRow lazyRow = spy(new LazyRow());
65+
lazyRow.init(segment, 1);
66+
67+
// should return persisted value
68+
assertEquals(lazyRow.getValue("col2"), "val2");
69+
70+
// second invocation should read from LazyRow._nullValueFields
71+
assertEquals(lazyRow.getValue("col2"), "val2");
72+
// only one disk read
73+
verify(_col2Dictionary, times(1)).get(1);
74+
75+
assertNull(lazyRow.getValue("col1"));
76+
}
77+
78+
@Test
79+
public void testGetColumnNames() {
80+
IndexSegment segment = getMockSegment();
81+
LazyRow lazyRow = new LazyRow();
82+
lazyRow.init(segment, 1);
83+
HashSet<String> columnNames = new HashSet<>(Arrays.asList("col1", "col2"));
84+
when(segment.getColumnNames()).thenReturn(columnNames);
85+
86+
assertEquals(lazyRow.getColumnNames(), columnNames);
87+
}
88+
89+
private IndexSegment getMockSegment() {
90+
IndexSegment segment = mock(IndexSegment.class);
91+
_col1Datasource = mock(DataSource.class);
92+
DataSource col2Datasource = mock(DataSource.class);
93+
when(segment.getDataSource("col1")).thenReturn(_col1Datasource);
94+
when(segment.getDataSource("col2")).thenReturn(col2Datasource);
95+
96+
NullValueVectorReader col1NullVectorReader = mock(NullValueVectorReader.class);
97+
when(col1NullVectorReader.isNull(1)).thenReturn(true);
98+
NullValueVectorReader col2NullVectorReader = mock(NullValueVectorReader.class);
99+
when(col2NullVectorReader.isNull(1)).thenReturn(false);
100+
when(_col1Datasource.getNullValueVector()).thenReturn(col1NullVectorReader);
101+
when(col2Datasource.getNullValueVector()).thenReturn(col2NullVectorReader);
102+
103+
ForwardIndexReader col1ForwardIndexReader = mock(ForwardIndexReader.class);
104+
when(col1ForwardIndexReader.isSingleValue()).thenReturn(true);
105+
ForwardIndexReader col2ForwardIndexReader = mock(ForwardIndexReader.class);
106+
when(col2ForwardIndexReader.isSingleValue()).thenReturn(true);
107+
when(_col1Datasource.getForwardIndex()).thenReturn(col1ForwardIndexReader);
108+
when(col2Datasource.getForwardIndex()).thenReturn(col2ForwardIndexReader);
109+
when(col2ForwardIndexReader.getDictId(eq(1), any())).thenReturn(1);
110+
111+
Dictionary col1Dictionary = mock(Dictionary.class);
112+
when(_col1Datasource.getDictionary()).thenReturn(col1Dictionary);
113+
_col2Dictionary = mock(Dictionary.class);
114+
when(col2Datasource.getDictionary()).thenReturn(_col2Dictionary);
115+
when(_col2Dictionary.get(1)).thenReturn("val2");
116+
117+
return segment;
118+
}
119+
}

pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashMap;
2424
import java.util.Map;
2525
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
26+
import org.apache.pinot.segment.local.segment.readers.LazyRow;
2627
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
2728
import org.apache.pinot.spi.config.table.UpsertConfig;
2829
import org.apache.pinot.spi.data.FieldSpec;
@@ -86,14 +87,16 @@ public void testMerge(boolean isPreviousNull, Object previousValue, boolean isNe
8687

8788
ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
8889
when(segment.getColumnNames()).thenReturn(Sets.newSet("field1", "field2", "hoursSinceEpoch"));
90+
LazyRow prevRecord = new LazyRow();
91+
prevRecord.init(segment, 1);
8992

9093
GenericRow row = new GenericRow();
9194
if (isNewNull) {
9295
row.putDefaultNullValue(columnName, newValue);
9396
} else {
9497
row.putValue(columnName, newValue);
9598
}
96-
handler.merge(segment, 1, row);
99+
handler.merge(prevRecord, row);
97100
assertEquals(row.getValue(columnName), expectedValue);
98101
assertEquals(row.isNullValue(columnName), isExpectedNull);
99102
}

0 commit comments

Comments
 (0)