Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type ImageAPIClient interface {
ImageImport(ctx context.Context, source ImageImportSource, ref string, options ImageImportOptions) (io.ReadCloser, error)

ImageList(ctx context.Context, options ImageListOptions) ([]image.Summary, error)
ImagePull(ctx context.Context, ref string, options ImagePullOptions) (io.ReadCloser, error)
ImagePull(ctx context.Context, ref string, options ImagePullOptions) (ImagePullResponse, error)
ImagePush(ctx context.Context, ref string, options ImagePushOptions) (io.ReadCloser, error)
ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]image.DeleteResponse, error)
ImageSearch(ctx context.Context, term string, options ImageSearchOptions) ([]registry.SearchResult, error)
Expand Down
76 changes: 71 additions & 5 deletions client/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,84 @@ package client

import (
"context"
"encoding/json"
"errors"
"io"
"iter"
"net/url"
"strings"
"sync"

cerrdefs "github.com/containerd/errdefs"
"github.com/distribution/reference"
"github.com/moby/moby/client/pkg/jsonmessage"
)

func newImagePullResponse(rc io.ReadCloser) ImagePullResponse {
return ImagePullResponse{
rc: rc,
close: &sync.Once{},
}
}

type ImagePullResponse struct {
rc io.ReadCloser
close *sync.Once
}

// Read implements io.ReadCloser
func (r ImagePullResponse) Read(p []byte) (n int, err error) {
return r.rc.Read(p)
}

// Close implements io.ReadCloser
func (r ImagePullResponse) Close() error {
if r.close == nil {
return nil
}
var err error
r.close.Do(func() {
if r.rc != nil {
err = r.rc.Close()
}
})
return err
}
Comment on lines +35 to +47
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Close implements io.ReadCloser
func (r ImagePullResponse) Close() (err error) {
if r.rc != nil {
err = r.rc.Close()
r.rc = nil
}
return
}
// Close implements io.ReadCloser
func (r *ImagePullResponse) Close() (err error) {
if r.rc != nil {
err = r.rc.Close()
r.rc = nil
}
return
}

This needs a pointer receiver, otherwise the r.rc = nil will only be applied to the local copy

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed. But then ImagePullResponse can't be used as an io.ReadCloser (need to return *ImagePullResponse). Let me test another approach

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do we have a strict requirement for ImagePull to not return a pointer?

cc @thaJeztah

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked in depth at the problem, but w.r.t. pointer vs not; I don't think we have a strict requirement; In fact for some cases, I like pointers, because they allow return nil, err, and callers should never consume returned values on error 😄. See #51076 (comment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick comment; I'd like to avoid named error output variables; at least if they're named err - in this case it's a very small function, but we've had cases where errors were not handled correctly, so;

// Close implements io.ReadCloser
func (r *ImagePullResponse) Close() error {
	if r.rc != nil {
		err := r.rc.Close()
		r.rc = nil
		return err
	}
	return nil
}

Or perhaps an early return;

// Close implements io.ReadCloser
func (r *ImagePullResponse) Close() error {
	if r.rc == nil {
		return nil
	}
	err := r.rc.Close()
	r.rc = nil
	return err
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at that; could there be a race happening in the above? (can close be called by anything concurrently and panic if one wins and sets r.rc to nil?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's indeed risks for a race condition. Use of sync.Once prevents this as well


// JSONMessages decodes the response stream as a sequence of JSONMessages.
// if stream ends or context is cancelled, the underlying [io.Reader] is closed.
func (r ImagePullResponse) JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error] {
context.AfterFunc(ctx, func() {
r.Close()
})
dec := json.NewDecoder(r)
return func(yield func(jsonmessage.JSONMessage, error) bool) {
defer r.Close()
for {
var jm jsonmessage.JSONMessage
err := dec.Decode(&jm)
if errors.Is(err, io.EOF) {
break
}
if ctx.Err() != nil {
yield(jm, ctx.Err())
return
}
if !yield(jm, err) {
return
}
}
}
}

// ImagePull requests the docker host to pull an image from a remote registry.
// It executes the privileged function if the operation is unauthorized
// and it tries one more time.
// It's up to the caller to handle the [io.ReadCloser] and close it.
func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePullOptions) (io.ReadCloser, error) {
// Callers can use [ImagePullResponse.JSONMessages] to monitor pull progress as
// a sequence of JSONMessages, [ImagePullResponse.Close] does not need to be
// called in this case. Or, use the [io.Reader] interface and call
// [ImagePullResponse.Close] after processing.
func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePullOptions) (ImagePullResponse, error) {
// FIXME(vdemeester): there is currently used in a few way in docker/docker
// - if not in trusted content, ref is used to pass the whole reference, and tag is empty
// - if in trusted content, ref is used to pass the reference name, and tag for the digest
Expand All @@ -23,7 +88,7 @@ func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePu

ref, err := reference.ParseNormalizedNamed(refStr)
if err != nil {
return nil, err
return ImagePullResponse{}, err
}

query := url.Values{}
Expand All @@ -40,9 +105,10 @@ func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePu
resp, err = cli.tryImageCreate(ctx, query, options.PrivilegeFunc)
}
if err != nil {
return nil, err
return ImagePullResponse{}, err
}
return resp.Body, nil

return newImagePullResponse(resp.Body), nil
}

// getAPITagFromNamedRef returns a tag from the specified reference.
Expand Down
4 changes: 3 additions & 1 deletion client/image_pull_opts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package client

import "context"
import (
"context"
)

