Skip to content

Commit d4c6af2

Browse files
damccormAbacn
andauthored
Revert "Merge pull request #32705: fix schema inference for parameterized types" (#33133) (#33147)
This reverts commit c243491. Co-authored-by: Yi Hu <[email protected]>
1 parent 420ca5b commit d4c6af2

File tree

3 files changed

+12
-54
lines changed

3 files changed

+12
-54
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ public PipelineResult run(PipelineOptions options) {
335335
/** Returns the {@link CoderRegistry} that this {@link Pipeline} uses. */
336336
public CoderRegistry getCoderRegistry() {
337337
if (coderRegistry == null) {
338-
coderRegistry = CoderRegistry.createDefault(getSchemaRegistry());
338+
coderRegistry = CoderRegistry.createDefault();
339339
}
340340
return coderRegistry;
341341
}

sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import org.apache.beam.sdk.io.fs.MetadataCoder;
4343
import org.apache.beam.sdk.io.fs.ResourceId;
4444
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
45-
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
46-
import org.apache.beam.sdk.schemas.SchemaRegistry;
4745
import org.apache.beam.sdk.transforms.SerializableFunction;
4846
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
4947
import org.apache.beam.sdk.util.CoderUtils;
@@ -197,17 +195,11 @@ public <T> Coder<T> coderFor(
197195
* the lexicographically smallest {@link Class#getName() class name} being used.
198196
* </ul>
199197
*/
200-
public static CoderRegistry createDefault(@Nullable SchemaRegistry schemaRegistry) {
201-
return new CoderRegistry(schemaRegistry);
202-
}
203-
204-
/** Backwards compatible version of createDefault. */
205198
public static CoderRegistry createDefault() {
206-
return new CoderRegistry(null);
199+
return new CoderRegistry();
207200
}
208201

209-
private CoderRegistry(@Nullable SchemaRegistry schemaRegistry) {
210-
this.schemaRegistry = schemaRegistry;
202+
private CoderRegistry() {
211203
coderProviders = new ArrayDeque<>(REGISTERED_CODER_FACTORIES);
212204
}
213205

@@ -598,8 +590,6 @@ private static boolean isNullOrEmpty(Collection<?> c) {
598590
/** The list of {@link CoderProvider coder providers} to use to provide Coders. */
599591
private ArrayDeque<CoderProvider> coderProviders;
600592

601-
private final @Nullable SchemaRegistry schemaRegistry;
602-
603593
/**
604594
* Returns a {@link Coder} to use for values of the given type, in a context where the given types
605595
* use the given coders.
@@ -660,28 +650,16 @@ private Coder<?> getCoderFromParameterizedType(
660650

661651
List<Coder<?>> typeArgumentCoders = new ArrayList<>();
662652
for (Type typeArgument : type.getActualTypeArguments()) {
663-
Coder<?> typeArgumentCoder = null;
664-
if (schemaRegistry != null) {
665-
TypeDescriptor<?> typeDescriptor = TypeDescriptor.of(typeArgument);
666-
try {
667-
typeArgumentCoder = schemaRegistry.getSchemaCoder(typeDescriptor);
668-
} catch (NoSuchSchemaException e) {
669-
// No schema.
670-
}
671-
}
672-
673-
if (typeArgumentCoder == null) {
674-
try {
675-
typeArgumentCoder =
676-
getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
677-
} catch (CannotProvideCoderException exc) {
678-
throw new CannotProvideCoderException(
679-
String.format(
680-
"Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()),
681-
exc);
682-
}
653+
try {
654+
Coder<?> typeArgumentCoder =
655+
getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
656+
typeArgumentCoders.add(typeArgumentCoder);
657+
} catch (CannotProvideCoderException exc) {
658+
throw new CannotProvideCoderException(
659+
String.format(
660+
"Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()),
661+
exc);
683662
}
684-
typeArgumentCoders.add(typeArgumentCoder);
685663
}
686664
return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders);
687665
}

sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626
import com.google.auto.service.AutoService;
2727
import com.google.auto.value.AutoValue;
2828
import java.util.List;
29-
import org.apache.beam.sdk.coders.CannotProvideCoderException;
30-
import org.apache.beam.sdk.coders.Coder;
31-
import org.apache.beam.sdk.coders.CoderRegistry;
32-
import org.apache.beam.sdk.coders.IterableCoder;
3329
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
3430
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
3531
import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
@@ -227,22 +223,6 @@ public void testRegisterPojo() throws NoSuchSchemaException {
227223
assertTrue(SIMPLE_POJO_SCHEMA.equivalent(schema));
228224
}
229225

230-
@Test
231-
public void testSchemaTypeParameterInsideCoder() throws CannotProvideCoderException {
232-
SchemaRegistry schemaRegistry = SchemaRegistry.createDefault();
233-
schemaRegistry.registerPOJO(SimplePOJO.class);
234-
235-
CoderRegistry coderRegistry = CoderRegistry.createDefault(schemaRegistry);
236-
Coder<Iterable<SimplePOJO>> coder =
237-
coderRegistry.getCoder(TypeDescriptors.iterables(TypeDescriptor.of(SimplePOJO.class)));
238-
assertTrue(coder instanceof IterableCoder);
239-
assertEquals(1, coder.getCoderArguments().size());
240-
assertTrue(coder.getCoderArguments().get(0) instanceof SchemaCoder);
241-
assertTrue(
242-
SIMPLE_POJO_SCHEMA.equivalent(
243-
((SchemaCoder<SimplePOJO>) coder.getCoderArguments().get(0)).getSchema()));
244-
}
245-
246226
@Test
247227
public void testRegisterJavaBean() throws NoSuchSchemaException {
248228
SchemaRegistry registry = SchemaRegistry.createDefault();

0 commit comments

Comments
 (0)