@@ -33,6 +33,8 @@ import (
33
33
"github.com/containerd/nri/pkg/net"
34
34
"github.com/containerd/nri/pkg/net/multiplex"
35
35
"github.com/containerd/ttrpc"
36
+ "google.golang.org/grpc/codes"
37
+ "google.golang.org/grpc/status"
36
38
)
37
39
38
40
const (
@@ -414,19 +416,101 @@ func (p *plugin) synchronize(ctx context.Context, pods []*PodSandbox, containers
414
416
ctx , cancel := context .WithTimeout (ctx , getPluginRequestTimeout ())
415
417
defer cancel ()
416
418
417
- req := & SynchronizeRequest {
418
- Pods : pods ,
419
- Containers : containers ,
420
- }
421
- rpl , err := p .stub .Synchronize (ctx , req )
422
- if err != nil {
423
- p .close ()
424
- return nil , err
419
+ var (
420
+ podsToSend = pods
421
+ ctrsToSend = containers
422
+ podsPerMsg = len (pods )
423
+ ctrsPerMsg = len (containers )
424
+
425
+ rpl * SynchronizeResponse
426
+ err error
427
+ )
428
+
429
+ for {
430
+ req := & SynchronizeRequest {
431
+ Pods : podsToSend [:podsPerMsg ],
432
+ Containers : ctrsToSend [:ctrsPerMsg ],
433
+ More : len (podsToSend ) > podsPerMsg || len (ctrsToSend ) > ctrsPerMsg ,
434
+ }
435
+
436
+ log .Debugf (ctx , "sending sync message, %d/%d, %d/%d (more: %v)" ,
437
+ len (req .Pods ), len (podsToSend ), len (req .Containers ), len (ctrsToSend ), req .More )
438
+
439
+ rpl , err = p .stub .Synchronize (ctx , req )
440
+ if err == nil {
441
+ if ! req .More {
442
+ break
443
+ }
444
+
445
+ if len (rpl .Update ) > 0 || rpl .More != req .More {
446
+ p .close ()
447
+ return nil , fmt .Errorf ("plugin does not handle split sync requests" )
448
+ }
449
+
450
+ podsToSend = podsToSend [podsPerMsg :]
451
+ ctrsToSend = ctrsToSend [ctrsPerMsg :]
452
+
453
+ if podsPerMsg > len (podsToSend ) {
454
+ podsPerMsg = len (podsToSend )
455
+ }
456
+ if ctrsPerMsg > len (ctrsToSend ) {
457
+ ctrsPerMsg = len (ctrsToSend )
458
+ }
459
+ } else {
460
+ podsPerMsg , ctrsPerMsg , err = recalcObjsPerSyncMsg (podsPerMsg , ctrsPerMsg , err )
461
+ if err != nil {
462
+ p .close ()
463
+ return nil , err
464
+ }
465
+
466
+ log .Debugf (ctx , "oversized message, retrying in smaller chunks" )
467
+ }
425
468
}
426
469
427
470
return rpl .Update , nil
428
471
}
429
472
473
+ func recalcObjsPerSyncMsg (pods , ctrs int , err error ) (int , int , error ) {
474
+ const (
475
+ minObjsPerMsg = 8
476
+ )
477
+
478
+ if status .Code (err ) != codes .ResourceExhausted {
479
+ return pods , ctrs , err
480
+ }
481
+
482
+ if pods + ctrs <= minObjsPerMsg {
483
+ return pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
484
+ }
485
+
486
+ var e * ttrpc.OversizedMessageErr
487
+ if ! errors .As (err , & e ) {
488
+ return pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
489
+ }
490
+
491
+ maxLen := e .MaximumLength ()
492
+ msgLen := e .RejectedLength ()
493
+
494
+ if msgLen == 0 || maxLen == 0 || msgLen <= maxLen {
495
+ return pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
496
+ }
497
+
498
+ factor := float64 (maxLen ) / float64 (msgLen )
499
+ if factor > 0.9 {
500
+ factor = 0.9
501
+ }
502
+
503
+ pods = int (float64 (pods ) * factor )
504
+ ctrs = int (float64 (ctrs ) * factor )
505
+
506
+ if pods + ctrs < minObjsPerMsg {
507
+ pods = minObjsPerMsg / 2
508
+ ctrs = minObjsPerMsg / 2
509
+ }
510
+
511
+ return pods , ctrs , nil
512
+ }
513
+
430
514
// Relay CreateContainer request to plugin.
431
515
func (p * plugin ) createContainer (ctx context.Context , req * CreateContainerRequest ) (* CreateContainerResponse , error ) {
432
516
if ! p .events .IsSet (Event_CREATE_CONTAINER ) {
0 commit comments