Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ protected Map<String, String> summary() {
summaryBuilder.set(KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size()));
summaryBuilder.set(REPLACED_MANIFESTS_COUNT, String.valueOf(rewrittenManifests.size() + deletedManifests.size()));
summaryBuilder.set(PROCESSED_ENTRY_COUNT, String.valueOf(entryCount.get()));
summaryBuilder.setPartitionSummaryLimit(0); // do not include partition summaries because data did not change
return summaryBuilder.build();
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ protected String operation() {

@Override
protected Map<String, String> summary() {
summaryBuilder.setPartitionSummaryLimit(ops.current().propertyAsInt(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT));
return summaryBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ private ManifestFile copyManifest(ManifestFile manifest) {

@Override
protected Map<String, String> summary() {
summaryBuilder.setPartitionSummaryLimit(ops.current().propertyAsInt(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT));
return summaryBuilder.build();
}

Expand Down
268 changes: 205 additions & 63 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Joiner.MapJoiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

public class SnapshotSummary {
public static final String ADDED_FILES_PROP = "added-data-files";
Expand All @@ -35,6 +36,8 @@ public class SnapshotSummary {
public static final String ADDED_RECORDS_PROP = "added-records";
public static final String DELETED_RECORDS_PROP = "deleted-records";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still interpret this as the number of records in data files that were removed?

public static final String TOTAL_RECORDS_PROP = "total-records";
public static final String ADDED_FILE_SIZE_PROP = "added-files-size";
public static final String REMOVED_FILE_SIZE_PROP = "removed-files-size";
public static final String ADDED_POS_DELETES_PROP = "added-position-deletes";
public static final String REMOVED_POS_DELETES_PROP = "removed-position-deletes";
public static final String TOTAL_POS_DELETES_PROP = "total-position-deletes";
Expand All @@ -43,12 +46,16 @@ public class SnapshotSummary {
public static final String TOTAL_EQ_DELETES_PROP = "total-equality-deletes";
public static final String DELETED_DUPLICATE_FILES = "deleted-duplicate-files";
public static final String CHANGED_PARTITION_COUNT_PROP = "changed-partition-count";
public static final String CHANGED_PARTITION_PREFIX = "partitions.";
public static final String PARTITION_SUMMARY_PROP = "partition-summaries-included";
public static final String STAGED_WAP_ID_PROP = "wap.id";
public static final String PUBLISHED_WAP_ID_PROP = "published-wap-id";
public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";

public static final MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");

private SnapshotSummary() {
}

Expand All @@ -58,27 +65,31 @@ public static Builder builder() {

public static class Builder {
// commit summary tracking
private Set<String> changedPartitions = Sets.newHashSet();
private long addedFiles = 0L;
private long deletedFiles = 0L;
private long addedDeleteFiles = 0L;
private long removedDeleteFiles = 0L;
private final Map<String, String> properties = Maps.newHashMap();
private final Map<String, UpdateMetrics> partitionMetrics = Maps.newHashMap();
private final UpdateMetrics metrics = new UpdateMetrics();
private int maxChangedPartitionsForSummaries = 0;
private long deletedDuplicateFiles = 0L;
private long addedRecords = 0L;
private long deletedRecords = 0L;
private long addedPosDeletes = 0L;
private long removedPosDeletes = 0L;
private long addedEqDeletes = 0L;
private long removedEqDeletes = 0L;
private Map<String, String> properties = Maps.newHashMap();
private boolean trustPartitionMetrics = true;

public void clear() {
changedPartitions.clear();
this.addedFiles = 0L;
this.deletedFiles = 0L;
partitionMetrics.clear();
metrics.clear();
this.deletedDuplicateFiles = 0L;
this.addedRecords = 0L;
this.deletedRecords = 0L;
this.trustPartitionMetrics = true;
}

/**
* Sets the maximum number of changed partitions before partition summaries will be excluded.
* <p>
* If the number of changed partitions is over this max, summaries will not be included. If the number of changed
* partitions is <= this limit, then partition-level summaries will be included in the summary if they are
* available, and "partition-summaries-included" will be set to "true".
*
* @param max maximum number of changed partitions
*/
public void setPartitionSummaryLimit(int max) {
this.maxChangedPartitionsForSummaries = max;
}

public void incrementDuplicateDeletes() {
Expand All @@ -90,19 +101,13 @@ public void incrementDuplicateDeletes(int increment) {
}

public void addedFile(PartitionSpec spec, DataFile file) {
changedPartitions.add(spec.partitionToPath(file.partition()));
this.addedFiles += 1;
this.addedRecords += file.recordCount();
metrics.addedFile(file);
updatePartitions(spec, file, true);
}

public void addedFile(PartitionSpec spec, DeleteFile file) {
changedPartitions.add(spec.partitionToPath(file.partition()));
this.addedDeleteFiles += 1;
if (file.content() == FileContent.POSITION_DELETES) {
this.addedPosDeletes += file.recordCount();
} else {
this.addedEqDeletes += file.recordCount();
}
metrics.addedFile(file);
updatePartitions(spec, file, true);
}

public void deletedFile(PartitionSpec spec, ContentFile<?> file) {
Expand All @@ -116,43 +121,53 @@ public void deletedFile(PartitionSpec spec, ContentFile<?> file) {
}

public void deletedFile(PartitionSpec spec, DataFile file) {
changedPartitions.add(spec.partitionToPath(file.partition()));
this.deletedFiles += 1;
this.deletedRecords += file.recordCount();
metrics.removedFile(file);
updatePartitions(spec, file, false);
}

public void deletedFile(PartitionSpec spec, DeleteFile file) {
changedPartitions.add(spec.partitionToPath(file.partition()));
this.removedDeleteFiles += 1;
if (file.content() == FileContent.POSITION_DELETES) {
this.removedPosDeletes += file.recordCount();
} else {
this.removedEqDeletes += file.recordCount();
}
metrics.removedFile(file);
updatePartitions(spec, file, false);
}

public void addedManifest(ManifestFile manifest) {
this.addedFiles += manifest.addedFilesCount();
this.addedRecords += manifest.addedRowsCount();
}

public void deletedManifest(ManifestFile manifest) {
this.deletedFiles += manifest.deletedFilesCount();
this.deletedRecords += manifest.deletedRowsCount();
this.trustPartitionMetrics = false;
partitionMetrics.clear();
metrics.addedManifest(manifest);
}

public void set(String property, String value) {
properties.put(property, value);
}

private void updatePartitions(PartitionSpec spec, ContentFile<?> file, boolean isAddition) {
if (trustPartitionMetrics) {
UpdateMetrics partMetrics = partitionMetrics.computeIfAbsent(
spec.partitionToPath(file.partition()),
key -> new UpdateMetrics());

if (isAddition) {
partMetrics.addedFile(file);
} else {
partMetrics.removedFile(file);
}
}
}

public void merge(SnapshotSummary.Builder builder) {
this.changedPartitions.addAll(builder.changedPartitions);
this.addedFiles += builder.addedFiles;
this.deletedFiles += builder.deletedFiles;
properties.putAll(builder.properties);
metrics.merge(builder.metrics);

this.trustPartitionMetrics = trustPartitionMetrics && builder.trustPartitionMetrics;
if (trustPartitionMetrics) {
for (Map.Entry<String, UpdateMetrics> entry : builder.partitionMetrics.entrySet()) {
partitionMetrics.computeIfAbsent(entry.getKey(), key -> new UpdateMetrics()).merge(entry.getValue());
}
} else {
partitionMetrics.clear();
}

this.deletedDuplicateFiles += builder.deletedDuplicateFiles;
this.addedRecords += builder.addedRecords;
this.deletedRecords += builder.deletedRecords;
this.properties.putAll(builder.properties);
}

public Map<String, String> build() {
Expand All @@ -161,27 +176,154 @@ public Map<String, String> build() {
// copy custom summary properties
builder.putAll(properties);

metrics.addTo(builder);
setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles);
Set<String> changedPartitions = partitionMetrics.keySet();
setIf(trustPartitionMetrics, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size());

if (trustPartitionMetrics && changedPartitions.size() <= maxChangedPartitionsForSummaries) {
setIf(changedPartitions.size() > 0, builder, PARTITION_SUMMARY_PROP, "true");
for (String key : changedPartitions) {
setIf(key != null, builder, CHANGED_PARTITION_PREFIX + key, partitionSummary(partitionMetrics.get(key)));
}
}

return builder.build();
}

private static String partitionSummary(UpdateMetrics metrics) {
ImmutableMap.Builder<String, String> partBuilder = ImmutableMap.builder();
metrics.addTo(partBuilder);
return MAP_JOINER.join(partBuilder.build());
}
}

private static class UpdateMetrics {
private long addedSize = 0L;
private long removedSize = 0L;
private int addedFiles = 0;
private int removedFiles = 0;
private int addedDeleteFiles = 0;
private int removedDeleteFiles = 0;
private long addedRecords = 0L;
private long deletedRecords = 0L;
private long addedPosDeletes = 0L;
private long removedPosDeletes = 0L;
private long addedEqDeletes = 0L;
private long removedEqDeletes = 0L;
private boolean trustSizeAndDeleteCounts = true;

void clear() {
this.addedSize = 0L;
this.removedSize = 0L;
this.addedFiles = 0;
this.removedFiles = 0;
this.addedDeleteFiles = 0;
this.removedDeleteFiles = 0;
this.addedRecords = 0L;
this.deletedRecords = 0L;
this.addedPosDeletes = 0L;
this.removedPosDeletes = 0L;
this.addedEqDeletes = 0L;
this.removedEqDeletes = 0L;
this.trustSizeAndDeleteCounts = true;
}

void addTo(ImmutableMap.Builder<String, String> builder) {
setIf(addedFiles > 0, builder, ADDED_FILES_PROP, addedFiles);
setIf(deletedFiles > 0, builder, DELETED_FILES_PROP, deletedFiles);
setIf(removedFiles > 0, builder, DELETED_FILES_PROP, removedFiles);
setIf(addedDeleteFiles > 0, builder, ADDED_DELETE_FILES_PROP, addedDeleteFiles);
setIf(removedDeleteFiles > 0, builder, REMOVED_DELETE_FILES_PROP, removedDeleteFiles);
setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles);
setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords);
setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords);
setIf(addedPosDeletes > 0, builder, ADDED_POS_DELETES_PROP, addedPosDeletes);
setIf(removedPosDeletes > 0, builder, REMOVED_POS_DELETES_PROP, removedPosDeletes);
setIf(addedEqDeletes > 0, builder, ADDED_EQ_DELETES_PROP, addedEqDeletes);
setIf(removedEqDeletes > 0, builder, REMOVED_EQ_DELETES_PROP, removedEqDeletes);
setIf(true, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size());

return builder.build();
if (trustSizeAndDeleteCounts) {
setIf(addedSize > 0, builder, ADDED_FILE_SIZE_PROP, addedSize);
setIf(removedSize > 0, builder, REMOVED_FILE_SIZE_PROP, removedSize);
setIf(addedPosDeletes > 0, builder, ADDED_POS_DELETES_PROP, addedPosDeletes);
setIf(removedPosDeletes > 0, builder, REMOVED_POS_DELETES_PROP, removedPosDeletes);
setIf(addedEqDeletes > 0, builder, ADDED_EQ_DELETES_PROP, addedEqDeletes);
setIf(removedEqDeletes > 0, builder, REMOVED_EQ_DELETES_PROP, removedEqDeletes);
}
}

private static void setIf(boolean expression, ImmutableMap.Builder<String, String> builder,
String property, Object value) {
if (expression) {
builder.put(property, String.valueOf(value));
void addedFile(ContentFile<?> file) {
this.addedSize += file.fileSizeInBytes();
switch (file.content()) {
case DATA:
this.addedFiles += 1;
this.addedRecords += file.recordCount();
break;
case POSITION_DELETES:
this.addedDeleteFiles += 1;
this.addedPosDeletes += file.recordCount();
break;
case EQUALITY_DELETES:
this.addedDeleteFiles += 1;
this.addedEqDeletes += file.recordCount();
break;
default:
throw new UnsupportedOperationException("Unsupported file content type: " + file.content());
}
}

void removedFile(ContentFile<?> file) {
this.removedSize += file.fileSizeInBytes();
switch (file.content()) {
case DATA:
this.removedFiles += 1;
this.deletedRecords += file.recordCount();
break;
case POSITION_DELETES:
this.removedDeleteFiles += 1;
this.removedPosDeletes += file.recordCount();
break;
case EQUALITY_DELETES:
this.removedDeleteFiles += 1;
this.removedEqDeletes += file.recordCount();
break;
default:
throw new UnsupportedOperationException("Unsupported file content type: " + file.content());
}
}

void addedManifest(ManifestFile manifest) {
switch (manifest.content()) {
case DATA:
this.addedFiles += manifest.addedFilesCount();
this.addedRecords += manifest.addedRowsCount();
this.removedFiles += manifest.deletedFilesCount();
this.deletedRecords += manifest.deletedRowsCount();
break;
case DELETES:
this.addedDeleteFiles += manifest.addedFilesCount();
this.removedDeleteFiles += manifest.deletedFilesCount();
this.trustSizeAndDeleteCounts = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because we don't know how many records an equality delete removes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no. It is because we don't have row counts for the number of equality and positional deletes in delete manifests.

break;
}
}

void merge(UpdateMetrics other) {
this.addedFiles += other.addedFiles;
this.removedFiles += other.removedFiles;
this.addedDeleteFiles += other.addedDeleteFiles;
this.removedDeleteFiles += other.removedDeleteFiles;
this.addedSize += other.addedSize;
this.removedSize += other.removedSize;
this.addedRecords += other.addedRecords;
this.deletedRecords += other.deletedRecords;
this.addedPosDeletes += other.addedPosDeletes;
this.removedPosDeletes += other.removedPosDeletes;
this.addedEqDeletes += other.addedEqDeletes;
this.removedEqDeletes += other.removedEqDeletes;
this.trustSizeAndDeleteCounts = trustSizeAndDeleteCounts && other.trustSizeAndDeleteCounts;
}
}

private static void setIf(boolean expression, ImmutableMap.Builder<String, String> builder,
String property, Object value) {
if (expression) {
builder.put(property, String.valueOf(value));
}
}
}
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private TableProperties() {
// If not set, defaults to a "metadata" folder underneath the root path of the table.
public static final String WRITE_METADATA_LOCATION = "write.metadata.path";

public static final String WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit";
public static final int WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0;

public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true;

Expand Down
Loading