Skip to content

Commit 38f4b55

Browse files
Saurabh DubeySaurabh Dubey
authored andcommitted
Review comments
1 parent 9f360b4 commit 38f4b55

File tree

10 files changed

+95
-94
lines changed

10 files changed

+95
-94
lines changed

pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
4040
REALTIME_OFFSET_COMMITS("commits", true),
4141
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
4242
REALTIME_PARTITION_MISMATCH("mismatch", false),
43-
REALTIME_DEDUP_DROPPED("dedup", false),
43+
REALTIME_DEDUP_DROPPED("rows", false),
4444
ROWS_WITH_ERRORS("rows", false),
4545
LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
4646
LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),

pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,6 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
539539
}
540540
for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
541541
try {
542-
// TODO(saurabh): we may have dropped the record due to dedup. Should we increment indexedMessageCount?
543542
canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
544543
indexedMessageCount++;
545544
realtimeRowsConsumedMeter =

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@
2929
import org.apache.pinot.common.metrics.ServerMetrics;
3030
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
3131
import org.apache.pinot.segment.local.utils.HashUtils;
32-
import org.apache.pinot.segment.local.utils.RecordInfo;
33-
import org.apache.pinot.segment.local.utils.tablestate.TableState;
32+
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
3433
import org.apache.pinot.segment.spi.IndexSegment;
3534
import org.apache.pinot.spi.config.table.HashFunction;
3635
import org.apache.pinot.spi.data.readers.PrimaryKey;
@@ -41,18 +40,17 @@
4140

4241
public class PartitionDedupMetadataManager {
4342
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionDedupMetadataManager.class);
43+
private static boolean _allSegmentsLoaded;
4444

4545
private final HelixManager _helixManager;
4646
private final String _tableNameWithType;
4747
private final List<String> _primaryKeyColumns;
4848
private final int _partitionId;
4949
private final ServerMetrics _serverMetrics;
5050
private final HashFunction _hashFunction;
51-
private boolean _allSegmentsLoaded;
5251

53-
// TODO(saurabh) : We can replace this with a ocncurrent Set
5452
@VisibleForTesting
55-
final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>();
53+
final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();
5654

5755
public PartitionDedupMetadataManager(HelixManager helixManager, String tableNameWithType,
5856
List<String> primaryKeyColumns, int partitionId, ServerMetrics serverMetrics, HashFunction hashFunction) {
@@ -65,22 +63,22 @@ public PartitionDedupMetadataManager(HelixManager helixManager, String tableName
6563
}
6664

6765
public void addSegment(IndexSegment segment) {
68-
// Add all PKs to _primaryKeySet
69-
Iterator<RecordInfo> recordInfoIterator = getRecordInfoIterator(segment, _primaryKeyColumns);
70-
while (recordInfoIterator.hasNext()) {
71-
RecordInfo recordInfo = recordInfoIterator.next();
72-
_primaryKeySet.put(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), segment);
66+
// Add all PKs to _primaryKeyToSegmentMap
67+
Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment, _primaryKeyColumns);
68+
while (primaryKeyIterator.hasNext()) {
69+
PrimaryKey pk = primaryKeyIterator.next();
70+
_primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction), segment);
7371
}
7472
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
75-
_primaryKeySet.size());
73+
_primaryKeyToSegmentMap.size());
7674
}
7775

7876
public void removeSegment(IndexSegment segment) {
7977
// TODO(saurabh): Explain reload scenario here
80-
Iterator<RecordInfo> recordInfoIterator = getRecordInfoIterator(segment, _primaryKeyColumns);
81-
while (recordInfoIterator.hasNext()) {
82-
RecordInfo recordInfo = recordInfoIterator.next();
83-
_primaryKeySet.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
78+
Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment, _primaryKeyColumns);
79+
while (primaryKeyIterator.hasNext()) {
80+
PrimaryKey pk = primaryKeyIterator.next();
81+
_primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, _hashFunction),
8482
(primaryKey, currentSegment) -> {
8583
if (currentSegment == segment) {
8684
return null;
@@ -90,18 +88,18 @@ public void removeSegment(IndexSegment segment) {
9088
});
9189
}
9290
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
93-
_primaryKeySet.size());
91+
_primaryKeyToSegmentMap.size());
9492
}
9593

