Skip to content

Commit e1a586a

Browse files
committed
libnetwork/d/overlay: handle coalesced peer updates
The eventually-consistent nature of NetworkDB means we cannot depend on events being received in the same order that they were sent. Nor can we depend on receiving events for all intermediate states. It is possible for a series of entry UPDATEs, or a DELETE followed by a CREATE with the same key, to get coalesced into a single UPDATE event on the receiving node. Watchers of NetworkDB tables therefore need to be prepared to gracefully handle arbitrary UPDATEs of a key, including those where the new value may have nothing in common with the previous value. The overlay driver naively handled events for overlay_peer_table assuming that an endpoint leave followed by a rejoin of the same endpoint would always be expressed as a DELETE event followed by a CREATE. It would handle a coalesced UPDATE as a CREATE, inserting a new entry into peerDB without removing the old one. This would have various side effects, such as having the "transient state" of multiple entries in peerDB with the same peer IP never settle. Update driverapi to pass both the previous and new value of a table entry into the driver. Modify the overlay driver to handle an UPDATE by removing the previous peer entry from peerDB then adding the new one. Modify the Windows overlay driver to match. Signed-off-by: Cory Snider <[email protected]>
1 parent 8340e10 commit e1a586a

6 files changed

Lines changed: 132 additions & 107 deletions

File tree

daemon/libnetwork/agent.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -822,25 +822,8 @@ func (n *Network) handleDriverTableEvent(ev events.Event) {
822822
return
823823
}
824824

825-
var (
826-
etype driverapi.EventType
827-
value []byte
828-
)
829-
830825
event := ev.(networkdb.WatchEvent)
831-
switch {
832-
case event.IsCreate():
833-
value = event.Value
834-
etype = driverapi.Create
835-
case event.IsDelete():
836-
value = event.Prev
837-
etype = driverapi.Delete
838-
case event.IsUpdate():
839-
value = event.Value
840-
etype = driverapi.Update
841-
}
842-
843-
ed.EventNotify(etype, n.ID(), event.Table, event.Key, value)
826+
ed.EventNotify(n.ID(), event.Table, event.Key, event.Prev, event.Value)
844827
}
845828

