|
42 | 42 | import org.apache.beam.sdk.io.fs.MetadataCoder; |
43 | 43 | import org.apache.beam.sdk.io.fs.ResourceId; |
44 | 44 | import org.apache.beam.sdk.io.fs.ResourceIdCoder; |
45 | | -import org.apache.beam.sdk.schemas.NoSuchSchemaException; |
46 | | -import org.apache.beam.sdk.schemas.SchemaRegistry; |
47 | 45 | import org.apache.beam.sdk.transforms.SerializableFunction; |
48 | 46 | import org.apache.beam.sdk.transforms.windowing.IntervalWindow; |
49 | 47 | import org.apache.beam.sdk.util.CoderUtils; |
@@ -197,17 +195,11 @@ public <T> Coder<T> coderFor( |
197 | 195 | * the lexicographically smallest {@link Class#getName() class name} being used. |
198 | 196 | * </ul> |
199 | 197 | */ |
200 | | - public static CoderRegistry createDefault(@Nullable SchemaRegistry schemaRegistry) { |
201 | | - return new CoderRegistry(schemaRegistry); |
202 | | - } |
203 | | - |
204 | | - /** Backwards compatible version of createDefault. */ |
205 | 198 | public static CoderRegistry createDefault() { |
206 | | - return new CoderRegistry(null); |
| 199 | + return new CoderRegistry(); |
207 | 200 | } |
208 | 201 |
|
209 | | - private CoderRegistry(@Nullable SchemaRegistry schemaRegistry) { |
210 | | - this.schemaRegistry = schemaRegistry; |
| 202 | + private CoderRegistry() { |
211 | 203 | coderProviders = new ArrayDeque<>(REGISTERED_CODER_FACTORIES); |
212 | 204 | } |
213 | 205 |
|
@@ -598,8 +590,6 @@ private static boolean isNullOrEmpty(Collection<?> c) { |
598 | 590 | /** The list of {@link CoderProvider coder providers} to use to provide Coders. */ |
599 | 591 | private ArrayDeque<CoderProvider> coderProviders; |
600 | 592 |
|
601 | | - private final @Nullable SchemaRegistry schemaRegistry; |
602 | | - |
603 | 593 | /** |
604 | 594 | * Returns a {@link Coder} to use for values of the given type, in a context where the given types |
605 | 595 | * use the given coders. |
@@ -660,28 +650,16 @@ private Coder<?> getCoderFromParameterizedType( |
660 | 650 |
|
661 | 651 | List<Coder<?>> typeArgumentCoders = new ArrayList<>(); |
662 | 652 | for (Type typeArgument : type.getActualTypeArguments()) { |
663 | | - Coder<?> typeArgumentCoder = null; |
664 | | - if (schemaRegistry != null) { |
665 | | - TypeDescriptor<?> typeDescriptor = TypeDescriptor.of(typeArgument); |
666 | | - try { |
667 | | - typeArgumentCoder = schemaRegistry.getSchemaCoder(typeDescriptor); |
668 | | - } catch (NoSuchSchemaException e) { |
669 | | - // No schema. |
670 | | - } |
671 | | - } |
672 | | - |
673 | | - if (typeArgumentCoder == null) { |
674 | | - try { |
675 | | - typeArgumentCoder = |
676 | | - getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings); |
677 | | - } catch (CannotProvideCoderException exc) { |
678 | | - throw new CannotProvideCoderException( |
679 | | - String.format( |
680 | | - "Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()), |
681 | | - exc); |
682 | | - } |
| 653 | + try { |
| 654 | + Coder<?> typeArgumentCoder = |
| 655 | + getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings); |
| 656 | + typeArgumentCoders.add(typeArgumentCoder); |
| 657 | + } catch (CannotProvideCoderException exc) { |
| 658 | + throw new CannotProvideCoderException( |
| 659 | + String.format( |
| 660 | + "Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()), |
| 661 | + exc); |
683 | 662 | } |
684 | | - typeArgumentCoders.add(typeArgumentCoder); |
685 | 663 | } |
686 | 664 | return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders); |
687 | 665 | } |
|
0 commit comments