Skip to content

Commit f8c2f04

Browse files
committed
remotes/ctr: allow to limit max concurrent uploads like downloads
Also add flags for push/pull subcommand to limit max concurrent uploads/downloads. Signed-off-by: Wei Fu <[email protected]>
1 parent 548d984 commit f8c2f04

6 files changed

Lines changed: 49 additions & 6 deletions

File tree

client.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import (
6363
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
6464
specs "github.com/opencontainers/runtime-spec/specs-go"
6565
"github.com/pkg/errors"
66+
"golang.org/x/sync/semaphore"
6667
"google.golang.org/grpc"
6768
"google.golang.org/grpc/backoff"
6869
"google.golang.org/grpc/health/grpc_health_v1"
@@ -355,6 +356,9 @@ type RemoteContext struct {
355356
// MaxConcurrentDownloads is the max concurrent content downloads for each pull.
356357
MaxConcurrentDownloads int
357358

359+
// MaxConcurrentUploadedLayers is the max concurrent uploaded layers for each push.
360+
MaxConcurrentUploadedLayers int
361+
358362
// AllMetadata downloads all manifests and known-configuration files
359363
AllMetadata bool
360364

@@ -463,7 +467,12 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor,
463467
wrapper = pushCtx.HandlerWrapper
464468
}
465469

466-
return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, wrapper)
470+
var limiter *semaphore.Weighted
471+
if pushCtx.MaxConcurrentUploadedLayers > 0 {
472+
limiter = semaphore.NewWeighted(int64(pushCtx.MaxConcurrentUploadedLayers))
473+
}
474+
475+
return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), limiter, pushCtx.PlatformMatcher, wrapper)
467476
}
468477

469478
// GetImage returns an existing image

client_opts.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,14 @@ func WithMaxConcurrentDownloads(max int) RemoteOpt {
228228
}
229229
}
230230

231+
// WithMaxConcurrentUploadedLayers sets max concurrent uploaded layer limit.
232+
func WithMaxConcurrentUploadedLayers(max int) RemoteOpt {
233+
return func(client *Client, c *RemoteContext) error {
234+
c.MaxConcurrentUploadedLayers = max
235+
return nil
236+
}
237+
}
238+
231239
// WithAllMetadata downloads all manifests and known-configuration files
232240
func WithAllMetadata() RemoteOpt {
233241
return func(_ *Client, c *RemoteContext) error {

cmd/ctr/commands/content/fetch.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ type FetchConfig struct {
109109
Platforms []string
110110
// Whether or not download all metadata
111111
AllMetadata bool
112-
// RemoteOpts is not used by ctr, but can be used by other CLI tools
112+
// RemoteOpts to configure object resolutions and transfers with remote content providers
113113
RemoteOpts []containerd.RemoteOpt
114114
// TraceHTTP writes DNS and connection information to the log when dealing with a container registry
115115
TraceHTTP bool
@@ -145,6 +145,16 @@ func NewFetchConfig(ctx context.Context, clicontext *cli.Context) (*FetchConfig,
145145
config.AllMetadata = true
146146
}
147147

148+
if clicontext.IsSet("max-concurrent-downloads") {
149+
mcd := clicontext.Int("max-concurrent-downloads")
150+
config.RemoteOpts = append(config.RemoteOpts, containerd.WithMaxConcurrentDownloads(mcd))
151+
}
152+
153+
if clicontext.IsSet("max-concurrent-uploaded-layers") {
154+
mcu := clicontext.Int("max-concurrent-uploaded-layers")
155+
config.RemoteOpts = append(config.RemoteOpts, containerd.WithMaxConcurrentUploadedLayers(mcu))
156+
}
157+
148158
return config, nil
149159
}
150160

cmd/ctr/commands/images/pull.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ command. As part of this process, we do the following:
6363
Name: "print-chainid",
6464
Usage: "Print the resulting image's chain ID",
6565
},
66+
cli.IntFlag{
67+
Name: "max-concurrent-downloads",
68+
Usage: "Set the max concurrent downloads for each pull",
69+
},
6670
),
6771
Action: func(context *cli.Context) error {
6872
var (

cmd/ctr/commands/images/push.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ var pushCommand = cli.Command{
6464
Name: "platform",
6565
Usage: "push content from a specific platform",
6666
Value: &cli.StringSlice{},
67+
}, cli.IntFlag{
68+
Name: "max-concurrent-uploaded-layers",
69+
Usage: "Set the max concurrent uploaded layers for each push",
6770
}),
6871
Action: func(context *cli.Context) error {
6972
var (
@@ -144,10 +147,17 @@ var pushCommand = cli.Command{
144147
return nil, nil
145148
})
146149

147-
return client.Push(ctx, ref, desc,
150+
ropts := []containerd.RemoteOpt{
148151
containerd.WithResolver(resolver),
149152
containerd.WithImageHandler(jobHandler),
150-
)
153+
}
154+
155+
if context.IsSet("max-concurrent-uploaded-layers") {
156+
mcu := context.Int("max-concurrent-uploaded-layers")
157+
ropts = append(ropts, containerd.WithMaxConcurrentUploadedLayers(mcu))
158+
}
159+
160+
return client.Push(ctx, ref, desc, ropts...)
151161
})
152162

153163
// don't show progress if debug mode is set

remotes/handlers.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3232
"github.com/pkg/errors"
3333
"github.com/sirupsen/logrus"
34+
"golang.org/x/sync/semaphore"
3435
)
3536

3637
type refKeyPrefix struct{}
@@ -181,7 +182,8 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc
181182
//
182183
// Base handlers can be provided which will be called before any push specific
183184
// handlers.
184-
func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, store content.Store, platform platforms.MatchComparer, wrapper func(h images.Handler) images.Handler) error {
185+
func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, store content.Store, limiter *semaphore.Weighted, platform platforms.MatchComparer, wrapper func(h images.Handler) images.Handler) error {
186+
185187
var m sync.Mutex
186188
manifestStack := []ocispec.Descriptor{}
187189

@@ -213,7 +215,7 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, st
213215
handler = wrapper(handler)
214216
}
215217

216-
if err := images.Dispatch(ctx, handler, nil, desc); err != nil {
218+
if err := images.Dispatch(ctx, handler, limiter, desc); err != nil {
217219
return err
218220
}
219221

0 commit comments

Comments
 (0)