Skip to content

Commit 47e8de3

Browse files
authored
Merge pull request #27884 Also use config pattern for yaml providers.
2 parents c994fdd + 051872e commit 47e8de3

File tree

3 files changed

+87
-41
lines changed

3 files changed

+87
-41
lines changed

sdks/python/apache_beam/yaml/pipeline.schema.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ $defs:
118118
type: { type: string }
119119
transforms:
120120
type: object
121+
properties: { __line__: {}}
121122
additionalProperties:
122123
type: string
123124
required:

sdks/python/apache_beam/yaml/standard_providers.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
# TODO(robertwb): Perhaps auto-generate this file?
2020

2121
- type: 'beamJar'
22-
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'
23-
version: BEAM_VERSION
22+
config:
23+
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'
24+
version: BEAM_VERSION
2425
transforms:
2526
Sql: 'beam:external:java:sql:v1'

sdks/python/apache_beam/yaml/yaml_provider.py

Lines changed: 83 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import os
2626
import subprocess
2727
import sys
28+
import urllib.parse
2829
import uuid
2930
from typing import Any
3031
from typing import Callable
32+
from typing import Dict
3133
from typing import Iterable
3234
from typing import Mapping
3335

@@ -105,6 +107,8 @@ def as_provider_list(name, lst):
105107

106108
class ExternalProvider(Provider):
107109
"""A Provider implemented via the cross language transform service."""
110+
_provider_types: Dict[str, Callable[..., Provider]] = {}
111+
108112
def __init__(self, urns, service):
109113
self._urns = urns
110114
self._service = service
@@ -137,51 +141,90 @@ def create_external_transform(self, urn, args):
137141
external.ImplicitSchemaPayloadBuilder(args).payload(),
138142
self._service)
139143

140-
@staticmethod
141-
def provider_from_spec(spec):
144+
@classmethod
145+
def provider_from_spec(cls, spec):
142146
urns = spec['transforms']
143147
type = spec['type']
144-
if spec.get('version', None) == 'BEAM_VERSION':
145-
spec['version'] = beam_version
146-
if type == 'javaJar':
147-
return ExternalJavaProvider(urns, lambda: spec['jar'])
148-
elif type == 'mavenJar':
149-
return ExternalJavaProvider(
150-
urns,
151-
lambda: subprocess_server.JavaJarServer.path_to_maven_jar(
152-
**{
153-
key: value
154-
for (key, value) in spec.items() if key in [
155-
'artifact_id',
156-
'group_id',
157-
'version',
158-
'repository',
159-
'classifier',
160-
'appendix'
161-
]
162-
}))
163-
elif type == 'beamJar':
164-
return ExternalJavaProvider(
165-
urns,
166-
lambda: subprocess_server.JavaJarServer.path_to_beam_jar(
167-
**{
168-
key: value
169-
for (key, value) in spec.items() if key in
170-
['gradle_target', 'version', 'appendix', 'artifact_id']
171-
}))
172-
elif type == 'pythonPackage':
173-
return ExternalPythonProvider(urns, spec['packages'])
174-
elif type == 'remote':
175-
return RemoteProvider(spec['address'])
176-
elif type == 'docker':
177-
raise NotImplementedError()
148+
from apache_beam.yaml.yaml_transform import SafeLineLoader
149+
config = SafeLineLoader.strip_metadata(spec.get('config', {}))
150+
if config.get('version', None) == 'BEAM_VERSION':
151+
config['version'] = beam_version
152+
if type in cls._provider_types:
153+
try:
154+
return cls._provider_types[type](urns, **config)
155+
except Exception as exn:
156+
raise ValueError(
157+
f'Unable to instantiate provider of type {type} '
158+
f'at line {SafeLineLoader.get_line(spec)}: {exn}') from exn
178159
else:
179-
raise NotImplementedError(f'Unknown provider type: {type}')
180-
181-
160+
raise NotImplementedError(
161+
f'Unknown provider type: {type} '
162+
f'at line {SafeLineLoader.get_line(spec)}.')
163+
164+
@classmethod
165+
def register_provider_type(cls, type_name):
166+
def apply(constructor):
167+
cls._provider_types[type_name] = constructor
168+
169+
return apply
170+
171+
172+
@ExternalProvider.register_provider_type('javaJar')
173+
def java_jar(urns, jar: str):
174+
if not os.path.exists(jar):
175+
parsed = urllib.parse.urlparse(jar)
176+
if not parsed.scheme or not parsed.netloc:
177+
raise ValueError(f'Invalid path or url: {jar}')
178+
return ExternalJavaProvider(urns, lambda: jar)
179+
180+
181+
@ExternalProvider.register_provider_type('mavenJar')
182+
def maven_jar(
183+
urns,
184+
*,
185+
artifact_id,
186+
group_id,
187+
version,
188+
repository=subprocess_server.JavaJarServer.MAVEN_CENTRAL_REPOSITORY,
189+
classifier=None,
190+
appendix=None):
191+
return ExternalJavaProvider(
192+
urns,
193+
lambda: subprocess_server.JavaJarServer.path_to_maven_jar(
194+
artifact_id=artifact_id,
195+
version=version,
196+
repository=repository,
197+
classifier=classifier,
198+
appendix=appendix))
199+
200+
201+
@ExternalProvider.register_provider_type('beamJar')
202+
def beam_jar(
203+
urns,
204+
*,
205+
gradle_target,
206+
appendix=None,
207+
version=beam_version,
208+
artifact_id=None):
209+
return ExternalJavaProvider(
210+
urns,
211+
lambda: subprocess_server.JavaJarServer.path_to_beam_jar(
212+
gradle_target=gradle_target, version=version, artifact_id=artifact_id)
213+
)
214+
215+
216+
@ExternalProvider.register_provider_type('docker')
217+
def docker(urns, **config):
218+
raise NotImplementedError()
219+
220+
221+
@ExternalProvider.register_provider_type('remote')
182222
class RemoteProvider(ExternalProvider):
183223
_is_available = None
184224

225+
def __init__(self, urns, address: str):
226+
super().__init__(urns, service=address)
227+
185228
def available(self):
186229
if self._is_available is None:
187230
try:
@@ -204,6 +247,7 @@ def available(self):
204247
capture_output=True).returncode == 0
205248

206249

250+
@ExternalProvider.register_provider_type('pythonPackage')
207251
class ExternalPythonProvider(ExternalProvider):
208252
def __init__(self, urns, packages):
209253
super().__init__(urns, PypiExpansionService(packages))

0 commit comments

Comments
 (0)