Skip to content

Commit 165516e

Browse files
authored
Merge pull request #50551 from corhere/backport-25.0/libn/all-the-overlay-fixes
[25.0] libnetwork/overlay: backport all the fixes
2 parents c447682 + f099e91 commit 165516e

50 files changed

Lines changed: 1322 additions & 1698 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

internal/iterutil/iterutil.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
4+
package iterutil
5+
6+
import (
7+
"iter"
8+
"maps"
9+
)
10+
11+
// SameValues checks if a and b yield the same values, independent of order.
12+
func SameValues[T comparable](a, b iter.Seq[T]) bool {
13+
m, n := make(map[T]int), make(map[T]int)
14+
for v := range a {
15+
m[v]++
16+
}
17+
for v := range b {
18+
n[v]++
19+
}
20+
return maps.Equal(m, n)
21+
}
22+
23+
// Deref adapts an iterator of pointers to an iterator of values.
24+
func Deref[T any, P *T](s iter.Seq[P]) iter.Seq[T] {
25+
return func(yield func(T) bool) {
26+
for p := range s {
27+
if !yield(*p) {
28+
return
29+
}
30+
}
31+
}
32+
}

internal/iterutil/iterutil_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
4+
package iterutil
5+
6+
import (
7+
"slices"
8+
"testing"
9+
10+
"gotest.tools/v3/assert"
11+
)
12+
13+
func TestSameValues(t *testing.T) {
14+
a := []int{1, 2, 3, 4, 3}
15+
b := []int{3, 4, 3, 2, 1}
16+
c := []int{1, 2, 3, 4}
17+
18+
assert.Check(t, SameValues(slices.Values(a), slices.Values(a)))
19+
assert.Check(t, SameValues(slices.Values(c), slices.Values(c)))
20+
assert.Check(t, SameValues(slices.Values(a), slices.Values(b)))
21+
assert.Check(t, !SameValues(slices.Values(a), slices.Values(c)))
22+
}
23+
24+
func TestDeref(t *testing.T) {
25+
a := make([]*int, 3)
26+
for i := range a {
27+
a[i] = &i
28+
}
29+
b := slices.Collect(Deref(slices.Values(a)))
30+
assert.DeepEqual(t, b, []int{0, 1, 2})
31+
}

libnetwork/agent.go

Lines changed: 140 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+
//go:build go1.23
3+
14
package libnetwork
25

