Skip to content

Commit 85df5f2

Browse files
authored
Eliminate CalciteUtil.CharType logical type (#24013)
* Eliminate CalciteUtils.CharType logical type * Replace CalciteUtils.CharType to String Note that CalciteUtils still omits the precision of BINARY/VARBINARY/CHAR/VARCHAR as what it originally did. Support of the precision of these calcite types involves make use of making use of the overload method RelDataTypeFactory.createSqlType(var1, var2). * Replace every reference of CalciteUtil.CharType to generic PassThroughLogicalType check * Add TODO to Support sql types with arguments * Use VariableString in LogicalTypeTestCase
1 parent f349f41 commit 85df5f2

6 files changed

Lines changed: 45 additions & 46 deletions

File tree

sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import java.util.Collection;
3333
import org.apache.beam.sdk.schemas.Schema;
3434
import org.apache.beam.sdk.schemas.Schema.FieldType;
35-
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
35+
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
3636
import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
3737
import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer.NullBehavior;
3838
import org.apache.beam.sdk.util.RowJson.RowJsonSerializer;
@@ -133,12 +133,10 @@ private static Object[] makeFlatRowTestCase() {
133133
private static Object[] makeLogicalTypeTestCase() {
134134
Schema schema =
135135
Schema.builder()
136-
.addLogicalTypeField(
137-
"f_passThroughString",
138-
new PassThroughLogicalType<String>(
139-
"SqlCharType", FieldType.STRING, "", FieldType.STRING) {})
136+
.addLogicalTypeField("f_passThroughString", VariableString.of(10))
140137
.build();
141138

139+
// fixed string will do padding
142140
String rowString = "{\n" + "\"f_passThroughString\" : \"hello\"\n" + "}";
143141

144142
Row expectedRow = Row.withSchema(schema).addValues("hello").build();

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,13 @@
4949
import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
5050
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
5151
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
52-
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
5352
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
5453
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
5554
import org.apache.beam.sdk.schemas.Schema;
55+
import org.apache.beam.sdk.schemas.Schema.LogicalType;
5656
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
5757
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
58+
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
5859
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
5960
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
6061
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
@@ -407,15 +408,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
407408
}
408409
return toBeamRow((List<Object>) value, fieldType.getRowSchema(), verifyValues);
409410
case LOGICAL_TYPE:
410-
String identifier = fieldType.getLogicalType().getIdentifier();
411-
if (CharType.IDENTIFIER.equals(identifier)
412-
|| FixedString.IDENTIFIER.equals(identifier)
413-
|| VariableString.IDENTIFIER.equals(identifier)) {
414-
return (String) value;
415-
} else if (FixedBytes.IDENTIFIER.equals(identifier)
416-
|| VariableBytes.IDENTIFIER.equals(identifier)) {
417-
return (byte[]) value;
418-
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
411+
LogicalType<?, ?> logicalType = fieldType.getLogicalType();
412+
assert logicalType != null;
413+
String identifier = logicalType.getIdentifier();
414+
if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
419415
return Instant.ofEpochMilli(((Number) value).longValue());
420416
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
421417
if (value instanceof Date) {
@@ -440,6 +436,9 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
440436
LocalTime.ofNanoOfDay(
441437
(((Number) value).longValue() % MILLIS_PER_DAY) * NANOS_PER_MILLISECOND));
442438
} else {
439+
if (logicalType instanceof PassThroughLogicalType) {
440+
return toBeamObject(value, logicalType.getBaseType(), verifyValues);
441+
}
443442
throw new UnsupportedOperationException("Unable to convert logical type " + identifier);
444443
}
445444
default:
@@ -561,8 +560,7 @@ private static Expression getBeamField(
561560
break;
562561
case LOGICAL_TYPE:
563562
String identifier = fieldType.getLogicalType().getIdentifier();
564-
if (CharType.IDENTIFIER.equals(identifier)
565-
|| FixedString.IDENTIFIER.equals(identifier)
563+
if (FixedString.IDENTIFIER.equals(identifier)
566564
|| VariableString.IDENTIFIER.equals(identifier)) {
567565
value = Expressions.call(expression, "getString", fieldName);
568566
} else if (FixedBytes.IDENTIFIER.equals(identifier)
@@ -643,8 +641,7 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType)
643641
return nullOr(value, toCalciteRow(value, fieldType.getRowSchema()));
644642
case LOGICAL_TYPE:
645643
String identifier = fieldType.getLogicalType().getIdentifier();
646-
if (CharType.IDENTIFIER.equals(identifier)
647-
|| FixedString.IDENTIFIER.equals(identifier)
644+
if (FixedString.IDENTIFIER.equals(identifier)
648645
|| VariableString.IDENTIFIER.equals(identifier)) {
649646
return Expressions.convert_(value, String.class);
650647
} else if (FixedBytes.IDENTIFIER.equals(identifier)

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
3838
import org.apache.beam.sdk.PipelineResult;
3939
import org.apache.beam.sdk.PipelineResult.State;
40-
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
4140
import org.apache.beam.sdk.io.FileSystems;
4241
import org.apache.beam.sdk.metrics.Counter;
4342
import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -50,6 +49,8 @@
5049
import org.apache.beam.sdk.options.PipelineOptionsFactory;
5150
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
5251
import org.apache.beam.sdk.schemas.Schema;
52+
import org.apache.beam.sdk.schemas.Schema.LogicalType;
53+
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
5354
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
5455
import org.apache.beam.sdk.transforms.DoFn;
5556
import org.apache.beam.sdk.transforms.ParDo;
@@ -318,7 +319,9 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
318319

319320
switch (type.getTypeName()) {
320321
case LOGICAL_TYPE:
321-
String logicalId = type.getLogicalType().getIdentifier();
322+
LogicalType<?, ?> logicalType = type.getLogicalType();
323+
assert logicalType != null;
324+
String logicalId = logicalType.getIdentifier();
322325
if (SqlTypes.TIME.getIdentifier().equals(logicalId)) {
323326
if (beamValue instanceof Long) { // base type
324327
return (Long) beamValue;
@@ -331,7 +334,7 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
331334
} else { // input type
332335
return (int) ((LocalDate) beamValue).toEpochDay();
333336
}
334-
} else if (CharType.IDENTIFIER.equals(logicalId)) {
337+
} else if (logicalType instanceof PassThroughLogicalType) {
335338
return beamValue;
336339
} else {
337340
throw new UnsupportedOperationException("Unknown DateTime type " + logicalId);

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,6 @@ public TimeWithLocalTzType() {
5757
}
5858
}
5959

60-
/** A LogicalType corresponding to CHAR. */
61-
public static class CharType extends PassThroughLogicalType<String> {
62-
public static final String IDENTIFIER = "SqlCharType";
63-
64-
public CharType() {
65-
super(IDENTIFIER, FieldType.STRING, "", FieldType.STRING);
66-
}
67-
}
68-
6960
/** Returns true if the type is any of the various date time types. */
7061
public static boolean isDateTimeType(FieldType fieldType) {
7162
if (fieldType.getTypeName() == TypeName.DATETIME) {
@@ -90,10 +81,9 @@ public static boolean isStringType(FieldType fieldType) {
9081
}
9182

9283
if (fieldType.getTypeName().isLogicalType()) {
93-
Schema.LogicalType logicalType = fieldType.getLogicalType();
94-
Preconditions.checkArgumentNotNull(logicalType);
95-
String logicalId = logicalType.getIdentifier();
96-
return logicalId.equals(CharType.IDENTIFIER);
84+
Schema.LogicalType<?, ?> logicalType = fieldType.getLogicalType();
85+
return logicalType instanceof PassThroughLogicalType
86+
&& logicalType.getBaseType().getTypeName() == TypeName.STRING;
9787
}
9888
return false;
9989
}
@@ -107,9 +97,10 @@ public static boolean isStringType(FieldType fieldType) {
10797
public static final FieldType DOUBLE = FieldType.DOUBLE;
10898
public static final FieldType DECIMAL = FieldType.DECIMAL;
10999
public static final FieldType BOOLEAN = FieldType.BOOLEAN;
100+
// TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments
110101
public static final FieldType VARBINARY = FieldType.BYTES;
111102
public static final FieldType VARCHAR = FieldType.STRING;
112-
public static final FieldType CHAR = FieldType.logicalType(new CharType());
103+
public static final FieldType CHAR = FieldType.STRING;
113104
public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE);
114105
public static final FieldType NULLABLE_DATE =
115106
FieldType.logicalType(SqlTypes.DATE).withNullable(true);
@@ -136,7 +127,6 @@ public static boolean isStringType(FieldType fieldType) {
136127
.put(BOOLEAN, SqlTypeName.BOOLEAN)
137128
.put(VARBINARY, SqlTypeName.VARBINARY)
138129
.put(VARCHAR, SqlTypeName.VARCHAR)
139-
.put(CHAR, SqlTypeName.CHAR)
140130
.put(DATE, SqlTypeName.DATE)
141131
.put(TIME, SqlTypeName.TIME)
142132
.put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE)
@@ -154,6 +144,9 @@ public static boolean isStringType(FieldType fieldType) {
154144
.put(SqlTypeName.DOUBLE, DOUBLE)
155145
.put(SqlTypeName.DECIMAL, DECIMAL)
156146
.put(SqlTypeName.BOOLEAN, BOOLEAN)
147+
// TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments
148+
// Handle Calcite VARBINARY/BINARY/VARCHAR/CHAR with
149+
// VariableBinary/FixedBinary/VariableString/FixedString logical types.
157150
.put(SqlTypeName.VARBINARY, VARBINARY)
158151
.put(SqlTypeName.BINARY, VARBINARY)
159152
.put(SqlTypeName.VARCHAR, VARCHAR)

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@
6161
import org.apache.beam.sdk.schemas.Schema;
6262
import org.apache.beam.sdk.schemas.Schema.Field;
6363
import org.apache.beam.sdk.schemas.Schema.FieldType;
64+
import org.apache.beam.sdk.schemas.Schema.LogicalType;
6465
import org.apache.beam.sdk.schemas.Schema.TypeName;
6566
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
67+
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
6668
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
6769
import org.apache.beam.sdk.transforms.SerializableFunction;
6870
import org.apache.beam.sdk.transforms.SerializableFunctions;
@@ -262,7 +264,6 @@ public abstract static class Builder {
262264
.put(SqlTypes.TIME.getIdentifier(), StandardSQLTypeName.TIME)
263265
.put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
264266
.put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
265-
.put("SqlCharType", StandardSQLTypeName.STRING)
266267
.put("Enum", StandardSQLTypeName.STRING)
267268
.build();
268269

@@ -280,6 +281,9 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
280281
Preconditions.checkArgumentNotNull(fieldType.getLogicalType());
281282
ret = BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(logicalType.getIdentifier());
282283
if (ret == null) {
284+
if (logicalType instanceof PassThroughLogicalType) {
285+
return toStandardSQLTypeName(logicalType.getBaseType());
286+
}
283287
throw new IllegalArgumentException(
284288
"Cannot convert Beam logical type: "
285289
+ logicalType.getIdentifier()
@@ -718,7 +722,6 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
718722

719723
// TODO: BigQuery shouldn't know about SQL internal logical types.
720724
private static final Set<String> SQL_DATE_TIME_TYPES = ImmutableSet.of("SqlTimeWithLocalTzType");
721-
private static final Set<String> SQL_STRING_TYPES = ImmutableSet.of("SqlCharType");
722725

723726
/**
724727
* Tries to convert an Avro decoded value to a Beam field value based on the target type of the
@@ -766,7 +769,9 @@ public static Object convertAvroFormat(
766769
case ARRAY:
767770
return convertAvroArray(beamFieldType, avroValue, options);
768771
case LOGICAL_TYPE:
769-
String identifier = beamFieldType.getLogicalType().getIdentifier();
772+
LogicalType<?, ?> logicalType = beamFieldType.getLogicalType();
773+
assert logicalType != null;
774+
String identifier = logicalType.getIdentifier();
770775
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
771776
return convertAvroDate(avroValue);
772777
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
@@ -784,8 +789,8 @@ public static Object convertAvroFormat(
784789
String.format(
785790
"Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));
786791
}
787-
} else if (SQL_STRING_TYPES.contains(identifier)) {
788-
return convertAvroPrimitiveTypes(TypeName.STRING, avroValue);
792+
} else if (logicalType instanceof PassThroughLogicalType) {
793+
return convertAvroFormat(logicalType.getBaseType(), avroValue, options);
789794
} else {
790795
throw new RuntimeException("Unknown logical type " + identifier);
791796
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.google.protobuf.ByteString;
2626
import java.io.Serializable;
2727
import org.apache.beam.sdk.schemas.Schema;
28+
import org.apache.beam.sdk.schemas.Schema.LogicalType;
29+
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
2830
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
2931
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
3032
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Shorts;
@@ -95,11 +97,12 @@ ByteString valueToByteString(Object value, Schema.FieldType type) {
9597
case DATETIME:
9698
return byteString(value.toString().getBytes(UTF_8));
9799
case LOGICAL_TYPE:
98-
String identifier = checkArgumentNotNull(type.getLogicalType()).getIdentifier();
99-
if ("SqlCharType".equals(identifier)) {
100-
return byteString(((String) value).getBytes(UTF_8));
100+
LogicalType<?, ?> logicalType = checkArgumentNotNull(type.getLogicalType());
101+
if (logicalType instanceof PassThroughLogicalType) {
102+
return valueToByteString(value, logicalType.getBaseType());
101103
} else {
102-
throw new IllegalStateException("Unsupported logical type: " + identifier);
104+
throw new IllegalStateException(
105+
"Unsupported logical type: " + logicalType.getIdentifier());
103106
}
104107
default:
105108
throw new IllegalStateException("Unsupported type: " + type.getTypeName());

0 commit comments

Comments
 (0)