Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

Commit 1d6e74a

Browse files
author
Dongsu Park
committed
Merge pull request #1572 from endocode/dongsu/fleetd-replaces-unit
fleetd: introduce Replaces option in unit files
2 parents 6f3df2e + b58c637 commit 1d6e74a

File tree

13 files changed

+461
-4
lines changed

13 files changed

+461
-4
lines changed

Documentation/unit-files-and-scheduling.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Note that these requirements are derived directly from systemd, with the only ex
2323
| `MachineMetadata` | Limit eligible machines to those with this specific metadata. |
2424
| `Conflicts` | Prevent a unit from being collocated with other units using glob-matching on the other unit names. |
2525
| `Global` | Schedule this unit on all agents in the cluster. A unit is considered invalid if options other than `MachineMetadata` are provided alongside `Global=true`. |
26+
| `Replaces` | Schedule a specified unit on another machine. A unit is considered invalid if options `Global` or `Conflicts` are provided alongside `Replaces=`. A circular replacement between multiple units is not allowed. |
2627

2728
See [more information][unit-scheduling] on these parameters and how they impact scheduling decisions.
2829

agent/reconcile_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,30 @@ func TestAbleToRun(t *testing.T) {
386386
job: newTestJobWithXFleetValues(t, "Conflicts=ping.service"),
387387
want: false,
388388
},
389+
390+
// no replaces found
391+
{
392+
dState: &AgentState{
393+
MState: &machine.MachineState{ID: "123"},
394+
Units: map[string]*job.Unit{
395+
"ping.service": &job.Unit{Name: "ping.service"},
396+
},
397+
},
398+
job: newTestJobWithXFleetValues(t, "Replaces=pong.service"),
399+
want: true,
400+
},
401+
402+
// replaces found
403+
{
404+
dState: &AgentState{
405+
MState: &machine.MachineState{ID: "123"},
406+
Units: map[string]*job.Unit{
407+
"ping.service": &job.Unit{Name: "ping.service"},
408+
},
409+
},
410+
job: newTestJobWithXFleetValues(t, "Replaces=ping.service"),
411+
want: false,
412+
},
389413
}
390414

