Skip to content

Commit 9160ba2

Browse files
authored
[BEAM-12697] Add SBE module and initial classes (#15733)
* [BEAM-12697] Add SBE module and initial classes * [BEAM-12697] Simplify datetime types * [BEAM-12697] Use Java time * [BEAM-12697] Remove newly unused Guava dependency
1 parent 3769d75 commit 9160ba2

6 files changed

Lines changed: 421 additions & 0 deletions

File tree

sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,8 @@ enum Kind {
106106

107107
/** Portability related APIs. */
108108
PORTABILITY,
109+
110+
/** Extension that is still in development. */
111+
EXTENSION,
109112
}
110113
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
plugins { id 'org.apache.beam.module' }
20+
applyJavaNature(
21+
automaticModuleName: 'org.apache.beam.sdk.extensions.sbe',
22+
)
23+
24+
description = "Apache Beam :: SDKs :: Java :: Extensions :: SBE"
25+
ext.summary = "Add support to Beam for FIX SBE"
26+
27+
dependencies {
28+
compile project(path: ":sdks:java:core", configuration: "shadow")
29+
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
30+
testCompile library.java.junit
31+
}
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.extensions.sbe;
19+
20+
import java.time.Instant;
21+
import java.time.LocalDate;
22+
import java.time.LocalTime;
23+
import java.time.OffsetDateTime;
24+
import java.time.OffsetTime;
25+
import org.apache.beam.sdk.annotations.Experimental;
26+
import org.apache.beam.sdk.annotations.Experimental.Kind;
27+
import org.apache.beam.sdk.schemas.Schema.FieldType;
28+
import org.apache.beam.sdk.schemas.Schema.LogicalType;
29+
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
30+
import org.checkerframework.checker.initialization.qual.Initialized;
31+
import org.checkerframework.checker.nullness.qual.NonNull;
32+
import org.checkerframework.checker.nullness.qual.Nullable;
33+
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
34+
35+
/**
36+
* Classes that represent various SBE semantic types.
37+
*
38+
* <p>Not all of SBE's semantic types are represented here, as some can be reasonably represented in
39+
* Beam schemas with just a primitive.
40+
*/
41+
@Experimental(Kind.SCHEMAS)
42+
public final class SbeLogicalTypes {
43+
// Default argument type values
44+
private static final String DEFAULT_STRING_ARG = "";
45+
46+
private SbeLogicalTypes() {}
47+
48+
// Unsigned types are all stored at the next highest value. This prevents unexpected behavior
49+
// when reading and likely has negligible space impact.
50+
51+
/** Represents SBE's uint8 type. */
52+
public static final class Uint8 extends PassThroughLogicalType<Short> {
53+
public static final String IDENTIFIER = "uint8";
54+
55+
public Uint8() {
56+
super(IDENTIFIER, FieldType.STRING, DEFAULT_STRING_ARG, FieldType.INT16);
57+
}
58+
}
59+
60+
/** Represents SBE's uint16 type. */
61+
public static final class Uint16 extends PassThroughLogicalType<Integer> {
62+
public static final String IDENTIFIER = "uint16";
63+
64+
public Uint16() {
65+
super(IDENTIFIER, FieldType.STRING, DEFAULT_STRING_ARG, FieldType.INT32);
66+
}
67+
}
68+
69+
/** Represents SBE's uint32 type. */
70+
public static final class Uint32 extends PassThroughLogicalType<Long> {
71+
public static final String IDENTIFIER = "uint32";
72+
73+
public Uint32() {
74+
super(IDENTIFIER, FieldType.STRING, DEFAULT_STRING_ARG, FieldType.INT64);
75+
}
76+
}
77+
78+
/** Represents SBE's uint64 type. */
79+
public static final class Uint64 extends PassThroughLogicalType<String> {
80+
// Unknown if anyone will ever use this as a BigInteger, so we're keeping it as a String for
81+
// now.
82+
83+
public static final String IDENTIFIER = "uint64";
84+
85+
public Uint64() {
86+
super(IDENTIFIER, FieldType.STRING, DEFAULT_STRING_ARG, FieldType.STRING);
87+
}
88+
}
89+
90+
// SBE time-based composite and logical types.
91+
92+
/** Represents SBE's UTCTimestamp composite type. */
93+
public static final class UTCTimestamp implements LogicalType<Instant, String> {
94+
public static final String IDENTIFIER = "UTCTimestamp";
95+
96+
@Override
97+
public @UnknownKeyFor @NonNull @Initialized String getIdentifier() {
98+
return IDENTIFIER;
99+
}
100+
101+
@Override
102+
public @Nullable @UnknownKeyFor @Initialized FieldType getArgumentType() {
103+
return null;
104+
}
105+
106+
@Override
107+
public @UnknownKeyFor @NonNull @Initialized FieldType getBaseType() {
108+
return FieldType.INT64;
109+
}
110+
111+
@Override
112+
public @NonNull String toBaseType(@NonNull Instant input) {
113+
return input.toString();
114+
}
115+
116+
@Override
117+
public @NonNull Instant toInputType(@NonNull String base) {
118+
return Instant.parse(base);
119+
}
120+
}
121+
122+
/** Represents SBE's UTCTimeOnly composite type. */
123+
public static final class UTCTimeOnly implements LogicalType<LocalTime, String> {
124+
public static final String IDENTIFIER = "UTCTimeOnly";
125+
126+
@Override
127+
public @UnknownKeyFor @NonNull @Initialized String getIdentifier() {
128+
return IDENTIFIER;
129+
}
130+
131+
@Override
132+
public @Nullable @UnknownKeyFor @Initialized FieldType getArgumentType() {
133+
return null;
134+
}
135+
136+
@Override
137+
public @UnknownKeyFor @NonNull @Initialized FieldType getBaseType() {
138+
return FieldType.INT64;
139+
}
140+
141+
@Override
142+
public @NonNull String toBaseType(@NonNull LocalTime input) {
143+
return input.toString();
144+
}
145+
146+
@Override
147+
public @NonNull LocalTime toInputType(@NonNull String base) {
148+
return LocalTime.parse(base);
149+
}
150+
}
151+
152+
/** Represents SBE's TZTimestamp composite type. */
153+
public static final class TZTimestamp implements LogicalType<OffsetDateTime, String> {
154+
public static final String IDENTIFIER = "TZTimestamp";
155+
156+
@Override
157+
public @UnknownKeyFor @NonNull @Initialized String getIdentifier() {
158+
return IDENTIFIER;
159+
}
160+
161+
@Override
162+
public @Nullable @UnknownKeyFor @Initialized FieldType getArgumentType() {
163+
return null;
164+
}
165+
166+
@Override
167+
public @UnknownKeyFor @NonNull @Initialized FieldType getBaseType() {
168+
return FieldType.STRING;
169+
}
170+
171+
@Override
172+
public @NonNull String toBaseType(@NonNull OffsetDateTime input) {
173+
return input.toString();
174+
}
175+
176+
@Override
177+
public @NonNull OffsetDateTime toInputType(@NonNull String base) {
178+
return OffsetDateTime.parse(base);
179+
}
180+
}
181+
182+
/** Represents SBE's TimeOnly composite type. */
183+
public static final class TZTimeOnly implements LogicalType<OffsetTime, String> {
184+
public static final String IDENTIFIER = "TZTimeOnly";
185+
186+
@Override
187+
public @UnknownKeyFor @NonNull @Initialized String getIdentifier() {
188+
return IDENTIFIER;
189+
}
190+
191+
@Override
192+
public @Nullable @UnknownKeyFor @Initialized FieldType getArgumentType() {
193+
return null;
194+
}
195+
196+
@Override
197+
public @UnknownKeyFor @NonNull @Initialized FieldType getBaseType() {
198+
return FieldType.STRING;
199+
}
200+
201+
@Override
202+
public @NonNull String toBaseType(@NonNull OffsetTime input) {
203+
return input.toString();
204+
}
205+
206+
@Override
207+
public @NonNull OffsetTime toInputType(@NonNull String base) {
208+
return OffsetTime.parse(base);
209+
}
210+
}
211+
212+
/** Helper type for SBE's date types. */
213+
private static class SbeDateType implements LogicalType<LocalDate, String> {
214+
private final String identifier;
215+
216+
SbeDateType(String identifier) {
217+
this.identifier = identifier;
218+
}
219+
220+
@Override
221+
public @UnknownKeyFor @NonNull @Initialized String getIdentifier() {
222+
return identifier;
223+
}
224+
225+
@Override
226+
public @Nullable @UnknownKeyFor @Initialized FieldType getArgumentType() {
227+
return null;
228+
}
229+
230+
@Override
231+
public @UnknownKeyFor @NonNull @Initialized FieldType getBaseType() {
232+
return FieldType.INT32;
233+
}
234+
235+
@Override
236+
public @NonNull String toBaseType(@NonNull LocalDate input) {
237+
return input.toString();
238+
}
239+
240+
@Override
241+
public @NonNull LocalDate toInputType(@NonNull String base) {
242+
return LocalDate.parse(base);
243+
}
244+
}
245+
246+
/** Representation of SBE's UTCDateOnly. */
247+
public static final class UTCDateOnly extends SbeDateType {
248+
public static final String IDENTIFIER = "UTCDateOnly";
249+
250+
public UTCDateOnly() {
251+
super(IDENTIFIER);
252+
}
253+
}
254+
255+
/** Representation of SBE's LocalMktDate. */
256+
public static final class LocalMktDate extends SbeDateType {
257+
public static final String IDENTIFIER = "LocalMktDate";
258+
259+
public LocalMktDate() {
260+
super(IDENTIFIER);
261+
}
262+
}
263+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/** Extension for working with SBE messages in Beam. */
20+
@Experimental(Kind.EXTENSION)
21+
package org.apache.beam.sdk.extensions.sbe;
22+
23+
import org.apache.beam.sdk.annotations.Experimental;
24+
import org.apache.beam.sdk.annotations.Experimental.Kind;

0 commit comments

Comments
 (0)