Skip to content

Commit f099e91

Browse files
committed
libnetwork: handle coalesced endpoint events
The eventually-consistent nature of NetworkDB means we cannot depend on events being received in the same order that they were sent. Nor can we depend on receiving events for all intermediate states. It is possible for a series of entry UPDATEs, or a DELETE followed by a CREATE with the same key, to get coalesced into a single UPDATE event on the receiving node. Watchers of NetworkDB tables therefore need to be prepared to gracefully handle arbitrary UPDATEs of a key, including those where the new value may have nothing in common with the previous value. The libnetwork controller naively handled events for endpoint_table assuming that an endpoint leave followed by a rejoin of the same endpoint would always be expressed as a DELETE event followed by a CREATE. It would handle a coalesced UPDATE as a CREATE, adding a new service binding without removing the old one. This would have various side effects, such as having the "transient state" of having multiple conflicting service bindings where more than one endpoint is assigned an IP address never settling. Modify the libnetwork controller to handle an UPDATE by removing the previous service binding then adding the new one. Signed-off-by: Cory Snider <[email protected]> (cherry picked from commit 4538a1d) Signed-off-by: Cory Snider <[email protected]>
1 parent bace1b8 commit f099e91

4 files changed

Lines changed: 271 additions & 53 deletions

File tree

internal/iterutil/iterutil.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
4+
package iterutil
5+
6+
import (
7+
"iter"
8+
"maps"
9+
)
10+
11+
// SameValues checks if a and b yield the same values, independent of order.
12+
func SameValues[T comparable](a, b iter.Seq[T]) bool {
13+
m, n := make(map[T]int), make(map[T]int)
14+
for v := range a {
15+
m[v]++
16+
}
17+
for v := range b {
18+
n[v]++
19+
}
20+
return maps.Equal(m, n)
21+
}
22+
23+
// Deref adapts an iterator of pointers to an iterator of values.
24+
func Deref[T any, P *T](s iter.Seq[P]) iter.Seq[T] {
25+
return func(yield func(T) bool) {
26+
for p := range s {
27+
if !yield(*p) {
28+
return
29+
}
30+
}
31+
}
32+
}

internal/iterutil/iterutil_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
4+
package iterutil
5+
6+
import (
7+
"slices"
8+
"testing"
9+
10+
"gotest.tools/v3/assert"
11+
)
12+
13+
func TestSameValues(t *testing.T) {
14+
a := []int{1, 2, 3, 4, 3}
15+
b := []int{3, 4, 3, 2, 1}
16+
c := []int{1, 2, 3, 4}
17+
18+
assert.Check(t, SameValues(slices.Values(a), slices.Values(a)))
19+
assert.Check(t, SameValues(slices.Values(c), slices.Values(c)))
20+
assert.Check(t, SameValues(slices.Values(a), slices.Values(b)))
21+
assert.Check(t, !SameValues(slices.Values(a), slices.Values(c)))
22+
}
23+
24+
func TestDeref(t *testing.T) {
25+
a := make([]*int, 3)
26+
for i := range a {
27+
a[i] = &i
28+
}
29+
b := slices.Collect(Deref(slices.Values(a)))
30+
assert.DeepEqual(t, b, []int{0, 1, 2})
31+
}

libnetwork/agent.go

Lines changed: 115 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
14
package libnetwork
25

36
//go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto
@@ -7,10 +10,13 @@ import (
710
"encoding/json"
811
"fmt"
912
"net"
13+
"net/netip"
14+
"slices"
1015
"sort"
1116
"sync"
1217

1318
"github.com/containerd/log"
19+
"github.com/docker/docker/internal/iterutil"
1420
"github.com/docker/docker/libnetwork/cluster"
1521
"github.com/docker/docker/libnetwork/discoverapi"
1622
"github.com/docker/docker/libnetwork/driverapi"
@@ -850,82 +856,138 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
850856
c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
851857
}
852858

