2525import os
2626import subprocess
2727import sys
28+ import urllib .parse
2829import uuid
2930from typing import Any
3031from typing import Callable
32+ from typing import Dict
3133from typing import Iterable
3234from typing import Mapping
3335
@@ -105,6 +107,8 @@ def as_provider_list(name, lst):
105107
106108class ExternalProvider (Provider ):
107109 """A Provider implemented via the cross language transform service."""
110+ _provider_types : Dict [str , Callable [..., Provider ]] = {}
111+
108112 def __init__ (self , urns , service ):
109113 self ._urns = urns
110114 self ._service = service
@@ -137,51 +141,90 @@ def create_external_transform(self, urn, args):
137141 external .ImplicitSchemaPayloadBuilder (args ).payload (),
138142 self ._service )
139143
140- @staticmethod
141- def provider_from_spec (spec ):
144+ @classmethod
145+ def provider_from_spec (cls , spec ):
142146 urns = spec ['transforms' ]
143147 type = spec ['type' ]
144- if spec .get ('version' , None ) == 'BEAM_VERSION' :
145- spec ['version' ] = beam_version
146- if type == 'javaJar' :
147- return ExternalJavaProvider (urns , lambda : spec ['jar' ])
148- elif type == 'mavenJar' :
149- return ExternalJavaProvider (
150- urns ,
151- lambda : subprocess_server .JavaJarServer .path_to_maven_jar (
152- ** {
153- key : value
154- for (key , value ) in spec .items () if key in [
155- 'artifact_id' ,
156- 'group_id' ,
157- 'version' ,
158- 'repository' ,
159- 'classifier' ,
160- 'appendix'
161- ]
162- }))
163- elif type == 'beamJar' :
164- return ExternalJavaProvider (
165- urns ,
166- lambda : subprocess_server .JavaJarServer .path_to_beam_jar (
167- ** {
168- key : value
169- for (key , value ) in spec .items () if key in
170- ['gradle_target' , 'version' , 'appendix' , 'artifact_id' ]
171- }))
172- elif type == 'pythonPackage' :
173- return ExternalPythonProvider (urns , spec ['packages' ])
174- elif type == 'remote' :
175- return RemoteProvider (spec ['address' ])
176- elif type == 'docker' :
177- raise NotImplementedError ()
148+ from apache_beam .yaml .yaml_transform import SafeLineLoader
149+ config = SafeLineLoader .strip_metadata (spec .get ('config' , {}))
150+ if config .get ('version' , None ) == 'BEAM_VERSION' :
151+ config ['version' ] = beam_version
152+ if type in cls ._provider_types :
153+ try :
154+ return cls ._provider_types [type ](urns , ** config )
155+ except Exception as exn :
156+ raise ValueError (
157+ f'Unable to instantiate provider of type { type } '
158+ f'at line { SafeLineLoader .get_line (spec )} : { exn } ' ) from exn
178159 else :
179- raise NotImplementedError (f'Unknown provider type: { type } ' )
180-
181-
160+ raise NotImplementedError (
161+ f'Unknown provider type: { type } '
162+ f'at line { SafeLineLoader .get_line (spec )} .' )
163+
164+ @classmethod
165+ def register_provider_type (cls , type_name ):
166+ def apply (constructor ):
167+ cls ._provider_types [type_name ] = constructor
168+
169+ return apply
170+
171+
172+ @ExternalProvider .register_provider_type ('javaJar' )
173+ def java_jar (urns , jar : str ):
174+ if not os .path .exists (jar ):
175+ parsed = urllib .parse .urlparse (jar )
176+ if not parsed .scheme or not parsed .netloc :
177+ raise ValueError (f'Invalid path or url: { jar } ' )
178+ return ExternalJavaProvider (urns , lambda : jar )
179+
180+
181+ @ExternalProvider .register_provider_type ('mavenJar' )
182+ def maven_jar (
183+ urns ,
184+ * ,
185+ artifact_id ,
186+ group_id ,
187+ version ,
188+ repository = subprocess_server .JavaJarServer .MAVEN_CENTRAL_REPOSITORY ,
189+ classifier = None ,
190+ appendix = None ):
191+ return ExternalJavaProvider (
192+ urns ,
193+ lambda : subprocess_server .JavaJarServer .path_to_maven_jar (
194+ artifact_id = artifact_id ,
195+ version = version ,
196+ repository = repository ,
197+ classifier = classifier ,
198+ appendix = appendix ))
199+
200+
201+ @ExternalProvider .register_provider_type ('beamJar' )
202+ def beam_jar (
203+ urns ,
204+ * ,
205+ gradle_target ,
206+ appendix = None ,
207+ version = beam_version ,
208+ artifact_id = None ):
209+ return ExternalJavaProvider (
210+ urns ,
211+ lambda : subprocess_server .JavaJarServer .path_to_beam_jar (
212+ gradle_target = gradle_target , version = version , artifact_id = artifact_id )
213+ )
214+
215+
216+ @ExternalProvider .register_provider_type ('docker' )
217+ def docker (urns , ** config ):
218+ raise NotImplementedError ()
219+
220+
221+ @ExternalProvider .register_provider_type ('remote' )
182222class RemoteProvider (ExternalProvider ):
183223 _is_available = None
184224
225+ def __init__ (self , urns , address : str ):
226+ super ().__init__ (urns , service = address )
227+
185228 def available (self ):
186229 if self ._is_available is None :
187230 try :
@@ -204,6 +247,7 @@ def available(self):
204247 capture_output = True ).returncode == 0
205248
206249
250+ @ExternalProvider .register_provider_type ('pythonPackage' )
207251class ExternalPythonProvider (ExternalProvider ):
208252 def __init__ (self , urns , packages ):
209253 super ().__init__ (urns , PypiExpansionService (packages ))
0 commit comments