Skip to content

Commit e7bb8a7

Browse files
aditighagjoestringer
authored andcommitted
k8s/cilium Event handlers and processing logic for LRPs
- Define internal representation of LRP - Add event handlers for various resources (LRP CRs, pods, services) that update LRPs - Plumb datapath with policy configs TODO: Add unit tests Testing: Manually tested various cases and verified that loadbalancer service entries were getting created. - LRPs with address/service matchers - Delete service/backend pods - Update pod labels cilium service list ID Frontend Service Type Backend 1 172.20.0.20:5001 ClusterIP 1 => 10.16.189.7:5001 2 => 10.16.45.6:5001 2 172.20.0.1:443 ClusterIP 1 => 192.168.33.11:6443 3 172.20.0.194:80 LocalRedirect 1 => 10.16.86.228:80 6 172.20.0.10:53 ClusterIP 1 => 10.16.14.226:53 7 172.20.0.10:9153 ClusterIP 1 => 10.16.14.226:9153 kubectl exec nginx-client -- curl -s -I http://172.20.0.194/index.html HTTP/1.1 200 OK Server: nginx/1.19.2 Date: Fri, 21 Aug 2020 16:50:06 GMT Content-Type: text/html Content-Length: 612 Last-Modified: Tue, 11 Aug 2020 14:50:35 GMT Connection: keep-alive ETag: "5f32b03b-264" Accept-Ranges: bytes cilium -D monitor --related-to 2280 Listening for events on 6 CPUs with 64x4096 of shared memory Press Ctrl-C to quit level=info msg="Initializing dissection cache..." subsys=monitor -> endpoint 2280 flow 0x52c1fcd0 identity 7086->59395 state new ifindex lxca7d000aa6ecc orig-ip 10.16.224.211: 10.16.224.211:34262 -> 10.16.86.228:80 tcp SYN LRP with addressMatcher - cilium service list ID Frontend Service Type Backend 1 172.20.0.20:5001 ClusterIP 1 => 10.16.189.7:5001 2 => 10.16.45.6:5001 2 172.20.0.1:443 ClusterIP 1 => 192.168.33.11:6443 3 169.254.169.254:8080 LocalRedirect 1 => 10.16.86.228:80 6 172.20.0.10:53 ClusterIP 1 => 10.16.14.226:53 7 172.20.0.10:9153 ClusterIP 1 => 10.16.14.226:9153 kubectl exec nginx-client -- curl -I http://169.254.169.254:8080/index.html HTTP/1.1 200 OK After removing backend- cilium service list ID Frontend Service Type Backend 1 172.20.0.20:5001 ClusterIP 1 => 10.16.189.7:5001 2 => 10.16.45.6:5001 2 172.20.0.1:443 ClusterIP 1 => 192.168.33.11:6443 3 169.254.169.254:8080 LocalRedirect 6 172.20.0.10:53 ClusterIP 1 => 10.16.14.226:53 7 172.20.0.10:9153 ClusterIP 1 => 10.16.14.226:9153 Signed-off-by: Aditi Ghag <[email protected]>
1 parent 2086746 commit e7bb8a7

17 files changed

Lines changed: 1401 additions & 43 deletions

File tree

CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ Jenkinsfile.nightly @cilium/ci
143143
/pkg/policy/groups/aws/ @cilium/policy @cilium/aws
144144
/pkg/proxy/ @cilium/proxy
145145
/pkg/proxy/accesslog @cilium/api
146+
/pkg/redirectpolicy @cilium/loadbalancer
146147
/pkg/serializer @cilium/agent
147148
/pkg/service @cilium/loadbalancer
148149
/pkg/sysctl @cilium/bpf

