File tree Expand file tree Collapse file tree 1 file changed +11
-7
lines changed
sdks/go/pkg/beam/core/runtime/harness Expand file tree Collapse file tree 1 file changed +11
-7
lines changed Original file line number Diff line number Diff line change @@ -425,16 +425,20 @@ func (c *DataChannel) read(ctx context.Context) {
425425 continue // we've already closed this cached reader, skip
426426 }
427427 r .PTransformDone ()
428- if r .Closed () {
429- // Clean up local bookkeeping. We'll never see another message
430- // for it again. We have to be careful not to remove the real
431- // one, because readers may be initialized after we've seen
432- // the full stream.
433- delete (cache , id .instID )
434- }
435428 }
436429 seenLast = seenLast [:0 ] // reset for re-use
437430 c .mu .Unlock ()
431+ // Scan through the cache and check for any closed readers, and evict them from the cache.
432+ // Readers might be closed out of band from the data messages because we received all data
433+ // for all transforms in an instruction before the instruction even begun. However, we can't
434+ // know this until we received the Control instruction which knows how many transforms for which
435+ // we need to receive data. So we check the cache directly every so often and evict closed
436+ // readers. We will never recieve data for these instructions again.
437+ for instID , r := range cache {
438+ if r .Closed () {
439+ delete (cache , instID )
440+ }
441+ }
438442 }
439443 }
440444}
You can’t perform that action at this time.
0 commit comments