Core: Refactor internal Avro reader to resolve schemas directly#9366
Core: Refactor internal Avro reader to resolve schemas directly#9366rdblue merged 8 commits intoapache:mainfrom
Conversation
| /** | ||
| * An interface for Avro DatumReaders to support custom record classes. | ||
| */ | ||
| interface SupportsCustomRecords { |
There was a problem hiding this comment.
This is internal for now. I'm not sure that we want to expose it more broadly yet.
| } | ||
|
|
||
| public abstract static class PlannedStructReader<S> | ||
| implements ValueReader<S>, SupportsRowPosition { |
There was a problem hiding this comment.
This is the new base class that implements a read plan, which is a list of positions and readers. The read plan is produced by the new logic in GenericAvroReader that handles structs.
| PositionReader(long rowPosition) { | ||
| this.currentPosition = rowPosition - 1; | ||
| } | ||
| static class PositionReader implements ValueReader<Long>, SupportsRowPosition { |
There was a problem hiding this comment.
This refactor includes changes to make PositionReader easier to use. Rather than needing to instantiate it inside of setRowPositionSupplier, it now just implements SupportsRowPosition. The logic for creating a position reader is now part of the object model rather than part of StructReader.
| .as("Field missing from table mapping is renamed") | ||
| .isNotNull(); | ||
| Assertions.assertThat(projected.get("location_r5")) | ||
| Assertions.assertThat(projected.get("location")) |
There was a problem hiding this comment.
These test updates fix the odd behavior caused by rewriting the read schema. Name mapping previously needed to produce fields with incorrect names in order to project fields but not have them read (by name) from the data file.
Fokko
left a comment
There was a problem hiding this comment.
Left a few questions, but looks good! Excited to see the skipping in there as well. When we write the sizes for arrays/maps then this should also speed up reading quite a bit.
| this.nameMapping = MappingUtil.create(schema); | ||
| } | ||
|
|
||
| DatumReader<D> reader; |
There was a problem hiding this comment.
I always like to make these final so you're sure that it doesn't skip through a branch.
| DatumReader<D> reader; | |
| final DatumReader<D> reader; |
There was a problem hiding this comment.
I don't think it is necessary in this case. The compiler will catch if it is unset because no default was provided.
| ((SupportsRowPosition) reader) | ||
| .setRowPositionSupplier(() -> AvroIO.findStartingRowPos(file::newStream, start)); | ||
| .setRowPositionSupplier( | ||
| Suppliers.memoize(() -> AvroIO.findStartingRowPos(file::newStream, start))); |
There was a problem hiding this comment.
Why the memoize? Are we reading the same file multiple times?
There was a problem hiding this comment.
Previously, this was being done in the StructValueReader. If the struct reader inserted a PositionReader, it would also rewrite the position supplier.
That was a lot of complication for the value reader and didn't work in all cases (for example, if two structs had position columns) so I moved the memoization here. It's simpler that way and enabled us to add position readers that are constructed in the same place as the other readers, instead of needing to keep track of the position index in a struct and inject when setRowPositionSupplier is called.
| } | ||
| } | ||
|
|
||
| private static class RequiredOptionReader implements ValueReader<Object> { |
There was a problem hiding this comment.
Why do we need this next to the UnionReader?
There was a problem hiding this comment.
This is actually unused so we could remove it. The purpose is to be able to replace the union reader with one that checks that the value is non-null for cases where the file has an optional field but the expected schema requires it.
With Iceberg's schema evolution rules, we should never have that case, which is why I didn't end up using this (it was complicated and of little value). But I included the class just in case we want it in the future.
It would be good to hear what you think. Should we keep or remove it?
There was a problem hiding this comment.
Since you've approved this and I don't see any other required changes, I'm going to remove this to unblock getting this commit in.
There was a problem hiding this comment.
Thanks for the context. I would remove it, the PR looks good, so feel free to merge it once the tests are green 👍
| Types.NestedField field = expected.field(fieldId); | ||
| if (constant != null) { | ||
| readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); | ||
| } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) { |
There was a problem hiding this comment.
Do we need to codify these cases? They should just follow the Iceberg spec like any other Avro file.
There was a problem hiding this comment.
Yes, I think this is better than how we did it before.
Previously, we would inject these fields in the StructReader, but there improvements that we can make to that approach:
- It isn't the struct reader's responsibility to skip or change the readers that are passed to it. Read "planning" should be done here, where the read and file schemas are both present.
- It wasn't clear what readers should be passed in or produced here, given that readers might be replaced
- This unifies how new optional fields are handled with how row position and metadata fields are handled. It also sets up future default value handling using a constant reader, which is one of the reasons for making these changes.
|
Thanks for reviewing, @Fokko! |
| Object constant = idToConstant.get(fieldId); | ||
| Types.NestedField field = expected.field(fieldId); | ||
| if (constant != null) { | ||
| readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); |
There was a problem hiding this comment.
I think here we need something along the lines of GenericAvroConstantReader that returns the constant in the Avro GenericData.Record format. Right now the ConstantReader class returns the constant object as is. Most of the time this constant is an Iceberg data constant, but what we need here is an Avro GenericData.Record. We can extend ConstantReader<T> here, but is is a private class. Can we promote it to public?
This refactors the Avro generic reader so that it resolves schemas directly (like PyIceberg) rather than creating an Avro schema to trick Avro's
ResolvingDecoderinto projecting columns correctly.This makes the read path easier to maintain because there is no need to hijack and rewrite schemas in
ProjectionDatumReaderusingBuildAvroProjection. This should make it much easier to add default value support.