-
Notifications
You must be signed in to change notification settings - Fork 3k
Add partition summaries in SnapshotSummary builder #1367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,9 +21,10 @@ | |
|
|
||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Joiner; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Joiner.MapJoiner; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
|
|
||
| public class SnapshotSummary { | ||
| public static final String ADDED_FILES_PROP = "added-data-files"; | ||
|
|
@@ -35,6 +36,8 @@ public class SnapshotSummary { | |
| public static final String ADDED_RECORDS_PROP = "added-records"; | ||
| public static final String DELETED_RECORDS_PROP = "deleted-records"; | ||
| public static final String TOTAL_RECORDS_PROP = "total-records"; | ||
| public static final String ADDED_FILE_SIZE_PROP = "added-files-size"; | ||
| public static final String REMOVED_FILE_SIZE_PROP = "removed-files-size"; | ||
| public static final String ADDED_POS_DELETES_PROP = "added-position-deletes"; | ||
| public static final String REMOVED_POS_DELETES_PROP = "removed-position-deletes"; | ||
| public static final String TOTAL_POS_DELETES_PROP = "total-position-deletes"; | ||
|
|
@@ -43,12 +46,16 @@ public class SnapshotSummary { | |
| public static final String TOTAL_EQ_DELETES_PROP = "total-equality-deletes"; | ||
| public static final String DELETED_DUPLICATE_FILES = "deleted-duplicate-files"; | ||
| public static final String CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"; | ||
| public static final String CHANGED_PARTITION_PREFIX = "partitions."; | ||
| public static final String PARTITION_SUMMARY_PROP = "partition-summaries-included"; | ||
| public static final String STAGED_WAP_ID_PROP = "wap.id"; | ||
| public static final String PUBLISHED_WAP_ID_PROP = "published-wap-id"; | ||
| public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id"; | ||
| public static final String REPLACE_PARTITIONS_PROP = "replace-partitions"; | ||
| public static final String EXTRA_METADATA_PREFIX = "snapshot-property."; | ||
|
|
||
| public static final MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("="); | ||
|
|
||
| private SnapshotSummary() { | ||
| } | ||
|
|
||
|
|
@@ -58,27 +65,31 @@ public static Builder builder() { | |
|
|
||
| public static class Builder { | ||
| // commit summary tracking | ||
| private Set<String> changedPartitions = Sets.newHashSet(); | ||
| private long addedFiles = 0L; | ||
| private long deletedFiles = 0L; | ||
| private long addedDeleteFiles = 0L; | ||
| private long removedDeleteFiles = 0L; | ||
| private final Map<String, String> properties = Maps.newHashMap(); | ||
| private final Map<String, UpdateMetrics> partitionMetrics = Maps.newHashMap(); | ||
| private final UpdateMetrics metrics = new UpdateMetrics(); | ||
| private int maxChangedPartitionsForSummaries = 0; | ||
| private long deletedDuplicateFiles = 0L; | ||
| private long addedRecords = 0L; | ||
| private long deletedRecords = 0L; | ||
| private long addedPosDeletes = 0L; | ||
| private long removedPosDeletes = 0L; | ||
| private long addedEqDeletes = 0L; | ||
| private long removedEqDeletes = 0L; | ||
| private Map<String, String> properties = Maps.newHashMap(); | ||
| private boolean trustPartitionMetrics = true; | ||
|
|
||
| public void clear() { | ||
| changedPartitions.clear(); | ||
| this.addedFiles = 0L; | ||
| this.deletedFiles = 0L; | ||
| partitionMetrics.clear(); | ||
| metrics.clear(); | ||
| this.deletedDuplicateFiles = 0L; | ||
| this.addedRecords = 0L; | ||
| this.deletedRecords = 0L; | ||
| this.trustPartitionMetrics = true; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the maximum number of changed partitions before partition summaries will be excluded. | ||
| * <p> | ||
| * If the number of changed partitions is over this max, summaries will not be included. If the number of changed | ||
| * partitions is <= this limit, then partition-level summaries will be included in the summary if they are | ||
| * available, and "partition-summaries-included" will be set to "true". | ||
| * | ||
| * @param max maximum number of changed partitions | ||
| */ | ||
| public void setPartitionSummaryLimit(int max) { | ||
| this.maxChangedPartitionsForSummaries = max; | ||
| } | ||
|
|
||
| public void incrementDuplicateDeletes() { | ||
|
|
@@ -90,19 +101,13 @@ public void incrementDuplicateDeletes(int increment) { | |
| } | ||
|
|
||
| public void addedFile(PartitionSpec spec, DataFile file) { | ||
| changedPartitions.add(spec.partitionToPath(file.partition())); | ||
| this.addedFiles += 1; | ||
| this.addedRecords += file.recordCount(); | ||
| metrics.addedFile(file); | ||
| updatePartitions(spec, file, true); | ||
| } | ||
|
|
||
| public void addedFile(PartitionSpec spec, DeleteFile file) { | ||
| changedPartitions.add(spec.partitionToPath(file.partition())); | ||
| this.addedDeleteFiles += 1; | ||
| if (file.content() == FileContent.POSITION_DELETES) { | ||
| this.addedPosDeletes += file.recordCount(); | ||
| } else { | ||
| this.addedEqDeletes += file.recordCount(); | ||
| } | ||
| metrics.addedFile(file); | ||
| updatePartitions(spec, file, true); | ||
| } | ||
|
|
||
| public void deletedFile(PartitionSpec spec, ContentFile<?> file) { | ||
|
|
@@ -116,43 +121,53 @@ public void deletedFile(PartitionSpec spec, ContentFile<?> file) { | |
| } | ||
|
|
||
| public void deletedFile(PartitionSpec spec, DataFile file) { | ||
| changedPartitions.add(spec.partitionToPath(file.partition())); | ||
| this.deletedFiles += 1; | ||
| this.deletedRecords += file.recordCount(); | ||
| metrics.removedFile(file); | ||
| updatePartitions(spec, file, false); | ||
| } | ||
|
|
||
| public void deletedFile(PartitionSpec spec, DeleteFile file) { | ||
| changedPartitions.add(spec.partitionToPath(file.partition())); | ||
| this.removedDeleteFiles += 1; | ||
| if (file.content() == FileContent.POSITION_DELETES) { | ||
| this.removedPosDeletes += file.recordCount(); | ||
| } else { | ||
| this.removedEqDeletes += file.recordCount(); | ||
| } | ||
| metrics.removedFile(file); | ||
| updatePartitions(spec, file, false); | ||
| } | ||
|
|
||
| public void addedManifest(ManifestFile manifest) { | ||
| this.addedFiles += manifest.addedFilesCount(); | ||
| this.addedRecords += manifest.addedRowsCount(); | ||
| } | ||
|
|
||
| public void deletedManifest(ManifestFile manifest) { | ||
| this.deletedFiles += manifest.deletedFilesCount(); | ||
| this.deletedRecords += manifest.deletedRowsCount(); | ||
| this.trustPartitionMetrics = false; | ||
| partitionMetrics.clear(); | ||
| metrics.addedManifest(manifest); | ||
| } | ||
|
|
||
| public void set(String property, String value) { | ||
| properties.put(property, value); | ||
| } | ||
|
|
||
| private void updatePartitions(PartitionSpec spec, ContentFile<?> file, boolean isAddition) { | ||
| if (trustPartitionMetrics) { | ||
| UpdateMetrics partMetrics = partitionMetrics.computeIfAbsent( | ||
| spec.partitionToPath(file.partition()), | ||
| key -> new UpdateMetrics()); | ||
|
|
||
| if (isAddition) { | ||
| partMetrics.addedFile(file); | ||
| } else { | ||
| partMetrics.removedFile(file); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public void merge(SnapshotSummary.Builder builder) { | ||
| this.changedPartitions.addAll(builder.changedPartitions); | ||
| this.addedFiles += builder.addedFiles; | ||
| this.deletedFiles += builder.deletedFiles; | ||
| properties.putAll(builder.properties); | ||
| metrics.merge(builder.metrics); | ||
|
|
||
| this.trustPartitionMetrics = trustPartitionMetrics && builder.trustPartitionMetrics; | ||
| if (trustPartitionMetrics) { | ||
| for (Map.Entry<String, UpdateMetrics> entry : builder.partitionMetrics.entrySet()) { | ||
| partitionMetrics.computeIfAbsent(entry.getKey(), key -> new UpdateMetrics()).merge(entry.getValue()); | ||
| } | ||
| } else { | ||
| partitionMetrics.clear(); | ||
| } | ||
|
|
||
| this.deletedDuplicateFiles += builder.deletedDuplicateFiles; | ||
| this.addedRecords += builder.addedRecords; | ||
| this.deletedRecords += builder.deletedRecords; | ||
| this.properties.putAll(builder.properties); | ||
| } | ||
|
|
||
| public Map<String, String> build() { | ||
|
|
@@ -161,27 +176,154 @@ public Map<String, String> build() { | |
| // copy custom summary properties | ||
| builder.putAll(properties); | ||
|
|
||
| metrics.addTo(builder); | ||
| setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles); | ||
| Set<String> changedPartitions = partitionMetrics.keySet(); | ||
| setIf(trustPartitionMetrics, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size()); | ||
|
|
||
| if (trustPartitionMetrics && changedPartitions.size() <= maxChangedPartitionsForSummaries) { | ||
| setIf(changedPartitions.size() > 0, builder, PARTITION_SUMMARY_PROP, "true"); | ||
| for (String key : changedPartitions) { | ||
| setIf(key != null, builder, CHANGED_PARTITION_PREFIX + key, partitionSummary(partitionMetrics.get(key))); | ||
| } | ||
| } | ||
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
| private static String partitionSummary(UpdateMetrics metrics) { | ||
| ImmutableMap.Builder<String, String> partBuilder = ImmutableMap.builder(); | ||
| metrics.addTo(partBuilder); | ||
| return MAP_JOINER.join(partBuilder.build()); | ||
| } | ||
| } | ||
|
|
||
| private static class UpdateMetrics { | ||
| private long addedSize = 0L; | ||
| private long removedSize = 0L; | ||
| private int addedFiles = 0; | ||
| private int removedFiles = 0; | ||
| private int addedDeleteFiles = 0; | ||
| private int removedDeleteFiles = 0; | ||
| private long addedRecords = 0L; | ||
| private long deletedRecords = 0L; | ||
| private long addedPosDeletes = 0L; | ||
| private long removedPosDeletes = 0L; | ||
| private long addedEqDeletes = 0L; | ||
| private long removedEqDeletes = 0L; | ||
| private boolean trustSizeAndDeleteCounts = true; | ||
|
|
||
| void clear() { | ||
| this.addedSize = 0L; | ||
| this.removedSize = 0L; | ||
| this.addedFiles = 0; | ||
| this.removedFiles = 0; | ||
| this.addedDeleteFiles = 0; | ||
| this.removedDeleteFiles = 0; | ||
| this.addedRecords = 0L; | ||
| this.deletedRecords = 0L; | ||
| this.addedPosDeletes = 0L; | ||
| this.removedPosDeletes = 0L; | ||
| this.addedEqDeletes = 0L; | ||
| this.removedEqDeletes = 0L; | ||
| this.trustSizeAndDeleteCounts = true; | ||
| } | ||
|
|
||
| void addTo(ImmutableMap.Builder<String, String> builder) { | ||
| setIf(addedFiles > 0, builder, ADDED_FILES_PROP, addedFiles); | ||
| setIf(deletedFiles > 0, builder, DELETED_FILES_PROP, deletedFiles); | ||
| setIf(removedFiles > 0, builder, DELETED_FILES_PROP, removedFiles); | ||
| setIf(addedDeleteFiles > 0, builder, ADDED_DELETE_FILES_PROP, addedDeleteFiles); | ||
| setIf(removedDeleteFiles > 0, builder, REMOVED_DELETE_FILES_PROP, removedDeleteFiles); | ||
| setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles); | ||
| setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords); | ||
| setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords); | ||
| setIf(addedPosDeletes > 0, builder, ADDED_POS_DELETES_PROP, addedPosDeletes); | ||
| setIf(removedPosDeletes > 0, builder, REMOVED_POS_DELETES_PROP, removedPosDeletes); | ||
| setIf(addedEqDeletes > 0, builder, ADDED_EQ_DELETES_PROP, addedEqDeletes); | ||
| setIf(removedEqDeletes > 0, builder, REMOVED_EQ_DELETES_PROP, removedEqDeletes); | ||
| setIf(true, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size()); | ||
|
|
||
| return builder.build(); | ||
| if (trustSizeAndDeleteCounts) { | ||
| setIf(addedSize > 0, builder, ADDED_FILE_SIZE_PROP, addedSize); | ||
| setIf(removedSize > 0, builder, REMOVED_FILE_SIZE_PROP, removedSize); | ||
| setIf(addedPosDeletes > 0, builder, ADDED_POS_DELETES_PROP, addedPosDeletes); | ||
| setIf(removedPosDeletes > 0, builder, REMOVED_POS_DELETES_PROP, removedPosDeletes); | ||
| setIf(addedEqDeletes > 0, builder, ADDED_EQ_DELETES_PROP, addedEqDeletes); | ||
| setIf(removedEqDeletes > 0, builder, REMOVED_EQ_DELETES_PROP, removedEqDeletes); | ||
| } | ||
| } | ||
|
|
||
| private static void setIf(boolean expression, ImmutableMap.Builder<String, String> builder, | ||
| String property, Object value) { | ||
| if (expression) { | ||
| builder.put(property, String.valueOf(value)); | ||
| void addedFile(ContentFile<?> file) { | ||
| this.addedSize += file.fileSizeInBytes(); | ||
| switch (file.content()) { | ||
| case DATA: | ||
| this.addedFiles += 1; | ||
| this.addedRecords += file.recordCount(); | ||
| break; | ||
| case POSITION_DELETES: | ||
| this.addedDeleteFiles += 1; | ||
| this.addedPosDeletes += file.recordCount(); | ||
| break; | ||
| case EQUALITY_DELETES: | ||
| this.addedDeleteFiles += 1; | ||
| this.addedEqDeletes += file.recordCount(); | ||
| break; | ||
| default: | ||
| throw new UnsupportedOperationException("Unsupported file content type: " + file.content()); | ||
| } | ||
| } | ||
|
|
||
| void removedFile(ContentFile<?> file) { | ||
| this.removedSize += file.fileSizeInBytes(); | ||
| switch (file.content()) { | ||
| case DATA: | ||
| this.removedFiles += 1; | ||
| this.deletedRecords += file.recordCount(); | ||
| break; | ||
| case POSITION_DELETES: | ||
| this.removedDeleteFiles += 1; | ||
| this.removedPosDeletes += file.recordCount(); | ||
| break; | ||
| case EQUALITY_DELETES: | ||
| this.removedDeleteFiles += 1; | ||
| this.removedEqDeletes += file.recordCount(); | ||
| break; | ||
| default: | ||
| throw new UnsupportedOperationException("Unsupported file content type: " + file.content()); | ||
| } | ||
| } | ||
|
|
||
| void addedManifest(ManifestFile manifest) { | ||
| switch (manifest.content()) { | ||
| case DATA: | ||
| this.addedFiles += manifest.addedFilesCount(); | ||
| this.addedRecords += manifest.addedRowsCount(); | ||
| this.removedFiles += manifest.deletedFilesCount(); | ||
| this.deletedRecords += manifest.deletedRowsCount(); | ||
| break; | ||
| case DELETES: | ||
| this.addedDeleteFiles += manifest.addedFilesCount(); | ||
| this.removedDeleteFiles += manifest.deletedFilesCount(); | ||
| this.trustSizeAndDeleteCounts = false; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this because we don't know how many records an equality delete removes?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, no. It is because we don't have row counts for the number of equality and positional deletes in delete manifests. |
||
| break; | ||
| } | ||
| } | ||
|
|
||
| void merge(UpdateMetrics other) { | ||
| this.addedFiles += other.addedFiles; | ||
| this.removedFiles += other.removedFiles; | ||
| this.addedDeleteFiles += other.addedDeleteFiles; | ||
| this.removedDeleteFiles += other.removedDeleteFiles; | ||
| this.addedSize += other.addedSize; | ||
| this.removedSize += other.removedSize; | ||
| this.addedRecords += other.addedRecords; | ||
| this.deletedRecords += other.deletedRecords; | ||
| this.addedPosDeletes += other.addedPosDeletes; | ||
| this.removedPosDeletes += other.removedPosDeletes; | ||
| this.addedEqDeletes += other.addedEqDeletes; | ||
| this.removedEqDeletes += other.removedEqDeletes; | ||
| this.trustSizeAndDeleteCounts = trustSizeAndDeleteCounts && other.trustSizeAndDeleteCounts; | ||
| } | ||
| } | ||
|
|
||
| private static void setIf(boolean expression, ImmutableMap.Builder<String, String> builder, | ||
| String property, Object value) { | ||
| if (expression) { | ||
| builder.put(property, String.valueOf(value)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still interpret this as the number of records in data files that were removed?