Skip to content

Commit d1d3457

Browse files
authored
Evict closed readers from the cache. (#30133)
Co-authored-by: lostluck <[email protected]>
1 parent 9cfc99b commit d1d3457

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

sdks/go/pkg/beam/core/runtime/harness/datamgr.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -425,16 +425,20 @@ func (c *DataChannel) read(ctx context.Context) {
425425
continue // we've already closed this cached reader, skip
426426
}
427427
r.PTransformDone()
428-
if r.Closed() {
429-
// Clean up local bookkeeping. We'll never see another message
430-
// for it again. We have to be careful not to remove the real
431-
// one, because readers may be initialized after we've seen
432-
// the full stream.
433-
delete(cache, id.instID)
434-
}
435428
}
436429
seenLast = seenLast[:0] // reset for re-use
437430
c.mu.Unlock()
431+
// Scan through the cache and check for any closed readers, and evict them from the cache.
432+
// Readers might be closed out of band from the data messages because we received all data
433+
// for all transforms in an instruction before the instruction even begun. However, we can't
434+
// know this until we received the Control instruction which knows how many transforms for which
435+
// we need to receive data. So we check the cache directly every so often and evict closed
436+
// readers. We will never recieve data for these instructions again.
437+
for instID, r := range cache {
438+
if r.Closed() {
439+
delete(cache, instID)
440+
}
441+
}
438442
}
439443
}
440444
}

0 commit comments

Comments
 (0)