Skip to content

Commit 6826639

Browse files
authored
Merge pull request moby#2706 from dani-docker/task_leak
address unassigned task leak when service is removed
2 parents 231d181 + 9d977ce commit 6826639

2 files changed

Lines changed: 140 additions & 6 deletions

File tree

manager/scheduler/scheduler.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,9 +702,20 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup
702702
return tasksScheduled
703703
}
704704

705+
// noSuitableNode checks unassigned tasks and make sure they have an existing service in the store before
706+
// updating the task status and adding it back to: schedulingDecisions, unassignedTasks and allTasks
705707
func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {
706708
explanation := s.pipeline.Explain()
707709
for _, t := range taskGroup {
710+
var service *api.Service
711+
s.store.View(func(tx store.ReadTx) {
712+
service = store.GetService(tx, t.ServiceID)
713+
})
714+
if service == nil {
715+
log.G(ctx).WithField("task.id", t.ID).Debug("removing task from the scheduler")
716+
continue
717+
}
718+
708719
log.G(ctx).WithField("task.id", t.ID).Debug("no suitable node available for task")
709720

710721
newT := *t

manager/scheduler/scheduler_test.go

Lines changed: 129 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,7 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
11141114
ctx := context.Background()
11151115
initialTask := &api.Task{
11161116
ID: "id1",
1117+
ServiceID: "serviceID1",
11171118
DesiredState: api.TaskStateRunning,
11181119
ServiceAnnotations: api.Annotations{
11191120
Name: "name1",
@@ -1128,7 +1129,8 @@ func TestSchedulerNoReadyNodes(t *testing.T) {
11281129
defer s.Close()
11291130

11301131
err := s.Update(func(tx store.Tx) error {
1131-
// Add initial task
1132+
// Add initial service and task
1133+
assert.NoError(t, store.CreateService(tx, &api.Service{ID: "serviceID1"}))
11321134
assert.NoError(t, store.CreateTask(tx, initialTask))
11331135
return nil
11341136
})
@@ -1536,6 +1538,7 @@ func TestSchedulerResourceConstraint(t *testing.T) {
15361538

15371539
initialTask := &api.Task{
15381540
ID: "id1",
1541+
ServiceID: "serviceID1",
15391542
DesiredState: api.TaskStateRunning,
15401543
Spec: api.TaskSpec{
15411544
Runtime: &api.TaskSpec_Container{
@@ -1559,12 +1562,17 @@ func TestSchedulerResourceConstraint(t *testing.T) {
15591562
},
15601563
}
15611564

1565+
initialService := &api.Service{
1566+
ID: "serviceID1",
1567+
}
1568+
15621569
s := store.NewMemoryStore(nil)
15631570
assert.NotNil(t, s)
15641571
defer s.Close()
15651572

15661573
err := s.Update(func(tx store.Tx) error {
1567-
// Add initial node and task
1574+
// Add initial node, service and task
1575+
assert.NoError(t, store.CreateService(tx, initialService))
15681576
assert.NoError(t, store.CreateTask(tx, initialTask))
15691577
assert.NoError(t, store.CreateNode(tx, underprovisionedNode))
15701578
assert.NoError(t, store.CreateNode(tx, nonready1))
@@ -1788,6 +1796,7 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
17881796
bigTask1 := &api.Task{
17891797
DesiredState: api.TaskStateRunning,
17901798
ID: "id1",
1799+
ServiceID: "serviceID1",
17911800
Spec: api.TaskSpec{
17921801
Resources: &api.ResourceRequirements{
17931802
Reservations: &api.Resources{
@@ -1809,12 +1818,17 @@ func TestSchedulerResourceConstraintDeadTask(t *testing.T) {
18091818
bigTask2 := bigTask1.Copy()
18101819
bigTask2.ID = "id2"
18111820

1821+
bigService := &api.Service{
1822+
ID: "serviceID1",
1823+
}
1824+
18121825
s := store.NewMemoryStore(nil)
18131826
assert.NotNil(t, s)
18141827
defer s.Close()
18151828

18161829
err := s.Update(func(tx store.Tx) error {
1817-
// Add initial node and task
1830+
// Add initial node, service and task
1831+
assert.NoError(t, store.CreateService(tx, bigService))
18181832
assert.NoError(t, store.CreateNode(tx, node))
18191833
assert.NoError(t, store.CreateTask(tx, bigTask1))
18201834
return nil
@@ -1951,6 +1965,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
19511965
// task1 - has a node it can run on
19521966
task1 := &api.Task{
19531967
ID: "id1",
1968+
ServiceID: "serviceID1",
19541969
DesiredState: api.TaskStateRunning,
19551970
ServiceAnnotations: api.Annotations{
19561971
Name: "name1",
@@ -1973,6 +1988,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
19731988
// task2 - has no node it can run on
19741989
task2 := &api.Task{
19751990
ID: "id2",
1991+
ServiceID: "serviceID1",
19761992
DesiredState: api.TaskStateRunning,
19771993
ServiceAnnotations: api.Annotations{
19781994
Name: "name2",
@@ -1995,6 +2011,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
19952011
// task3 - no platform constraints, should run on any node
19962012
task3 := &api.Task{
19972013
ID: "id3",
2014+
ServiceID: "serviceID1",
19982015
DesiredState: api.TaskStateRunning,
19992016
ServiceAnnotations: api.Annotations{
20002017
Name: "name3",
@@ -2007,6 +2024,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
20072024
// task4 - only OS constraint, is runnable on any linux node
20082025
task4 := &api.Task{
20092026
ID: "id4",
2027+
ServiceID: "serviceID1",
20102028
DesiredState: api.TaskStateRunning,
20112029
ServiceAnnotations: api.Annotations{
20122030
Name: "name4",
@@ -2029,6 +2047,7 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
20292047
// task5 - supported on multiple platforms
20302048
task5 := &api.Task{
20312049
ID: "id5",
2050+
ServiceID: "serviceID1",
20322051
DesiredState: api.TaskStateRunning,
20332052
ServiceAnnotations: api.Annotations{
20342053
Name: "name5",
@@ -2103,12 +2122,16 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
21032122
Description: &api.NodeDescription{},
21042123
}
21052124

2125+
service1 := &api.Service{
2126+
ID: "serviceID1",
2127+
}
21062128
s := store.NewMemoryStore(nil)
21072129
assert.NotNil(t, s)
21082130
defer s.Close()
21092131

21102132
err := s.Update(func(tx store.Tx) error {
2111-
// Add initial task and nodes to the store
2133+
// Add initial task, service and nodes to the store
2134+
assert.NoError(t, store.CreateService(tx, service1))
21122135
assert.NoError(t, store.CreateTask(tx, task1))
21132136
assert.NoError(t, store.CreateNode(tx, node1))
21142137
assert.NoError(t, store.CreateNode(tx, node2))
@@ -2168,6 +2191,86 @@ func TestSchedulerCompatiblePlatform(t *testing.T) {
21682191
assert.Regexp(t, assignment4.NodeID, "(node1|node2)")
21692192
}
21702193

2194+
// TestSchedulerUnassignedMap tests that unassigned tasks are deleted from unassignedTasks when the service is removed
2195+
func TestSchedulerUnassignedMap(t *testing.T) {
2196+
ctx := context.Background()
2197+
// create a service and a task with OS constraint that is not met
2198+
task1 := &api.Task{
2199+
ID: "id1",
2200+
ServiceID: "serviceID1",
2201+
DesiredState: api.TaskStateRunning,
2202+
ServiceAnnotations: api.Annotations{
2203+
Name: "name1",
2204+
},
2205+
Status: api.TaskStatus{
2206+
State: api.TaskStatePending,
2207+
},
2208+
Spec: api.TaskSpec{
2209+
Placement: &api.Placement{
2210+
Platforms: []*api.Platform{
2211+
{
2212+
Architecture: "amd64",
2213+
OS: "windows",
2214+
},
2215+
},
2216+
},
2217+
},
2218+
}
2219+
2220+
node1 := &api.Node{
2221+
ID: "node1",
2222+
Spec: api.NodeSpec{
2223+
Annotations: api.Annotations{
2224+
Name: "node1",
2225+
},
2226+
},
2227+
Status: api.NodeStatus{
2228+
State: api.NodeStatus_READY,
2229+
},
2230+
Description: &api.NodeDescription{
2231+
Platform: &api.Platform{
2232+
Architecture: "x86_64",
2233+
OS: "linux",
2234+
},
2235+
},
2236+
}
2237+
2238+
service1 := &api.Service{
2239+
ID: "serviceID1",
2240+
}
2241+
2242+
s := store.NewMemoryStore(nil)
2243+
assert.NotNil(t, s)
2244+
defer s.Close()
2245+
2246+
err := s.Update(func(tx store.Tx) error {
2247+
// Add initial task, service and nodes to the store
2248+
assert.NoError(t, store.CreateService(tx, service1))
2249+
assert.NoError(t, store.CreateTask(tx, task1))
2250+
assert.NoError(t, store.CreateNode(tx, node1))
2251+
return nil
2252+
})
2253+
assert.NoError(t, err)
2254+
2255+
scheduler := New(s)
2256+
scheduler.unassignedTasks["id1"] = task1
2257+
2258+
scheduler.tick(ctx)
2259+
// task1 is in the unassigned map
2260+
assert.Contains(t, scheduler.unassignedTasks, task1.ID)
2261+
2262+
// delete the service of an unassigned task
2263+
err = s.Update(func(tx store.Tx) error {
2264+
assert.NoError(t, store.DeleteService(tx, service1.ID))
2265+
return nil
2266+
})
2267+
assert.NoError(t, err)
2268+
2269+
scheduler.tick(ctx)
2270+
// task1 is removed from the unassigned map
2271+
assert.NotContains(t, scheduler.unassignedTasks, task1.ID)
2272+
}
2273+
21712274
func TestPreassignedTasks(t *testing.T) {
21722275
ctx := context.Background()
21732276
initialNodeSet := []*api.Node{
@@ -2523,6 +2626,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
25232626
// Task0: bind mount
25242627
t0 := &api.Task{
25252628
ID: "task0_ID",
2629+
ServiceID: "serviceID1",
25262630
DesiredState: api.TaskStateRunning,
25272631
Spec: api.TaskSpec{
25282632
Runtime: &api.TaskSpec_Container{
@@ -2548,6 +2652,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
25482652
// Task1: vol plugin1
25492653
t1 := &api.Task{
25502654
ID: "task1_ID",
2655+
ServiceID: "serviceID1",
25512656
DesiredState: api.TaskStateRunning,
25522657
Spec: api.TaskSpec{
25532658
Runtime: &api.TaskSpec_Container{
@@ -2574,6 +2679,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
25742679
// Task2: vol plugin1, vol plugin2
25752680
t2 := &api.Task{
25762681
ID: "task2_ID",
2682+
ServiceID: "serviceID1",
25772683
DesiredState: api.TaskStateRunning,
25782684
Spec: api.TaskSpec{
25792685
Runtime: &api.TaskSpec_Container{
@@ -2606,6 +2712,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
26062712
// Task3: vol plugin1, network plugin1
26072713
t3 := &api.Task{
26082714
ID: "task3_ID",
2715+
ServiceID: "serviceID1",
26092716
DesiredState: api.TaskStateRunning,
26102717
Networks: []*api.NetworkAttachment{
26112718
{
@@ -2646,6 +2753,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
26462753
// Task4: log plugin1
26472754
t4 := &api.Task{
26482755
ID: "task4_ID",
2756+
ServiceID: "serviceID1",
26492757
DesiredState: api.TaskStateRunning,
26502758
Spec: api.TaskSpec{
26512759
Runtime: &api.TaskSpec_Container{
@@ -2663,6 +2771,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
26632771
// Task5: log plugin1
26642772
t5 := &api.Task{
26652773
ID: "task5_ID",
2774+
ServiceID: "serviceID1",
26662775
DesiredState: api.TaskStateRunning,
26672776
Spec: api.TaskSpec{
26682777
Runtime: &api.TaskSpec_Container{
@@ -2681,6 +2790,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
26812790
// no logging
26822791
t6 := &api.Task{
26832792
ID: "task6_ID",
2793+
ServiceID: "serviceID1",
26842794
DesiredState: api.TaskStateRunning,
26852795
Spec: api.TaskSpec{
26862796
Runtime: &api.TaskSpec_Container{
@@ -2699,6 +2809,7 @@ func TestSchedulerPluginConstraint(t *testing.T) {
26992809
// log driver with no name
27002810
t7 := &api.Task{
27012811
ID: "task7_ID",
2812+
ServiceID: "serviceID1",
27022813
DesiredState: api.TaskStateRunning,
27032814
Spec: api.TaskSpec{
27042815
Runtime: &api.TaskSpec_Container{
@@ -2718,12 +2829,16 @@ func TestSchedulerPluginConstraint(t *testing.T) {
27182829
},
27192830
}
27202831

2832+
s1 := &api.Service{
2833+
ID: "serviceID1",
2834+
}
27212835
s := store.NewMemoryStore(nil)
27222836
assert.NotNil(t, s)
27232837
defer s.Close()
27242838

2725-
// Add initial node and task
2839+
// Add initial node, service and task
27262840
err := s.Update(func(tx store.Tx) error {
2841+
assert.NoError(t, store.CreateService(tx, s1))
27272842
assert.NoError(t, store.CreateTask(tx, t1))
27282843
assert.NoError(t, store.CreateNode(tx, n1))
27292844
return nil
@@ -3014,6 +3129,7 @@ func TestSchedulerHostPort(t *testing.T) {
30143129

30153130
task1 := &api.Task{
30163131
ID: "id1",
3132+
ServiceID: "serviceID1",
30173133
DesiredState: api.TaskStateRunning,
30183134
Spec: api.TaskSpec{
30193135
Runtime: &api.TaskSpec_Container{
@@ -3038,6 +3154,7 @@ func TestSchedulerHostPort(t *testing.T) {
30383154
}
30393155
task2 := &api.Task{
30403156
ID: "id2",
3157+
ServiceID: "serviceID1",
30413158
DesiredState: api.TaskStateRunning,
30423159
Spec: api.TaskSpec{
30433160
Runtime: &api.TaskSpec_Container{
@@ -3062,6 +3179,7 @@ func TestSchedulerHostPort(t *testing.T) {
30623179
}
30633180
task3 := &api.Task{
30643181
ID: "id3",
3182+
ServiceID: "serviceID1",
30653183
DesiredState: api.TaskStateRunning,
30663184
Spec: api.TaskSpec{
30673185
Runtime: &api.TaskSpec_Container{
@@ -3090,12 +3208,17 @@ func TestSchedulerHostPort(t *testing.T) {
30903208
},
30913209
}
30923210

3211+
service1 := &api.Service{
3212+
ID: "serviceID1",
3213+
}
3214+
30933215
s := store.NewMemoryStore(nil)
30943216
assert.NotNil(t, s)
30953217
defer s.Close()
30963218

30973219
err := s.Update(func(tx store.Tx) error {
3098-
// Add initial node and task
3220+
// Add initial node, service and task
3221+
assert.NoError(t, store.CreateService(tx, service1))
30993222
assert.NoError(t, store.CreateTask(tx, task1))
31003223
assert.NoError(t, store.CreateTask(tx, task2))
31013224
return nil

0 commit comments

Comments
 (0)