Skip to content

Commit ed78ae9

Browse files
committed
adaptation: use multiple sync messages if necessary.
Try using multiple messages if we hit maximum ttrpc payload limit for initial plugin synchronization. Signed-off-by: Krisztian Litkey <[email protected]>
1 parent 6fd59d6 commit ed78ae9

File tree

1 file changed

+92
-8
lines changed

1 file changed

+92
-8
lines changed

pkg/adaptation/plugin.go

+92-8
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
"github.com/containerd/nri/pkg/net"
3434
"github.com/containerd/nri/pkg/net/multiplex"
3535
"github.com/containerd/ttrpc"
36+
"google.golang.org/grpc/codes"
37+
"google.golang.org/grpc/status"
3638
)
3739

3840
const (
@@ -414,19 +416,101 @@ func (p *plugin) synchronize(ctx context.Context, pods []*PodSandbox, containers
414416
ctx, cancel := context.WithTimeout(ctx, getPluginRequestTimeout())
415417
defer cancel()
416418

417-
req := &SynchronizeRequest{
418-
Pods: pods,
419-
Containers: containers,
420-
}
421-
rpl, err := p.stub.Synchronize(ctx, req)
422-
if err != nil {
423-
p.close()
424-
return nil, err
419+
var (
420+
podsToSend = pods
421+
ctrsToSend = containers
422+
podsPerMsg = len(pods)
423+
ctrsPerMsg = len(containers)
424+
425+
rpl *SynchronizeResponse
426+
err error
427+
)
428+
429+
for {
430+
req := &SynchronizeRequest{
431+
Pods: podsToSend[:podsPerMsg],
432+
Containers: ctrsToSend[:ctrsPerMsg],
433+
More: len(podsToSend) > podsPerMsg || len(ctrsToSend) > ctrsPerMsg,
434+
}
435+
436+
log.Debugf(ctx, "sending sync message, %d/%d, %d/%d (more: %v)",
437+
len(req.Pods), len(podsToSend), len(req.Containers), len(ctrsToSend), req.More)
438+
439+
rpl, err = p.stub.Synchronize(ctx, req)
440+
if err == nil {
441+
if !req.More {
442+
break
443+
}
444+
445+
if len(rpl.Update) > 0 || rpl.More != req.More {
446+
p.close()
447+
return nil, fmt.Errorf("plugin does not handle split sync requests")
448+
}
449+
450+
podsToSend = podsToSend[podsPerMsg:]
451+
ctrsToSend = ctrsToSend[ctrsPerMsg:]
452+
453+
if podsPerMsg > len(podsToSend) {
454+
podsPerMsg = len(podsToSend)
455+
}
456+
if ctrsPerMsg > len(ctrsToSend) {
457+
ctrsPerMsg = len(ctrsToSend)
458+
}
459+
} else {
460+
podsPerMsg, ctrsPerMsg, err = recalcObjsPerSyncMsg(podsPerMsg, ctrsPerMsg, err)
461+
if err != nil {
462+
p.close()
463+
return nil, err
464+
}
465+
466+
log.Debugf(ctx, "oversized message, retrying in smaller chunks")
467+
}
425468
}
426469

427470
return rpl.Update, nil
428471
}
429472

473+
func recalcObjsPerSyncMsg(pods, ctrs int, err error) (int, int, error) {
474+
const (
475+
minObjsPerMsg = 8
476+
)
477+
478+
if status.Code(err) != codes.ResourceExhausted {
479+
return pods, ctrs, err
480+
}
481+
482+
if pods+ctrs <= minObjsPerMsg {
483+
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
484+
}
485+
486+
var e *ttrpc.OversizedMessageErr
487+
if !errors.As(err, &e) {
488+
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
489+
}
490+
491+
maxLen := e.MaximumLength()
492+
msgLen := e.RejectedLength()
493+
494+
if msgLen == 0 || maxLen == 0 || msgLen <= maxLen {
495+
return pods, ctrs, fmt.Errorf("failed to synchronize plugin with split messages")
496+
}
497+
498+
factor := float64(maxLen) / float64(msgLen)
499+
if factor > 0.9 {
500+
factor = 0.9
501+
}
502+
503+
pods = int(float64(pods) * factor)
504+
ctrs = int(float64(ctrs) * factor)
505+
506+
if pods+ctrs < minObjsPerMsg {
507+
pods = minObjsPerMsg / 2
508+
ctrs = minObjsPerMsg / 2
509+
}
510+
511+
return pods, ctrs, nil
512+
}
513+
430514
// Relay CreateContainer request to plugin.
431515
func (p *plugin) createContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
432516
if !p.events.IsSet(Event_CREATE_CONTAINER) {

0 commit comments

Comments
 (0)