36
//go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto
@@ -7,10 +10,13 @@ import (
710
"encoding/json"
811
"fmt"
912
"net"
13+
"net/netip"
14+
"slices"
1015
"sort"
1116
"sync"
1217

1318
"github.com/containerd/log"
19+
"github.com/docker/docker/internal/iterutil"
1420
"github.com/docker/docker/libnetwork/cluster"
1521
"github.com/docker/docker/libnetwork/discoverapi"
1622
"github.com/docker/docker/libnetwork/driverapi"
@@ -490,17 +496,19 @@ func (n *Network) Services() map[string]ServiceInfo {
490496
// Walk through the driver's tables, have the driver decode the entries
491497
// and return the tuple {ep ID, value}. value is a string that coveys
492498
// relevant info about the endpoint.
493-
for _, table := range n.driverTables {
494-
if table.objType != driverapi.EndpointObject {
495-
continue
496-
}
497-
for key, value := range agent.networkDB.GetTableByNetwork(table.name, nwID) {
498-
epID, info := d.DecodeTableEntry(table.name, key, value.Value)
499-
if ep, ok := eps[epID]; !ok {
500-
log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
501-
} else {
502-
ep.info = info
503-
eps[epID] = ep
499+
if d, ok := d.(driverapi.TableWatcher); ok {
500+
for _, table := range n.driverTables {
501+
if table.objType != driverapi.EndpointObject {
502+
continue
503+
}
504+
for key, value := range agent.networkDB.GetTableByNetwork(table.name, nwID) {
505+
epID, info := d.DecodeTableEntry(table.name, key, value.Value)
506+
if ep, ok := eps[epID]; !ok {
507+
log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
508+
} else {
509+
ep.info = info
510+
eps[epID] = ep
511+
}
504512
}
505513
}
506514
}
@@ -813,33 +821,14 @@ func (n *Network) handleDriverTableEvent(ev events.Event) {
813821
log.G(context.TODO()).Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
814822
return
815823
}
816-
817-
var (
818-
etype driverapi.EventType
819-
tname string
820-
key string
821-
value []byte
822-
)
823-
824-
switch event := ev.(type) {
825-
case networkdb.CreateEvent:
826-
tname = event.Table
827-
key = event.Key
828-
value = event.Value
829-
etype = driverapi.Create
830-
case networkdb.DeleteEvent:
831-
tname = event.Table
832-
key = event.Key
833-
value = event.Value
834-
etype = driverapi.Delete
835-
case networkdb.UpdateEvent:
836-
tname = event.Table
837-
key = event.Key
838-
value = event.Value
839-
etype = driverapi.Update
824+
ed, ok := d.(driverapi.TableWatcher)
825+
if !ok {
826+
log.G(context.TODO()).Errorf("Could not notify driver %s about table event: driver does not implement TableWatcher interface", n.networkType)
827+
return
840828
}
841829

842-
d.EventNotify(etype, n.ID(), tname, key, value)
830+
event := ev.(networkdb.WatchEvent)
831+
ed.EventNotify(n.ID(), event.Table, event.Key, event.Prev, event.Value)
843832
}
844833

845834
func (c *Controller) handleNodeTableEvent(ev events.Event) {
@@ -848,13 +837,14 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
848837
isAdd bool
849838
nodeAddr networkdb.NodeAddr
850839
)
851-
switch event := ev.(type) {
852-
case networkdb.CreateEvent:
840+
event := ev.(networkdb.WatchEvent)
841+
switch {
842+
case event.IsCreate():
853843
value = event.Value
854844
isAdd = true
855-
case networkdb.DeleteEvent:
856-
value = event.Value
857-
case networkdb.UpdateEvent:
845+
case event.IsDelete():
846+
value = event.Prev
847+
case event.IsUpdate():
858848
log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
859849
}
860850

@@ -866,94 +856,138 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
866856
c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
867857
}
868858

869-
func (c *Controller) handleEpTableEvent(ev events.Event) {
870-
var (
871-
nid string
872-
eid string
873-
value []byte
874-
epRec EndpointRecord
875-
)
859+
type endpointEvent struct {
860+
EndpointRecord
861+
// Virtual IP of the service to which this endpoint belongs.
862+
VirtualIP netip.Addr
863+
// IP assigned to this endpoint.
864+
EndpointIP netip.Addr
865+
}
876866

877-
switch event := ev.(type) {
878-
case networkdb.CreateEvent:
879-
nid = event.NetworkID
880-
eid = event.Key
881-
value = event.Value
882-
case networkdb.DeleteEvent:
883-
nid = event.NetworkID
884-
eid = event.Key
885-
value = event.Value
886-
case networkdb.UpdateEvent:
887-
nid = event.NetworkID
888-
eid = event.Key
889-
value = event.Value
890-
default:
891-
log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
892-
return
867+
func unmarshalEndpointRecord(data []byte) (*endpointEvent, error) {
868+
var epRec EndpointRecord
869+
if err := proto.Unmarshal(data, &epRec); err != nil {
870+
return nil, fmt.Errorf("failed to unmarshal endpoint record: %w", err)
893871
}
894872

895-
err := proto.Unmarshal(value, &epRec)
896-
if err != nil {
897-
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal service table value")
898-
return
873+
vip, _ := netip.ParseAddr(epRec.VirtualIP)
874+
eip, _ := netip.ParseAddr(epRec.EndpointIP)
875+
876+
if epRec.Name == "" || !eip.IsValid() {
877+
return nil, fmt.Errorf("invalid endpoint name/ip in service table event %s", data)
899878
}
900879

901-
containerName := epRec.Name
902-
svcName := epRec.ServiceName
903-
svcID := epRec.ServiceID
904-
vip := net.ParseIP(epRec.VirtualIP)
905-
ip := net.ParseIP(epRec.EndpointIP)
906-
ingressPorts := epRec.IngressPorts
907-
serviceAliases := epRec.Aliases
908-
taskAliases := epRec.TaskAliases
880+
return &endpointEvent{
881+
EndpointRecord: epRec,
882+
VirtualIP: vip,
883+
EndpointIP: eip,
884+
}, nil
885+
}
909886

910-
logger := log.G(context.TODO()).WithFields(log.Fields{
911-
"nid": nid,
912-
"eid": eid,
913-
"T": fmt.Sprintf("%T", ev),
914-
"R": epRec,
915-
})
887+
// EquivalentTo returns true if ev is semantically equivalent to other.
888+
func (ev *endpointEvent) EquivalentTo(other *endpointEvent) bool {
889+
return ev.Name == other.Name &&
890+
ev.ServiceName == other.ServiceName &&
891+
ev.ServiceID == other.ServiceID &&
892+
ev.VirtualIP == other.VirtualIP &&
893+
ev.EndpointIP == other.EndpointIP &&
894+
ev.ServiceDisabled == other.ServiceDisabled &&
895+
iterutil.SameValues(
896+
iterutil.Deref(slices.Values(ev.IngressPorts)),
897+
iterutil.Deref(slices.Values(other.IngressPorts))) &&
898+
iterutil.SameValues(slices.Values(ev.Aliases), slices.Values(other.Aliases)) &&
899+
iterutil.SameValues(slices.Values(ev.TaskAliases), slices.Values(other.TaskAliases))
900+
}
916901

917-
if containerName == "" || ip == nil {
918-
logger.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
919-
return
902+
func (c *Controller) handleEpTableEvent(ev events.Event) {
903+
event := ev.(networkdb.WatchEvent)
904+
nid := event.NetworkID
905+
eid := event.Key
906+
907+
var prev, epRec *endpointEvent
908+
if event.Prev != nil {
909+
var err error
910+
prev, err = unmarshalEndpointRecord(event.Prev)
911+
if err != nil {
912+
log.G(context.TODO()).WithError(err).Error("error unmarshaling previous value from service table event")
913+
return
914+
}
915+
}
916+
if event.Value != nil {
917+
var err error
918+
epRec, err = unmarshalEndpointRecord(event.Value)
919+
if err != nil {
920+
log.G(context.TODO()).WithError(err).Error("error unmarshaling service table event")
921+
return
922+
}
920923
}
921924

925+
logger := log.G(context.TODO()).WithFields(log.Fields{
926+
"evt": event,
927+
"R": epRec,
928+
"prev": prev,
929+
})
922930
logger.Debug("handleEpTableEvent")
923931

924-
switch ev.(type) {
925-
case networkdb.CreateEvent, networkdb.UpdateEvent:
926-
if svcID != "" {
932+
if prev != nil {
933+
if epRec != nil && prev.EquivalentTo(epRec) {
934+
// Avoid flapping if we would otherwise remove a service
935+
// binding then immediately replace it with an equivalent one.
936+
return
937+
}
938+
939+
if prev.ServiceID != "" {
927940
// This is a remote task part of a service
928-
if epRec.ServiceDisabled {
929-
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
930-
logger.WithError(err).Error("failed disabling service binding")
931-
return
932-
}
933-
} else {
934-
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
935-
logger.WithError(err).Error("failed adding service binding")
936-
return
941+
if !prev.ServiceDisabled {
942+
err := c.rmServiceBinding(prev.ServiceName, prev.ServiceID, nid, eid,
943+
prev.Name, prev.VirtualIP.AsSlice(), prev.IngressPorts,
944+
prev.Aliases, prev.TaskAliases, prev.EndpointIP.AsSlice(),
945+
"handleEpTableEvent", true, true)
946+
if err != nil {
947+
logger.WithError(err).Error("failed removing service binding")
937948
}
938949
}
939950
} else {
940951
// This is a remote container simply attached to an attachable network
941-
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
942-
logger.WithError(err).Errorf("failed adding container name resolution")
952+
err := c.delContainerNameResolution(nid, eid, prev.Name, prev.TaskAliases,
953+
prev.EndpointIP.AsSlice(), "handleEpTableEvent")
954+
if err != nil {
955+
logger.WithError(err).Errorf("failed removing container name resolution")
943956
}
944957
}
958+
}
945959

946-
case networkdb.DeleteEvent:
947-
if svcID != "" {
960+
if epRec != nil {
961+
if epRec.ServiceID != "" {
948962
// This is a remote task part of a service
949-
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
950-
logger.WithError(err).Error("failed removing service binding")
951-
return
963+
if epRec.ServiceDisabled {
964+
// Don't double-remove a service binding
965+
if prev == nil || prev.ServiceID != epRec.ServiceID || !prev.ServiceDisabled {
966+
err := c.rmServiceBinding(epRec.ServiceName, epRec.ServiceID,
967+
nid, eid, epRec.Name, epRec.VirtualIP.AsSlice(),
968+
epRec.IngressPorts, epRec.Aliases, epRec.TaskAliases,
969+
epRec.EndpointIP.AsSlice(), "handleEpTableEvent", true, false)
970+
if err != nil {
971+
logger.WithError(err).Error("failed disabling service binding")
972+
return
973+
}
974+
}
975+
} else {
976+
err := c.addServiceBinding(epRec.ServiceName, epRec.ServiceID, nid, eid,
977+
epRec.Name, epRec.VirtualIP.AsSlice(), epRec.IngressPorts,
978+
epRec.Aliases, epRec.TaskAliases, epRec.EndpointIP.AsSlice(),
979+
"handleEpTableEvent")
980+
if err != nil {
981+
logger.WithError(err).Error("failed adding service binding")
982+
return
983+
}
952984
}
953985
} else {
954986
// This is a remote container simply attached to an attachable network
955-
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
956-
logger.WithError(err).Errorf("failed removing container name resolution")
987+
err := c.addContainerNameResolution(nid, eid, epRec.Name, epRec.TaskAliases,
988+
epRec.EndpointIP.AsSlice(), "handleEpTableEvent")
989+
if err != nil {
990+
logger.WithError(err).Errorf("failed adding container name resolution")
957991
}
958992
}
959993
}

0 commit comments

Comments
 (0)