Skip to content

Commit d395d73

Browse files
anubhabMajumdarjoamaki
authored andcommitted
pkg/subnet: Add subnet config watcher and manager
- Introduce subnet-map config to supply underlying subnet configuration - Implement a watcher for above config that parse and adds the entries into subnet-identities stateDB - Provide a cell for pkg/subnet Signed-off-by: Anubhab Majumdar <[email protected]>
1 parent eae7a50 commit d395d73

7 files changed

Lines changed: 222 additions & 0 deletions

File tree

CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,7 @@ Makefile* @cilium/build
674674
/pkg/source @cilium/ipcache
675675
/pkg/spanstat/ @cilium/sig-agent
676676
/pkg/status/ @cilium/sig-agent
677+
/pkg/subnet/ @cilium/sig-datapath
677678
/pkg/svcrouteconfig/ @cilium/sig-datapath @cilium/sig-bgp
678679
/pkg/testutils/ @cilium/ci-structure
679680
/pkg/testutils/scriptnet @cilium/sig-foundations

daemon/cmd/cells.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ import (
8484
"github.com/cilium/cilium/pkg/signal"
8585
"github.com/cilium/cilium/pkg/source"
8686
"github.com/cilium/cilium/pkg/status"
87+
"github.com/cilium/cilium/pkg/subnet"
8788
"github.com/cilium/cilium/pkg/svcrouteconfig"
8889
"github.com/cilium/cilium/pkg/ztunnel"
8990
)
@@ -366,6 +367,9 @@ var (
366367

367368
// Instantiates an xDS server used for zTunnel integration.
368369
ztunnel.Cell,
370+
371+
// Subnet topology watcher and management.
372+
subnet.Cell,
369373
)
370374
)
371375

daemon/cmd/daemon_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cilium/hive/cell"
1515
"github.com/cilium/hive/hivetest"
16+
"github.com/cilium/statedb"
1617
statedbReconciler "github.com/cilium/statedb/reconciler"
1718
"github.com/spf13/cobra"
1819
"github.com/stretchr/testify/require"
@@ -45,6 +46,7 @@ import (
4546
"github.com/cilium/cilium/pkg/loadbalancer"
4647
"github.com/cilium/cilium/pkg/maps/ctmap"
4748
"github.com/cilium/cilium/pkg/maps/policymap"
49+
"github.com/cilium/cilium/pkg/maps/subnet"
4850
"github.com/cilium/cilium/pkg/metrics"
4951
monitorAgent "github.com/cilium/cilium/pkg/monitor/agent"
5052
"github.com/cilium/cilium/pkg/option"
@@ -140,6 +142,9 @@ func setupDaemonEtcdSuite(tb testing.TB) *DaemonSuite {
140142
return &loadbalancer.TestConfig{}
141143
},
142144
func() *server.Server { return nil },
145+
func() statedb.RWTable[subnet.SubnetTableEntry] {
146+
return nil
147+
},
143148
),
144149
fakeDatapath.Cell,
145150
neighbor.ForwardableIPCell,