846829
func (c *Controller) handleNodeTableEvent(ev events.Event) {

daemon/libnetwork/driverapi/driverapi.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ type TableWatcher interface {
6969
// happened on a table of its interest as soon as this node
7070
// receives such an event in the gossip layer. This method is
7171
// only invoked for the global scope driver.
72-
EventNotify(event EventType, nid string, tableName string, key string, value []byte)
72+
EventNotify(nid string, tableName string, key string, prev, value []byte)
7373

7474
// DecodeTableEntry passes the driver a key, value pair from table it registered
7575
// with libnetwork. Driver should return {object ID, map[string]string} tuple.
@@ -204,18 +204,6 @@ type IPAMData struct {
204204
AuxAddresses map[string]*net.IPNet
205205
}
206206

207-
// EventType defines a type for the CRUD event
208-
type EventType uint8
209-
210-
const (
211-
// Create event is generated when a table entry is created,
212-
Create EventType = 1 + iota
213-
// Update event is generated when a table entry is updated.
214-
Update
215-
// Delete event is generated when a table entry is deleted.
216-
Delete
217-
)
218-
219207
// ObjectType represents the type of object driver wants to store in libnetwork's networkDB
220208
type ObjectType int
221209

daemon/libnetwork/drivers/overlay/joinleave.go

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/containerd/log"
1313
"github.com/docker/docker/daemon/libnetwork/driverapi"
14-
"github.com/docker/docker/daemon/libnetwork/internal/hashable"
1514
"github.com/docker/docker/daemon/libnetwork/internal/netiputil"
1615
"github.com/docker/docker/daemon/libnetwork/netlabel"
1716
"github.com/docker/docker/daemon/libnetwork/ns"
@@ -161,41 +160,45 @@ func (d *driver) DecodeTableEntry(tablename string, key string, value []byte) (s
161160
}
162161
}
163162

164-
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
163+
func (d *driver) EventNotify(nid, tableName, key string, prev, value []byte) {
165164
if tableName != OverlayPeerTable {
166165
log.G(context.TODO()).Errorf("Unexpected table notification for table %s received", tableName)
167166
return
168167
}
169168

170169
eid := key
171170

172-
var peer PeerRecord
173-
if err := proto.Unmarshal(value, &peer); err != nil {
174-
log.G(context.TODO()).Errorf("Failed to unmarshal peer record: %v", err)
175-
return
176-
}
177-
178-
// Ignore local peers. We already know about them and they
179-
// should not be added to vxlan fdb.
180-
if addr, _ := netip.ParseAddr(peer.TunnelEndpointIP); addr == d.advertiseAddress {
181-
return
171+
var prevPeer, newPeer *Peer
172+
if prev != nil {
173+
var err error
174+
prevPeer, err = UnmarshalPeerRecord(prev)
175+
if err != nil {
176+
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal previous peer record")
177+
}
178+
if prevPeer.TunnelEndpointIP == d.advertiseAddress {
179+
// Ignore local peers. We don't add them to the VXLAN
180+
// FDB so don't need to remove them.
181+
prevPeer = nil
182+
}
182183
}
183-
184-
addr, err := netip.ParsePrefix(peer.EndpointIP)
185-
if err != nil {
186-
log.G(context.TODO()).WithError(err).Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP)
187-
return
184+
if value != nil {
185+
var err error
186+
newPeer, err = UnmarshalPeerRecord(value)
187+
if err != nil {
188+
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal peer record")
189+
}
190+
if newPeer.TunnelEndpointIP == d.advertiseAddress {
191+
newPeer = nil
192+
}
188193
}
189194

190-
mac, err := hashable.ParseMAC(peer.EndpointMAC)
191-
if err != nil {
192-
log.G(context.TODO()).WithError(err).Errorf("Invalid mac %s received in event notify", peer.EndpointMAC)
195+
if prevPeer == nil && newPeer == nil {
196+
// Nothing to do! Either the event was for a local peer,
197+
// or unmarshaling failed.
193198
return
194199
}
195-
196-
vtep, err := netip.ParseAddr(peer.TunnelEndpointIP)
197-
if err != nil {
198-
log.G(context.TODO()).WithError(err).Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP)
200+
if prevPeer != nil && newPeer != nil && *prevPeer == *newPeer {
201+
// The update did not materially change the FDB entry.
199202
return
200203
}
201204

@@ -209,21 +212,23 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
209212
}
210213
defer unlock()
211214

212-
var opname string
213-
if etype == driverapi.Delete {
214-
opname = "delete"
215-
err = n.peerDelete(eid, addr, mac, vtep)
216-
} else {
217-
opname = "add"
218-
err = n.peerAdd(eid, addr, mac, vtep)
215+
if prevPeer != nil {
216+
if err := n.peerDelete(eid, prevPeer.EndpointIP, prevPeer.EndpointMAC, prevPeer.TunnelEndpointIP); err != nil {
217+
log.G(context.TODO()).WithFields(log.Fields{
218+
"error": err,
219+
"nid": n.id,
220+
"peer": prevPeer,
221+
}).Warn("overlay: failed to delete peer entry")
222+
}
219223
}
220-
if err != nil {
221-
log.G(context.TODO()).WithFields(log.Fields{
222-
"error": err,
223-
"nid": n.id,
224-
"peer": peer,
225-
"op": opname,
226-
}).Warn("Peer operation failed")
224+
if newPeer != nil {
225+
if err := n.peerAdd(eid, newPeer.EndpointIP, newPeer.EndpointMAC, newPeer.TunnelEndpointIP); err != nil {
226+
log.G(context.TODO()).WithFields(log.Fields{
227+
"error": err,
228+
"nid": n.id,
229+
"peer": newPeer,
230+
}).Warn("overlay: failed to add peer entry")
231+
}
227232
}
228233
}
229234

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,42 @@
11
package overlay
22

3+
import (
4+
"fmt"
5+
"net/netip"
6+
7+
"github.com/docker/docker/daemon/libnetwork/internal/hashable"
8+
"github.com/gogo/protobuf/proto"
9+
)
10+
311
// OverlayPeerTable is the NetworkDB table for overlay network peer discovery.
412
const OverlayPeerTable = "overlay_peer_table"
13+
14+
type Peer struct {
15+
EndpointIP netip.Prefix
16+
EndpointMAC hashable.MACAddr
17+
TunnelEndpointIP netip.Addr
18+
}
19+
20+
func UnmarshalPeerRecord(data []byte) (*Peer, error) {
21+
var pr PeerRecord
22+
if err := proto.Unmarshal(data, &pr); err != nil {
23+
return nil, fmt.Errorf("failed to unmarshal peer record: %w", err)
24+
}
25+
var (
26+
p Peer
27+
err error
28+
)
29+
p.EndpointIP, err = netip.ParsePrefix(pr.EndpointIP)
30+
if err != nil {
31+
return nil, fmt.Errorf("invalid peer IP %q received: %w", pr.EndpointIP, err)
32+
}
33+
p.EndpointMAC, err = hashable.ParseMAC(pr.EndpointMAC)
34+
if err != nil {
35+
return nil, fmt.Errorf("invalid MAC %q received: %w", pr.EndpointMAC, err)
36+
}
37+
p.TunnelEndpointIP, err = netip.ParseAddr(pr.TunnelEndpointIP)
38+
if err != nil {
39+
return nil, fmt.Errorf("invalid VTEP %q received: %w", pr.TunnelEndpointIP, err)
40+
}
41+
return &p, nil
42+
}

daemon/libnetwork/drivers/windows/overlay/joinleave_windows.go

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@ package overlay
33
import (
44
"context"
55
"fmt"
6-
"net"
76

87
"github.com/containerd/log"
98
"github.com/docker/docker/daemon/libnetwork/driverapi"
109
"github.com/docker/docker/daemon/libnetwork/drivers/overlay"
11-
"github.com/docker/docker/daemon/libnetwork/types"
1210
"github.com/gogo/protobuf/proto"
1311
"go.opentelemetry.io/otel"
1412
"go.opentelemetry.io/otel/attribute"
@@ -57,57 +55,70 @@ func (d *driver) Join(ctx context.Context, nid, eid string, sboxKey string, jinf
5755
return nil
5856
}
5957

60-
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
58+
func (d *driver) EventNotify(nid, tableName, key string, prev, value []byte) {
6159
if tableName != overlay.OverlayPeerTable {
6260
log.G(context.TODO()).Errorf("Unexpected table notification for table %s received", tableName)
6361
return
6462
}
6563

6664
eid := key
6765

68-
var peer overlay.PeerRecord
69-
if err := proto.Unmarshal(value, &peer); err != nil {
70-
log.G(context.TODO()).Errorf("Failed to unmarshal peer record: %v", err)
71-
return
72-
}
73-
7466
n := d.network(nid)
7567
if n == nil {
7668
return
7769
}
7870

79-
// Ignore local peers. We already know about them and they
80-
// should not be added to vxlan fdb.
81-
if peer.TunnelEndpointIP == n.providerAddress {
82-
return
83-
}
84-
85-
addr, err := types.ParseCIDR(peer.EndpointIP)
86-
if err != nil {
87-
log.G(context.TODO()).Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP)
88-
return
89-
}
90-
91-
mac, err := net.ParseMAC(peer.EndpointMAC)
92-
if err != nil {
93-
log.G(context.TODO()).Errorf("Invalid mac %s received in event notify", peer.EndpointMAC)
94-
return
95-
}
96-
97-
vtep := net.ParseIP(peer.TunnelEndpointIP)
98-
if vtep == nil {
99-
log.G(context.TODO()).Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP)
71+
var prevPeer, newPeer *overlay.Peer
72+
if prev != nil {
73+
var err error
74+
prevPeer, err = overlay.UnmarshalPeerRecord(prev)
75+
if err != nil {
76+
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal previous peer record")
77+
}
78+
if prevPeer.TunnelEndpointIP.String() == n.providerAddress {
79+
// Ignore local peers. We don't add them to the VXLAN
80+
// FDB so don't need to remove them.
81+
prevPeer = nil
82+
}
83+
}
84+
if value != nil {
85+
var err error
86+
newPeer, err = overlay.UnmarshalPeerRecord(value)
87+
if err != nil {
88+
log.G(context.TODO()).WithError(err).Error("Failed to unmarshal peer record")
89+
}
90+
if prevPeer.TunnelEndpointIP.String() == n.providerAddress {
91+
newPeer = nil
92+
}
93+
}
94+
95+
if prevPeer == nil && newPeer == nil {
96+
// Nothing to do! Either the event was for a local peer,
97+
// or unmarshaling failed.
10098
return
10199
}
102-
103-
if etype == driverapi.Delete {
104-
d.peerDelete(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
100+
if prevPeer != nil && newPeer != nil && *prevPeer == *newPeer {
101+
// The update did not materially change the FDB entry.
105102
return
106103
}
107104

108-
err = d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
109-
if err != nil {
110-
log.G(context.TODO()).Errorf("peerAdd failed (%v) for ip %s with mac %s", err, addr.IP.String(), mac.String())
105+
if prevPeer != nil {
106+
if err := d.peerDelete(nid, eid, prevPeer.EndpointIP.Addr().AsSlice(), true); err != nil {
107+
log.G(context.TODO()).WithFields(log.Fields{
108+
"error": err,
109+
"nid": n.id,
110+
"peer": prevPeer,
111+
}).Warn("overlay: failed to delete peer entry")
112+
}
113+
}
114+
if newPeer != nil {
115+
if err := d.peerAdd(nid, eid, newPeer.EndpointIP.Addr().AsSlice(), newPeer.EndpointMAC.AsSlice(), newPeer.TunnelEndpointIP.AsSlice(), true); err != nil {
116+
log.G(context.TODO()).WithFields(log.Fields{
117+
"error": err,
118+
"nid": n.id,
119+
"peer": newPeer,
120+
}).Warn("overlay: failed to add peer entry")
121+
}
111122
}
112123
}
113124

daemon/libnetwork/drivers/windows/overlay/peerdb_windows.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/docker/docker/daemon/libnetwork/types"
1313
)
1414

15-
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
15+
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
1616
log.G(context.TODO()).Debugf("WINOVERLAY: Enter peerAdd for ca ip %s with ca mac %s", peerIP.String(), peerMac.String())
1717

1818
if err := validateID(nid, eid); err != nil {
@@ -82,7 +82,7 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
8282
return nil
8383
}
8484

85-
func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
85+
func (d *driver) peerDelete(nid, eid string, peerIP net.IP, updateDb bool) error {
8686
log.G(context.TODO()).Infof("WINOVERLAY: Enter peerDelete for endpoint %s and peer ip %s", eid, peerIP.String())
8787

8888
if err := validateID(nid, eid); err != nil {

0 commit comments

Comments
 (0)