daemon/cmd/daemon.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ import (
7474
policyApi "github.com/cilium/cilium/pkg/policy/api"
7575
"github.com/cilium/cilium/pkg/probe"
7676
"github.com/cilium/cilium/pkg/proxy"
77+
"github.com/cilium/cilium/pkg/redirectpolicy"
7778
"github.com/cilium/cilium/pkg/service"
7879
"github.com/cilium/cilium/pkg/sockops"
7980
"github.com/cilium/cilium/pkg/status"
@@ -155,6 +156,8 @@ type Daemon struct {
155156
// endpointCreations is a map of all currently ongoing endpoint
156157
// creation events
157158
endpointCreations *endpointCreationManager
159+
160+
redirectPolicyManager *redirectpolicy.Manager
158161
}
159162

160163
// GetPolicyRepository returns the policy repository of the daemon
@@ -338,13 +341,16 @@ func NewDaemon(ctx context.Context, epMgr *endpointmanager.EndpointManager, dp d
338341
d.endpointManager = epMgr
339342
d.endpointManager.InitMetrics()
340343

344+
d.redirectPolicyManager = redirectpolicy.NewRedirectPolicyManager(d.svc)
345+
341346
d.k8sWatcher = watchers.NewK8sWatcher(
342347
d.endpointManager,
343348
d.nodeDiscovery.Manager,
344349
&d,
345350
d.policy,
346351
d.svc,
347352
d.datapath,
353+
d.redirectPolicyManager,
348354
)
349355

350356
bootstrapStats.daemonInit.End(true)

pkg/k8s/apis/cilium.io/v2/clrp_types.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
package v2
1616

1717
import (
18+
"fmt"
19+
"strconv"
20+
"strings"
21+
22+
"github.com/cilium/cilium/pkg/iana"
1823
slim_metav1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
24+
lb "github.com/cilium/cilium/pkg/loadbalancer"
1925
"github.com/cilium/cilium/pkg/policy/api"
2026

2127
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -198,3 +204,41 @@ type CiliumLocalRedirectPolicyList struct {
198204
// Items is a list of CiliumLocalRedirectPolicy
199205
Items []CiliumLocalRedirectPolicy `json:"items"`
200206
}
207+
208+
// SanitizePortInfo sanitizes all the fields in the PortInfo.
209+
// It returns port number, name, and protocol derived from the given input and error (failure cases).
210+
func (pInfo *PortInfo) SanitizePortInfo(checkNamedPort bool) (uint16, string, lb.L4Type, error) {
211+
var (
212+
pInt uint16
213+
pName string
214+
protocol lb.L4Type
215+
)
216+
// Sanitize port
217+
if pInfo.Port == "" {
218+
return pInt, pName, protocol, fmt.Errorf("port must be specified")
219+
} else {
220+
p, err := strconv.ParseUint(pInfo.Port, 0, 16)
221+
if err != nil {
222+
return pInt, pName, protocol, fmt.Errorf("unable to parse port: %v", err)
223+
}
224+
if p == 0 {
225+
return pInt, pName, protocol, fmt.Errorf("port cannot be 0")
226+
}
227+
pInt = uint16(p)
228+
}
229+
// Sanitize name
230+
if checkNamedPort {
231+
if !iana.IsSvcName(pInfo.Name) {
232+
return pInt, pName, protocol, fmt.Errorf("valid port name is not present")
233+
}
234+
}
235+
pName = strings.ToLower(pInfo.Name) // Normalize for case insensitive comparison
236+
237+
// Sanitize protocol
238+
var err error
239+
protocol, err = lb.NewL4Type(string(pInfo.Protocol))
240+
if err != nil {
241+
return pInt, pName, protocol, err
242+
}
243+
return pInt, pName, protocol, nil
244+
}

pkg/k8s/factory_functions.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,30 @@ func ConvertToCiliumNode(obj interface{}) interface{} {
655655
}
656656
}
657657

658+
// ConvertToCiliumLocalRedirectPolicy converts a *cilium_v2.CiliumLocalRedirectPolicy into a
659+
// *cilium_v2.CiliumLocalRedirectPolicy or a cache.DeletedFinalStateUnknown into
660+
// a cache.DeletedFinalStateUnknown with a *cilium_v2.CiliumLocalRedirectPolicy in its Obj.
661+
// If the given obj can't be cast into either *cilium_v2.CiliumLocalRedirectPolicy
662+
// nor cache.DeletedFinalStateUnknown, the original obj is returned.
663+
func ConvertToCiliumLocalRedirectPolicy(obj interface{}) interface{} {
664+
// TODO create a slim type of the CiliumLocalRedirectPolicy
665+
switch concreteObj := obj.(type) {
666+
case *cilium_v2.CiliumLocalRedirectPolicy:
667+
return concreteObj
668+
case cache.DeletedFinalStateUnknown:
669+
ciliumLocalRedirectPolicy, ok := concreteObj.Obj.(*cilium_v2.CiliumLocalRedirectPolicy)
670+
if !ok {
671+
return obj
672+
}
673+
return cache.DeletedFinalStateUnknown{
674+
Key: concreteObj.Key,
675+
Obj: ciliumLocalRedirectPolicy,
676+
}
677+
default:
678+
return obj
679+
}
680+
}
681+
658682
// ObjToCiliumNode attempts to cast object to a CiliumNode object and
659683
// returns a deep copy if the castin succeeds. Otherwise, nil is returned.
660684
func ObjToCiliumNode(obj interface{}) *cilium_v2.CiliumNode {
@@ -769,3 +793,25 @@ func ObjToCiliumEndpoint(obj interface{}) *types.CiliumEndpoint {
769793
Warn("Ignoring invalid v2 CiliumEndpoint")
770794
return nil
771795
}
796+
797+
// ObjToCLRP attempts to cast object to a CLRP object and
798+
// returns a deep copy if the castin succeeds. Otherwise, nil is returned.
799+
func ObjToCLRP(obj interface{}) *cilium_v2.CiliumLocalRedirectPolicy {
800+
cLRP, ok := obj.(*cilium_v2.CiliumLocalRedirectPolicy)
801+
if ok {
802+
return cLRP
803+
}
804+
deletedObj, ok := obj.(cache.DeletedFinalStateUnknown)
805+
if ok {
806+
// Delete was not observed by the watcher but is
807+
// removed from kube-apiserver. This is the last
808+
// known state and the object no longer exists.
809+
cn, ok := deletedObj.Obj.(*cilium_v2.CiliumLocalRedirectPolicy)
810+
if ok {
811+
return cn
812+
}
813+
}
814+
log.WithField(logfields.Object, logfields.Repr(obj)).
815+
Warn("Ignoring invalid v2 Cilium Local Redirect Policy")
816+
return nil
817+
}

pkg/k8s/service_cache.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package k8s
1616

1717
import (
18+
"net"
19+
1820
"github.com/cilium/cilium/pkg/datapath"
1921
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/core/v1"
2022
slim_discovery_v1beta1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/discovery/v1beta1"
@@ -122,6 +124,36 @@ func (s *ServiceCache) GetServiceIP(svcID ServiceID) *loadbalancer.L3n4Addr {
122124
return nil
123125
}
124126

127+
// GetServiceFrontendIP returns the frontend IP (aka clusterIP) for the given service with type.
128+
func (s *ServiceCache) GetServiceFrontendIP(svcID ServiceID, svcType loadbalancer.SVCType) net.IP {
129+
s.mutex.RLock()
130+
defer s.mutex.RUnlock()
131+
svc := s.services[svcID]
132+
if svc == nil || svc.Type != svcType {
133+
return nil
134+
}
135+
136+
return svc.FrontendIP
137+
}
138+
139+
// GetServiceAddrWithPortsAndType returns a slice of all the L3n4Addr that are backing the
140+
// given Service ID with given type.
141+
// Note: The returned IPs are with External scope.
142+
func (s *ServiceCache) GetServiceAddrsWithType(svcID ServiceID, svcType loadbalancer.SVCType) map[loadbalancer.FEPortName]*loadbalancer.L3n4Addr {
143+
s.mutex.RLock()
144+
defer s.mutex.RUnlock()
145+
svc := s.services[svcID]
146+
if svc == nil || svc.Type != svcType {
147+
return nil
148+
}
149+
addrsByPort := make(map[loadbalancer.FEPortName]*loadbalancer.L3n4Addr)
150+
for pName, l4Addr := range svc.Ports {
151+
addrsByPort[pName] = loadbalancer.NewL3n4Addr(l4Addr.Protocol, svc.FrontendIP,
152+
l4Addr.Port, loadbalancer.ScopeExternal)
153+
}
154+
return addrsByPort
155+
}
156+
125157
// GetNodeAddressing returns the registered node addresses to this service cache.
126158
func (s *ServiceCache) GetNodeAddressing() datapath.NodeAddressing {
127159
return s.nodeAddressing

pkg/k8s/utils/utils.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@
1515
package utils
1616

1717
import (
18+
"fmt"
19+
"sort"
20+
21+
slimcorev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/core/v1"
1822
"github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/labels"
1923
"github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/selection"
2024
"github.com/cilium/cilium/pkg/option"
25+
2126
v1 "k8s.io/api/core/v1"
2227
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
2328
)
@@ -103,3 +108,29 @@ func GetServiceListOptionsModifier() (func(options *v1meta.ListOptions), error)
103108
options.LabelSelector = labelSelector.String()
104109
}, nil
105110
}
111+
112+
// ValidIPs return a sorted slice of unique IP addresses retrieved from the given PodStatus.
113+
// Returns an error when no IPs are found.
114+
func ValidIPs(podStatus slimcorev1.PodStatus) ([]string, error) {
115+
if len(podStatus.PodIPs) == 0 && len(podStatus.PodIP) == 0 {
116+
return nil, fmt.Errorf("empty PodIPs")
117+
}
118+
119+
// make it a set first to avoid repeated IP addresses
120+
ipsMap := make(map[string]struct{}, 1+len(podStatus.PodIPs))
121+
if podStatus.PodIP != "" {
122+
ipsMap[podStatus.PodIP] = struct{}{}
123+
}
124+
for _, podIP := range podStatus.PodIPs {
125+
if podIP.IP != "" {
126+
ipsMap[podIP.IP] = struct{}{}
127+
}
128+
}
129+
130+
ips := make([]string, 0, len(ipsMap))
131+
for ipStr := range ipsMap {
132+
ips = append(ips, ipStr)
133+
}
134+
sort.Strings(ips)
135+
return ips, nil
136+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright 2020 Authors of Cilium
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package watchers
16+
17+
import (
18+
"github.com/cilium/cilium/pkg/k8s"
19+
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
20+
"github.com/cilium/cilium/pkg/k8s/informer"
21+
"github.com/cilium/cilium/pkg/logging/logfields"
22+
"github.com/cilium/cilium/pkg/redirectpolicy"
23+
24+
"github.com/sirupsen/logrus"
25+
v1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/fields"
27+
"k8s.io/apimachinery/pkg/util/wait"
28+
"k8s.io/client-go/tools/cache"
29+
)
30+
31+
func (k *K8sWatcher) ciliumLocalRedirectPolicyInit(ciliumLRPClient *k8s.K8sCiliumClient) {
32+
33+
_, lrpController := informer.NewInformer(
34+
cache.NewListWatchFromClient(ciliumLRPClient.CiliumV2().RESTClient(),
35+
"ciliumlocalredirectpolicies", v1.NamespaceAll, fields.Everything()),
36+
&cilium_v2.CiliumLocalRedirectPolicy{},
37+
0,
38+
cache.ResourceEventHandlerFuncs{
39+
AddFunc: func(obj interface{}) {
40+
var valid, equal bool
41+
defer func() { k.K8sEventReceived(metricCLRP, metricCreate, valid, equal) }()
42+
if cLRP := k8s.ObjToCLRP(obj); cLRP != nil {
43+
valid = true
44+
err := k.addCiliumLocalRedirectPolicy(cLRP)
45+
k.K8sEventProcessed(metricCLRP, metricCreate, err == nil)
46+
}
47+
48+
},
49+
UpdateFunc: func(oldObj, newObj interface{}) {
50+
log.Info("Local Redirect Policy updates are not handled")
51+
52+
},
53+
DeleteFunc: func(obj interface{}) {
54+
var valid, equal bool
55+
defer func() { k.K8sEventReceived(metricCLRP, metricDelete, valid, equal) }()
56+
cLRP := k8s.ObjToCLRP(obj)
57+
if cLRP == nil {
58+
return
59+
}
60+
valid = true
61+
err := k.deleteCiliumLocalRedirectPolicy(cLRP)
62+
k.K8sEventProcessed(metricCLRP, metricDelete, err == nil)
63+
},
64+
},
65+
k8s.ConvertToCiliumLocalRedirectPolicy,
66+
)
67+
k.blockWaitGroupToSyncResources(
68+
wait.NeverStop,
69+
nil,
70+
lrpController,
71+
k8sAPIGroupCiliumLocalRedirectPolicyV2,
72+
)
73+
74+
go lrpController.Run(wait.NeverStop)
75+
k.k8sAPIGroups.addAPI(k8sAPIGroupCiliumClusterwideNetworkPolicyV2)
76+
}
77+
78+
func (k *K8sWatcher) addCiliumLocalRedirectPolicy(clrp *cilium_v2.CiliumLocalRedirectPolicy) error {
79+
scopedLog := log.WithFields(logrus.Fields{
80+
logfields.CiliumLocalRedirectName: clrp.ObjectMeta.Name,
81+
logfields.K8sUID: clrp.ObjectMeta.UID,
82+
logfields.K8sAPIVersion: clrp.TypeMeta.APIVersion,
83+
logfields.K8sNamespace: clrp.ObjectMeta.Namespace,
84+
})
85+
86+
scopedLog.Debug("Add CiliumLocalRedirectPolicy")
87+
88+
rp, policyAddErr := redirectpolicy.Parse(clrp, true)
89+
if policyAddErr == nil {
90+
_, policyAddErr = k.redirectPolicyManager.AddRedirectPolicy(*rp, &k.K8sSvcCache,
91+
k.podStore)
92+
}
93+
94+
if policyAddErr != nil {
95+
scopedLog.WithError(policyAddErr).Warn("Failed to add CiliumLocalRedirectPolicy")
96+
} else {
97+
scopedLog.Info("Added CiliumLocalRedirectPolicy")
98+
}
99+
100+
//TODO update status
101+
102+
return policyAddErr
103+
}
104+
105+
func (k *K8sWatcher) deleteCiliumLocalRedirectPolicy(clrp *cilium_v2.CiliumLocalRedirectPolicy) error {
106+
scopedLog := log.WithFields(logrus.Fields{
107+
logfields.CiliumLocalRedirectName: clrp.ObjectMeta.Name,
108+
logfields.K8sUID: clrp.ObjectMeta.UID,
109+
logfields.K8sAPIVersion: clrp.TypeMeta.APIVersion,
110+
logfields.K8sNamespace: clrp.ObjectMeta.Namespace,
111+
})
112+
113+
scopedLog.Debug("Delete CiliumLocalRedirectPolicy")
114+
115+
rp, policyDelErr := redirectpolicy.Parse(clrp, false)
116+
if policyDelErr == nil {
117+
policyDelErr = k.redirectPolicyManager.DeleteRedirectPolicy(*rp)
118+
}
119+
120+
if policyDelErr != nil {
121+
scopedLog.WithError(policyDelErr).Warn("Failed to delete CiliumLocalRedirectPolicy")
122+
} else {
123+
scopedLog.Info("Deleted CiliumLocalRedirectPolicy")
124+
}
125+
126+
return policyDelErr
127+
}

0 commit comments

Comments
 (0)