pkg/subnet/cell.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Authors of Cilium
3+
4+
package subnet
5+
6+
import (
7+
"context"
8+
9+
"github.com/cilium/hive/cell"
10+
"github.com/cilium/hive/job"
11+
12+
"github.com/cilium/cilium/pkg/dynamicconfig"
13+
"github.com/cilium/cilium/pkg/logging/logfields"
14+
"github.com/cilium/cilium/pkg/option"
15+
)
16+
17+
// Cell provides the subnet watcher functionality
18+
var Cell = cell.Module(
19+
"subnet",
20+
"Subnet watcher and management",
21+
22+
cell.Config(DefaultConfig),
23+
24+
cell.Provide(
25+
newSubnetWatcher,
26+
),
27+
28+
cell.Invoke(
29+
registerSubnetWatcher,
30+
),
31+
)
32+
33+
func registerSubnetWatcher(cfg *option.DaemonConfig, sw *SubnetWatcher) {
34+
if cfg.RoutingMode != option.RoutingModeHybrid {
35+
sw.logger.Debug("Routing mode is not hybrid, skipping subnet watcher")
36+
return
37+
}
38+
sw.jobGroup.Add(job.OneShot("subnet-watcher", func(ctx context.Context, health cell.Health) error {
39+
sw.logger.Info("Starting subnet topology dynamic config watcher")
40+
for {
41+
entry, found, w := dynamicconfig.WatchKey(sw.db.ReadTxn(), sw.dynamicConfigTable, SubnetTopologyConfigKey)
42+
if found {
43+
sw.logger.Info("Detected change in subnet-topology dynamic config")
44+
if err := sw.processSubnetConfigEntry(entry); err != nil {
45+
sw.logger.Error("Failed to process subnet-topology dynamic config", logfields.Error, err)
46+
health.Degraded("Failed to process subnet-topology dynamic config", err)
47+
} else {
48+
health.OK("subnet-topology dynamic config processed successfully")
49+
}
50+
}
51+
select {
52+
case <-ctx.Done():
53+
return ctx.Err()
54+
case <-w:
55+
continue
56+
}
57+
}
58+
}))
59+
}

pkg/subnet/config.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Authors of Cilium
3+
4+
package subnet
5+
6+
import "github.com/spf13/pflag"
7+
8+
const (
9+
SubnetTopologyConfigKey = "subnet-topology"
10+
)
11+
12+
var DefaultConfig = Config{
13+
Subnets: "",
14+
}
15+
16+
type Config struct {
17+
Subnets string `json:"subnet-topology,omitempty"`
18+
}
19+
20+
func (cfg Config) Flags(flags *pflag.FlagSet) {
21+
flags.String(
22+
SubnetTopologyConfigKey,
23+
cfg.Subnets,
24+
"Comma and/or semicolon separated list of subnets in CIDR notation representing the subnet topology.",
25+
)
26+
flags.MarkHidden(SubnetTopologyConfigKey)
27+
}

