Skip to content

Commit 8293bbd

Browse files
authored
[Cherry Pick #28083][prism] support single external env pipelines. (#28111)
* support single external env pipelines. * provide clear error message --------- Co-authored-by: lostluck <[email protected]>
1 parent c42daee commit 8293bbd

File tree

1 file changed

+14
-4
lines changed

1 file changed

+14
-4
lines changed

sdks/go/pkg/beam/runners/prism/internal/execute.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ func RunPipeline(j *jobservices.Job) {
4747
// environments, and start up docker containers, but
4848
// here, we only want and need the go one, operating
4949
// in loopback mode.
50-
env := "go"
50+
envs := j.Pipeline.GetComponents().GetEnvironments()
51+
if len(envs) != 1 {
52+
j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs))
53+
return
54+
}
55+
env, _ := getOnlyPair(envs)
5156
wk := worker.New(env) // Cheating by having the worker id match the environment id.
5257
go wk.Serve()
5358

@@ -302,12 +307,17 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod
302307
return makeWindowCoders(coders[wcID])
303308
}
304309

305-
func getOnlyValue[K comparable, V any](in map[K]V) V {
310+
func getOnlyPair[K comparable, V any](in map[K]V) (K, V) {
306311
if len(in) != 1 {
307312
panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
308313
}
309-
for _, v := range in {
310-
return v
314+
for k, v := range in {
315+
return k, v
311316
}
312317
panic("unreachable")
313318
}
319+
320+
func getOnlyValue[K comparable, V any](in map[K]V) V {
321+
_, v := getOnlyPair(in)
322+
return v
323+
}

0 commit comments

Comments
 (0)