1+ // FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
2+ //go:build go1.23
3+
14package libnetwork
25
36//go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto
@@ -7,10 +10,13 @@ import (
710 "encoding/json"
811 "fmt"
912 "net"
13+ "net/netip"
14+ "slices"
1015 "sort"
1116 "sync"
1217
1318 "github.com/containerd/log"
19+ "github.com/docker/docker/internal/iterutil"
1420 "github.com/docker/docker/libnetwork/cluster"
1521 "github.com/docker/docker/libnetwork/discoverapi"
1622 "github.com/docker/docker/libnetwork/driverapi"
@@ -490,17 +496,19 @@ func (n *Network) Services() map[string]ServiceInfo {
490496 // Walk through the driver's tables, have the driver decode the entries
491497 // and return the tuple {ep ID, value}. value is a string that coveys
492498 // relevant info about the endpoint.
493- for _ , table := range n .driverTables {
494- if table .objType != driverapi .EndpointObject {
495- continue
496- }
497- for key , value := range agent .networkDB .GetTableByNetwork (table .name , nwID ) {
498- epID , info := d .DecodeTableEntry (table .name , key , value .Value )
499- if ep , ok := eps [epID ]; ! ok {
500- log .G (context .TODO ()).Errorf ("Inconsistent driver and libnetwork state for endpoint %s" , epID )
501- } else {
502- ep .info = info
503- eps [epID ] = ep
499+ if d , ok := d .(driverapi.TableWatcher ); ok {
500+ for _ , table := range n .driverTables {
501+ if table .objType != driverapi .EndpointObject {
502+ continue
503+ }
504+ for key , value := range agent .networkDB .GetTableByNetwork (table .name , nwID ) {
505+ epID , info := d .DecodeTableEntry (table .name , key , value .Value )
506+ if ep , ok := eps [epID ]; ! ok {
507+ log .G (context .TODO ()).Errorf ("Inconsistent driver and libnetwork state for endpoint %s" , epID )
508+ } else {
509+ ep .info = info
510+ eps [epID ] = ep
511+ }
504512 }
505513 }
506514 }
@@ -813,33 +821,14 @@ func (n *Network) handleDriverTableEvent(ev events.Event) {
813821 log .G (context .TODO ()).Errorf ("Could not resolve driver %s while handling driver table event: %v" , n .networkType , err )
814822 return
815823 }
816-
817- var (
818- etype driverapi.EventType
819- tname string
820- key string
821- value []byte
822- )
823-
824- switch event := ev .(type ) {
825- case networkdb.CreateEvent :
826- tname = event .Table
827- key = event .Key
828- value = event .Value
829- etype = driverapi .Create
830- case networkdb.DeleteEvent :
831- tname = event .Table
832- key = event .Key
833- value = event .Value
834- etype = driverapi .Delete
835- case networkdb.UpdateEvent :
836- tname = event .Table
837- key = event .Key
838- value = event .Value
839- etype = driverapi .Update
824+ ed , ok := d .(driverapi.TableWatcher )
825+ if ! ok {
826+ log .G (context .TODO ()).Errorf ("Could not notify driver %s about table event: driver does not implement TableWatcher interface" , n .networkType )
827+ return
840828 }
841829
842- d .EventNotify (etype , n .ID (), tname , key , value )
830+ event := ev .(networkdb.WatchEvent )
831+ ed .EventNotify (n .ID (), event .Table , event .Key , event .Prev , event .Value )
843832}
844833
845834func (c * Controller ) handleNodeTableEvent (ev events.Event ) {
@@ -848,13 +837,14 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
848837 isAdd bool
849838 nodeAddr networkdb.NodeAddr
850839 )
851- switch event := ev .(type ) {
852- case networkdb.CreateEvent :
840+ event := ev .(networkdb.WatchEvent )
841+ switch {
842+ case event .IsCreate ():
853843 value = event .Value
854844 isAdd = true
855- case networkdb. DeleteEvent :
856- value = event .Value
857- case networkdb. UpdateEvent :
845+ case event . IsDelete () :
846+ value = event .Prev
847+ case event . IsUpdate () :
858848 log .G (context .TODO ()).Errorf ("Unexpected update node table event = %#v" , event )
859849 }
860850
@@ -866,94 +856,138 @@ func (c *Controller) handleNodeTableEvent(ev events.Event) {
866856 c .processNodeDiscovery ([]net.IP {nodeAddr .Addr }, isAdd )
867857}
868858
869- func ( c * Controller ) handleEpTableEvent ( ev events. Event ) {
870- var (
871- nid string
872- eid string
873- value [] byte
874- epRec EndpointRecord
875- )
859+ type endpointEvent struct {
860+ EndpointRecord
861+ // Virtual IP of the service to which this endpoint belongs.
862+ VirtualIP netip. Addr
863+ // IP assigned to this endpoint.
864+ EndpointIP netip. Addr
865+ }
876866
877- switch event := ev .(type ) {
878- case networkdb.CreateEvent :
879- nid = event .NetworkID
880- eid = event .Key
881- value = event .Value
882- case networkdb.DeleteEvent :
883- nid = event .NetworkID
884- eid = event .Key
885- value = event .Value
886- case networkdb.UpdateEvent :
887- nid = event .NetworkID
888- eid = event .Key
889- value = event .Value
890- default :
891- log .G (context .TODO ()).Errorf ("Unexpected update service table event = %#v" , event )
892- return
867+ func unmarshalEndpointRecord (data []byte ) (* endpointEvent , error ) {
868+ var epRec EndpointRecord
869+ if err := proto .Unmarshal (data , & epRec ); err != nil {
870+ return nil , fmt .Errorf ("failed to unmarshal endpoint record: %w" , err )
893871 }
894872
895- err := proto .Unmarshal (value , & epRec )
896- if err != nil {
897- log .G (context .TODO ()).WithError (err ).Error ("Failed to unmarshal service table value" )
898- return
873+ vip , _ := netip .ParseAddr (epRec .VirtualIP )
874+ eip , _ := netip .ParseAddr (epRec .EndpointIP )
875+
876+ if epRec .Name == "" || ! eip .IsValid () {
877+ return nil , fmt .Errorf ("invalid endpoint name/ip in service table event %s" , data )
899878 }
900879
901- containerName := epRec .Name
902- svcName := epRec .ServiceName
903- svcID := epRec .ServiceID
904- vip := net .ParseIP (epRec .VirtualIP )
905- ip := net .ParseIP (epRec .EndpointIP )
906- ingressPorts := epRec .IngressPorts
907- serviceAliases := epRec .Aliases
908- taskAliases := epRec .TaskAliases
880+ return & endpointEvent {
881+ EndpointRecord : epRec ,
882+ VirtualIP : vip ,
883+ EndpointIP : eip ,
884+ }, nil
885+ }
909886
910- logger := log .G (context .TODO ()).WithFields (log.Fields {
911- "nid" : nid ,
912- "eid" : eid ,
913- "T" : fmt .Sprintf ("%T" , ev ),
914- "R" : epRec ,
915- })
887+ // EquivalentTo returns true if ev is semantically equivalent to other.
888+ func (ev * endpointEvent ) EquivalentTo (other * endpointEvent ) bool {
889+ return ev .Name == other .Name &&
890+ ev .ServiceName == other .ServiceName &&
891+ ev .ServiceID == other .ServiceID &&
892+ ev .VirtualIP == other .VirtualIP &&
893+ ev .EndpointIP == other .EndpointIP &&
894+ ev .ServiceDisabled == other .ServiceDisabled &&
895+ iterutil .SameValues (
896+ iterutil .Deref (slices .Values (ev .IngressPorts )),
897+ iterutil .Deref (slices .Values (other .IngressPorts ))) &&
898+ iterutil .SameValues (slices .Values (ev .Aliases ), slices .Values (other .Aliases )) &&
899+ iterutil .SameValues (slices .Values (ev .TaskAliases ), slices .Values (other .TaskAliases ))
900+ }
916901
917- if containerName == "" || ip == nil {
918- logger .Errorf ("Invalid endpoint name/ip received while handling service table event %s" , value )
919- return
902+ func (c * Controller ) handleEpTableEvent (ev events.Event ) {
903+ event := ev .(networkdb.WatchEvent )
904+ nid := event .NetworkID
905+ eid := event .Key
906+
907+ var prev , epRec * endpointEvent
908+ if event .Prev != nil {
909+ var err error
910+ prev , err = unmarshalEndpointRecord (event .Prev )
911+ if err != nil {
912+ log .G (context .TODO ()).WithError (err ).Error ("error unmarshaling previous value from service table event" )
913+ return
914+ }
915+ }
916+ if event .Value != nil {
917+ var err error
918+ epRec , err = unmarshalEndpointRecord (event .Value )
919+ if err != nil {
920+ log .G (context .TODO ()).WithError (err ).Error ("error unmarshaling service table event" )
921+ return
922+ }
920923 }
921924
925+ logger := log .G (context .TODO ()).WithFields (log.Fields {
926+ "evt" : event ,
927+ "R" : epRec ,
928+ "prev" : prev ,
929+ })
922930 logger .Debug ("handleEpTableEvent" )
923931
924- switch ev .(type ) {
925- case networkdb.CreateEvent , networkdb.UpdateEvent :
926- if svcID != "" {
932+ if prev != nil {
933+ if epRec != nil && prev .EquivalentTo (epRec ) {
934+ // Avoid flapping if we would otherwise remove a service
935+ // binding then immediately replace it with an equivalent one.
936+ return
937+ }
938+
939+ if prev .ServiceID != "" {
927940 // This is a remote task part of a service
928- if epRec .ServiceDisabled {
929- if err := c .rmServiceBinding (svcName , svcID , nid , eid , containerName , vip , ingressPorts , serviceAliases , taskAliases , ip , "handleEpTableEvent" , true , false ); err != nil {
930- logger .WithError (err ).Error ("failed disabling service binding" )
931- return
932- }
933- } else {
934- if err := c .addServiceBinding (svcName , svcID , nid , eid , containerName , vip , ingressPorts , serviceAliases , taskAliases , ip , "handleEpTableEvent" ); err != nil {
935- logger .WithError (err ).Error ("failed adding service binding" )
936- return
941+ if ! prev .ServiceDisabled {
942+ err := c .rmServiceBinding (prev .ServiceName , prev .ServiceID , nid , eid ,
943+ prev .Name , prev .VirtualIP .AsSlice (), prev .IngressPorts ,
944+ prev .Aliases , prev .TaskAliases , prev .EndpointIP .AsSlice (),
945+ "handleEpTableEvent" , true , true )
946+ if err != nil {
947+ logger .WithError (err ).Error ("failed removing service binding" )
937948 }
938949 }
939950 } else {
940951 // This is a remote container simply attached to an attachable network
941- if err := c .addContainerNameResolution (nid , eid , containerName , taskAliases , ip , "handleEpTableEvent" ); err != nil {
942- logger .WithError (err ).Errorf ("failed adding container name resolution" )
952+ err := c .delContainerNameResolution (nid , eid , prev .Name , prev .TaskAliases ,
953+ prev .EndpointIP .AsSlice (), "handleEpTableEvent" )
954+ if err != nil {
955+ logger .WithError (err ).Errorf ("failed removing container name resolution" )
943956 }
944957 }
958+ }
945959
946- case networkdb. DeleteEvent :
947- if svcID != "" {
960+ if epRec != nil {
961+ if epRec . ServiceID != "" {
948962 // This is a remote task part of a service
949- if err := c .rmServiceBinding (svcName , svcID , nid , eid , containerName , vip , ingressPorts , serviceAliases , taskAliases , ip , "handleEpTableEvent" , true , true ); err != nil {
950- logger .WithError (err ).Error ("failed removing service binding" )
951- return
963+ if epRec .ServiceDisabled {
964+ // Don't double-remove a service binding
965+ if prev == nil || prev .ServiceID != epRec .ServiceID || ! prev .ServiceDisabled {
966+ err := c .rmServiceBinding (epRec .ServiceName , epRec .ServiceID ,
967+ nid , eid , epRec .Name , epRec .VirtualIP .AsSlice (),
968+ epRec .IngressPorts , epRec .Aliases , epRec .TaskAliases ,
969+ epRec .EndpointIP .AsSlice (), "handleEpTableEvent" , true , false )
970+ if err != nil {
971+ logger .WithError (err ).Error ("failed disabling service binding" )
972+ return
973+ }
974+ }
975+ } else {
976+ err := c .addServiceBinding (epRec .ServiceName , epRec .ServiceID , nid , eid ,
977+ epRec .Name , epRec .VirtualIP .AsSlice (), epRec .IngressPorts ,
978+ epRec .Aliases , epRec .TaskAliases , epRec .EndpointIP .AsSlice (),
979+ "handleEpTableEvent" )
980+ if err != nil {
981+ logger .WithError (err ).Error ("failed adding service binding" )
982+ return
983+ }
952984 }
953985 } else {
954986 // This is a remote container simply attached to an attachable network
955- if err := c .delContainerNameResolution (nid , eid , containerName , taskAliases , ip , "handleEpTableEvent" ); err != nil {
956- logger .WithError (err ).Errorf ("failed removing container name resolution" )
987+ err := c .addContainerNameResolution (nid , eid , epRec .Name , epRec .TaskAliases ,
988+ epRec .EndpointIP .AsSlice (), "handleEpTableEvent" )
989+ if err != nil {
990+ logger .WithError (err ).Errorf ("failed adding container name resolution" )
957991 }
958992 }
959993 }
0 commit comments