Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/remotes/docker/config/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func ConfigureHosts(ctx context.Context, options HostOptions) docker.RegistryHos
}
}
hosts[len(hosts)-1].path = "/v2"
hosts[len(hosts)-1].capabilities = docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush
hosts[len(hosts)-1].capabilities = docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush | docker.HostCapabilityReferrers
}

// tlsConfigured indicates that TLS was configured and HTTP endpoints should
Expand Down Expand Up @@ -458,12 +458,14 @@ func parseHostConfig(server string, baseDir string, config hostFileConfig) (host
result.capabilities |= docker.HostCapabilityResolve
case "push":
result.capabilities |= docker.HostCapabilityPush
case "referrers":
result.capabilities |= docker.HostCapabilityReferrers
default:
return hostConfig{}, fmt.Errorf("unknown capability %v", c)
}
}
} else {
result.capabilities = docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush
result.capabilities = docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush | docker.HostCapabilityReferrers
}

if config.CACert != nil {
Expand Down
18 changes: 16 additions & 2 deletions core/remotes/docker/config/hosts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/containerd/containerd/v2/core/remotes/docker"
)

const allCaps = docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush
const allCaps = docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush | docker.HostCapabilityReferrers

func TestDefaultHosts(t *testing.T) {
ctx := logtest.WithT(context.Background(), t)
Expand Down Expand Up @@ -97,7 +97,7 @@ ca = "/etc/path/default"
capabilities = ["pull"]

[host."https://test-1.registry"]
capabilities = ["pull", "resolve", "push"]
capabilities = ["pull", "resolve", "push", "referrers"]
ca = ["/etc/certs/test-1-ca.pem", "/etc/certs/special.pem"]
client = [["/etc/certs/client.cert", "/etc/certs/client.key"],["/etc/certs/client.pem", ""]]

Expand All @@ -107,6 +107,10 @@ ca = "/etc/path/default"
[host."https://test-3.registry"]
client = ["/etc/certs/client-1.pem", "/etc/certs/client-2.pem"]

[host."https://no-referrers.registry"]
capabilities = ["pull", "resolve", "push"]
client = ["/etc/certs/client-1.pem", "/etc/certs/client-2.pem"]

[host."https://noncompliantmirror.registry/v2/namespaceprefix"]
capabilities = ["pull"]
override_path = true
Expand Down Expand Up @@ -176,6 +180,16 @@ ca = "/etc/path/default"
{filepath.FromSlash("/etc/certs/client-2.pem")},
},
},
{
scheme: "https",
host: "no-referrers.registry",
path: "/v2",
capabilities: docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush,
clientPairs: [][2]string{
{filepath.FromSlash("/etc/certs/client-1.pem")},
{filepath.FromSlash("/etc/certs/client-2.pem")},
},
},
{
scheme: "https",
host: "noncompliantmirror.registry",
Expand Down
30 changes: 16 additions & 14 deletions core/remotes/docker/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"sync"

Expand Down Expand Up @@ -260,7 +259,7 @@ func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R
req.path = req.path + "?" + u.RawQuery
}

rc, err := r.open(ctx, req, desc.MediaType, offset, false)
rc, _, err := r.open(ctx, req, desc.MediaType, offset, false)
if err != nil {
if errdefs.IsNotFound(err) {
continue // try one of the other urls.
Expand All @@ -282,7 +281,7 @@ func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R
return nil, err
}

rc, err := r.open(ctx, req, desc.MediaType, offset, i == len(r.hosts)-1)
rc, _, err := r.open(ctx, req, desc.MediaType, offset, i == len(r.hosts)-1)
if err != nil {
// Store the error for referencing later
if firstErr == nil {
Expand All @@ -305,7 +304,7 @@ func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R
return nil, err
}

rc, err := r.open(ctx, req, desc.MediaType, offset, i == len(r.hosts)-1)
rc, _, err := r.open(ctx, req, desc.MediaType, offset, i == len(r.hosts)-1)
if err != nil {
// Store the error for referencing later
if firstErr == nil {
Expand Down Expand Up @@ -419,8 +418,9 @@ func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest, op
return nil, desc, firstErr
}

seeker, err := newHTTPReadSeeker(sz, func(offset int64) (io.ReadCloser, error) {
return r.open(ctx, getReq, config.Mediatype, offset, true)
seeker, err := newHTTPReadSeeker(sz, func(offset int64) (rc io.ReadCloser, err error) {
rc, _, err = r.open(ctx, getReq, config.Mediatype, offset, true)
return
})
if err != nil {
return nil, desc, err
Expand All @@ -437,7 +437,7 @@ func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest, op
return seeker, desc, nil
}

func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64, lastHost bool) (_ io.ReadCloser, retErr error) {
func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64, lastHost bool) (_ io.ReadCloser, _ int64, retErr error) {
const minChunkSize = 512

chunkSize := int64(r.performances.ConcurrentLayerFetchBuffer)
Expand All @@ -457,21 +457,24 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
}

if err := r.Acquire(ctx, 1); err != nil {
return nil, err
return nil, 0, err
}
var remaining int64
resp, err := req.doWithRetries(ctx, lastHost, withErrorCheck, withOffsetCheck(offset, parallelism))
switch err {
case nil:
// all good
remaining = resp.ContentLength
case errContentRangeIgnored:
if parallelism != 1 {
log.G(ctx).WithError(err).Info("remote host ignored content range, forcing parallelism to 1")
parallelism = 1
}
remaining = resp.ContentLength - offset
default:
log.G(ctx).WithError(err).Debug("fetch failed")
r.Release(1)
return nil, err
return nil, 0, err
}

body := &fnOnClose{
Expand All @@ -490,7 +493,6 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
return r == ' ' || r == '\t' || r == ','
})

remaining, _ := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 0)
if remaining <= chunkSize {
parallelism = 1
}
Expand Down Expand Up @@ -589,25 +591,25 @@ func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string,
zstd.WithDecoderLowmem(false),
)
if err != nil {
return nil, err
return nil, 0, err
}
body.ReadCloser = r.IOReadCloser()
case "gzip":
r, err := gzip.NewReader(body.ReadCloser)
if err != nil {
return nil, err
return nil, 0, err
}
body.ReadCloser = r
case "deflate":
body.ReadCloser = flate.NewReader(body.ReadCloser)
case "identity", "":
// no content-encoding applied, use raw body
default:
return nil, errors.New("unsupported Content-Encoding algorithm: " + algorithm)
return nil, 0, errors.New("unsupported Content-Encoding algorithm: " + algorithm)
}
}