9694
@VisibleForTesting
97-
public static Iterator<RecordInfo> getRecordInfoIterator(IndexSegment segment, List<String> primaryKeyColumns) {
95+
public static Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment, List<String> primaryKeyColumns) {
9896
Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
9997
for (String primaryKeyColumn : primaryKeyColumns) {
10098
columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(segment, primaryKeyColumn));
10199
}
102100
int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
103101
int numPrimaryKeyColumns = primaryKeyColumns.size();
104-
return new Iterator<RecordInfo>() {
102+
return new Iterator<PrimaryKey>() {
105103
private int _docId = 0;
106104

107105
@Override
@@ -110,7 +108,7 @@ public boolean hasNext() {
110108
}
111109

112110
@Override
113-
public RecordInfo next() {
111+
public PrimaryKey next() {
114112
Object[] values = new Object[numPrimaryKeyColumns];
115113
for (int i = 0; i < numPrimaryKeyColumns; i++) {
116114
Object value = columnToReaderMap.get(primaryKeyColumns.get(i)).getValue(_docId);
@@ -119,14 +117,18 @@ public RecordInfo next() {
119117
}
120118
values[i] = value;
121119
}
122-
PrimaryKey primaryKey = new PrimaryKey(values);
123-
return new RecordInfo(primaryKey, _docId++, null);
120+
_docId++;
121+
return new PrimaryKey(values);
124122
}
125123
};
126124
}
127125

128126
private synchronized void waitTillAllSegmentsLoaded() {
129-
while (!TableState.isAllSegmentsLoaded(_helixManager, _tableNameWithType)) {
127+
if (_allSegmentsLoaded) {
128+
return;
129+
}
130+
131+
while (!TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType)) {
130132
LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
131133
try {
132134
//noinspection BusyWait
@@ -138,16 +140,16 @@ private synchronized void waitTillAllSegmentsLoaded() {
138140
_allSegmentsLoaded = true;
139141
}
140142

141-
public boolean checkRecordPresentOrUpdate(RecordInfo recordInfo, IndexSegment indexSegment) {
143+
public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) {
142144
if (!_allSegmentsLoaded) {
143145
waitTillAllSegmentsLoaded();
144146
}
145147

146148
boolean result =
147-
(_primaryKeySet.putIfAbsent(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), indexSegment)
149+
(_primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment)
148150
!= null);
149151
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
150-
_primaryKeySet.size());
152+
_primaryKeyToSegmentMap.size());
151153

152154
return result;
153155
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -489,12 +489,12 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
489489
recordInfo = getRecordInfo(row, numDocsIndexed);
490490
}
491491

492-
if (isDedupEnabled() && _partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo, this)) {
493-
_logger.info("Dropped row {} since its primary key already exists", row);
492+
if (isDedupEnabled() && _partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(),
493+
this)) {
494494
if (_serverMetrics != null) {
495495
_serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1);
496496
}
497-
return numDocsIndexed < _capacity;
497+
return true;
498498
}
499499

500500
if (isUpsertEnabled()) {

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.helix.HelixManager;
2424
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
2525
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
26-
import org.apache.pinot.segment.local.utils.tablestate.TableState;
26+
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
2727
import org.apache.pinot.spi.config.table.UpsertConfig;
2828
import org.apache.pinot.spi.data.Schema;
2929
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -64,7 +64,7 @@ public synchronized boolean isAllSegmentsLoaded() {
6464
if (_allSegmentsLoaded) {
6565
return true;
6666
}
67-
_allSegmentsLoaded = TableState.isAllSegmentsLoaded(_helixManager, _tableNameWithType);
67+
_allSegmentsLoaded = TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType);
6868
return _allSegmentsLoaded;
6969
}
7070

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableState.java renamed to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131