391415
for i, tt := range tests {

agent/state.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,48 @@ func (as *AgentState) hasConflict(pUnitName string, pConflicts []string) (found
6666
return
6767
}
6868

69+
// hasReplace determines whether there are any known replaces with the given Unit
70+
func (as *AgentState) hasReplace(pUnitName string, pReplaces []string) (found bool, replace string) {
71+
for _, eUnit := range as.Units {
72+
foundPrepl := false
73+
foundErepl := false
74+
retStr := ""
75+
76+
if pUnitName == eUnit.Name {
77+
continue
78+
}
79+
80+
for _, pReplace := range pReplaces {
81+
if globMatches(pReplace, eUnit.Name) {
82+
foundPrepl = true
83+
retStr = eUnit.Name
84+
break
85+
}
86+
}
87+
88+
for _, eReplace := range eUnit.Replaces() {
89+
if globMatches(eReplace, pUnitName) {
90+
foundErepl = true
91+
retStr = eUnit.Name
92+
break
93+
}
94+
}
95+
96+
// Only 1 of 2 matches must be found. If both matches are found,
97+
// it means it's a circular replace situation, which could result in
98+
// an infinite loop. So ignore such replace options.
99+
if (foundPrepl && foundErepl) || (!foundPrepl && !foundErepl) {
100+
continue
101+
} else {
102+
found = true
103+
replace = retStr
104+
return
105+
}
106+
}
107+
108+
return
109+
}
110+
69111
func globMatches(pattern, target string) bool {
70112
matched, err := path.Match(pattern, target)
71113
if err != nil {
@@ -81,6 +123,7 @@ func globMatches(pattern, target string) bool {
81123
// - Agent must have all of the Job's required metadata (if any)
82124
// - Agent must have all required Peers of the Job scheduled locally (if any)
83125
// - Job must not conflict with any other Units scheduled to the agent
126+
// - Job must specially handle replaced units to be rescheduled
84127
func (as *AgentState) AbleToRun(j *job.Job) (bool, string) {
85128
if tgt, ok := j.RequiredTarget(); ok && !as.MState.MatchID(tgt) {
86129
return false, fmt.Sprintf("agent ID %q does not match required %q", as.MState.ID, tgt)
@@ -106,5 +149,11 @@ func (as *AgentState) AbleToRun(j *job.Job) (bool, string) {
106149
return false, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName)
107150
}
108151

152+
// Handle Replace option specially, by returning a special string
153+
// "jobreschedule" as reason.
154+
if cExists, _ := as.hasReplace(j.Name, j.Replaces()); cExists {
155+
return false, job.JobReschedule
156+
}
157+
109158
return true, ""
110159
}

agent/state_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,83 @@ func TestHasConflicts(t *testing.T) {
9898
}
9999
}
100100

101+
func TestHasReplaces(t *testing.T) {
102+
tests := []struct {
103+
cState *AgentState
104+
job *job.Job
105+
want bool
106+
replace string
107+
}{
108+
// empty current state causes no replaces
109+
{
110+
cState: NewAgentState(&machine.MachineState{ID: "XXX"}),
111+
job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")},
112+
want: false,
113+
},
114+
115+
// existing Job has replace with new Job
116+
{
117+
cState: &AgentState{
118+
MState: &machine.MachineState{ID: "XXX"},
119+
Units: map[string]*job.Unit{
120+
"bar.service": &job.Unit{
121+
Name: "bar.service",
122+
Unit: fleetUnit(t, "Replaces=foo.service"),
123+
},
124+
},
125+
},
126+
job: &job.Job{Name: "foo.service", Unit: unit.UnitFile{}},
127+
want: true,
128+
replace: "bar.service",
129+
},
130+
131+
// new Job has replace with existing job
132+
{
133+
cState: &AgentState{
134+
MState: &machine.MachineState{ID: "XXX"},
135+
Units: map[string]*job.Unit{
136+
"bar.service": &job.Unit{
137+
Name: "bar.service",
138+
Unit: unit.UnitFile{},
139+
},
140+
},
141+
},
142+
job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")},
143+
want: true,
144+
replace: "bar.service",
145+
},
146+
147+
// both jobs have replace with each other: it should fail
148+
{
149+
cState: &AgentState{
150+
MState: &machine.MachineState{ID: "XXX"},
151+
Units: map[string]*job.Unit{
152+
"bar.service": &job.Unit{
153+
Name: "bar.service",
154+
Unit: fleetUnit(t, "Replaces=foo.service"),
155+
},
156+
},
157+
},
158+
job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")},
159+
want: false,
160+
replace: "bar.service",
161+
},
162+
}
163+
164+
for i, tt := range tests {
165+
got, replace := tt.cState.hasReplace(tt.job.Name, tt.job.Replaces())
166+
if got != tt.want {
167+
var msg string
168+
if tt.want == true {
169+
msg = fmt.Sprintf("expected no replace, found replace with Job %q", replace)
170+
} else {
171+
msg = fmt.Sprintf("expected replace with Job %q, got none", replace)
172+
}
173+
t.Errorf("case %d: %s", i, msg)
174+
}
175+
}
176+
}
177+
101178
func TestGlobMatches(t *testing.T) {
102179
tests := []struct {
103180
pattern string

api/units.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func ValidateOptions(opts []*schema.UnitOption) error {
223223
Unit: *uf,
224224
}
225225
conflicts := pkg.NewUnsafeSet(j.Conflicts()...)
226+
replaces := pkg.NewUnsafeSet(j.Replaces()...)
226227
peers := pkg.NewUnsafeSet(j.Peers()...)
227228
for _, peer := range peers.Values() {
228229
for _, conflict := range conflicts.Values() {
@@ -231,9 +232,16 @@ func ValidateOptions(opts []*schema.UnitOption) error {
231232
return fmt.Errorf("unresolvable requirements: peer %q matches conflict %q", peer, conflict)
232233
}
233234
}
235+
for _, replace := range replaces.Values() {
236+
matched, _ := path.Match(replace, peer)
237+
if matched {
238+
return fmt.Errorf("unresolvable requirements: peer %q matches replace %q", peer, replace)
239+
}
240+
}
234241
}
235242
hasPeers := peers.Length() != 0
236243
hasConflicts := conflicts.Length() != 0
244+
hasReplaces := replaces.Length() != 0
237245
_, hasReqTarget := j.RequiredTarget()
238246
u := &job.Unit{
239247
Unit: *uf,
@@ -247,10 +255,16 @@ func ValidateOptions(opts []*schema.UnitOption) error {
247255
return errors.New("MachineID cannot be used with Conflicts")
248256
case hasReqTarget && isGlobal:
249257
return errors.New("MachineID cannot be used with Global")
258+
case hasReqTarget && hasReplaces:
259+
return errors.New("MachineID cannot be used with Replaces")
250260
case isGlobal && hasPeers:
251261
return errors.New("Global cannot be used with Peers")
252262
case isGlobal && hasConflicts:
253263
return errors.New("Global cannot be used with Conflicts")
264+
case isGlobal && hasReplaces:
265+
return errors.New("Global cannot be used with Replaces")
266+
case hasConflicts && hasReplaces:
267+
return errors.New("Conflicts cannot be used with Replaces")
254268
}
255269

256270
return nil

api/units_test.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,14 @@ func makeConflictUO(name string) *schema.UnitOption {
454454
}
455455
}
456456

457+
func makeReplaceUO(name string) *schema.UnitOption {
458+
return &schema.UnitOption{
459+
Section: "X-Fleet",
460+
Name: "Replaces",
461+
Value: name,
462+
}
463+
}
464+
457465
func makePeerUO(name string) *schema.UnitOption {
458466
return &schema.UnitOption{
459467
Section: "X-Fleet",
@@ -543,6 +551,39 @@ func TestValidateOptions(t *testing.T) {
543551
},
544552
false,
545553
},
554+
// Replaces
555+
// non-overlapping replace is fine
556+
{
557+
[]*schema.UnitOption{
558+
makeReplaceUO("foo.service"),
559+
makeReplaceUO("bar.service"),
560+
},
561+
true,
562+
},
563+
{
564+
[]*schema.UnitOption{
565+
makeReplaceUO("foo.service"),
566+
makePeerUO("bar.service"),
567+
},
568+
true,
569+
},
570+
// replace + conflict is not good
571+
{
572+
[]*schema.UnitOption{
573+
makeReplaceUO("foo.service"),
574+
makeConflictUO("bar.service"),
575+
},
576+
false,
577+
},
578+
// circular replaces are not good
579+
{
580+
[]*schema.UnitOption{
581+
makeReplaceUO("foo.service"),
582+
makeReplaceUO("bar.service"),
583+
makePeerUO("bar.service"),
584+
},
585+
false,
586+
},
546587
// MachineID is fine by itself
547588
{
548589
[]*schema.UnitOption{
@@ -576,7 +617,25 @@ func TestValidateOptions(t *testing.T) {
576617
},
577618
{
578619
[]*schema.UnitOption{
579-
makeIDUO("zyxwvutsr"), makeConflictUO("foo.service"), makeConflictUO("bar.service"),
620+
makeIDUO("zyxwvutsr"),
621+
makeConflictUO("foo.service"),
622+
makeConflictUO("bar.service"),
623+
},
624+
false,
625+
},
626+
// MachineID with Replaces no good
627+
{
628+
[]*schema.UnitOption{
629+
makeIDUO("abcdefghi"),
630+
makeReplaceUO("bar.service"),
631+
},
632+
false,
633+
},
634+
{
635+
[]*schema.UnitOption{
636+
makeIDUO("zyxwvutsr"),
637+
makeReplaceUO("foo.service"),
638+
makeReplaceUO("bar.service"),
580639
},
581640
false,
582641
},
@@ -649,6 +708,18 @@ func TestValidateOptions(t *testing.T) {
649708
},
650709
false,
651710
},
711+
// Global with Replaces no good
712+
{
713+
[]*schema.UnitOption{
714+
&schema.UnitOption{
715+
Section: "X-Fleet",
716+
Name: "Global",
717+
Value: "true",
718+
},
719+
makeReplaceUO("foo.service"),
720+
},
721+
false,
722+
},
652723
}
653724
for i, tt := range testCases {
654725
err := ValidateOptions(tt.opts)

engine/reconciler.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,15 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st
110110
}
111111

112112
var able bool
113-
if able, reason = as.AbleToRun(j); !able {
113+
var ableReason string
114+
if able, ableReason = as.AbleToRun(j); !able {
114115
unschedule = true
115-
reason = fmt.Sprintf("target Machine(%s) unable to run unit", j.TargetMachineID)
116-
metrics.ReportEngineReconcileFailure(metrics.RunFailure)
116+
if ableReason == job.JobReschedule {
117+
reason = ableReason
118+
} else {
119+
reason = fmt.Sprintf("target Machine(%s) unable to run unit", j.TargetMachineID)
120+
metrics.ReportEngineReconcileFailure(metrics.RunFailure)
121+
}
117122
return
118123
}
119124

fleetctl/fleetctl_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,24 @@ MachineOf=zxcvq`),
327327
Global=true
328328
Conflicts=bar`),
329329
},
330+
{
331+
"foo.service",
332+
newUnitFile(t, `[X-Fleet]
333+
Global=true
334+
Replaces=bar`),
335+
},
336+
{
337+
"foo.service",
338+
newUnitFile(t, `[X-Fleet]
339+
Conflicts=bar
340+
Replaces=bar`),
341+
},
342+
{
343+
"foo.service",
344+
newUnitFile(t, `[X-Fleet]
345+
MachineOf=abcd
346+
Replaces=abcd`),
347+
},
330348
}
331349
for i, tt = range testCases {
332350
un = tt.name
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[Unit]
2+
Description=Test Unit
3+
4+
[Service]
5+
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[Unit]
2+
Description=Test Unit
3+
4+
[Service]
5+
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"
6+
7+
[X-Fleet]
8+
Replaces=replace.0.service

0 commit comments

Comments
 (0)