return body, nil
return body, remaining, nil
}

type fnOnClose struct {
Expand Down
2 changes: 1 addition & 1 deletion core/remotes/docker/fetcher_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func FuzzFetcher(f *testing.F) {

ctx := context.Background()
req := f.request(host, http.MethodGet)
rc, err := f.open(ctx, req, "", 0, true)
rc, _, err := f.open(ctx, req, "", 0, true)
if err != nil {
return
}
Expand Down
20 changes: 10 additions & 10 deletions core/remotes/docker/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestFetcherOpen(t *testing.T) {
checkReader := func(o int64) {
t.Helper()

rc, err := f.open(ctx, req, "", o, true)
rc, _, err := f.open(ctx, req, "", o, true)
if err != nil {
t.Fatalf("failed to open: %+v", err)
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestFetcherOpen(t *testing.T) {
// Check that server returning a different content range
// then requested errors
start = 30
_, err = f.open(ctx, req, "", 20, true)
_, _, err = f.open(ctx, req, "", 20, true)
if err == nil {
t.Fatal("expected error opening with invalid server response")
}
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestFetcherOpenParallel(t *testing.T) {
checkReader := func(offset int64) {
t.Helper()

rc, err := f.open(ctx, req, "", offset, true)
rc, _, err := f.open(ctx, req, "", offset, true)
if err != nil {
t.Fatalf("failed to open: %+v", err)
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestFetcherOpenParallel(t *testing.T) {
// Check that server returning a different content range
// than requested errors
forceRange = []httpRange{{start: 10, length: size - 20}}
_, err = f.open(ctx, req, "", 20, true)
_, _, err = f.open(ctx, req, "", 20, true)
if err == nil {
t.Fatal("expected error opening with invalid server response")
}
Expand All @@ -273,14 +273,14 @@ func TestFetcherOpenParallel(t *testing.T) {

failAfter = 1
forceRange = []httpRange{{start: 20}}
_, err = f.open(ctx, req, "", 20, true)
_, _, err = f.open(ctx, req, "", 20, true)
assert.ErrorContains(t, err, "unexpected status")
forceRange = nil
failAfter = 0

// test a case when a subsequent request fails and shouldn't have
failAfter = 1 * 1024 * 1024
body, err := f.open(ctx, req, "", 0, true)
body, _, err := f.open(ctx, req, "", 0, true)
assert.NoError(t, err)
_, err = io.ReadAll(body)
assert.Error(t, err, "this should have failed")
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestContentEncoding(t *testing.T) {

req := f.request(host, http.MethodGet)

rc, err := f.open(context.Background(), req, "", 0, true)
rc, _, err := f.open(context.Background(), req, "", 0, true)
if err != nil {
t.Fatalf("failed to open for encoding %s: %+v", tc.encodingHeader, err)
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestDockerFetcherOpen(t *testing.T) {

req := f.request(host, http.MethodGet)

got, err := f.open(context.TODO(), req, "", 0, tt.lastHost)
got, _, err := f.open(context.TODO(), req, "", 0, tt.lastHost)
assert.Equal(t, tt.wantErr, err != nil)
assert.Equal(t, tt.want, got)
assert.Equal(t, 0, tt.retries)
Expand Down Expand Up @@ -586,11 +586,11 @@ func TestDockerFetcherOpenLimiterDeadlock(t *testing.T) {
}

req := f.request(host, http.MethodGet)
_, err = f.open(context.Background(), req, "", 0, true)
_, _, err = f.open(context.Background(), req, "", 0, true)
assert.Error(t, err)

// verify that the limiter Release has been successfully called when the last open error occurred
_, err = f.open(context.Background(), req, "", 0, true)
_, _, err = f.open(context.Background(), req, "", 0, true)
assert.Error(t, err)
}

Expand Down
Loading