Skip to content

Commit 5df6f52

Browse files
ritwikranjandylandreimerink
authored andcommitted
feat(hubble): decouple the payloadparser from hubble control plane.
This pull request introduces important changes to integrate a payload parser for Hubble. Currently, the parser (or Decoder) is tightly coupled with the launch code for Hubble, making it impossible to run Hubble without the Cilium dataplane. While we expose Hubble as a cell, we do not explicitly expose the parser as a dependency. In this update, I have extracted the parser initiation and exposed it as a cell. This change allows Hubble to operate as a standalone component, fully decoupled from the Cilium dataplane. Signed-off-by: Ritwik Ranjan <[email protected]>
1 parent 8effc69 commit 5df6f52

File tree

6 files changed

+317
-171
lines changed

6 files changed

+317
-171
lines changed

pkg/datapath/link/link.go

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"sync"
1212

13+
"github.com/cilium/hive/cell"
1314
"github.com/vishvananda/netlink"
1415

1516
"github.com/cilium/cilium/pkg/controller"
@@ -78,27 +79,76 @@ func GetIfIndex(ifName string) (uint32, error) {
7879
type LinkCache struct {
7980
mu lock.RWMutex
8081
indexToName map[int]string
82+
manager *controller.Manager
8183
}
8284

83-
// NewLinkCache begins monitoring local interfaces for changes in order to
84-
// track local link information.
85-
func NewLinkCache() *LinkCache {
86-
once.Do(func() {
87-
linkCache = LinkCache{}
88-
controller.NewManager().UpdateController("link-cache",
89-
controller.ControllerParams{
90-
Group: linkCacheControllerGroup,
91-
RunInterval: 15 * time.Second,
92-
DoFunc: func(ctx context.Context) error {
93-
return linkCache.syncCache()
85+
var Cell = cell.Module(
86+
"link-cache",
87+
"Provides a cache of link names to ifindex mappings",
88+
89+
cell.Provide(newLinkCache),
90+
cell.Invoke(registerLinkCacheHooks),
91+
)
92+
93+
type linkCacheParams struct {
94+
cell.In
95+
Lifecycle cell.Lifecycle
96+
}
97+
98+
func registerLinkCacheHooks(params linkCacheParams, cache *LinkCache) {
99+
params.Lifecycle.Append(cell.Hook{
100+
OnStart: func(ctx cell.HookContext) error {
101+
// Start the controller when the application starts
102+
cache.manager.UpdateController("link-cache",
103+
controller.ControllerParams{
104+
Group: linkCacheControllerGroup,
105+
RunInterval: 15 * time.Second,
106+
DoFunc: func(ctx context.Context) error {
107+
return cache.syncCache()
108+
},
94109
},
95-
},
96-
)
110+
)
111+
return nil
112+
},
113+
OnStop: func(ctx cell.HookContext) error {
114+
cache.Stop()
115+
return nil
116+
},
117+
})
118+
}
119+
120+
func newLinkCache() *LinkCache {
121+
once.Do(func() {
122+
linkCache = LinkCache{
123+
indexToName: make(map[int]string),
124+
manager: controller.NewManager(),
125+
}
97126
})
98127

99128
return &linkCache
100129
}
101130

131+
func NewLinkCache() *LinkCache {
132+
lc := newLinkCache()
133+
134+
lc.manager.UpdateController("link-cache",
135+
controller.ControllerParams{
136+
Group: linkCacheControllerGroup,
137+
RunInterval: 15 * time.Second,
138+
DoFunc: func(ctx context.Context) error {
139+
return linkCache.syncCache()
140+
},
141+
},
142+
)
143+
144+
return lc
145+
}
146+
147+
// Stop terminates the link cache controller
148+
func (c *LinkCache) Stop() {
149+
c.manager.RemoveController("link-cache")
150+
}
151+
102152
func (c *LinkCache) syncCache() error {
103153
links, err := safenetlink.LinkList()
104154
if err != nil {

pkg/hubble/cell/cell.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/cilium/cilium/pkg/endpointmanager"
1717
exportercell "github.com/cilium/cilium/pkg/hubble/exporter/cell"
1818
"github.com/cilium/cilium/pkg/hubble/observer/observeroption"
19+
"github.com/cilium/cilium/pkg/hubble/parser"
20+
parsercell "github.com/cilium/cilium/pkg/hubble/parser/cell"
1921
identitycell "github.com/cilium/cilium/pkg/identity/cache/cell"
2022
"github.com/cilium/cilium/pkg/ipcache"
2123
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
@@ -35,11 +37,21 @@ var Cell = cell.Module(
3537
"hubble",
3638
"Exposes the Observer gRPC API and Hubble metrics",
3739

38-
cell.Provide(newHubbleIntegration),
39-
cell.Config(defaultConfig),
40+
Core,
4041

4142
// Hubble flow log exporters
4243
exportercell.Cell,
44+
45+
// Parser for Hubble flows
46+
parsercell.Cell,
47+
)
48+
49+
// The core cell group, which contains the Hubble integration and the
50+
// Hubble integration configuration isolated from the dependency graph
51+
// will enable us to run hubble with a different dataplane
52+
var Core = cell.Group(
53+
cell.Provide(newHubbleIntegration),
54+
cell.Config(defaultConfig),
4355
)
4456

4557
type hubbleParams struct {
@@ -65,6 +77,8 @@ type hubbleParams struct {
6577
ObserverOptions []observeroption.Option `group:"hubble-observer-options"`
6678
ExporterBuilders []*exportercell.FlowLogExporterBuilder `group:"hubble-exporter-builders"`
6779

80+
PayloadParser parser.Decoder
81+
6882
// NOTE: we still need DaemonConfig for the shared EnableRecorder flag.
6983
AgentConfig *option.DaemonConfig
7084
Config config
@@ -90,6 +104,7 @@ func newHubbleIntegration(params hubbleParams) (HubbleIntegration, error) {
90104
params.Recorder,
91105
params.ObserverOptions,
92106
params.ExporterBuilders,
107+
params.PayloadParser,
93108
params.AgentConfig,
94109
params.Config,
95110
params.Logger,

pkg/hubble/cell/config.go

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ type config struct {
2626
// EventQueueSize specifies the buffer size of the channel to receive
2727
// monitor events.
2828
EventQueueSize int `mapstructure:"hubble-event-queue-size"`
29-
// SkipUnknownCGroupIDs specifies if events with unknown cgroup ids should
30-
// be skipped.
31-
SkipUnknownCGroupIDs bool `mapstructure:"hubble-skip-unknown-cgroup-ids"`
3229
// MonitorEvents specifies Cilium monitor events for Hubble to observe. By
3330
// default, Hubble observes all monitor events.
3431
MonitorEvents []string `mapstructure:"hubble-monitor-events"`
@@ -89,22 +86,6 @@ type config struct {
8986
// RecorderSinkQueueSize is the queue size for each recorder sink.
9087
RecorderSinkQueueSize int `mapstructure:"hubble-recorder-sink-queue-size"`
9188

92-
// EnableRedact controls if sensitive information will be redacted from L7
93-
// flows.
94-
EnableRedact bool `mapstructure:"hubble-redact-enabled"`
95-
// RedactHttpURLQuery controls if the URL query will be redacted from flows.
96-
RedactHttpURLQuery bool `mapstructure:"hubble-redact-http-urlquery"`
97-
// RedactHttpUserInfo controls if the user info will be redacted from flows.
98-
RedactHttpUserInfo bool `mapstructure:"hubble-redact-http-userinfo"`
99-
// RedactHttpHeadersAllow controls which http headers will not be redacted
100-
// from flows.
101-
RedactHttpHeadersAllow []string `mapstructure:"hubble-redact-http-headers-allow"`
102-
// RedactHttpHeadersDeny controls which http headers will be redacted from
103-
// flows.
104-
RedactHttpHeadersDeny []string `mapstructure:"hubble-redact-http-headers-deny"`
105-
// RedactKafkaAPIKey controls if Kafka API key will be redacted from flows.
106-
RedactKafkaAPIKey bool `mapstructure:"hubble-redact-kafka-apikey"`
107-
10889
// EnableK8sDropEvents controls whether Hubble should create v1.Events for
10990
// packet drops related to pods.
11091
EnableK8sDropEvents bool `mapstructure:"hubble-drop-events"`
@@ -120,10 +101,9 @@ type config struct {
120101
var defaultConfig = config{
121102
EnableHubble: false,
122103
// Hubble internals (parser, ringbuffer) configuration
123-
EventBufferCapacity: observeroption.Default.MaxFlows.AsInt(),
124-
EventQueueSize: 0, // see getDefaultMonitorQueueSize()
125-
SkipUnknownCGroupIDs: true,
126-
MonitorEvents: []string{},
104+
EventBufferCapacity: observeroption.Default.MaxFlows.AsInt(),
105+
EventQueueSize: 0, // see getDefaultMonitorQueueSize()
106+
MonitorEvents: []string{},
127107
// Hubble local server configuration
128108
SocketPath: hubbleDefaults.SocketPath,
129109
// Hubble TCP server configuration
@@ -148,13 +128,6 @@ var defaultConfig = config{
148128
EnableRecorderAPI: true,
149129
RecorderStoragePath: hubbleDefaults.RecorderStoragePath,
150130
RecorderSinkQueueSize: 1024,
151-
// Hubble field redaction configuration
152-
EnableRedact: false,
153-
RedactHttpURLQuery: false,
154-
RedactHttpUserInfo: true,
155-
RedactHttpHeadersAllow: []string{},
156-
RedactHttpHeadersDeny: []string{},
157-
RedactKafkaAPIKey: false,
158131
// Hubble k8s v1.Events integration configuration.
159132
EnableK8sDropEvents: false,
160133
K8sDropEventsInterval: 2 * time.Minute,
@@ -167,7 +140,6 @@ func (def config) Flags(flags *pflag.FlagSet) {
167140
// Hubble internals (parser, ringbuffer) configuration
168141
flags.Int("hubble-event-buffer-capacity", def.EventBufferCapacity, "Capacity of Hubble events buffer. The provided value must be one less than an integer power of two and no larger than 65535 (ie: 1, 3, ..., 2047, 4095, ..., 65535)")
169142
flags.Int("hubble-event-queue-size", def.EventQueueSize, "Buffer size of the channel to receive monitor events.")
170-
flags.Bool("hubble-skip-unknown-cgroup-ids", def.SkipUnknownCGroupIDs, "Skip Hubble events with unknown cgroup ids")
171143
flags.StringSlice("hubble-monitor-events", def.MonitorEvents,
172144
fmt.Sprintf(
173145
"Cilium monitor events for Hubble to observe: [%s]. By default, Hubble observes all monitor events.",
@@ -197,13 +169,6 @@ func (def config) Flags(flags *pflag.FlagSet) {
197169
flags.Bool("enable-hubble-recorder-api", def.EnableRecorderAPI, "Enable the Hubble recorder API")
198170
flags.String("hubble-recorder-storage-path", def.RecorderStoragePath, "Directory in which pcap files created via the Hubble Recorder API are stored")
199171
flags.Int("hubble-recorder-sink-queue-size", def.RecorderSinkQueueSize, "Queue size of each Hubble recorder sink")
200-
// Hubble field redaction configuration
201-
flags.Bool("hubble-redact-enabled", def.EnableRedact, "Hubble redact sensitive information from flows")
202-
flags.Bool("hubble-redact-http-urlquery", def.RedactHttpURLQuery, "Hubble redact http URL query from flows")
203-
flags.Bool("hubble-redact-http-userinfo", def.RedactHttpUserInfo, "Hubble redact http user info from flows")
204-
flags.StringSlice("hubble-redact-http-headers-allow", def.RedactHttpHeadersAllow, "HTTP headers to keep visible in flows")
205-
flags.StringSlice("hubble-redact-http-headers-deny", def.RedactHttpHeadersDeny, "HTTP headers to redact from flows")
206-
flags.Bool("hubble-redact-kafka-apikey", def.RedactKafkaAPIKey, "Hubble redact Kafka API key from flows")
207172
// Hubble k8s v1.Events integration configuration.
208173
flags.Bool("hubble-drop-events", def.EnableK8sDropEvents, "Emit packet drop Events related to pods (alpha)")
209174
flags.Duration("hubble-drop-events-interval", def.K8sDropEventsInterval, "Minimum time between emitting same events")
@@ -235,13 +200,6 @@ func (cfg *config) normalize() {
235200
}
236201
}
237202

238-
func (cfg config) validate() error {
239-
if len(cfg.RedactHttpHeadersAllow) > 0 && len(cfg.RedactHttpHeadersDeny) > 0 {
240-
return fmt.Errorf("Only one of --hubble-redact-http-headers-allow and --hubble-redact-http-headers-deny can be specified, not both")
241-
}
242-
return nil
243-
}
244-
245203
func getDefaultMonitorQueueSize(numCPU int) int {
246204
monitorQueueSize := numCPU * ciliumDefaults.MonitorQueueSizePerCPU
247205
if monitorQueueSize > ciliumDefaults.MonitorQueueSizePerCPUMaximum {

0 commit comments

Comments
 (0)