[BEAM-12697] Add SBE module and initial classes#15733
[BEAM-12697] Add SBE module and initial classes#15733TheNeuralBit merged 4 commits intoapache:masterfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## master #15733 +/- ##
==========================================
- Coverage 83.77% 83.60% -0.18%
==========================================
Files 444 445 +1
Lines 60414 61329 +915
==========================================
+ Hits 50614 51276 +662
- Misses 9800 10053 +253
Continue to review full report at Codecov.
|
|
Run Java_Examples_Dataflow PreCommit |
| private SbeLogicalTypes() {} | ||
|
|
||
| // Unsigned types are all stored at the next highest value. This prevents unexpected behavior | ||
| // when reading and likely has negligible space impact. |
There was a problem hiding this comment.
The space impact will be 2x which could be non-trivial if there are many unsigned integer fields. We could at least save the space on the wire by using the same bit width for the base type. But that would still consume memory on the worker. You'd also have to make a custom logical type for that.
Another option is the protobuf approach. It just maps unsigned integers to their signed counterparts directly and users are responsible for munging negative values. See footnote [2] here: https://developers.google.com/protocol-buffers/docs/proto3#scalar
@reuvenlax have you thought at all about the best way to represent unsigned integers in Java beam schemas?
There was a problem hiding this comment.
My main concern with doing the protobuf approach is that conversion from Row to JSON will use the base value, which will be signed. Since conversion to JSON is probably going to be a standard part of any pipeline using this, I'm a bit concerned that it'll lead to a lot of unexpected output.
Or am I misunderstanding how ToJson works?
| * | ||
| * <p>These are convertible to/from a {@link Row} that is a direct mapping of an SBE composite type. | ||
| */ | ||
| public final class TimeValues { |
There was a problem hiding this comment.
Would it be possible to just use joda or java time types instead of defining our own date/time classes?
There was a problem hiding this comment.
For Row -> T, that'll work.
For T -> Row, the difficulty will be recovering the original unit. This isn't an issue if the SBE schema is a variable unit, since we can just pick a unit and set that. However, SBE allows setting a constant unit, like:
<composite name="UTCTimestampNanos" description="UTC timestamp with nanosecond precision">
<type name="time" primitiveType="uint64" />
<type name="unit" primitiveType="uint8" presence="constant" valueRef="TimeUnit.nanosecond" />
</composite>
(Source)
I'm guessing that there's some potential for detecting this on Row -> SBE type, since the signature will be different:
// Variable unit
public UTCTimestampEncoder unit(final short value)
{
buffer.putByte(offset + 8, (byte)value);
return this;
}
// Constant unit
public short unit()
{
return (short)9;
}
If we assume nanos when doing T -> Row, then going Row -> SBE type, we would either write nanos or reduce precision if constant.
So it is doable, but I'm thinking that:
- It is harder to implement and potentially more error prone.
- It is more expensive (more analysis of reflection).
- A sudden unit change might be surprising, even if it is variable. (e.g. "Why did it output nano when I only input seconds and millis?")
Overall, I thought it was better to use an intermediate type that bridges the gap between T and Row and which preserves the original value/unit.
There was a problem hiding this comment.
So I have two main concerns with this approach:
- I really don't want to get in the business of maintaining a date/time library :)
- It would be preferable to have a library of common types that most connectors are using, so that it's trivial to interop with the schemas that they produce.
We could maybe alleviate (1) by discouraging users from using these types directly, instead preferring to convert to java time or joda time if they need to.
Note we can encode more information in the schema logical type than is represented in the Java type that it maps to. So another approach to be able to represent the mixed vs. fixed precision case could be to have a family of logical types that all map to Instant, but that have different representation types (i.e. wire types):
- micros_instant
- millis_instant
- seconds_instant
- variable_unit_instant (or something)
In the variable unit case the representation type could be row<int64 timestamp, int64 unit>. The drawback with this approach would be that the user can't interrogate the precision of the instant in their Java code, but I'm not so sure that's a significant problem.
There was a problem hiding this comment.
We may want to discuss this on the dev list, as it's a pretty significant decision about how we want to deal with date/time types in Schemas in general.
I don't want your work on SBE to be hung up by that though. Can you still make progress on the schema mapping, just leaving out the types that are represented in this PR for now?
There was a problem hiding this comment.
We can get by on just using a Row. I would still like to have a logical type that provides the same semantic meaning as SBE does.
There was a problem hiding this comment.
I've changed them all to use PassThroughLogicalType<Row> and have removed all the relevant datetime values. I've left UTCDateOnly and LocalMktDate, since they're just using Java's LocalDate, not a custom type.
There was a problem hiding this comment.
What I meant by make progress was to build other infrastructure, like the PayloadSerializerProvider, that only supports the unambiguous types (for now).
If we must provide types that have the same semantic meaning as SBE, I do think it's preferable to define concrete types (as you had before) rather than using Row. Row is supposed to be an implementation detail - we aim to create APIs where users don't need to interact with it directly. I just think we need to be careful creating these types, as users may come to expect date/time library-like functionality. If we're clear from the outset that these are intended to be simple containers to faithfully represent SBE data, that could be ok.
However that solution would still have drawback (2) - the types would have little utility outside of SBE. What if a user wanted to use these types in a SqlTransform, or write them to Avro, or take data from one of those sources and write it as SBE? Ideally they could do that without boilerplate to convert between SBE-native date/time types and standard types. This doesn't need to be a blocker, but it's something to think about.
There was a problem hiding this comment.
What I meant by make progress was to build other infrastructure, like the
PayloadSerializerProvider, that only supports the unambiguous types (for now).
Yeah, that's doable. I was just noting that in not explicitly accounting for these types, they'll likely be translated into a Row of primitive fields. That's how we would be handling unfamiliar composite types.
Actually, I am thinking back to my earlier comment about detecting the unit in converting to the SBE type, and I think that analyzing the type with reflection will always be necessary to avoid trying to write the unit when it is constant, which removes the first two concerns I had. The only challenge is getting the right unit in the variable case, but I can think of some ways to do that easily, though we may still choose a less precise unit than the original if the less precise unit would give the same result. I've at least tried this out with Instant, and I'd imagine it will work the same for the other types.
Basically, I'm thinking we could probably use Java time types and determine the wire format that works best. I'll update the logical types, and if there's still concerns, I can remove them and revisit them in a later PR.
There was a problem hiding this comment.
Awesome, thank you. I just realized we probably want to provide a path for joda time where possible, since Beam Java uses it for event times. I don't think it supports nanosecond precision though, which complicates things.
There was a problem hiding this comment.
I've added Java time for this. All of them are converted to/from String for simplicity and consistency. The UTC* and date-only types could use Long and Integer respectively if memory is concern.
I'll do Joda in a later PR.
|
The Java failure seems related to BEAM-11689. Go precommit failures are related to Go tests: Go portable precommit also seems related to the Go test environment: |
|
Yeah it looks like Go precommits are broken at head: https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/4871/ |
|
Run Java_Examples_Dataflow PreCommit |
|
Run Java PreCommit |
|
Run Java_Examples_Dataflow PreCommit |
TheNeuralBit
left a comment
There was a problem hiding this comment.
LGTM, just a minor suggestion. Really sorry I let this drop for so long.
| * Beam schemas with just a primitive. | ||
| */ | ||
| @Experimental(Kind.SCHEMAS) | ||
| public final class SbeLogicalTypes { |
There was a problem hiding this comment.
nit: You might consider making all the LogicalType implementations private, and just exposing concrete instances of them here. I'm also fine merging as-is if you'd prefer
This is what we did in SqlTypes, so that we can easily migrate if/when it's necessary:
There was a problem hiding this comment.
I would prefer to keep them public for now to remain consistent with the way that the protobuf extension does things:
Adds some types for helping to represent an SBE schema in Beam.
This is focused on types found under SBEs date and time encodings: https://www.fixtrading.org/standards/sbe-online/#date-and-time-encoding
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunnercompliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.