Skip to content

Commit 8cd2d33

Browse files
committed
[release/1.7] remotes/docker: Add MountedFrom and Exists push status
This makes it possible to check whether content didn't actually need to be pushed to the remote registry and was cross-repo mounted or already existed. Signed-off-by: Paweł Gronowski <[email protected]> (cherry picked from commit dfc7590) Signed-off-by: Paweł Gronowski <[email protected]>
1 parent 091922f commit 8cd2d33

3 files changed

Lines changed: 124 additions & 10 deletions

File tree

remotes/docker/pusher.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"io"
2424
"net/http"
2525
"net/url"
26+
"path"
2627
"strings"
2728
"sync"
2829
"time"
@@ -137,6 +138,9 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
137138
if exists {
138139
p.tracker.SetStatus(ref, Status{
139140
Committed: true,
141+
PushStatus: PushStatus{
142+
Exists: true,
143+
},
140144
Status: content.Status{
141145
Ref: ref,
142146
Total: desc.Size,
@@ -164,6 +168,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
164168
// Start upload request
165169
req = p.request(host, http.MethodPost, "blobs", "uploads/")
166170

171+
mountedFrom := ""
167172
var resp *http.Response
168173
if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" {
169174
preq := requestWithMountFrom(req, desc.Digest.String(), fromRepo)
@@ -180,11 +185,14 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
180185
return nil, err
181186
}
182187

183-
if resp.StatusCode == http.StatusUnauthorized {
188+
switch resp.StatusCode {
189+
case http.StatusUnauthorized:
184190
log.G(ctx).Debugf("failed to mount from repository %s", fromRepo)
185191

186192
resp.Body.Close()
187193
resp = nil
194+
case http.StatusCreated:
195+
mountedFrom = path.Join(p.refspec.Locator, fromRepo)
188196
}
189197
}
190198

@@ -204,6 +212,9 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
204212
case http.StatusCreated:
205213
p.tracker.SetStatus(ref, Status{
206214
Committed: true,
215+
PushStatus: PushStatus{
216+
MountedFrom: mountedFrom,
217+
},
207218
Status: content.Status{
208219
Ref: ref,
209220
Total: desc.Size,

remotes/docker/pusher_test.go

Lines changed: 101 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
"github.com/containerd/containerd/content"
3333
"github.com/containerd/containerd/errdefs"
34+
"github.com/containerd/containerd/reference"
3435
"github.com/containerd/containerd/remotes"
3536
"github.com/opencontainers/go-digest"
3637
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -69,7 +70,7 @@ func TestGetManifestPath(t *testing.T) {
6970
func TestPusherErrClosedRetry(t *testing.T) {
7071
ctx := context.Background()
7172

72-
p, reg, done := samplePusher(t)
73+
p, reg, _, done := samplePusher(t)
7374
defer done()
7475

7576
layerContent := []byte("test")
@@ -88,7 +89,7 @@ func TestPusherErrClosedRetry(t *testing.T) {
8889
// TestPusherErrReset tests the push method if the request needs to be retried
8990
// i.e when ErrReset occurs
9091
func TestPusherErrReset(t *testing.T) {
91-
p, reg, done := samplePusher(t)
92+
p, reg, _, done := samplePusher(t)
9293
defer done()
9394

9495
p.object = "latest@sha256:55d31f3af94c797b65b310569803cacc1c9f4a34bf61afcdc8138f89345c8308"
@@ -149,7 +150,7 @@ func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent [
149150
return cw.Commit(ctx, 0, "")
150151
}
151152

152-
func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) {
153+
func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, StatusTrackLocker, func()) {
153154
reg := &uploadableMockRegistry{
154155
availableContents: make([]string, 0),
155156
}
@@ -158,8 +159,12 @@ func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func())
158159
if err != nil {
159160
t.Fatal(err)
160161
}
162+
tracker := NewInMemoryTracker()
161163
return dockerPusher{
162164
dockerBase: &dockerBase{
165+
refspec: reference.Spec{
166+
Locator: "sample",
167+
},
163168
repository: "sample",
164169
hosts: []RegistryHost{
165170
{
@@ -172,8 +177,8 @@ func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func())
172177
},
173178
},
174179
object: "sample",
175-
tracker: NewInMemoryTracker(),
176-
}, reg, s.Close
180+
tracker: tracker,
181+
}, reg, tracker, s.Close
177182
}
178183

179184
var manifestRegexp = regexp.MustCompile(`/([a-z0-9]+)/manifests/(.*)`)
@@ -204,11 +209,21 @@ func (u *uploadableMockRegistry) defaultHandler(w http.ResponseWriter, r *http.R
204209
} else {
205210
w.Header().Set("Location", "/cannotupload")
206211
}
212+
207213
dgstr := digest.Canonical.Digester()
214+
208215
if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil {
209216
w.WriteHeader(http.StatusInternalServerError)
210217
return
211218
}
219+
220+
query := r.URL.Query()
221+
if query.Has("mount") && query.Get("from") == "always-mount" {
222+
w.Header().Set("Docker-Content-Digest", dgstr.Digest().String())
223+
w.WriteHeader(http.StatusCreated)
224+
return
225+
}
226+
212227
u.availableContents = append(u.availableContents, dgstr.Digest().String())
213228
w.WriteHeader(http.StatusAccepted)
214229
return
@@ -262,7 +277,7 @@ func (u *uploadableMockRegistry) isContentAlreadyExist(c string) bool {
262277

263278
func Test_dockerPusher_push(t *testing.T) {
264279

265-
p, reg, done := samplePusher(t)
280+
p, reg, tracker, done := samplePusher(t)
266281
defer done()
267282

268283
reg.uploadable = true
@@ -280,6 +295,7 @@ func Test_dockerPusher_push(t *testing.T) {
280295
mediatype string
281296
ref string
282297
unavailableOnFail bool
298+
annotations map[string]string
283299
}
284300
tests := []struct {
285301
name string
@@ -288,6 +304,7 @@ func Test_dockerPusher_push(t *testing.T) {
288304
args args
289305
checkerFunc func(writer *pushWriter) bool
290306
wantErr error
307+
wantStatus *PushStatus
291308
}{
292309
{
293310
name: "when a manifest is pushed",
@@ -321,6 +338,68 @@ func Test_dockerPusher_push(t *testing.T) {
321338
unavailableOnFail: false,
322339
},
323340
wantErr: fmt.Errorf("content %v on remote: %w", digest.FromBytes(manifestContent), errdefs.ErrAlreadyExists),
341+
wantStatus: &PushStatus{
342+
Exists: true,
343+
MountedFrom: "",
344+
},
345+
},
346+
{
347+
name: "success cross-repo mount a blob layer",
348+
dp: p,
349+
// Not needed to set the base object as it is used to generate path only in case of manifests
350+
// dockerBaseObject:
351+
args: args{
352+
content: layerContent,
353+
mediatype: ocispec.MediaTypeImageLayer,
354+
ref: fmt.Sprintf("layer2-%s", layerContentDigest.String()),
355+
unavailableOnFail: false,
356+
annotations: map[string]string{
357+
distributionSourceLabelKey("sample"): "always-mount",
358+
},
359+
},
360+
checkerFunc: func(writer *pushWriter) bool {
361+
select {
362+
case resp := <-writer.respC:
363+
// 201 should be the response code when uploading a new blob
364+
return resp.StatusCode == http.StatusCreated
365+
case <-writer.errC:
366+
return false
367+
}
368+
},
369+
wantErr: fmt.Errorf("content %v on remote: %w", digest.FromBytes(layerContent), errdefs.ErrAlreadyExists),
370+
wantStatus: &PushStatus{
371+
MountedFrom: "sample/always-mount",
372+
Exists: false,
373+
},
374+
},
375+
{
376+
name: "failed to cross-repo mount a blob layer",
377+
dp: p,
378+
// Not needed to set the base object as it is used to generate path only in case of manifests
379+
// dockerBaseObject:
380+
args: args{
381+
content: layerContent,
382+
mediatype: ocispec.MediaTypeImageLayer,
383+
ref: fmt.Sprintf("layer3-%s", layerContentDigest.String()),
384+
unavailableOnFail: false,
385+
annotations: map[string]string{
386+
distributionSourceLabelKey("sample"): "never-mount",
387+
},
388+
},
389+
checkerFunc: func(writer *pushWriter) bool {
390+
select {
391+
case resp := <-writer.respC:
392+
// 201 should be the response code when uploading a new blob
393+
return resp.StatusCode == http.StatusCreated
394+
case <-writer.errC:
395+
return false
396+
}
397+
},
398+
wantErr: nil,
399+
wantStatus: &PushStatus{
400+
MountedFrom: "",
401+
Exists: false,
402+
},
324403
},
325404
{
326405
name: "trying to push a blob layer",
@@ -343,21 +422,34 @@ func Test_dockerPusher_push(t *testing.T) {
343422
}
344423
},
345424
wantErr: nil,
425+
wantStatus: &PushStatus{
426+
MountedFrom: "",
427+
Exists: false,
428+
},
346429
},
347430
}
348431
for _, test := range tests {
432+
test := test
349433
t.Run(test.name, func(t *testing.T) {
350434
desc := ocispec.Descriptor{
351-
MediaType: test.args.mediatype,
352-
Digest: digest.FromBytes(test.args.content),
353-
Size: int64(len(test.args.content)),
435+
MediaType: test.args.mediatype,
436+
Digest: digest.FromBytes(test.args.content),
437+
Size: int64(len(test.args.content)),
438+
Annotations: test.args.annotations,
354439
}
355440

356441
test.dp.object = test.dockerBaseObject
357442

358443
got, err := test.dp.push(context.Background(), desc, test.args.ref, test.args.unavailableOnFail)
359444

360445
assert.Equal(t, test.wantErr, err)
446+
447+
if test.wantStatus != nil {
448+
status, err := tracker.GetStatus(test.args.ref)
449+
assert.NoError(t, err)
450+
assert.Equal(t, *test.wantStatus, status.PushStatus)
451+
}
452+
361453
// if an error is expected, further comparisons are not required.
362454
if test.wantErr != nil {
363455
return

remotes/docker/status.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,17 @@ type Status struct {
3636

3737
// UploadUUID is used by the Docker registry to reference blob uploads
3838
UploadUUID string
39+
40+
// PushStatus contains status related to push.
41+
PushStatus
42+
}
43+
44+
type PushStatus struct {
45+
// MountedFrom is the source content was cross-repo mounted from (empty if no cross-repo mount was performed).
46+
MountedFrom string
47+
48+
// Exists indicates whether content already exists in the repository and wasn't uploaded.
49+
Exists bool
3950
}
4051

4152
// StatusTracker to track status of operations

0 commit comments

Comments
 (0)