32-
public class TableState {
33-
private static final Logger LOGGER = LoggerFactory.getLogger(TableState.class);
32+
public class TableStateUtils {
33+
private static final Logger LOGGER = LoggerFactory.getLogger(TableStateUtils.class);
3434

35-
private TableState() {
35+
private TableStateUtils() {
3636
}
3737

3838
public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) {

pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
2828
import org.apache.pinot.segment.local.utils.HashUtils;
2929
import org.apache.pinot.segment.local.utils.RecordInfo;
30-
import org.apache.pinot.segment.local.utils.tablestate.TableState;
30+
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
3131
import org.apache.pinot.segment.spi.IndexSegment;
3232
import org.apache.pinot.spi.config.table.HashFunction;
3333
import org.apache.pinot.spi.data.readers.PrimaryKey;
@@ -51,8 +51,8 @@ public class PartitionDedupMetadataManagerTest {
5151

5252
@BeforeClass
5353
public void init() {
54-
MockedStatic mocked = mockStatic(TableState.class);
55-
mocked.when(() -> TableState.isAllSegmentsLoaded(any(), any())).thenReturn(true);
54+
MockedStatic mocked = mockStatic(TableStateUtils.class);
55+
mocked.when(() -> TableStateUtils.isAllSegmentsLoaded(any(), any())).thenReturn(true);
5656
}
5757

5858
@Test
@@ -61,38 +61,38 @@ public void verifyAddRemoveSegment() {
6161
PartitionDedupMetadataManager partitionDedupMetadataManager =
6262
new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
6363
mock(ServerMetrics.class), hashFunction);
64-
Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeySet;
64+
Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
6565

6666
// Add the first segment
67-
List<RecordInfo> recordInfoList1 = new ArrayList<>();
68-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, null));
69-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, null));
70-
recordInfoList1.add(new RecordInfo(getPrimaryKey(2), 2, null));
71-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 3, null));
72-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 4, null));
73-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, null));
67+
List<PrimaryKey> pkList1 = new ArrayList<>();
68+
pkList1.add(getPrimaryKey(0));
69+
pkList1.add(getPrimaryKey(1));
70+
pkList1.add(getPrimaryKey(2));
71+
pkList1.add(getPrimaryKey(0));
72+
pkList1.add(getPrimaryKey(1));
73+
pkList1.add(getPrimaryKey(0));
7474
ImmutableSegmentImpl segment1 = mockSegment(1);
7575
MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
76-
mocked.when(() -> PartitionDedupMetadataManager.getRecordInfoIterator(any(), any()))
77-
.thenReturn(recordInfoList1.iterator());
76+
mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
77+
.thenReturn(pkList1.iterator());
7878

7979
partitionDedupMetadataManager.addSegment(segment1);
8080
checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
8181
checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
8282
checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
8383

84-
recordInfoList1 = new ArrayList<>();
85-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, null));
86-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, null));
87-
recordInfoList1.add(new RecordInfo(getPrimaryKey(2), 2, null));
88-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 3, null));
89-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 4, null));
90-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, null));
84+
pkList1 = new ArrayList<>();
85+
pkList1.add(getPrimaryKey(0));
86+
pkList1.add(getPrimaryKey(1));
87+
pkList1.add(getPrimaryKey(2));
88+
pkList1.add(getPrimaryKey(0));
89+
pkList1.add(getPrimaryKey(1));
90+
pkList1.add(getPrimaryKey(0));
9191

9292
mocked.close();
9393
mocked = mockStatic(PartitionDedupMetadataManager.class);
94-
mocked.when(() -> PartitionDedupMetadataManager.getRecordInfoIterator(any(), any()))
95-
.thenReturn(recordInfoList1.iterator());
94+
mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
95+
.thenReturn(pkList1.iterator());
9696

9797
partitionDedupMetadataManager.removeSegment(segment1);
9898
Assert.assertEquals(recordLocationMap.size(), 0);
@@ -105,37 +105,37 @@ public void verifyReloadSegment() {
105105
PartitionDedupMetadataManager partitionDedupMetadataManager =
106106
new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
107107
mock(ServerMetrics.class), hashFunction);
108-
Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeySet;
108+
Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
109109