// ImagePullOptions holds information to pull images.
type ImagePullOptions struct {
Expand Down
43 changes: 43 additions & 0 deletions client/image_pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"io"
"net/http"
"testing"
"time"

cerrdefs "github.com/containerd/errdefs"
"github.com/moby/moby/api/types/registry"
"github.com/moby/moby/client/pkg/jsonmessage"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
Expand Down Expand Up @@ -194,3 +196,44 @@ func TestImagePullWithoutErrors(t *testing.T) {
})
}
}

func TestImagePullResponse(t *testing.T) {
r, w := io.Pipe()
response := newImagePullResponse(r)
ctx, cancel := context.WithCancel(context.TODO())
messages := response.JSONMessages(ctx)
c := make(chan jsonmessage.JSONMessage)
go func() {
for message, err := range messages {
if err != nil {
close(c)
break
}
c <- message
}
}()

// Check we receive message sent to json stream
w.Write([]byte(`{"id":"test"}`))
tiemout, _ := context.WithTimeout(context.TODO(), 100*time.Millisecond)
select {
case message := <-c:
assert.Equal(t, message.ID, "test")
case <-tiemout.Done():
t.Fatal("expected message not received")
}

// Check context cancelation
cancel()
tiemout, _ = context.WithTimeout(context.TODO(), 100*time.Millisecond)
select {
case _, ok := <-c:
assert.Check(t, !ok)
case <-tiemout.Done():
t.Fatal("expected message not received")
}

// Check Close can be ran twice without error
assert.NilError(t, response.Close())
assert.NilError(t, response.Close())
}
44 changes: 31 additions & 13 deletions client/pkg/jsonmessage/jsonmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package jsonmessage

import (
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"strings"
"time"

Expand Down Expand Up @@ -187,9 +189,32 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
return nil
}

type JSONMessagesStream iter.Seq2[JSONMessage, error]

// DisplayJSONMessagesStream reads a JSON message stream from in, and writes
// each [JSONMessage] to out. It returns an error if an invalid JSONMessage
// is received, or if a JSONMessage containers a non-zero [JSONMessage.Error].
// each [JSONMessage] to out.
// see DisplayJSONMessages for details
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left this func for backward compatibility, maybe could ne removed ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a "note to self", as @vvoland 's comment reminded me of that.

I recall when we moved this package that we also considered cleaning up the API; there was a change in the CLI to improve some bits (passing context etc), but also to properly take advantage of the package name, so changing these to jsonmessage.Stream and jsonmessage.Display (or jsonmessage.DisplayStream).

https://github.com/docker/cli/blob/v28.5.0/internal/jsonstream/display.go

NOT a blocker for this PR IMO, as it's some bike-shedding, but we should look at that before we release.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this is safe to change DisplayJSONMessagesStream signature to drop support for io.Reader. If this is fine, then I can also adopt a shorted name

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👇 copy/paste from Slack

strictly speaking, moby/client is a completely new module; we have no requirement to carry anything from the old docker/docker/client module, as long as we can provide a replacement for users of it, which sometimes requires a bit of digging (I tend to use https://grep.app to get a quick "guess" what's used, and where, and if it's relevant) to see if it can reasonably be swapped.

The whole json-message was a massacre; it really had to be reverse-engineered to understand "what was the meaning here in the first place?"; fun things I discovered was that (IIRC) there were no daemon-side types corresponding with it! It just happened to be a type that could unmarshal some random other types produced by the daemon because some of the fields matched (basically it "cherry-picked" some fields produced by build, some other fields for events, and yet some other fields for image pull / push streams).

I jotted down some things in #50575 - but even with changes I made in the API, it's possible that definitions are in the wrong place, or ... shouldn't be there;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var dec = json.NewDecoder(in)
var f JSONMessagesStream = func(yield func(JSONMessage, error) bool) {
for {
var jm JSONMessage
err := dec.Decode(&jm)
if errors.Is(err, io.EOF) {
break
}
if !yield(jm, err) {
return
}
}
}

return DisplayJSONMessages(f, out, terminalFd, isTerminal, auxCallback)
}

// DisplayJSONMessages writes each [JSONMessage] from stream to out.
// It returns an error if an invalid JSONMessage is received, or if
// a JSONMessage containers a non-zero [JSONMessage.Error].
//
// Presentation of the JSONMessage depends on whether a terminal is attached,
// and on the terminal width. Progress bars ([JSONProgress]) are suppressed
Expand All @@ -203,19 +228,12 @@ func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
// - auxCallback allows handling the [JSONMessage.Aux] field. It is
// called if a JSONMessage contains an Aux field, in which case
// DisplayJSONMessagesStream does not present the JSONMessage.
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
var (
dec = json.NewDecoder(in)
ids = make(map[string]uint)
)
func DisplayJSONMessages(messages JSONMessagesStream, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(JSONMessage)) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a docstring?

var ids = make(map[string]uint)

for {
for jm, err := range messages {
var diff uint
var jm JSONMessage
if err := dec.Decode(&jm); err != nil {
if err == io.EOF {
break
}
if err != nil {
return err
}

Expand Down
4 changes: 1 addition & 3 deletions integration/image/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,7 @@ func TestImagePullStoredDigestForOtherRepo(t *testing.T) {

// Now, pull a totally different repo with a the same digest
rdr, err = apiClient.ImagePull(ctx, path.Join(registry.DefaultURL, "other:image@"+desc.Digest.String()), client.ImagePullOptions{})
if rdr != nil {
assert.Check(t, rdr.Close())
}
assert.Check(t, rdr.Close())
assert.Assert(t, err != nil, "Expected error, got none: %v", err)
assert.Assert(t, cerrdefs.IsNotFound(err), err)
assert.Check(t, is.ErrorType(err, cerrdefs.IsNotFound))
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/moby/moby/client/client_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 71 additions & 5 deletions vendor/github.com/moby/moby/client/image_pull.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion vendor/github.com/moby/moby/client/image_pull_opts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading