Skip to content

[BEAM-12697] Add SBE module and initial classes#15733

Merged
TheNeuralBit merged 4 commits intoapache:masterfrom
zhoufek:sbe_lt
Nov 29, 2021
Merged

[BEAM-12697] Add SBE module and initial classes#15733
TheNeuralBit merged 4 commits intoapache:masterfrom
zhoufek:sbe_lt

Conversation

@zhoufek
Copy link
Copy Markdown
Contributor

@zhoufek zhoufek commented Oct 15, 2021

Adds some types for helping to represent an SBE schema in Beam.

This is focused on types found under SBEs date and time encodings: https://www.fixtrading.org/standards/sbe-online/#date-and-time-encoding


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link
Copy Markdown

codecov Bot commented Oct 15, 2021

Codecov Report

Merging #15733 (49c87b2) into master (0111cff) will decrease coverage by 0.17%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #15733      +/-   ##
==========================================
- Coverage   83.77%   83.60%   -0.18%     
==========================================
  Files         444      445       +1     
  Lines       60414    61329     +915     
==========================================
+ Hits        50614    51276     +662     
- Misses       9800    10053     +253     
Impacted Files Coverage Δ
sdks/python/apache_beam/io/gcp/bigquery.py 62.72% <0.00%> (-12.84%) ⬇️
...ython/apache_beam/runners/interactive/sql/utils.py 76.09% <0.00%> (-7.91%) ⬇️
sdks/python/apache_beam/utils/interactive_utils.py 87.80% <0.00%> (-7.32%) ⬇️
...ython/apache_beam/io/gcp/experimental/spannerio.py 82.52% <0.00%> (-5.69%) ⬇️
...thon/apache_beam/runners/worker/operation_specs.py 40.67% <0.00%> (-4.90%) ⬇️
...he_beam/runners/interactive/sql/beam_sql_magics.py 49.75% <0.00%> (-4.79%) ⬇️
...ython/apache_beam/io/gcp/bigquery_read_internal.py 53.92% <0.00%> (-4.24%) ⬇️
...eam/portability/api/beam_expansion_api_pb2_grpc.py 57.89% <0.00%> (-4.02%) ⬇️
sdks/python/apache_beam/io/gcp/bigquery_tools.py 82.91% <0.00%> (-3.82%) ⬇️
...eam/portability/api/beam_provision_api_pb2_grpc.py 73.68% <0.00%> (-2.51%) ⬇️
... and 82 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0111cff...49c87b2. Read the comment docs.

@zhoufek
Copy link
Copy Markdown
Contributor Author

zhoufek commented Oct 15, 2021

Run Java_Examples_Dataflow PreCommit

@zhoufek zhoufek marked this pull request as ready for review October 19, 2021 14:07
@zhoufek
Copy link
Copy Markdown
Contributor Author

zhoufek commented Oct 19, 2021

R: @TheNeuralBit

@TheNeuralBit TheNeuralBit self-requested a review October 19, 2021 16:20
private SbeLogicalTypes() {}

// Unsigned types are all stored at the next highest value. This prevents unexpected behavior
// when reading and likely has negligible space impact.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The space impact will be 2x which could be non-trivial if there are many unsigned integer fields. We could at least save the space on the wire by using the same bit width for the base type. But that would still consume memory on the worker. You'd also have to make a custom logical type for that.

Another option is the protobuf approach. It just maps unsigned integers to their signed counterparts directly and users are responsible for munging negative values. See footnote [2] here: https://developers.google.com/protocol-buffers/docs/proto3#scalar

@reuvenlax have you thought at all about the best way to represent unsigned integers in Java beam schemas?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern with doing the protobuf approach is that conversion from Row to JSON will use the base value, which will be signed. Since conversion to JSON is probably going to be a standard part of any pipeline using this, I'm a bit concerned that it'll lead to a lot of unexpected output.

Or am I misunderstanding how ToJson works?

*
* <p>These are convertible to/from a {@link Row} that is a direct mapping of an SBE composite type.
*/
public final class TimeValues {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to just use joda or java time types instead of defining our own date/time classes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Row -> T, that'll work.

For T -> Row, the difficulty will be recovering the original unit. This isn't an issue if the SBE schema is a variable unit, since we can just pick a unit and set that. However, SBE allows setting a constant unit, like:

<composite name="UTCTimestampNanos" description="UTC timestamp with nanosecond precision">
    <type name="time" primitiveType="uint64" />
    <type name="unit" primitiveType="uint8" presence="constant" valueRef="TimeUnit.nanosecond" />
</composite>

(Source)

I'm guessing that there's some potential for detecting this on Row -> SBE type, since the signature will be different:

// Variable unit
public UTCTimestampEncoder unit(final short value)
{
    buffer.putByte(offset + 8, (byte)value);
    return this;
}

// Constant unit
public short unit()
{
    return (short)9;
}

If we assume nanos when doing T -> Row, then going Row -> SBE type, we would either write nanos or reduce precision if constant.

So it is doable, but I'm thinking that:

  1. It is harder to implement and potentially more error prone.
  2. It is more expensive (more analysis of reflection).
  3. A sudden unit change might be surprising, even if it is variable. (e.g. "Why did it output nano when I only input seconds and millis?")

Overall, I thought it was better to use an intermediate type that bridges the gap between T and Row and which preserves the original value/unit.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I have two main concerns with this approach:

  1. I really don't want to get in the business of maintaining a date/time library :)
  2. It would be preferable to have a library of common types that most connectors are using, so that it's trivial to interop with the schemas that they produce.

We could maybe alleviate (1) by discouraging users from using these types directly, instead preferring to convert to java time or joda time if they need to.

Note we can encode more information in the schema logical type than is represented in the Java type that it maps to. So another approach to be able to represent the mixed vs. fixed precision case could be to have a family of logical types that all map to Instant, but that have different representation types (i.e. wire types):

  • micros_instant
  • millis_instant
  • seconds_instant
  • variable_unit_instant (or something)

In the variable unit case the representation type could be row<int64 timestamp, int64 unit>. The drawback with this approach would be that the user can't interrogate the precision of the instant in their Java code, but I'm not so sure that's a significant problem.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to discuss this on the dev list, as it's a pretty significant decision about how we want to deal with date/time types in Schemas in general.

I don't want your work on SBE to be hung up by that though. Can you still make progress on the schema mapping, just leaving out the types that are represented in this PR for now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get by on just using a Row. I would still like to have a logical type that provides the same semantic meaning as SBE does.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed them all to use PassThroughLogicalType<Row> and have removed all the relevant datetime values. I've left UTCDateOnly and LocalMktDate, since they're just using Java's LocalDate, not a custom type.

Copy link
Copy Markdown
Member

@TheNeuralBit TheNeuralBit Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant by make progress was to build other infrastructure, like the PayloadSerializerProvider, that only supports the unambiguous types (for now).

If we must provide types that have the same semantic meaning as SBE, I do think it's preferable to define concrete types (as you had before) rather than using Row. Row is supposed to be an implementation detail - we aim to create APIs where users don't need to interact with it directly. I just think we need to be careful creating these types, as users may come to expect date/time library-like functionality. If we're clear from the outset that these are intended to be simple containers to faithfully represent SBE data, that could be ok.

However that solution would still have drawback (2) - the types would have little utility outside of SBE. What if a user wanted to use these types in a SqlTransform, or write them to Avro, or take data from one of those sources and write it as SBE? Ideally they could do that without boilerplate to convert between SBE-native date/time types and standard types. This doesn't need to be a blocker, but it's something to think about.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant by make progress was to build other infrastructure, like the PayloadSerializerProvider, that only supports the unambiguous types (for now).

Yeah, that's doable. I was just noting that in not explicitly accounting for these types, they'll likely be translated into a Row of primitive fields. That's how we would be handling unfamiliar composite types.

Actually, I am thinking back to my earlier comment about detecting the unit in converting to the SBE type, and I think that analyzing the type with reflection will always be necessary to avoid trying to write the unit when it is constant, which removes the first two concerns I had. The only challenge is getting the right unit in the variable case, but I can think of some ways to do that easily, though we may still choose a less precise unit than the original if the less precise unit would give the same result. I've at least tried this out with Instant, and I'd imagine it will work the same for the other types.

Basically, I'm thinking we could probably use Java time types and determine the wire format that works best. I'll update the logical types, and if there's still concerns, I can remove them and revisit them in a later PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thank you. I just realized we probably want to provide a path for joda time where possible, since Beam Java uses it for event times. I don't think it supports nanosecond precision though, which complicates things.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added Java time for this. All of them are converted to/from String for simplicity and consistency. The UTC* and date-only types could use Long and Integer respectively if memory is concern.

I'll do Joda in a later PR.

@zhoufek zhoufek requested a review from TheNeuralBit October 29, 2021 21:20
@zhoufek
Copy link
Copy Markdown
Contributor Author

zhoufek commented Nov 1, 2021

The Java failure seems related to BEAM-11689.

Go precommit failures are related to Go tests:

09:45:03 Test for github.com/apache/beam/sdks/v2/go/cmd/starcgen finished, 6 completed, 4 failed.

Go portable precommit also seems related to the Go test environment:

09:53:15 ./run_validatesrunner_tests.sh: line 399: go: command not found

@TheNeuralBit
Copy link
Copy Markdown
Member

@zhoufek
Copy link
Copy Markdown
Contributor Author

zhoufek commented Nov 11, 2021

Run Java_Examples_Dataflow PreCommit

@zhoufek
Copy link
Copy Markdown
Contributor Author

zhoufek commented Nov 11, 2021

Run Java PreCommit

@zhoufek
Copy link
Copy Markdown
Contributor Author

zhoufek commented Nov 11, 2021

Run Java_Examples_Dataflow PreCommit

Copy link
Copy Markdown
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a minor suggestion. Really sorry I let this drop for so long.

* Beam schemas with just a primitive.
*/
@Experimental(Kind.SCHEMAS)
public final class SbeLogicalTypes {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You might consider making all the LogicalType implementations private, and just exposing concrete instances of them here. I'm also fine merging as-is if you'd prefer

This is what we did in SqlTypes, so that we can easily migrate if/when it's necessary:

/** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */
public static final LogicalType<LocalDate, Long> DATE = new Date();
/** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */
public static final LogicalType<LocalTime, Long> TIME = new Time();
/** Beam LogicalType corresponding to ZetaSQL DATETIME type. */
public static final LogicalType<LocalDateTime, Row> DATETIME = new DateTime();
/** Beam LogicalType corresponding to ZetaSQL TIMESTAMP type. */
public static final LogicalType<Instant, Row> TIMESTAMP = new MicrosInstant();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep them public for now to remain consistent with the way that the protobuf extension does things:

public static class UInt32 extends PassThroughLogicalType<Integer> {
public static final String IDENTIFIER = "Uint32";
UInt32() {
super(IDENTIFIER, FieldType.STRING, "", FieldType.INT32);
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good 👍

@TheNeuralBit TheNeuralBit merged commit 9160ba2 into apache:master Nov 29, 2021
@zhoufek zhoufek deleted the sbe_lt branch March 25, 2022 19:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants