Skip to content

Commit 70e6b8c

Browse files
committed
Wait for caches to sync when IC starts
1 parent 177af78 commit 70e6b8c

File tree

2 files changed

+90
-133
lines changed

2 files changed

+90
-133
lines changed

internal/k8s/controller.go

Lines changed: 90 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525

2626
"github.com/nginxinc/kubernetes-ingress/internal/k8s/appprotect"
27+
"k8s.io/client-go/informers"
2728

2829
"github.com/golang/glog"
2930
"github.com/nginxinc/kubernetes-ingress/internal/k8s/secrets"
@@ -50,6 +51,7 @@ import (
5051
conf_v1alpha1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1"
5152
"github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/validation"
5253
k8s_nginx "github.com/nginxinc/kubernetes-ingress/pkg/client/clientset/versioned"
54+
k8s_nginx_informers "github.com/nginxinc/kubernetes-ingress/pkg/client/informers/externalversions"
5355

5456
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
5557
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -89,28 +91,19 @@ type LoadBalancerController struct {
8991
client kubernetes.Interface
9092
confClient k8s_nginx.Interface
9193
dynClient dynamic.Interface
92-
ingressController cache.Controller
93-
svcController cache.Controller
94-
endpointController cache.Controller
94+
cacheSyncs []cache.InformerSynced
95+
sharedInformerFactory informers.SharedInformerFactory
96+
confSharedInformerFactorry k8s_nginx_informers.SharedInformerFactory
9597
configMapController cache.Controller
96-
secretController cache.Controller
97-
virtualServerController cache.Controller
98-
virtualServerRouteController cache.Controller
99-
podController cache.Controller
10098
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory
101-
appProtectPolicyInformer cache.SharedIndexInformer
102-
appProtectLogConfInformer cache.SharedIndexInformer
103-
appProtectUserSigInformer cache.SharedIndexInformer
10499
globalConfigurationController cache.Controller
105-
transportServerController cache.Controller
106-
policyController cache.Controller
107100
ingressLinkInformer cache.SharedIndexInformer
108101
ingressLister storeToIngressLister
109102
svcLister cache.Store
110103
endpointLister storeToEndpointLister
111104
configMapLister storeToConfigMapLister
112105
podLister indexerToPodLister
113-
secretLister storeToSecretLister
106+
secretLister cache.Store
114107
virtualServerLister cache.Store
115108
virtualServerRouteLister cache.Store
116109
appProtectPolicyLister cache.Store
@@ -243,20 +236,26 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc
243236

244237
glog.V(3).Infof("Nginx Ingress Controller has class: %v", input.IngressClass)
245238

239+
lbc.sharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(lbc.client, input.ResyncPeriod, informers.WithNamespace(lbc.namespace))
240+
246241
// create handlers for resources we care about
247242
lbc.addSecretHandler(createSecretHandlers(lbc))
248243
lbc.addIngressHandler(createIngressHandlers(lbc))
249244
lbc.addServiceHandler(createServiceHandlers(lbc))
250245
lbc.addEndpointHandler(createEndpointHandlers(lbc))
251246
lbc.addPodHandler()
247+
252248
if lbc.appProtectEnabled {
253249
lbc.dynInformerFactory = dynamicinformer.NewDynamicSharedInformerFactory(lbc.dynClient, 0)
250+
254251
lbc.addAppProtectPolicyHandler(createAppProtectPolicyHandlers(lbc))
255252
lbc.addAppProtectLogConfHandler(createAppProtectLogConfHandlers(lbc))
256253
lbc.addAppProtectUserSigHandler(createAppProtectUserSigHandlers(lbc))
257254
}
258255

259256
if lbc.areCustomResourcesEnabled {
257+
lbc.confSharedInformerFactorry = k8s_nginx_informers.NewSharedInformerFactoryWithOptions(lbc.confClient, input.ResyncPeriod, k8s_nginx_informers.WithNamespace(lbc.namespace))
258+
260259
lbc.addVirtualServerHandler(createVirtualServerHandlers(lbc))
261260
lbc.addVirtualServerRouteHandler(createVirtualServerRouteHandlers(lbc))
262261
lbc.addTransportServerHandler(createTransportServerHandlers(lbc))
@@ -337,79 +336,65 @@ func (lbc *LoadBalancerController) AddSyncQueue(item interface{}) {
337336

338337
// addappProtectPolicyHandler creates dynamic informers for custom appprotect policy resource
339338
func (lbc *LoadBalancerController) addAppProtectPolicyHandler(handlers cache.ResourceEventHandlerFuncs) {
340-
lbc.appProtectPolicyInformer = lbc.dynInformerFactory.ForResource(appprotect.PolicyGVR).Informer()
341-
lbc.appProtectPolicyLister = lbc.appProtectPolicyInformer.GetStore()
342-
lbc.appProtectPolicyInformer.AddEventHandler(handlers)
339+
informer := lbc.dynInformerFactory.ForResource(appprotect.PolicyGVR).Informer()
340+
informer.AddEventHandler(handlers)
341+
lbc.appProtectPolicyLister = informer.GetStore()
342+
343+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
343344
}
344345

345346
// addappProtectLogConfHandler creates dynamic informer for custom appprotect logging config resource
346347
func (lbc *LoadBalancerController) addAppProtectLogConfHandler(handlers cache.ResourceEventHandlerFuncs) {
347-
lbc.appProtectLogConfInformer = lbc.dynInformerFactory.ForResource(appprotect.LogConfGVR).Informer()
348-
lbc.appProtectLogConfLister = lbc.appProtectLogConfInformer.GetStore()
349-
lbc.appProtectLogConfInformer.AddEventHandler(handlers)
348+
informer := lbc.dynInformerFactory.ForResource(appprotect.LogConfGVR).Informer()
349+
informer.AddEventHandler(handlers)
350+
lbc.appProtectLogConfLister = informer.GetStore()
351+
352+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
350353
}
351354

352355
// addappProtectUserSigHandler creates dynamic informer for custom appprotect user defined signature resource
353356
func (lbc *LoadBalancerController) addAppProtectUserSigHandler(handlers cache.ResourceEventHandlerFuncs) {
354-
lbc.appProtectUserSigInformer = lbc.dynInformerFactory.ForResource(appprotect.UserSigGVR).Informer()
355-
lbc.appProtectUserSigLister = lbc.appProtectUserSigInformer.GetStore()
356-
lbc.appProtectUserSigInformer.AddEventHandler(handlers)
357+
informer := lbc.dynInformerFactory.ForResource(appprotect.UserSigGVR).Informer()
358+
informer.AddEventHandler(handlers)
359+
lbc.appProtectUserSigLister = informer.GetStore()
360+
361+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
357362
}
358363

359364
// addSecretHandler adds the handler for secrets to the controller
360365
func (lbc *LoadBalancerController) addSecretHandler(handlers cache.ResourceEventHandlerFuncs) {
361-
lbc.secretLister.Store, lbc.secretController = cache.NewInformer(
362-
cache.NewListWatchFromClient(
363-
lbc.client.CoreV1().RESTClient(),
364-
"secrets",
365-
lbc.namespace,
366-
fields.Everything()),
367-
&api_v1.Secret{},
368-
lbc.resync,
369-
handlers,
370-
)
366+
informer := lbc.sharedInformerFactory.Core().V1().Secrets().Informer()
367+
informer.AddEventHandler(handlers)
368+
lbc.secretLister = informer.GetStore()
369+
370+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
371371
}
372372

373373
// addServiceHandler adds the handler for services to the controller
374374
func (lbc *LoadBalancerController) addServiceHandler(handlers cache.ResourceEventHandlerFuncs) {
375-
lbc.svcLister, lbc.svcController = cache.NewInformer(
376-
cache.NewListWatchFromClient(
377-
lbc.client.CoreV1().RESTClient(),
378-
"services",
379-
lbc.namespace,
380-
fields.Everything()),
381-
&api_v1.Service{},
382-
lbc.resync,
383-
handlers,
384-
)
375+
informer := lbc.sharedInformerFactory.Core().V1().Services().Informer()
376+
informer.AddEventHandler(handlers)
377+
lbc.svcLister = informer.GetStore()
378+
379+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
385380
}
386381

387382
// addIngressHandler adds the handler for ingresses to the controller
388383
func (lbc *LoadBalancerController) addIngressHandler(handlers cache.ResourceEventHandlerFuncs) {
389-
lbc.ingressLister.Store, lbc.ingressController = cache.NewInformer(
390-
cache.NewListWatchFromClient(
391-
lbc.client.NetworkingV1beta1().RESTClient(),
392-
"ingresses",
393-
lbc.namespace,
394-
fields.Everything()),
395-
&networking.Ingress{},
396-
lbc.resync,
397-
handlers,
398-
)
384+
informer := lbc.sharedInformerFactory.Networking().V1beta1().Ingresses().Informer()
385+
informer.AddEventHandler(handlers)
386+
lbc.ingressLister.Store = informer.GetStore()
387+
388+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
399389
}
400390

401391
// addEndpointHandler adds the handler for endpoints to the controller
402392
func (lbc *LoadBalancerController) addEndpointHandler(handlers cache.ResourceEventHandlerFuncs) {
403-
lbc.endpointLister.Store, lbc.endpointController = cache.NewInformer(
404-
cache.NewListWatchFromClient(
405-
lbc.client.CoreV1().RESTClient(),
406-
"endpoints",
407-
lbc.namespace,
408-
fields.Everything()),
409-
&api_v1.Endpoints{},
410-
lbc.resync,
411-
handlers,
412-
)
393+
informer := lbc.sharedInformerFactory.Core().V1().Endpoints().Informer()
394+
informer.AddEventHandler(handlers)
395+
lbc.endpointLister.Store = informer.GetStore()
396+
397+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
413398
}
414399

415400
// addConfigMapHandler adds the handler for config maps to the controller
@@ -424,59 +409,38 @@ func (lbc *LoadBalancerController) addConfigMapHandler(handlers cache.ResourceEv
424409
lbc.resync,
425410
handlers,
426411
)
412+
lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.configMapController.HasSynced)
427413
}
428414

429415
func (lbc *LoadBalancerController) addPodHandler() {
430-
lbc.podLister.Indexer, lbc.podController = cache.NewIndexerInformer(
431-
cache.NewListWatchFromClient(
432-
lbc.client.CoreV1().RESTClient(),
433-
"pods",
434-
lbc.namespace,
435-
fields.Everything()),
436-
&api_v1.Pod{},
437-
lbc.resync,
438-
cache.ResourceEventHandlerFuncs{},
439-
cache.Indexers{},
440-
)
416+
informer := lbc.sharedInformerFactory.Core().V1().Pods().Informer()
417+
lbc.podLister.Indexer = informer.GetIndexer()
418+
419+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
441420
}
442421

443422
func (lbc *LoadBalancerController) addVirtualServerHandler(handlers cache.ResourceEventHandlerFuncs) {
444-
lbc.virtualServerLister, lbc.virtualServerController = cache.NewInformer(
445-
cache.NewListWatchFromClient(
446-
lbc.confClient.K8sV1().RESTClient(),
447-
"virtualservers",
448-
lbc.namespace,
449-
fields.Everything()),
450-
&conf_v1.VirtualServer{},
451-
lbc.resync,
452-
handlers,
453-
)
423+
informer := lbc.confSharedInformerFactorry.K8s().V1().VirtualServers().Informer()
424+
informer.AddEventHandler(handlers)
425+
lbc.virtualServerLister = informer.GetStore()
426+
427+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
454428
}
455429

456430
func (lbc *LoadBalancerController) addVirtualServerRouteHandler(handlers cache.ResourceEventHandlerFuncs) {
457-
lbc.virtualServerRouteLister, lbc.virtualServerRouteController = cache.NewInformer(
458-
cache.NewListWatchFromClient(
459-
lbc.confClient.K8sV1().RESTClient(),
460-
"virtualserverroutes",
461-
lbc.namespace,
462-
fields.Everything()),
463-
&conf_v1.VirtualServerRoute{},
464-
lbc.resync,
465-
handlers,
466-
)
431+
informer := lbc.confSharedInformerFactorry.K8s().V1().VirtualServerRoutes().Informer()
432+
informer.AddEventHandler(handlers)
433+
lbc.virtualServerRouteLister = informer.GetStore()
434+
435+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
467436
}
468437

469438
func (lbc *LoadBalancerController) addPolicyHandler(handlers cache.ResourceEventHandlerFuncs) {
470-
lbc.policyLister, lbc.policyController = cache.NewInformer(
471-
cache.NewListWatchFromClient(
472-
lbc.confClient.K8sV1().RESTClient(),
473-
"policies",
474-
lbc.namespace,
475-
fields.Everything()),
476-
&conf_v1.Policy{},
477-
lbc.resync,
478-
handlers,
479-
)
439+
informer := lbc.confSharedInformerFactorry.K8s().V1().Policies().Informer()
440+
informer.AddEventHandler(handlers)
441+
lbc.policyLister = informer.GetStore()
442+
443+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
480444
}
481445

482446
func (lbc *LoadBalancerController) addGlobalConfigurationHandler(handlers cache.ResourceEventHandlerFuncs, namespace string, name string) {
@@ -490,19 +454,15 @@ func (lbc *LoadBalancerController) addGlobalConfigurationHandler(handlers cache.
490454
lbc.resync,
491455
handlers,
492456
)
457+
lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.globalConfigurationController.HasSynced)
493458
}
494459

495460
func (lbc *LoadBalancerController) addTransportServerHandler(handlers cache.ResourceEventHandlerFuncs) {
496-
lbc.transportServerLister, lbc.transportServerController = cache.NewInformer(
497-
cache.NewListWatchFromClient(
498-
lbc.confClient.K8sV1alpha1().RESTClient(),
499-
"transportservers",
500-
lbc.namespace,
501-
fields.Everything()),
502-
&conf_v1alpha1.TransportServer{},
503-
lbc.resync,
504-
handlers,
505-
)
461+
informer := lbc.confSharedInformerFactorry.K8s().V1alpha1().TransportServers().Informer()
462+
informer.AddEventHandler(handlers)
463+
lbc.transportServerLister = informer.GetStore()
464+
465+
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
506466
}
507467

508468
func (lbc *LoadBalancerController) addIngressLinkHandler(handlers cache.ResourceEventHandlerFuncs, name string) {
@@ -517,46 +477,48 @@ func (lbc *LoadBalancerController) addIngressLinkHandler(handlers cache.Resource
517477

518478
lbc.ingressLinkInformer = informer.Informer()
519479
lbc.ingressLinkLister = informer.Informer().GetStore()
480+
481+
lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.ingressLinkInformer.HasSynced)
520482
}
521483

522484
// Run starts the loadbalancer controller
523485
func (lbc *LoadBalancerController) Run() {
524486
lbc.ctx, lbc.cancel = context.WithCancel(context.Background())
525-
if lbc.appProtectEnabled {
526-
go lbc.dynInformerFactory.Start(lbc.ctx.Done())
527-
}
528487

529488
if lbc.spiffeController != nil {
530489
err := lbc.spiffeController.Start(lbc.ctx.Done(), lbc.addInternalRouteServer)
531490
if err != nil {
532491
glog.Fatal(err)
533492
}
534493
}
535-
536494
if lbc.leaderElector != nil {
537495
go lbc.leaderElector.Run(lbc.ctx)
538496
}
539-
go lbc.svcController.Run(lbc.ctx.Done())
540-
go lbc.podController.Run(lbc.ctx.Done())
541-
go lbc.endpointController.Run(lbc.ctx.Done())
542-
go lbc.secretController.Run(lbc.ctx.Done())
497+
498+
go lbc.sharedInformerFactory.Start(lbc.ctx.Done())
543499
if lbc.watchNginxConfigMaps {
544500
go lbc.configMapController.Run(lbc.ctx.Done())
545501
}
546-
go lbc.ingressController.Run(lbc.ctx.Done())
547-
548502
if lbc.areCustomResourcesEnabled {
549-
go lbc.virtualServerController.Run(lbc.ctx.Done())
550-
go lbc.virtualServerRouteController.Run(lbc.ctx.Done())
551-
go lbc.transportServerController.Run(lbc.ctx.Done())
552-
go lbc.policyController.Run(lbc.ctx.Done())
503+
go lbc.confSharedInformerFactorry.Start(lbc.ctx.Done())
553504
}
554505
if lbc.watchGlobalConfiguration {
555506
go lbc.globalConfigurationController.Run(lbc.ctx.Done())
556507
}
557508
if lbc.watchIngressLink {
558509
go lbc.ingressLinkInformer.Run(lbc.ctx.Done())
559510
}
511+
if lbc.appProtectEnabled {
512+
go lbc.dynInformerFactory.Start(lbc.ctx.Done())
513+
}
514+
515+
glog.V(3).Infof("Waiting for %d caches to sync", len(lbc.cacheSyncs))
516+
517+
if !cache.WaitForCacheSync(lbc.ctx.Done(), lbc.cacheSyncs...) {
518+
return
519+
}
520+
521+
glog.V(3).Infof("Starting the queue with %d initial elements", lbc.syncQueue.Len())
560522

561523
go lbc.syncQueue.Run(time.Second, lbc.ctx.Done())
562524
<-lbc.ctx.Done()
@@ -1731,7 +1693,7 @@ func (lbc *LoadBalancerController) reportCustomResourceStatusEnabled() bool {
17311693

17321694
func (lbc *LoadBalancerController) syncSecret(task task) {
17331695
key := task.Key
1734-
obj, secrExists, err := lbc.secretLister.Store.GetByKey(key)
1696+
obj, secrExists, err := lbc.secretLister.GetByKey(key)
17351697
if err != nil {
17361698
lbc.syncQueue.Requeue(task, err)
17371699
return

internal/k8s/utils.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,6 @@ func findPort(pod *v1.Pod, svcPort *v1.ServicePort) (int32, error) {
152152
return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
153153
}
154154

155-
// storeToSecretLister makes a Store that lists Secrets
156-
type storeToSecretLister struct {
157-
cache.Store
158-
}
159-
160155
// isMinion determines is an ingress is a minion or not
161156
func isMinion(ing *networking.Ingress) bool {
162157
return ing.Annotations["nginx.org/mergeable-ingress-type"] == "minion"

0 commit comments

Comments
 (0)