Skip to content

Commit 4538a1d

Browse files
committed
libnetwork: handle coalesced endpoint events
The eventually-consistent nature of NetworkDB means we cannot depend on events being received in the same order that they were sent. Nor can we depend on receiving events for all intermediate states. It is possible for a series of entry UPDATEs, or a DELETE followed by a CREATE with the same key, to get coalesced into a single UPDATE event on the receiving node. Watchers of NetworkDB tables therefore need to be prepared to gracefully handle arbitrary UPDATEs of a key, including those where the new value may have nothing in common with the previous value. The libnetwork controller naively handled events for endpoint_table assuming that an endpoint leave followed by a rejoin of the same endpoint would always be expressed as a DELETE event followed by a CREATE. It would handle a coalesced UPDATE as a CREATE, adding a new service binding without removing the old one. This would have various side effects, such as having the "transient state" of having multiple conflicting service bindings where more than one endpoint is assigned an IP address never settling. Modify the libnetwork controller to handle an UPDATE by removing the previous service binding then adding the new one. Signed-off-by: Cory Snider <[email protected]>
1 parent e1a586a commit 4538a1d

4 files changed

Lines changed: 271 additions & 53 deletions

File tree

daemon/libnetwork/agent.go

Lines changed: 115 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
14
package libnetwork
25

36
//go:generate protoc -I=. -I=../../vendor/ --gogofaster_out=import_path=github.com/docker/docker/daemon/libnetwork:. agent.proto
@@ -8,6 +11,8 @@ import (
811
"errors"
912
"fmt"
1013
"net"
14+
"net/netip"
15+
"slices"
1116
"sort"
1217
"sync"
1318

@@ -18,6 +23,7 @@ import (
1823
"github.com/docker/docker/daemon/libnetwork/networkdb"
1924
"github.com/docker/docker/daemon/libnetwork/scope"
2025
"github.com/docker/docker/daemon/libnetwork/types"
26+
"github.com/docker/docker/internal/iterutil"
2127
"github.com/docker/go-events"
2228
"github.com/gogo/protobuf/proto"
2329
)
@@ -851,82 +857,138 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
851857
c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
852858
}
853859

