-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[Feature] Deduplication #8708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Deduplication #8708
Conversation
Codecov Report
@@ Coverage Diff @@
## master #8708 +/- ##
=============================================
- Coverage 69.81% 14.13% -55.69%
+ Complexity 4622 168 -4454
=============================================
Files 1735 1695 -40
Lines 91320 89448 -1872
Branches 13644 13440 -204
=============================================
- Hits 63759 12642 -51117
- Misses 23144 75868 +52724
+ Partials 4417 938 -3479
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
f684376 to
3d42ad5
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job extracting several common properties from upsert and dedup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest modeling it as a util class (TableStateUtils) and have one static method public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType). The _allSegmentsLoaded can still be tracked within the metadata manager. We don't want this util class to track the loaded flag, instead it should always re-calculate the state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_allSegmentsLoaded will need to present in both upsert and dedupe metadata classes separately. Here its with just one instance of this class. Is that okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I suggest modeling this class as a util and not tracking _allSegmentsLoaded within this class is because we may reuse this util method for other features, and we don't want to couple this "check once then always true" semantic into this util method/class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Let's rename it to TableStateUtils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it is okay to increase the value since we are just tracking the row count fed into the index(). We should use another metrics to track the rows ignored because of the dedup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Since we already track the dropped records, we can remove this TODO and consider changing it to a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I think with the metric added, this is no longer needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag is redundant. It is implicit on the presence of partitionDedupMetadataManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag is redundant, and is implicit on the presence of _tableDedupMetadataManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, | |
| .format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove this method. The hash function can come from both upsert config and dedup config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) We don't usually put final for local variables or parameters
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-null config doesn't mean it is enabled
| if (tableConfig.getUpsertConfig() != null) { | |
| if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-null dedup config doesn't mean it is enabled. We either remove the dedupEnabled field and treat non-null dedup as dedup-enabled, or check the flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea here is, if the config json doesn't have dedupeConfig field, no need to run the validaiton
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood. We should also skip the validation when DedupConfig is available, but dedup is not enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
d41aa14 to
f594261
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some non-blocking comments. Good job!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Since we already track the dropped records, we can remove this TODO and consider changing it to a comment
| private final HashFunction _hashFunction; | ||
| private boolean _allSegmentsLoaded; | ||
|
|
||
| // TODO(saurabh) : We can replace this with a ocncurrent Set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Remove this TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
|
|
||
| // TODO(saurabh) : We can replace this with a ocncurrent Set | ||
| @VisibleForTesting | ||
| final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>(); | |
| final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
| } | ||
|
|
||
| public boolean checkRecordPresentOrUpdate(RecordInfo recordInfo, IndexSegment indexSegment) { | ||
| if (!_allSegmentsLoaded) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move the if check into the waitTillAllSegmentsLoaded() for thread safety. It is single threaded now, but in case that changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me understand the thread safety concerns with this? I don't any, single threaded or multi threaded.
Infact, moving this if check inside waitTillAllSegmentsLoaded() would lead to unnecessary serialization even when all segments have already been loaded. Even in single threaded env, that's a heavy lock acquisition cost, when _allSegmentsLoaded is already true.
To the point where, I think we should reduce the critical section here https://github.com/apache/pinot/blob/master/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java#L74, once _allSegmentsLoaded has been set to true, no need to enter a syncronized block.
Do let me know your thoughts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misused the word thread-safety. I was suggesting adding an extra if check in the synchronized block to avoid potential unnecessary checks when multiple threads invoke waitTillAllSegmentsLoaded().
Good point on reducing the critical section in PartialUpsertHandler. We should first check the flag, then enter the critical section
| } | ||
|
|
||
| if (isDedupEnabled() && _partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo, this)) { | ||
| _logger.info("Dropped row {} since its primary key already exists", row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't log anything here, it can flood the log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
| if (_serverMetrics != null) { | ||
| _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1); | ||
| } | ||
| return numDocsIndexed < _capacity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor)
| return numDocsIndexed < _capacity; | |
| return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Let's rename it to TableStateUtils
| "description" : "second", | ||
| "secondsSinceEpoch": 1567205392 | ||
| } | ||
| ] No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
| } | ||
| }, | ||
| "primaryKeyColumns": ["event_id"] | ||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
| } | ||
|
|
||
| @VisibleForTesting | ||
| public static Iterator<RecordInfo> getRecordInfoIterator(IndexSegment segment, List<String> primaryKeyColumns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest returning an iterator of PrimaryKey. For dedup, we don't need the docId and comparisonValue information from the RecordInfo. Similar for the checkRecordPresentOrUpdate() which can just take the PrimaryKey object. This is not a blocker, so maybe put a TODO and address it later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Didn't see any big impact of changing the method signature to accept PK, hence made that change too.
693659f to
b56bfe2
Compare
|
@saurabhd336 Please add documentation for this in |
This PR adds support for enabling deduplication for realtime table, via a top level table config. At a high level, primaryKey (as defined in the table schema) hashes are stored into in-memory data structures, and each incoming row is validated against it. Duplicate rows are dropped. The expectation while using this feature, is for the stream to be partitioned by the primary key,
strictReplicaGrouprouting to be enabled and the configured stream consumer type to belowLevel. These requirements are therefore mandated via tableConfig API's input validations.Design doc: https://docs.google.com/document/d/17sOSRQ1slff30z7jDc0ec5qKwv0xSfPkDjpMOY07POQ/edit?usp=sharing
How to use
https://docs.pinot.apache.org/basics/data-import/dedup