Skip to content

Commit 8732fbd

Browse files
committed
Remap expanded outputs after merging.
1 parent 62b33a5 commit 8732fbd

1 file changed

Lines changed: 4 additions & 2 deletions

File tree

sdks/go/pkg/beam/core/runtime/graphx/translate.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,12 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) {
156156

157157
// If there are external transforms that need expanding, do it now.
158158
if m.needsExpansion {
159-
// Remap outputs of expanded external transforms to be the inputs for all downstream consumers
160-
purgeOutputInput(edges, p)
161159
// Merge the expanded components into the existing pipeline
162160
mergeExpandedWithPipeline(edges, p)
161+
162+
// Remap outputs of expanded external transforms to be the inputs for all downstream consumers
163+
// Must happen after merging, so that the inputs in the expanded transforms are also updated.
164+
purgeOutputInput(edges, p)
163165
}
164166

165167
return p, nil

0 commit comments

Comments
 (0)