854-
func (c *Controller) handleEpTableEvent(ev events.Event) {
855-
var value []byte
856-
event := ev.(networkdb.WatchEvent)
857-
switch {
858-
case event.IsCreate(), event.IsUpdate():
859-
value = event.Value
860-
case event.IsDelete():
861-
value = event.Prev
862-
default:
863-
log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
864-
return
865-
}
860+
type endpointEvent struct {
861+
EndpointRecord
862+
// Virtual IP of the service to which this endpoint belongs.
863+
VirtualIP netip.Addr
864+
// IP assigned to this endpoint.
865+
EndpointIP netip.Addr
866+
}
866867

868+
func unmarshalEndpointRecord(data []byte) (*endpointEvent, error) {
867869
var epRec EndpointRecord
868-
err := proto.Unmarshal(value, &epRec)
869-
if err != nil {
870-
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
871-
return
870+
if err := proto.Unmarshal(data, &epRec); err != nil {
871+
return nil, fmt.Errorf("failed to unmarshal endpoint record: %w", err)
872+
}
873+
874+
vip, _ := netip.ParseAddr(epRec.VirtualIP)
875+
eip, _ := netip.ParseAddr(epRec.EndpointIP)
876+
877+
if epRec.Name == "" || !eip.IsValid() {
878+
return nil, fmt.Errorf("invalid endpoint name/ip in service table event %s", data)
872879
}
873880

881+
return &endpointEvent{
882+
EndpointRecord: epRec,
883+
VirtualIP: vip,
884+
EndpointIP: eip,
885+
}, nil
886+
}
887+
888+
// EquivalentTo returns true if ev is semantically equivalent to other.
889+
func (ev *endpointEvent) EquivalentTo(other *endpointEvent) bool {
890+
return ev.Name == other.Name &&
891+
ev.ServiceName == other.ServiceName &&
892+
ev.ServiceID == other.ServiceID &&
893+
ev.VirtualIP == other.VirtualIP &&
894+
ev.EndpointIP == other.EndpointIP &&
895+
ev.ServiceDisabled == other.ServiceDisabled &&
896+
iterutil.SameValues(
897+
iterutil.Deref(slices.Values(ev.IngressPorts)),
898+
iterutil.Deref(slices.Values(other.IngressPorts))) &&
899+
iterutil.SameValues(slices.Values(ev.Aliases), slices.Values(other.Aliases)) &&
900+
iterutil.SameValues(slices.Values(ev.TaskAliases), slices.Values(other.TaskAliases))
901+
}
902+
903+
func (c *Controller) handleEpTableEvent(ev events.Event) {
904+
event := ev.(networkdb.WatchEvent)
874905
nid := event.NetworkID
875906
eid := event.Key
876-
containerName := epRec.Name
877-
svcName := epRec.ServiceName
878-
svcID := epRec.ServiceID
879-
vip := net.ParseIP(epRec.VirtualIP)
880-
ip := net.ParseIP(epRec.EndpointIP)
881-
ingressPorts := epRec.IngressPorts
882-
serviceAliases := epRec.Aliases
883-
taskAliases := epRec.TaskAliases
884907

885-
logger := log.G(context.TODO()).WithFields(log.Fields{
886-
"evt": event,
887-
"R": epRec,
888-
})
889-
890-
if containerName == "" || ip == nil {
891-
logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
892-
return
908+
var prev, epRec *endpointEvent
909+
if event.Prev != nil {
910+
var err error
911+
prev, err = unmarshalEndpointRecord(event.Prev)
912+
if err != nil {
913+
log.G(context.TODO()).WithError(err).Error("error unmarshaling previous value from service table event")
914+
return
915+
}
916+
}
917+
if event.Value != nil {
918+
var err error
919+
epRec, err = unmarshalEndpointRecord(event.Value)
920+
if err != nil {
921+
log.G(context.TODO()).WithError(err).Error("error unmarshaling service table event")
922+
return
923+
}
893924
}
894925

926+
logger := log.G(context.TODO()).WithFields(log.Fields{
927+
"evt": event,
928+
"R": epRec,
929+
"prev": prev,
930+
})
895931
logger.Debug("handleEpTableEvent")
896932

897-
switch {
898-
case event.IsCreate(), event.IsUpdate():
899-
if svcID != "" {
933+
if prev != nil {
934+
if epRec != nil && prev.EquivalentTo(epRec) {
935+
// Avoid flapping if we would otherwise remove a service
936+
// binding then immediately replace it with an equivalent one.
937+
return
938+
}
939+
940+
if prev.ServiceID != "" {
900941
// This is a remote task part of a service
901-
if epRec.ServiceDisabled {
902-
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
903-
logger.WithError(err).Error("failed disabling service binding")
904-
return
905-
}
906-
} else {
907-
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
908-
logger.WithError(err).Error("failed adding service binding")
909-
return
942+
if !prev.ServiceDisabled {
943+
err := c.rmServiceBinding(prev.ServiceName, prev.ServiceID, nid, eid,
944+
prev.Name, prev.VirtualIP.AsSlice(), prev.IngressPorts,
945+
prev.Aliases, prev.TaskAliases, prev.EndpointIP.AsSlice(),
946+
"handleEpTableEvent", true, true)
947+
if err != nil {
948+
logger.WithError(err).Error("failed removing service binding")
910949
}
911950
}
912951
} else {
913952
// This is a remote container simply attached to an attachable network
914-
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
915-
logger.WithError(err).Errorf("failed adding container name resolution")
953+
err := c.delContainerNameResolution(nid, eid, prev.Name, prev.TaskAliases,
954+
prev.EndpointIP.AsSlice(), "handleEpTableEvent")
955+
if err != nil {
956+
logger.WithError(err).Errorf("failed removing container name resolution")
916957
}
917958
}
959+
}
918960

919-
case event.IsDelete():
920-
if svcID != "" {
961+
if epRec != nil {
962+
if epRec.ServiceID != "" {
921963
// This is a remote task part of a service
922-
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
923-
logger.WithError(err).Error("failed removing service binding")
924-
return
964+
if epRec.ServiceDisabled {
965+
// Don't double-remove a service binding
966+
if prev == nil || prev.ServiceID != epRec.ServiceID || !prev.ServiceDisabled {
967+
err := c.rmServiceBinding(epRec.ServiceName, epRec.ServiceID,
968+
nid, eid, epRec.Name, epRec.VirtualIP.AsSlice(),
969+
epRec.IngressPorts, epRec.Aliases, epRec.TaskAliases,
970+
epRec.EndpointIP.AsSlice(), "handleEpTableEvent", true, false)
971+
if err != nil {
972+
logger.WithError(err).Error("failed disabling service binding")
973+
return
974+
}
975+
}
976+
} else {
977+
err := c.addServiceBinding(epRec.ServiceName, epRec.ServiceID, nid, eid,
978+
epRec.Name, epRec.VirtualIP.AsSlice(), epRec.IngressPorts,
979+
epRec.Aliases, epRec.TaskAliases, epRec.EndpointIP.AsSlice(),
980+
"handleEpTableEvent")
981+
if err != nil {
982+
logger.WithError(err).Error("failed adding service binding")
983+
return
984+
}
925985
}
926986
} else {
927987
// This is a remote container simply attached to an attachable network
928-
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
929-
logger.WithError(err).Errorf("failed removing container name resolution")
988+
err := c.addContainerNameResolution(nid, eid, epRec.Name, epRec.TaskAliases,
989+
epRec.EndpointIP.AsSlice(), "handleEpTableEvent")
990+
if err != nil {
991+
logger.WithError(err).Errorf("failed adding container name resolution")
930992
}
931993
}
932994
}

daemon/libnetwork/agent_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
4+
package libnetwork
5+
6+
import (
7+
"net/netip"
8+
"slices"
9+
"testing"
10+
11+
"gotest.tools/v3/assert"
12+
)
13+
14+
func TestEndpointEvent_EquivalentTo(t *testing.T) {
15+
assert.Check(t, (&endpointEvent{}).EquivalentTo(&endpointEvent{}))
16+
17+
a := endpointEvent{
18+
EndpointRecord: EndpointRecord{
19+
Name: "foo",
20+
ServiceName: "bar",
21+
ServiceID: "baz",
22+
IngressPorts: []*PortConfig{
23+
{
24+
Protocol: ProtocolTCP,
25+
TargetPort: 80,
26+
},
27+
{
28+
Name: "dns",
29+
Protocol: ProtocolUDP,
30+
TargetPort: 5353,
31+
PublishedPort: 53,
32+
},
33+
},
34+
},
35+
VirtualIP: netip.MustParseAddr("10.0.0.42"),
36+
EndpointIP: netip.MustParseAddr("192.168.69.42"),
37+
}
38+
assert.Check(t, a.EquivalentTo(&a))
39+
40+
reflexiveEquiv := func(a, b *endpointEvent) bool {
41+
t.Helper()
42+
assert.Check(t, a.EquivalentTo(b) == b.EquivalentTo(a), "reflexive equivalence")
43+
return a.EquivalentTo(b)
44+
}
45+
46+
b := a
47+
b.ServiceDisabled = true
48+
assert.Check(t, !reflexiveEquiv(&a, &b), "differing by ServiceDisabled")
49+
50+
c := a
51+
c.IngressPorts = slices.Clone(a.IngressPorts)
52+
slices.Reverse(c.IngressPorts)
53+
assert.Check(t, reflexiveEquiv(&a, &c), "IngressPorts order should not matter")
54+
55+
d := a
56+
d.IngressPorts = append(d.IngressPorts, a.IngressPorts[0])
57+
assert.Check(t, !reflexiveEquiv(&a, &d), "Differing number of copies of IngressPort entries should not be equivalent")
58+
d.IngressPorts = a.IngressPorts[:1]
59+
assert.Check(t, !reflexiveEquiv(&a, &d), "Removing an IngressPort entry should not be equivalent")
60+
61+
e := a
62+
e.Aliases = []string{"alias1", "alias2"}
63+
assert.Check(t, !reflexiveEquiv(&a, &e), "Differing Aliases should not be equivalent")
64+
65+
f := a
66+
f.TaskAliases = []string{"taskalias1", "taskalias2"}
67+
assert.Check(t, !reflexiveEquiv(&a, &f), "Adding TaskAliases should not be equivalent")
68+
g := a
69+
g.TaskAliases = []string{"taskalias2", "taskalias1"}
70+
assert.Check(t, reflexiveEquiv(&f, &g), "TaskAliases order should not matter")
71+
g.TaskAliases = g.TaskAliases[:1]
72+
assert.Check(t, !reflexiveEquiv(&f, &g), "Differing number of TaskAliases should not be equivalent")
73+
74+
h := a
75+
h.EndpointIP = netip.MustParseAddr("192.168.69.43")
76+
assert.Check(t, !reflexiveEquiv(&a, &h), "Differing EndpointIP should not be equivalent")
77+
78+
i := a
79+
i.VirtualIP = netip.MustParseAddr("10.0.0.69")
80+
assert.Check(t, !reflexiveEquiv(&a, &i), "Differing VirtualIP should not be equivalent")
81+
82+
j := a
83+
j.ServiceID = "qux"
84+
assert.Check(t, !reflexiveEquiv(&a, &j), "Differing ServiceID should not be equivalent")
85+
86+
k := a
87+
k.ServiceName = "quux"
88+
assert.Check(t, !reflexiveEquiv(&a, &k), "Differing ServiceName should not be equivalent")
89+
90+
l := a
91+
l.Name = "aaaaa"
92+
assert.Check(t, !reflexiveEquiv(&a, &l), "Differing Name should not be equivalent")
93+
}

internal/iterutil/iterutil.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
4+
package iterutil
5+
6+
import (
7+
"iter"
8+
"maps"
9+
)
10+
11+
// SameValues checks if a and b yield the same values, independent of order.
12+
func SameValues[T comparable](a, b iter.Seq[T]) bool {
13+
m, n := make(map[T]int), make(map[T]int)
14+
for v := range a {
15+
m[v]++
16+
}
17+
for v := range b {
18+
n[v]++
19+
}
20+
return maps.Equal(m, n)
21+
}
22+
23+
// Deref adapts an iterator of pointers to an iterator of values.
24+
func Deref[T any, P *T](s iter.Seq[P]) iter.Seq[T] {
25+
return func(yield func(T) bool) {
26+
for p := range s {
27+
if !yield(*p) {
28+
return
29+
}
30+
}
31+
}
32+
}

internal/iterutil/iterutil_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
4+
package iterutil
5+
6+
import (
7+
"slices"
8+
"testing"
9+
10+
"gotest.tools/v3/assert"
11+
)
12+
13+
func TestSameValues(t *testing.T) {
14+
a := []int{1, 2, 3, 4, 3}
15+
b := []int{3, 4, 3, 2, 1}
16+
c := []int{1, 2, 3, 4}
17+
18+
assert.Check(t, SameValues(slices.Values(a), slices.Values(a)))
19+
assert.Check(t, SameValues(slices.Values(c), slices.Values(c)))
20+
assert.Check(t, SameValues(slices.Values(a), slices.Values(b)))
21+
assert.Check(t, !SameValues(slices.Values(a), slices.Values(c)))
22+
}
23+
24+
func TestDeref(t *testing.T) {
25+
a := make([]*int, 3)
26+
for i := range a {
27+
a[i] = &i
28+
}
29+
b := slices.Collect(Deref(slices.Values(a)))
30+
assert.DeepEqual(t, b, []int{0, 1, 2})
31+
}

0 commit comments

Comments
 (0)