pkg/subnet/watcher.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Authors of Cilium
3+
4+
package subnet
5+
6+
import (
7+
"fmt"
8+
"log/slog"
9+
"net/netip"
10+
"strings"
11+
12+
"github.com/cilium/hive/cell"
13+
"github.com/cilium/hive/job"
14+
"github.com/cilium/statedb"
15+
16+
"github.com/cilium/cilium/pkg/dynamicconfig"
17+
subnetTable "github.com/cilium/cilium/pkg/maps/subnet"
18+
)
19+
20+
type watcherParams struct {
21+
cell.In
22+
23+
Logger *slog.Logger
24+
DynamicConfigTable statedb.Table[dynamicconfig.DynamicConfig]
25+
SubnetTable statedb.RWTable[subnetTable.SubnetTableEntry]
26+
DB *statedb.DB
27+
JobGroup job.Group
28+
}
29+
30+
type SubnetWatcher struct {
31+
logger *slog.Logger
32+
dynamicConfigTable statedb.Table[dynamicconfig.DynamicConfig]
33+
subnetTable statedb.RWTable[subnetTable.SubnetTableEntry]
34+
db *statedb.DB
35+
jobGroup job.Group
36+
}
37+
38+
func newSubnetWatcher(params watcherParams) *SubnetWatcher {
39+
return &SubnetWatcher{
40+
logger: params.Logger,
41+
dynamicConfigTable: params.DynamicConfigTable,
42+
subnetTable: params.SubnetTable,
43+
db: params.DB,
44+
jobGroup: params.JobGroup,
45+
}
46+
}
47+
48+
func (w *SubnetWatcher) processSubnetConfigEntry(entry dynamicconfig.DynamicConfig) error {
49+
subnetEntries, err := decodeJson(entry.Value)
50+
if err != nil {
51+
return fmt.Errorf("failed to decode subnet-topology dynamic config value: %w", err)
52+
}
53+
54+
// Write to the subnet table.
55+
// Reset the table and write all entries afresh.
56+
wTx := w.db.WriteTxn(w.subnetTable)
57+
defer wTx.Abort()
58+
59+
if err := w.subnetTable.DeleteAll(wTx); err != nil {
60+
return fmt.Errorf("failed to reset subnet table: %w", err)
61+
}
62+
for _, entry := range subnetEntries {
63+
if _, _, err := w.subnetTable.Insert(wTx, entry); err != nil {
64+
return fmt.Errorf("failed to upsert subnet entry %v: %w", entry, err)
65+
}
66+
}
67+
wTx.Commit()
68+
return nil
69+
}
70+
71+
// decodeJson decodes a JSON string into a slice of SubnetTableEntry.
72+
// Ex: data=10.0.0.1/24,10.10.0.1/24;10.20.0.1/24;2001:0db8:85a3::/64
73+
// would decode into four SubnetTableEntry objects.
74+
// | Key | Value |
75+
// |------|-----------|
76+
// | 10.0.0.1/24 | 1 |
77+
// | 10.10.0.1/24 | 1 |
78+
// | 10.20.0.1/24 | 2 |
79+
// | 2001:0db8:85a3::/64 | 3 |
80+
func decodeJson(data string) ([]subnetTable.SubnetTableEntry, error) {
81+
data = strings.TrimSpace(data)
82+
if data == "" {
83+
return []subnetTable.SubnetTableEntry{}, nil
84+
}
85+
86+
var entries []subnetTable.SubnetTableEntry
87+
88+
// Split by semicolons to get groups
89+
groups := strings.Split(data, ";")
90+
91+
for groupID, group := range groups {
92+
group = strings.TrimSpace(group)
93+
if group == "" {
94+
continue
95+
}
96+
97+
// Split by commas to get individual subnets within a group
98+
subnets := strings.SplitSeq(group, ",")
99+
100+
for subnet := range subnets {
101+
subnet = strings.TrimSpace(subnet)
102+
if subnet == "" {
103+
continue
104+
}
105+
106+
// Validate CIDR format
107+
prefix, err := netip.ParsePrefix(subnet)
108+
if err != nil {
109+
return nil, fmt.Errorf("invalid CIDR %q: %w", subnet, err)
110+
}
111+
112+
// Identity is groupID + 1 to avoid using identity 0.
113+
entries = append(entries, subnetTable.NewSubnetEntry(prefix, uint32(groupID+1)))
114+
}
115+
}
116+
117+
if len(entries) == 0 {
118+
return nil, fmt.Errorf("no valid subnets found in data")
119+
}
120+
121+
return entries, nil
122+
}

test/controlplane/suite/agent.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/cilium/cilium/pkg/loadbalancer"
3636
"github.com/cilium/cilium/pkg/maps/ctmap"
3737
"github.com/cilium/cilium/pkg/maps/policymap"
38+
"github.com/cilium/cilium/pkg/maps/subnet"
3839
"github.com/cilium/cilium/pkg/metrics"
3940
monitorAgent "github.com/cilium/cilium/pkg/monitor/agent"
4041
"github.com/cilium/cilium/pkg/option"
@@ -79,6 +80,9 @@ func (h *agentHandle) setupCiliumAgentHive(clientset k8sClient.Clientset, extraC
7980
func() policymap.Factory { return nil },
8081
func() *server.Server { return nil },
8182
func() *loadbalancer.TestConfig { return &loadbalancer.TestConfig{} },
83+
func() statedb.RWTable[subnet.SubnetTableEntry] {
84+
return nil
85+
},
8286
k8sSynced.RejectedCRDSyncPromise,
8387
),
8488
kvstore.Cell(kvstore.DisabledBackendName),

0 commit comments

Comments
 (0)