853-
func (c *Controller) handleEpTableEvent(ev events.Event) {
854-
var value []byte
855-
event := ev.(networkdb.WatchEvent)
856-
switch {
857-
case event.IsCreate(), event.IsUpdate():
858-
value = event.Value
859-
case event.IsDelete():
860-
value = event.Prev
861-
default:
862-
log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
863-
return
864-
}
859+
type endpointEvent struct {
860+
EndpointRecord
861+
// Virtual IP of the service to which this endpoint belongs.
862+
VirtualIP netip.Addr
863+
// IP assigned to this endpoint.
864+
EndpointIP netip.Addr
865+
}
865866

867+
func unmarshalEndpointRecord(data []byte) (*endpointEvent, error) {
866868
var epRec EndpointRecord
867-
err := proto.Unmarshal(value, &epRec)
868-
if err != nil {
869-
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
870-
return
869+
if err := proto.Unmarshal(data, &epRec); err != nil {
870+
return nil, fmt.Errorf("failed to unmarshal endpoint record: %w", err)
871+
}
872+
873+
vip, _ := netip.ParseAddr(epRec.VirtualIP)
874+
eip, _ := netip.ParseAddr(epRec.EndpointIP)
875+
876+
if epRec.Name == "" || !eip.IsValid() {
877+
return nil, fmt.Errorf("invalid endpoint name/ip in service table event %s", data)
871878
}
872879

880+
return &endpointEvent{
881+
EndpointRecord: epRec,
882+
VirtualIP: vip,
883+
EndpointIP: eip,
884+
}, nil
885+
}
886+
887+
// EquivalentTo returns true if ev is semantically equivalent to other.
888+
func (ev *endpointEvent) EquivalentTo(other *endpointEvent) bool {
889+
return ev.Name == other.Name &&
890+
ev.ServiceName == other.ServiceName &&
891+
ev.ServiceID == other.ServiceID &&
892+
ev.VirtualIP == other.VirtualIP &&
893+
ev.EndpointIP == other.EndpointIP &&
894+
ev.ServiceDisabled == other.ServiceDisabled &&
895+
iterutil.SameValues(
896+
iterutil.Deref(slices.Values(ev.IngressPorts)),
897+
iterutil.Deref(slices.Values(other.IngressPorts))) &&
898+
iterutil.SameValues(slices.Values(ev.Aliases), slices.Values(other.Aliases)) &&
899+
iterutil.SameValues(slices.Values(ev.TaskAliases), slices.Values(other.TaskAliases))
900+
}
901+
902+
func (c *Controller) handleEpTableEvent(ev events.Event) {
903+
event := ev.(networkdb.WatchEvent)
873904
nid := event.NetworkID
874905
eid := event.Key
875-
containerName := epRec.Name
876-
svcName := epRec.ServiceName
877-
svcID := epRec.ServiceID
878-
vip := net.ParseIP(epRec.VirtualIP)
879-
ip := net.ParseIP(epRec.EndpointIP)
880-
ingressPorts := epRec.IngressPorts
881-
serviceAliases := epRec.Aliases
882-
taskAliases := epRec.TaskAliases
883906

884-
logger := log.G(context.TODO()).WithFields(log.Fields{
885-
"evt": event,
886-
"R": epRec,
887-
})
888-
889-
if containerName == "" || ip == nil {
890-
logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
891-
return
907+
var prev, epRec *endpointEvent
908+
if event.Prev != nil {
909+
var err error
910+
prev, err = unmarshalEndpointRecord(event.Prev)
911+
if err != nil {
912+
log.G(context.TODO()).WithError(err).Error("error unmarshaling previous value from service table event")
913+
return
914+
}
915+
}
916+
if event.Value != nil {
917+
var err error
918+
epRec, err = unmarshalEndpointRecord(event.Value)
919+
if err != nil {
920+
log.G(context.TODO()).WithError(err).Error("error unmarshaling service table event")
921+
return
922+
}
892923
}
893924

925+
logger := log.G(context.TODO()).WithFields(log.Fields{
926+
"evt": event,
927+
"R": epRec,
928+
"prev": prev,
929+
})
894930
logger.Debug("handleEpTableEvent")
895931

896-
switch {
897-
case event.IsCreate(), event.IsUpdate():
898-
if svcID != "" {
932+
if prev != nil {
933+
if epRec != nil && prev.EquivalentTo(epRec) {
934+
// Avoid flapping if we would otherwise remove a service
935+
// binding then immediately replace it with an equivalent one.
936+
return
937+
}
938+
939+
if prev.ServiceID != "" {
899940
// This is a remote task part of a service
900-
if epRec.ServiceDisabled {
901-
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
902-
logger.WithError(err).Error("failed disabling service binding")
903-
return
904-
}
905-
} else {
906-
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
907-
logger.WithError(err).Error("failed adding service binding")
908-
return
941+
if !prev.ServiceDisabled {
942+
err := c.rmServiceBinding(prev.ServiceName, prev.ServiceID, nid, eid,
943+
prev.Name, prev.VirtualIP.AsSlice(), prev.IngressPorts,
944+
prev.Aliases, prev.TaskAliases, prev.EndpointIP.AsSlice(),
945+
"handleEpTableEvent", true, true)
946+
if err != nil {
947+
logger.WithError(err).Error("failed removing service binding")
909948
}
910949
}
911950
} else {
912951
// This is a remote container simply attached to an attachable network
913-
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
914-
logger.WithError(err).Errorf("failed adding container name resolution")
952+
err := c.delContainerNameResolution(nid, eid, prev.Name, prev.TaskAliases,
953+
prev.EndpointIP.AsSlice(), "handleEpTableEvent")
954+
if err != nil {
955+
logger.WithError(err).Errorf("failed removing container name resolution")
915956
}
916957
}
958+
}
917959

918-
case event.IsDelete():
919-
if svcID != "" {
960+
if epRec != nil {
961+
if epRec.ServiceID != "" {
920962
// This is a remote task part of a service
921-
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
922-
logger.WithError(err).Error("failed removing service binding")
923-
return
963+
if epRec.ServiceDisabled {
964+
// Don't double-remove a service binding
965+
if prev == nil || prev.ServiceID != epRec.ServiceID || !prev.ServiceDisabled {
966+
err := c.rmServiceBinding(epRec.ServiceName, epRec.ServiceID,
967+
nid, eid, epRec.Name, epRec.VirtualIP.AsSlice(),
968+
epRec.IngressPorts, epRec.Aliases, epRec.TaskAliases,
969+
epRec.EndpointIP.AsSlice(), "handleEpTableEvent", true, false)
970+
if err != nil {
971+
logger.WithError(err).Error("failed disabling service binding")
972+
return
973+
}
974+
}
975+
} else {
976+
err := c.addServiceBinding(epRec.ServiceName, epRec.ServiceID, nid, eid,
977+
epRec.Name, epRec.VirtualIP.AsSlice(), epRec.IngressPorts,
978+
epRec.Aliases, epRec.TaskAliases, epRec.EndpointIP.AsSlice(),
979+
"handleEpTableEvent")
980+
if err != nil {
981+
logger.WithError(err).Error("failed adding service binding")
982+
return
983+
}
924984
}
925985
} else {
926986
// This is a remote container simply attached to an attachable network
927-
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
928-
logger.WithError(err).Errorf("failed removing container name resolution")
987+
err := c.addContainerNameResolution(nid, eid, epRec.Name, epRec.TaskAliases,
988+
epRec.EndpointIP.AsSlice(), "handleEpTableEvent")
989+
if err != nil {
990+
logger.WithError(err).Errorf("failed adding container name resolution")
929991
}
930992
}
931993
}

libnetwork/agent_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
4+
package libnetwork
5+
6+
import (
7+
"net/netip"
8+
"slices"
9+
"testing"
10+
11+
"gotest.tools/v3/assert"
12+
)
13+
14+
func TestEndpointEvent_EquivalentTo(t *testing.T) {
15+
assert.Check(t, (&endpointEvent{}).EquivalentTo(&endpointEvent{}))
16+
17+
a := endpointEvent{
18+
EndpointRecord: EndpointRecord{
19+
Name: "foo",
20+
ServiceName: "bar",
21+
ServiceID: "baz",
22+
IngressPorts: []*PortConfig{
23+
{
24+
Protocol: ProtocolTCP,
25+
TargetPort: 80,
26+
},
27+
{
28+
Name: "dns",
29+
Protocol: ProtocolUDP,
30+
TargetPort: 5353,
31+
PublishedPort: 53,
32+
},
33+
},
34+
},
35+
VirtualIP: netip.MustParseAddr("10.0.0.42"),
36+
EndpointIP: netip.MustParseAddr("192.168.69.42"),
37+
}
38+
assert.Check(t, a.EquivalentTo(&a))
39+
40+
reflexiveEquiv := func(a, b *endpointEvent) bool {
41+
t.Helper()
42+
assert.Check(t, a.EquivalentTo(b) == b.EquivalentTo(a), "reflexive equivalence")
43+
return a.EquivalentTo(b)
44+
}
45+
46+
b := a
47+
b.ServiceDisabled = true
48+
assert.Check(t, !reflexiveEquiv(&a, &b), "differing by ServiceDisabled")
49+
50+
c := a
51+
c.IngressPorts = slices.Clone(a.IngressPorts)
52+
slices.Reverse(c.IngressPorts)
53+
assert.Check(t, reflexiveEquiv(&a, &c), "IngressPorts order should not matter")
54+
55+
d := a
56+
d.IngressPorts = append(d.IngressPorts, a.IngressPorts[0])
57+
assert.Check(t, !reflexiveEquiv(&a, &d), "Differing number of copies of IngressPort entries should not be equivalent")
58+
d.IngressPorts = a.IngressPorts[:1]
59+
assert.Check(t, !reflexiveEquiv(&a, &d), "Removing an IngressPort entry should not be equivalent")
60+
61+
e := a
62+
e.Aliases = []string{"alias1", "alias2"}
63+
assert.Check(t, !reflexiveEquiv(&a, &e), "Differing Aliases should not be equivalent")
64+
65+
f := a
66+
f.TaskAliases = []string{"taskalias1", "taskalias2"}
67+
assert.Check(t, !reflexiveEquiv(&a, &f), "Adding TaskAliases should not be equivalent")
68+
g := a
69+
g.TaskAliases = []string{"taskalias2", "taskalias1"}
70+
assert.Check(t, reflexiveEquiv(&f, &g), "TaskAliases order should not matter")
71+
g.TaskAliases = g.TaskAliases[:1]
72+
assert.Check(t, !reflexiveEquiv(&f, &g), "Differing number of TaskAliases should not be equivalent")
73+
74+
h := a
75+
h.EndpointIP = netip.MustParseAddr("192.168.69.43")
76+
assert.Check(t, !reflexiveEquiv(&a, &h), "Differing EndpointIP should not be equivalent")
77+
78+
i := a
79+
i.VirtualIP = netip.MustParseAddr("10.0.0.69")
80+
assert.Check(t, !reflexiveEquiv(&a, &i), "Differing VirtualIP should not be equivalent")
81+
82+
j := a
83+
j.ServiceID = "qux"
84+
assert.Check(t, !reflexiveEquiv(&a, &j), "Differing ServiceID should not be equivalent")
85+
86+
k := a
87+
k.ServiceName = "quux"
88+
assert.Check(t, !reflexiveEquiv(&a, &k), "Differing ServiceName should not be equivalent")
89+
90+
l := a
91+
l.Name = "aaaaa"
92+
assert.Check(t, !reflexiveEquiv(&a, &l), "Differing Name should not be equivalent")
93+
}

0 commit comments

Comments
 (0)