Skip to content

Commit 485331f

Browse files
committed
experimental: Extend LBMaps implementation
Signed-off-by: Jussi Maki <[email protected]>
1 parent acdc24a commit 485331f

10 files changed

Lines changed: 528 additions & 132 deletions

File tree

pkg/loadbalancer/experimental/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ ClusterMesh is implemented by merging the external services and endpoints in Ser
137137
ClusterMesh services and backends are essentially the same as those coming from Kubernetes
138138
and do not require any special handling.
139139

140+
One notable requirement for ClusterMesh is the need to prune non-global services before
141+
ClusterMesh-sourced services are initialized. See cf4279c68202bae83917b65b8e7da21e20869def
142+
for context. Yet unclear how to cleanly implement this. The reconciler currently won't
143+
perform the Prune() operation if there are any pending initializers.
144+
140145
### ServiceCache replacement
141146

142147
ServiceCache in addition to merging Services with Endpoints and forwarding as events to a

pkg/loadbalancer/experimental/bpf_reconciler.go

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"golang.org/x/sys/unix"
1818
"k8s.io/apimachinery/pkg/util/sets"
1919

20-
"github.com/cilium/cilium/pkg/bpf"
2120
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
2221
"github.com/cilium/cilium/pkg/loadbalancer"
2322
"github.com/cilium/cilium/pkg/maps/lbmap"
@@ -69,7 +68,13 @@ type bpfOps struct {
6968
backendIDAlloc idAllocator
7069
restoredBackendIDs sets.Set[loadbalancer.BackendID]
7170

72-
backendStates map[loadbalancer.L3n4Addr]backendState
71+
// backendStates maps from backend address to associated state.
72+
// This is used to track which frontends reference a specific backend
73+
// in order to delete orphaned backeds.
74+
backendStates map[loadbalancer.L3n4Addr]backendState
75+
76+
// backendReferences maps from frontend address to the set of referenced
77+
// backends.
7378
backendReferences map[loadbalancer.L3n4Addr]sets.Set[loadbalancer.L3n4Addr]
7479

7580
// nodePortAddrs are the last used NodePort addresses for a given NodePort
@@ -107,6 +112,9 @@ func (ops *bpfOps) start(_ cell.HookContext) error {
107112
// Restore the ID allocations from the BPF maps in order to reuse
108113
// them and thus avoiding traffic disruptions.
109114
err := ops.lbmaps.DumpService(func(key lbmap.ServiceKey, value lbmap.ServiceValue) {
115+
if key.GetBackendSlot() != 0 {
116+
return
117+
}
110118
id := loadbalancer.ID(value.GetRevNat())
111119
ops.serviceIDAlloc.addID(svcKeyToAddr(key), id)
112120
ops.restoredServiceIDs.Insert(id)
@@ -134,10 +142,10 @@ func svcKeyToAddr(svcKey lbmap.ServiceKey) loadbalancer.L3n4Addr {
134142
}
135143

136144
func beValueToAddr(beValue lbmap.BackendValue) loadbalancer.L3n4Addr {
137-
feIP := beValue.GetAddress()
138-
feAddrCluster := cmtypes.MustAddrClusterFromIP(feIP)
139-
feL3n4Addr := loadbalancer.NewL3n4Addr(loadbalancer.TCP /* FIXME */, feAddrCluster, beValue.GetPort(), 0)
140-
return *feL3n4Addr
145+
beIP := beValue.GetAddress()
146+
beAddrCluster := cmtypes.MustAddrClusterFromIP(beIP)
147+
beL3n4Addr := loadbalancer.NewL3n4Addr(loadbalancer.TCP /* FIXME */, beAddrCluster, beValue.GetPort(), 0)
148+
return *beL3n4Addr
141149
}
142150

143151
// Delete implements reconciler.Operations.
@@ -233,11 +241,10 @@ func (ops *bpfOps) deleteFrontend(fe *Frontend) error {
233241

234242
func (ops *bpfOps) pruneServiceMaps() error {
235243
toDelete := []lbmap.ServiceKey{}
236-
svcCB := func(key bpf.MapKey, value bpf.MapValue) {
237-
svcKey := key.(lbmap.ServiceKey).ToHost()
244+
svcCB := func(svcKey lbmap.ServiceKey, _ lbmap.ServiceValue) {
238245
ac, ok := cmtypes.AddrClusterFromIP(svcKey.GetAddress())
239246
if !ok {
240-
ops.log.Warn("Prune: bad address in service key", "key", key)
247+
ops.log.Warn("Prune: bad address in service key", "key", svcKey)
241248
return
242249
}
243250
addr := loadbalancer.L3n4Addr{
@@ -246,34 +253,34 @@ func (ops *bpfOps) pruneServiceMaps() error {
246253
Scope: svcKey.GetScope(),
247254
}
248255
if _, ok := ops.backendReferences[addr]; !ok {
249-
toDelete = append(toDelete, svcKey)
256+
toDelete = append(toDelete, svcKey.ToNetwork())
250257
}
251258
}
252-
lbmap.Service4MapV2.DumpWithCallback(svcCB)
253-
lbmap.Service6MapV2.DumpWithCallback(svcCB)
259+
if err := ops.lbmaps.DumpService(svcCB); err != nil {
260+
ops.log.Warn("Failed to prune service maps", "error", err)
261+
}
254262

255263
for _, key := range toDelete {
256-
if err := key.MapDelete(); err != nil {
257-
ops.log.Warn("Failed to delete from service map", "error", err)
264+
if err := ops.lbmaps.DeleteService(key); err != nil {
265+
ops.log.Warn("Failed to delete from service map while pruning", "error", err)
258266
}
259267
}
260268
return nil
261269
}
262270

263271
func (ops *bpfOps) pruneBackendMaps() error {
264272
toDelete := []lbmap.BackendKey{}
265-
beCB := func(key bpf.MapKey, value bpf.MapValue) {
266-
beKey := key.(lbmap.BackendKey)
267-
beValue := value.(lbmap.BackendValue).ToHost()
273+
beCB := func(beKey lbmap.BackendKey, beValue lbmap.BackendValue) {
268274
if _, ok := ops.backendStates[beValueToAddr(beValue)]; !ok {
269275
ops.log.Info("pruneBackendMaps: deleting", "id", beKey.GetID(), "addr", beValueToAddr(beValue))
270276
toDelete = append(toDelete, beKey)
271277
}
272278
}
273-
lbmap.Backend4MapV3.DumpWithCallback(beCB)
274-
lbmap.Backend6MapV3.DumpWithCallback(beCB)
279+
if err := ops.lbmaps.DumpBackend(beCB); err != nil {
280+
ops.log.Warn("Failed to prune backend maps", "error", err)
281+
}
275282
for _, key := range toDelete {
276-
if err := key.Map().Delete(key); err != nil {
283+
if err := ops.lbmaps.DeleteBackend(key); err != nil {
277284
ops.log.Warn("Failed to delete from backend map", "error", err)
278285
}
279286
}
@@ -298,8 +305,8 @@ func (ops *bpfOps) pruneRestoredIDs() error {
298305
}
299306
}
300307

301-
ops.restoredServiceIDs.Clear()
302-
ops.restoredBackendIDs.Clear()
308+
ops.restoredServiceIDs = nil
309+
ops.restoredBackendIDs = nil
303310

304311
return nil
305312
}
@@ -348,10 +355,6 @@ func (ops *bpfOps) Update(_ context.Context, _ statedb.ReadTxn, fe *Frontend) er
348355
fe.Type == loadbalancer.SVCTypeHostPort && fe.Address.AddrCluster.IsUnspecified() {
349356
// For NodePort create entries for each node address.
350357
// For HostPort only create them if the address was not specified (HostIP is unset).
351-
// TODO: HostPort loopback?
352-
// TODO: When the nodeport addresses change trigger a full refresh by marking everything as
353-
// pending?
354-
355358
old := sets.New(ops.nodePortAddrs[fe.Address.Port]...)
356359
for _, addr := range fe.nodePortAddrs {
357360
if fe.Address.IsIPv6() != addr.Is6() {
@@ -414,16 +417,15 @@ func (ops *bpfOps) updateFrontend(fe *Frontend) error {
414417
svc := fe.Service()
415418
flag := loadbalancer.NewSvcFlag(&loadbalancer.SvcFlagParam{
416419
SvcType: fe.Type,
420+
SvcNatPolicy: svc.NatPolicy,
417421
SvcExtLocal: svc.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal,
418422
SvcIntLocal: svc.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal,
419-
SvcNatPolicy: svc.NatPolicy,
420423
SessionAffinity: svc.SessionAffinity,
421424
IsRoutable: isRoutable,
425+
CheckSourceRange: len(svc.SourceRanges) > 0,
422426
L7LoadBalancer: svc.L7ProxyPort != 0,
423427
LoopbackHostport: svc.LoopbackHostPort,
424-
425-
// TODO:
426-
//CheckSourceRange: checkSourceRange,
428+
Quarantined: false,
427429
})
428430
svcVal.SetFlags(flag.UInt16())
429431
svcVal.SetRevNat(int(feID))
@@ -568,7 +570,7 @@ func (ops *bpfOps) upsertMaster(svcKey lbmap.ServiceKey, svcVal lbmap.ServiceVal
568570
func (ops *bpfOps) cleanupSlots(svcKey lbmap.ServiceKey, oldCount, newCount int) error {
569571
for i := newCount; i < oldCount; i++ {
570572
svcKey.SetBackendSlot(i + 1)
571-
_, err := svcKey.Map().SilentDelete(svcKey.ToNetwork())
573+
err := ops.lbmaps.DeleteService(svcKey.ToNetwork())
572574
if err != nil {
573575
return fmt.Errorf("cleanup service slot %q: %w", svcKey.String(), err)
574576
}
@@ -604,7 +606,7 @@ func (ops *bpfOps) deleteBackend(ipv6 bool, id loadbalancer.BackendID) error {
604606
} else {
605607
key = lbmap.NewBackend4KeyV3(id)
606608
}
607-
_, err := key.Map().SilentDelete(key)
609+
err := ops.lbmaps.DeleteBackend(key)
608610
if err != nil {
609611
return fmt.Errorf("delete backend %d: %w", id, err)
610612
}
@@ -622,7 +624,7 @@ func (ops *bpfOps) upsertAffinityMatch(id loadbalancer.ID, beID loadbalancer.Bac
622624
}
623625
var value lbmap.AffinityMatchValue
624626
ops.log.Info("upsertAffinityMatch", "key", key)
625-
return lbmap.AffinityMatchMap.Update(key.ToNetwork(), &value)
627+
return ops.lbmaps.UpdateAffinityMatch(key.ToNetwork(), &value)
626628
}
627629

628630
func (ops *bpfOps) deleteAffinityMatch(id loadbalancer.ID, beID loadbalancer.BackendID) error {
@@ -635,8 +637,7 @@ func (ops *bpfOps) deleteAffinityMatch(id loadbalancer.ID, beID loadbalancer.Bac
635637
RevNATID: uint16(id),
636638
}
637639
ops.log.Info("deleteAffinityMatch", "serviceID", id, "backendID", beID)
638-
_, err := lbmap.AffinityMatchMap.SilentDelete(key.ToNetwork())
639-
return err
640+
return ops.lbmaps.DeleteAffinityMatch(key.ToNetwork())
640641
}
641642

642643
func (ops *bpfOps) upsertRevNat(id loadbalancer.ID, svcKey lbmap.ServiceKey, svcVal lbmap.ServiceValue) error {

pkg/loadbalancer/experimental/bpf_reconciler_test.go

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/stretchr/testify/assert"
2222
"github.com/stretchr/testify/require"
2323

24-
"github.com/cilium/cilium/pkg/bpf"
2524
"github.com/cilium/cilium/pkg/clustermesh/types"
2625
"github.com/cilium/cilium/pkg/loadbalancer"
2726
"github.com/cilium/cilium/pkg/logging"
@@ -821,28 +820,12 @@ func parseAddrPort(s string) loadbalancer.L3n4Addr {
821820
func TestBPFOps(t *testing.T) {
822821
testutils.PrivilegedTest(t)
823822

824-
lbmap.Init(lbmap.InitParams{
825-
IPv4: true,
826-
IPv6: true,
827-
MaxSockRevNatMapEntries: 1000,
828-
ServiceMapMaxEntries: 1000,
829-
BackEndMapMaxEntries: 1000,
830-
RevNatMapMaxEntries: 1000,
831-
AffinityMapMaxEntries: 1000,
832-
SourceRangeMapMaxEntries: 1000,
833-
MaglevMapMaxEntries: 1000,
834-
})
835-
require.NoError(t, lbmap.Service4MapV2.CreateUnpinned())
836-
require.NoError(t, lbmap.Service6MapV2.CreateUnpinned())
837-
require.NoError(t, lbmap.Backend4MapV3.CreateUnpinned())
838-
require.NoError(t, lbmap.Backend6MapV3.CreateUnpinned())
839-
require.NoError(t, lbmap.RevNat4Map.CreateUnpinned())
840-
require.NoError(t, lbmap.RevNat6Map.CreateUnpinned())
841-
require.NoError(t, lbmap.AffinityMatchMap.CreateUnpinned())
842-
843823
lc := hivetest.Lifecycle(t)
844824
log := hivetest.Logger(t)
845825

826+
lbmaps := &realLBMaps{pinned: false}
827+
lc.Append(lbmaps)
828+
846829
// Initialize the metrics registry. Otherwise the bpf.Map ops will incur a 1 second
847830
// delay as they try to update the pressure gauge.
848831
metrics.NewRegistry(metrics.RegistryParams{
@@ -865,7 +848,7 @@ func TestBPFOps(t *testing.T) {
865848
for _, addr := range frontendAddrs {
866849
// For each set of test cases, use a fresh instance so each set gets
867850
// fresh IDs.
868-
ops := newBPFOps(lc, log, cfg, &realLBMaps{})
851+
ops := newBPFOps(lc, log, cfg, lbmaps)
869852
for _, testCase := range testCaseSet {
870853
t.Run(testCase.name, func(t *testing.T) {
871854
frontend := testCase.frontend
@@ -906,15 +889,15 @@ func TestBPFOps(t *testing.T) {
906889
),
907890
"Prune")
908891

909-
out := dump(addr, false)
892+
out := dump(lbmaps, addr, false)
910893
if !slices.Equal(out, testCase.maps) {
911894
t.Fatalf("BPF map contents differ!\nexpected:\n%s\nactual:\n%s", showMaps(testCase.maps), showMaps(out))
912895
}
913896
})
914897
}
915898

916899
// Verify that the BPF maps are empty after the test set.
917-
assert.Empty(t, dump(addr, false), "BPF maps not empty")
900+
assert.Empty(t, dump(lbmaps, addr, false), "BPF maps not empty")
918901

919902
// Verify that all internal state has been cleaned up.
920903
assert.Empty(t, ops.backendIDAlloc.entities, "Backend ID allocations remain")
@@ -956,7 +939,7 @@ func sanitizeID[Num numeric](n Num, sanitize bool) string {
956939
}
957940

958941
// dump the load-balancing maps into a concise format for assertions in tests.
959-
func dump(feAddr loadbalancer.L3n4Addr, sanitizeIDs bool) (out []mapDump) {
942+
func dump(lbmaps lbmaps, feAddr loadbalancer.L3n4Addr, sanitizeIDs bool) (out []mapDump) {
960943
out = []string{}
961944

962945
replaceAddr := func(addr net.IP, port uint16) (s string) {
@@ -981,9 +964,7 @@ func dump(feAddr loadbalancer.L3n4Addr, sanitizeIDs bool) (out []mapDump) {
981964
return
982965
}
983966

984-
svcCB := func(key bpf.MapKey, value bpf.MapValue) {
985-
svcKey := key.(lbmap.ServiceKey).ToHost()
986-
svcValue := value.(lbmap.ServiceValue).ToHost()
967+
svcCB := func(svcKey lbmap.ServiceKey, svcValue lbmap.ServiceValue) {
987968
addr := svcKey.GetAddress()
988969
addrS := replaceAddr(addr, svcKey.GetPort())
989970
if svcKey.GetScope() == loadbalancer.ScopeInternal {
@@ -1000,12 +981,11 @@ func dump(feAddr loadbalancer.L3n4Addr, sanitizeIDs bool) (out []mapDump) {
1000981
", ", "+"),
1001982
))
1002983
}
1003-
lbmap.Service4MapV2.DumpWithCallback(svcCB)
1004-
lbmap.Service6MapV2.DumpWithCallback(svcCB)
984+
if err := lbmaps.DumpService(svcCB); err != nil {
985+
panic(err)
986+
}
1005987

1006-
beCB := func(key bpf.MapKey, value bpf.MapValue) {
1007-
beKey := key.(lbmap.BackendKey)
1008-
beValue := value.(lbmap.BackendValue).ToHost()
988+
beCB := func(beKey lbmap.BackendKey, beValue lbmap.BackendValue) {
1009989
addr := beValue.GetAddress()
1010990
addrS := addr.String()
1011991
if addr.To4() == nil {
@@ -1019,12 +999,11 @@ func dump(feAddr loadbalancer.L3n4Addr, sanitizeIDs bool) (out []mapDump) {
1019999
stateS,
10201000
))
10211001
}
1022-
lbmap.Backend4MapV3.DumpWithCallback(beCB)
1023-
lbmap.Backend6MapV3.DumpWithCallback(beCB)
1002+
if err := lbmaps.DumpBackend(beCB); err != nil {
1003+
panic(err)
1004+
}
10241005

1025-
revCB := func(key bpf.MapKey, value bpf.MapValue) {
1026-
revKey := key.(lbmap.RevNatKey).ToHost()
1027-
revValue := value.(lbmap.RevNatValue).ToHost()
1006+
revCB := func(revKey lbmap.RevNatKey, revValue lbmap.RevNatValue) {
10281007
var addr string
10291008

10301009
switch v := revValue.(type) {
@@ -1040,17 +1019,18 @@ func dump(feAddr loadbalancer.L3n4Addr, sanitizeIDs bool) (out []mapDump) {
10401019
addr,
10411020
))
10421021
}
1043-
lbmap.RevNat4Map.DumpWithCallback(revCB)
1044-
lbmap.RevNat6Map.DumpWithCallback(revCB)
1022+
if err := lbmaps.DumpRevNat(revCB); err != nil {
1023+
panic(err)
1024+
}
10451025

1046-
affCB := func(key bpf.MapKey, value bpf.MapValue) {
1047-
affKey := key.(*lbmap.AffinityMatchKey).ToHost()
1026+
affCB := func(affKey *lbmap.AffinityMatchKey, _ *lbmap.AffinityMatchValue) {
10481027
out = append(out, fmt.Sprintf("AFF: ID=%s BEID=%d",
10491028
sanitizeID(affKey.RevNATID, sanitizeIDs),
10501029
affKey.BackendID,
10511030
))
10521031
}
1053-
if err := lbmap.AffinityMatchMap.DumpWithCallback(affCB); err != nil {
1032+
1033+
if err := lbmaps.DumpAffinityMatch(affCB); err != nil {
10541034
panic(err)
10551035
}
10561036

pkg/loadbalancer/experimental/cell.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var Cell = cell.Module(
3535
ReconcilerCell,
3636

3737
// Provide [lbmaps], abstraction for the load-balancing BPF map access.
38-
cell.ProvidePrivate(newLBMaps),
38+
cell.ProvidePrivate(newLBMaps, newLBMapsConfig),
3939
)
4040

4141
// TablesCell provides the [Writer] API for configuring load-balancing and the
@@ -64,11 +64,13 @@ var TablesCell = cell.Module(
6464
),
6565
)
6666

67-
func newLBMaps(w *Writer) lbmaps {
67+
func newLBMaps(lc cell.Lifecycle, cfg LBMapsConfig, w *Writer) lbmaps {
6868
if !w.IsEnabled() {
6969
return nil
7070
}
71-
return &realLBMaps{}
71+
r := &realLBMaps{pinned: true, cfg: cfg}
72+
lc.Append(r)
73+
return r
7274
}
7375

7476
type resourceIn struct {

pkg/loadbalancer/experimental/config.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ import (
1111

1212
type Config struct {
1313
EnableExperimentalLB bool
14-
15-
RetryBackoffMin time.Duration `mapstructure:"lb-retry-backoff-min"`
16-
RetryBackoffMax time.Duration `mapstructure:"lb-retry-backoff-max"`
14+
RetryBackoffMin time.Duration `mapstructure:"lb-retry-backoff-min"`
15+
RetryBackoffMax time.Duration `mapstructure:"lb-retry-backoff-max"`
1716
}
1817

1918
func (def Config) Flags(flags *pflag.FlagSet) {

0 commit comments

Comments
 (0)