110110
// Add the first segment
111-
List<RecordInfo> recordInfoList1 = new ArrayList<>();
112-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, null));
113-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, null));
114-
recordInfoList1.add(new RecordInfo(getPrimaryKey(2), 2, null));
115-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 3, null));
116-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 4, null));
117-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, null));
111+
List<PrimaryKey> pkList1 = new ArrayList<>();
112+
pkList1.add(getPrimaryKey(0));
113+
pkList1.add(getPrimaryKey(1));
114+
pkList1.add(getPrimaryKey(2));
115+
pkList1.add(getPrimaryKey(0));
116+
pkList1.add(getPrimaryKey(1));
117+
pkList1.add(getPrimaryKey(0));
118118
ImmutableSegmentImpl segment1 = mockSegment(1);
119119
MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
120-
mocked.when(() -> PartitionDedupMetadataManager.getRecordInfoIterator(any(), any()))
121-
.thenReturn(recordInfoList1.iterator());
120+
mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
121+
.thenReturn(pkList1.iterator());
122122

123123
partitionDedupMetadataManager.addSegment(segment1);
124124

125125
// Remove another segment with same PK rows
126-
recordInfoList1 = new ArrayList<>();
127-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, null));
128-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, null));
129-
recordInfoList1.add(new RecordInfo(getPrimaryKey(2), 2, null));
130-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 3, null));
131-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 4, null));
132-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, null));
126+
pkList1 = new ArrayList<>();
127+
pkList1.add(getPrimaryKey(0));
128+
pkList1.add(getPrimaryKey(1));
129+
pkList1.add(getPrimaryKey(2));
130+
pkList1.add(getPrimaryKey(0));
131+
pkList1.add(getPrimaryKey(1));
132+
pkList1.add(getPrimaryKey(0));
133133
ImmutableSegmentImpl segment2 = mockSegment(1);
134134

135135
mocked.close();
136136
mocked = mockStatic(PartitionDedupMetadataManager.class);
137-
mocked.when(() -> PartitionDedupMetadataManager.getRecordInfoIterator(any(), any()))
138-
.thenReturn(recordInfoList1.iterator());
137+
mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
138+
.thenReturn(pkList1.iterator());
139139

140140
partitionDedupMetadataManager.removeSegment(segment2);
141141
Assert.assertEquals(recordLocationMap.size(), 3);
@@ -153,38 +153,38 @@ public void verifyAddRow() {
153153
PartitionDedupMetadataManager partitionDedupMetadataManager =
154154
new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
155155
mock(ServerMetrics.class), hashFunction);
156-
Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeySet;
156+
Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
157157

158158
// Add the first segment
159-
List<RecordInfo> recordInfoList1 = new ArrayList<>();
160-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, null));
161-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, null));
162-
recordInfoList1.add(new RecordInfo(getPrimaryKey(2), 2, null));
163-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 3, null));
164-
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 4, null));
165-
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, null));
159+
List<PrimaryKey> pkList1 = new ArrayList<>();
160+
pkList1.add(getPrimaryKey(0));
161+
pkList1.add(getPrimaryKey(1));
162+
pkList1.add(getPrimaryKey(2));
163+
pkList1.add(getPrimaryKey(0));
164+
pkList1.add(getPrimaryKey(1));
165+
pkList1.add(getPrimaryKey(0));
166166
ImmutableSegmentImpl segment1 = mockSegment(1);
167167
MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
168-
mocked.when(() -> PartitionDedupMetadataManager.getRecordInfoIterator(any(), any()))
169-
.thenReturn(recordInfoList1.iterator());
168+
mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
169+
.thenReturn(pkList1.iterator());
170170
partitionDedupMetadataManager.addSegment(segment1);
171171
mocked.close();
172172

173173
// Same PK exists
174174
RecordInfo recordInfo = mock(RecordInfo.class);
175175
when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(0));
176176
ImmutableSegmentImpl segment2 = mockSegment(2);
177-
Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo, segment2));
177+
Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
178178
checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
179179

180180
// New PK
181181
when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
182-
Assert.assertFalse(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo, segment2));
182+
Assert.assertFalse(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
183183
checkRecordLocation(recordLocationMap, 3, segment2, hashFunction);
184184

185185
// Same PK as the one recently ingested
186186
when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
187-
Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo, segment2));
187+
Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
188188
}
189189

190190
private static ImmutableSegmentImpl mockSegment(int sequenceNumber) {

0 commit comments

Comments
 (0)