Skip to content

Commit fe2b40e

Browse files
committed
Adding a logical type for Schemas using proto serialization.
1 parent 3cd1f7f commit fe2b40e

5 files changed

Lines changed: 90 additions & 3 deletions

File tree

model/pipeline/src/main/proto/beam_runner_api.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,6 +1055,10 @@ message StandardCoders {
10551055
// - A timestamp without a timezone where seconds + micros represents the
10561056
// amount of time since the epoch.
10571057
//
1058+
// beam:logical_type:schema:v1
1059+
// - Representation type: BYTES
1060+
// - A Beam Schema stored as a serialized proto.
1061+
//
10581062
// The payload for RowCoder is an instance of Schema.
10591063
// Components: None
10601064
// Experimental.

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.beam.sdk.schemas.Schema.LogicalType;
4444
import org.apache.beam.sdk.schemas.Schema.TypeName;
4545
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
46+
import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType;
4647
import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType;
4748
import org.apache.beam.sdk.util.SerializableUtils;
4849
import org.apache.beam.sdk.values.Row;
@@ -67,10 +68,13 @@ public class SchemaTranslation {
6768
private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1";
6869

6970
// TODO(BEAM-7855): Populate this with a LogicalTypeRegistrar, which includes a way to construct
70-
// the LogicalType
71-
// given an argument.
71+
// the LogicalType given an argument.
7272
private static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>>
73-
STANDARD_LOGICAL_TYPES = ImmutableMap.of(MicrosInstant.IDENTIFIER, MicrosInstant.class);
73+
STANDARD_LOGICAL_TYPES =
74+
ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder()
75+
.put(MicrosInstant.IDENTIFIER, MicrosInstant.class)
76+
.put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class)
77+
.build();
7478

7579
public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) {
7680
String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.schemas.logicaltypes;
19+
20+
import org.apache.beam.model.pipeline.v1.SchemaApi;
21+
import org.apache.beam.sdk.schemas.Schema;
22+
import org.apache.beam.sdk.schemas.Schema.FieldType;
23+
import org.apache.beam.sdk.schemas.SchemaTranslation;
24+
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.InvalidProtocolBufferException;
25+
import org.checkerframework.checker.nullness.qual.NonNull;
26+
import org.checkerframework.checker.nullness.qual.Nullable;
27+
28+
/** A schema represented as a serialized proto bytes. */
29+
public class SchemaLogicalType implements Schema.LogicalType<Schema, byte[]> {
30+
public static final String IDENTIFIER = "beam:logical_type:schema:v1";
31+
32+
@Override
33+
public String getIdentifier() {
34+
return IDENTIFIER;
35+
}
36+
37+
@Override
38+
public @Nullable FieldType getArgumentType() {
39+
return null;
40+
}
41+
42+
@Override
43+
public FieldType getBaseType() {
44+
return FieldType.BYTES;
45+
}
46+
47+
@Override
48+
public byte @NonNull [] toBaseType(Schema input) {
49+
return SchemaTranslation.schemaToProto(input, true).toByteArray();
50+
}
51+
52+
@Override
53+
public org.apache.beam.sdk.schemas.Schema toInputType(byte @NonNull [] base) {
54+
try {
55+
return SchemaTranslation.schemaFromProto(SchemaApi.Schema.parseFrom(base));
56+
} catch (InvalidProtocolBufferException e) {
57+
throw new RuntimeException(e);
58+
}
59+
}
60+
}

sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
4242
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
4343
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
44+
import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType;
4445
import org.apache.beam.sdk.values.Row;
4546
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
4647
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
@@ -180,6 +181,7 @@ public static Iterable<Schema> data() {
180181
.add(Schema.of(Field.of("logical_argument", FieldType.logicalType(new DateTime()))))
181182
.add(
182183
Schema.of(Field.of("single_arg_argument", FieldType.logicalType(FixedBytes.of(100)))))
184+
.add(Schema.of(Field.of("schema", FieldType.logicalType(new SchemaLogicalType()))))
183185
.build();
184186
}
185187

sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,21 @@ public void testUuid() {
113113
assertEquals(uuid, row.getLogicalTypeValue(0, UUID.class));
114114
assertEquals(uuidAsRow, row.getBaseValue(0, Row.class));
115115
}
116+
117+
@Test
118+
public void testSchema() {
119+
Schema schemaValue =
120+
Schema.of(
121+
Field.of("fieldOne", FieldType.BOOLEAN),
122+
Field.of("nested", FieldType.logicalType(new SchemaLogicalType())));
123+
124+
Schema schema = Schema.builder().addLogicalTypeField("schema", new SchemaLogicalType()).build();
125+
Row row = Row.withSchema(schema).addValues(schemaValue).build();
126+
assertEquals(schemaValue, row.getLogicalTypeValue(0, Schema.class));
127+
128+
// Check base type conversion.
129+
assertEquals(
130+
schemaValue,
131+
new SchemaLogicalType().toInputType(new SchemaLogicalType().toBaseType(schemaValue)));
132+
}
116133
}

0 commit comments

Comments
 (0)