Skip to content

Commit 0769c60

Browse files
authored
Comments for orphaned state/task reaper. (#2421)
Signed-off-by: Anshul Pundir <[email protected]>
1 parent de950a7 commit 0769c60

File tree

3 files changed

+43
-10
lines changed

3 files changed

+43
-10
lines changed

api/types.pb.go

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/types.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,10 @@ enum TaskState {
458458
SHUTDOWN = 640 [(gogoproto.enumvalue_customname)="TaskStateShutdown"]; // orchestrator requested shutdown
459459
FAILED = 704 [(gogoproto.enumvalue_customname)="TaskStateFailed"]; // task execution failed with error
460460
REJECTED = 768 [(gogoproto.enumvalue_customname)="TaskStateRejected"]; // task could not be executed here.
461-
ORPHANED = 832 [(gogoproto.enumvalue_customname)="TaskStateOrphaned"]; // The node on which this task is scheduled is Down for too long
461+
// The main purpose of this state is to free up resources associated with service tasks on
462+
// unresponsive nodes without having to delete those tasks. This state is directly assigned
463+
// to the task by the orchestrator.
464+
ORPHANED = 832 [(gogoproto.enumvalue_customname)="TaskStateOrphaned"];
462465

463466
// NOTE(stevvooe): The state of a task is actually a lamport clock, in that
464467
// given two observations, the greater of the two can be considered

manager/orchestrator/taskreaper/task_reaper.go

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,17 @@ const (
2323
// exist for the same service/instance or service/nodeid combination.
2424
type TaskReaper struct {
2525
store *store.MemoryStore
26+
2627
// taskHistory is the number of tasks to keep
2728
taskHistory int64
28-
dirty map[orchestrator.SlotTuple]struct{}
29-
orphaned []string
30-
stopChan chan struct{}
31-
doneChan chan struct{}
29+
30+
// List of slot tubles to be inspected for task history cleanup.
31+
dirty map[orchestrator.SlotTuple]struct{}
32+
33+
// List of serviceless orphaned tasks collected for cleanup.
34+
orphaned []string
35+
stopChan chan struct{}
36+
doneChan chan struct{}
3237
}
3338

3439
// New creates a new TaskReaper.
@@ -41,7 +46,11 @@ func New(store *store.MemoryStore) *TaskReaper {
4146
}
4247
}
4348

44-
// Run is the TaskReaper's main loop.
49+
// Run is the TaskReaper's watch loop which collects candidates for cleanup.
50+
// Task history is mainly used in task restarts but is also available for administrative purposes.
51+
// Note that the task history is stored per-slot-per-service for replicated services
52+
// and per-node-per-service for global services. History does not apply to serviceless
53+
// since they are not attached to a service.
4554
func (tr *TaskReaper) Run(ctx context.Context) {
4655
watcher, watchCancel := state.Watch(tr.store.WatchQueue(), api.EventCreateTask{}, api.EventUpdateTask{}, api.EventUpdateCluster{})
4756

@@ -59,6 +68,7 @@ func (tr *TaskReaper) Run(ctx context.Context) {
5968
tr.taskHistory = clusters[0].Spec.Orchestration.TaskHistoryRetentionLimit
6069
}
6170

71+
// On startup, scan the entire store and inspect orphaned tasks from previous life.
6272
tasks, err = store.FindTasks(readTx, store.ByTaskState(api.TaskStateOrphaned))
6373
if err != nil {
6474
log.G(ctx).WithError(err).Error("failed to find Orphaned tasks in task reaper init")
@@ -67,21 +77,31 @@ func (tr *TaskReaper) Run(ctx context.Context) {
6777

6878
if len(tasks) > 0 {
6979
for _, t := range tasks {
70-
// Do not reap service tasks immediately
80+
// Do not reap service tasks immediately.
81+
// Let them go through the regular history cleanup process
82+
// of checking TaskHistoryRetentionLimit.
7183
if t.ServiceID != "" {
7284
continue
7385
}
7486

87+
// Serviceless tasks can be cleaned up right away since they are not attached to a service.
7588
tr.orphaned = append(tr.orphaned, t.ID)
7689
}
7790

91+
// Clean up orphaned serviceless tasks right away.
7892
if len(tr.orphaned) > 0 {
7993
tr.tick()
8094
}
8195
}
8296

97+
// Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
98+
// whichever happens first.
8399
timer := time.NewTimer(reaperBatchingInterval)
84100

101+
// Watch for:
102+
// 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
103+
// 2. EventUpdateTask for cleaning serviceless orphaned tasks (when orchestrator updates the task status to ORPHANED).
104+
// 3. EventUpdateCluster for TaskHistoryRetentionLimit update.
85105
for {
86106
select {
87107
case event := <-watcher:
@@ -118,6 +138,7 @@ func (tr *TaskReaper) Run(ctx context.Context) {
118138
}
119139
}
120140

141+
// tick performs task history cleanup.
121142
func (tr *TaskReaper) tick() {
122143
if len(tr.dirty) == 0 && len(tr.orphaned) == 0 {
123144
return
@@ -131,6 +152,8 @@ func (tr *TaskReaper) tick() {
131152
for _, tID := range tr.orphaned {
132153
deleteTasks[tID] = struct{}{}
133154
}
155+
156+
// Check history of dirty tasks for cleanup.
134157
tr.store.View(func(tx store.ReadTx) {
135158
for dirty := range tr.dirty {
136159
service := store.GetService(tx, dirty.ServiceID)
@@ -141,8 +164,8 @@ func (tr *TaskReaper) tick() {
141164
taskHistory := tr.taskHistory
142165

143166
// If MaxAttempts is set, keep at least one more than
144-
// that number of tasks. This is necessary reconstruct
145-
// restart history when the orchestrator starts up.
167+
// that number of tasks (this overrides TaskHistoryRetentionLimit).
168+
// This is necessary to reconstruct restart history when the orchestrator starts up.
146169
// TODO(aaronl): Consider hiding tasks beyond the normal
147170
// retention limit in the UI.
148171
// TODO(aaronl): There are some ways to cut down the
@@ -156,6 +179,7 @@ func (tr *TaskReaper) tick() {
156179
taskHistory = int64(service.Spec.Task.Restart.MaxAttempts) + 1
157180
}
158181

182+
// Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
159183
if taskHistory < 0 {
160184
continue
161185
}
@@ -164,13 +188,15 @@ func (tr *TaskReaper) tick() {
164188

165189
switch service.Spec.GetMode().(type) {
166190
case *api.ServiceSpec_Replicated:
191+
// Clean out the slot for which we received EventCreateTask.
167192
var err error
168193
historicTasks, err = store.FindTasks(tx, store.BySlot(dirty.ServiceID, dirty.Slot))
169194
if err != nil {
170195
continue
171196
}
172197

173198
case *api.ServiceSpec_Global:
199+
// Clean out the node history in case of global services.
174200
tasksByNode, err := store.FindTasks(tx, store.ByNodeID(dirty.NodeID))
175201
if err != nil {
176202
continue
@@ -215,6 +241,7 @@ func (tr *TaskReaper) tick() {
215241
}
216242
})
217243

244+
// Perform cleanup.
218245
if len(deleteTasks) > 0 {
219246
tr.store.Batch(func(batch *store.Batch) error {
220247
for taskID := range deleteTasks {

0 commit comments

Comments
 (0)