@@ -23,12 +23,17 @@ const (
2323// exist for the same service/instance or service/nodeid combination.
2424type TaskReaper struct {
2525 store * store.MemoryStore
26+
2627 // taskHistory is the number of tasks to keep
2728 taskHistory int64
28- dirty map [orchestrator.SlotTuple ]struct {}
29- orphaned []string
30- stopChan chan struct {}
31- doneChan chan struct {}
29+
30+ // List of slot tubles to be inspected for task history cleanup.
31+ dirty map [orchestrator.SlotTuple ]struct {}
32+
33+ // List of serviceless orphaned tasks collected for cleanup.
34+ orphaned []string
35+ stopChan chan struct {}
36+ doneChan chan struct {}
3237}
3338
3439// New creates a new TaskReaper.
@@ -41,7 +46,11 @@ func New(store *store.MemoryStore) *TaskReaper {
4146 }
4247}
4348
44- // Run is the TaskReaper's main loop.
49+ // Run is the TaskReaper's watch loop which collects candidates for cleanup.
50+ // Task history is mainly used in task restarts but is also available for administrative purposes.
51+ // Note that the task history is stored per-slot-per-service for replicated services
52+ // and per-node-per-service for global services. History does not apply to serviceless
53+ // since they are not attached to a service.
4554func (tr * TaskReaper ) Run (ctx context.Context ) {
4655 watcher , watchCancel := state .Watch (tr .store .WatchQueue (), api.EventCreateTask {}, api.EventUpdateTask {}, api.EventUpdateCluster {})
4756
@@ -59,6 +68,7 @@ func (tr *TaskReaper) Run(ctx context.Context) {
5968 tr .taskHistory = clusters [0 ].Spec .Orchestration .TaskHistoryRetentionLimit
6069 }
6170
71+ // On startup, scan the entire store and inspect orphaned tasks from previous life.
6272 tasks , err = store .FindTasks (readTx , store .ByTaskState (api .TaskStateOrphaned ))
6373 if err != nil {
6474 log .G (ctx ).WithError (err ).Error ("failed to find Orphaned tasks in task reaper init" )
@@ -67,21 +77,31 @@ func (tr *TaskReaper) Run(ctx context.Context) {
6777
6878 if len (tasks ) > 0 {
6979 for _ , t := range tasks {
70- // Do not reap service tasks immediately
80+ // Do not reap service tasks immediately.
81+ // Let them go through the regular history cleanup process
82+ // of checking TaskHistoryRetentionLimit.
7183 if t .ServiceID != "" {
7284 continue
7385 }
7486
87+ // Serviceless tasks can be cleaned up right away since they are not attached to a service.
7588 tr .orphaned = append (tr .orphaned , t .ID )
7689 }
7790
91+ // Clean up orphaned serviceless tasks right away.
7892 if len (tr .orphaned ) > 0 {
7993 tr .tick ()
8094 }
8195 }
8296
97+ // Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
98+ // whichever happens first.
8399 timer := time .NewTimer (reaperBatchingInterval )
84100
101+ // Watch for:
102+ // 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
103+ // 2. EventUpdateTask for cleaning serviceless orphaned tasks (when orchestrator updates the task status to ORPHANED).
104+ // 3. EventUpdateCluster for TaskHistoryRetentionLimit update.
85105 for {
86106 select {
87107 case event := <- watcher :
@@ -118,6 +138,7 @@ func (tr *TaskReaper) Run(ctx context.Context) {
118138 }
119139}
120140
141+ // tick performs task history cleanup.
121142func (tr * TaskReaper ) tick () {
122143 if len (tr .dirty ) == 0 && len (tr .orphaned ) == 0 {
123144 return
@@ -131,6 +152,8 @@ func (tr *TaskReaper) tick() {
131152 for _ , tID := range tr .orphaned {
132153 deleteTasks [tID ] = struct {}{}
133154 }
155+
156+ // Check history of dirty tasks for cleanup.
134157 tr .store .View (func (tx store.ReadTx ) {
135158 for dirty := range tr .dirty {
136159 service := store .GetService (tx , dirty .ServiceID )
@@ -141,8 +164,8 @@ func (tr *TaskReaper) tick() {
141164 taskHistory := tr .taskHistory
142165
143166 // If MaxAttempts is set, keep at least one more than
144- // that number of tasks. This is necessary reconstruct
145- // restart history when the orchestrator starts up.
167+ // that number of tasks (this overrides TaskHistoryRetentionLimit).
168+ // This is necessary to reconstruct restart history when the orchestrator starts up.
146169 // TODO(aaronl): Consider hiding tasks beyond the normal
147170 // retention limit in the UI.
148171 // TODO(aaronl): There are some ways to cut down the
@@ -156,6 +179,7 @@ func (tr *TaskReaper) tick() {
156179 taskHistory = int64 (service .Spec .Task .Restart .MaxAttempts ) + 1
157180 }
158181
182+ // Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
159183 if taskHistory < 0 {
160184 continue
161185 }
@@ -164,13 +188,15 @@ func (tr *TaskReaper) tick() {
164188
165189 switch service .Spec .GetMode ().(type ) {
166190 case * api.ServiceSpec_Replicated :
191+ // Clean out the slot for which we received EventCreateTask.
167192 var err error
168193 historicTasks , err = store .FindTasks (tx , store .BySlot (dirty .ServiceID , dirty .Slot ))
169194 if err != nil {
170195 continue
171196 }
172197
173198 case * api.ServiceSpec_Global :
199+ // Clean out the node history in case of global services.
174200 tasksByNode , err := store .FindTasks (tx , store .ByNodeID (dirty .NodeID ))
175201 if err != nil {
176202 continue
@@ -215,6 +241,7 @@ func (tr *TaskReaper) tick() {
215241 }
216242 })
217243
244+ // Perform cleanup.
218245 if len (deleteTasks ) > 0 {
219246 tr .store .Batch (func (batch * store.Batch ) error {
220247 for taskID := range deleteTasks {
0 commit comments