Skip to content

Commit 597e0e6

Browse files
committed
split in 3 files
1 parent 377817d commit 597e0e6

7 files changed

Lines changed: 260 additions & 211 deletions

File tree

commands.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
202202
// FIXME: ProgressReader shouldn't be this annoying to use
203203
if context != nil {
204204
sf := utils.NewStreamFormatter(false)
205-
body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("", "Uploading context", "%v bytes%0.0s%0.0s"), sf, true)
205+
body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf, true, "", "Uploading context")
206206
}
207207
// Upload the build context
208208
v := &url.Values{}

graph.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func (graph *Graph) TempLayerArchive(id string, compression archive.Compression,
205205
if err != nil {
206206
return nil, err
207207
}
208-
return archive.NewTempArchive(utils.ProgressReader(ioutil.NopCloser(a), 0, output, sf.FormatProgress("", "Buffering to disk", "%v/%v (%v)"), sf, true), tmp)
208+
return archive.NewTempArchive(utils.ProgressReader(ioutil.NopCloser(a), 0, output, sf, true, "", "Buffering to disk"), tmp)
209209
}
210210

211211
// Mktemp creates a temporary sub-directory inside the graph's filesystem.

server.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer, sf *utils.
451451
return err
452452
}
453453

454-
if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("", "Downloading", "%8v/%v (%v)"), sf, false), path); err != nil {
454+
if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf, false, "", "Downloading"), path); err != nil {
455455
return err
456456
}
457457
// FIXME: Handle custom repo, tag comment, author
@@ -761,7 +761,7 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
761761
if err != nil {
762762
return err
763763
}
764-
out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling", "dependent layers"))
764+
out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
765765
// FIXME: Try to stream the images?
766766
// FIXME: Launch the getRemoteImage() in goroutines
767767

@@ -776,33 +776,33 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
776776
defer srv.poolRemove("pull", "layer:"+id)
777777

778778
if !srv.runtime.graph.Exists(id) {
779-
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling", "metadata"))
779+
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
780780
imgJSON, imgSize, err := r.GetRemoteImageJSON(id, endpoint, token)
781781
if err != nil {
782-
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
782+
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
783783
// FIXME: Keep going in case of error?
784784
return err
785785
}
786786
img, err := NewImgJSON(imgJSON)
787787
if err != nil {
788-
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
788+
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
789789
return fmt.Errorf("Failed to parse json: %s", err)
790790
}
791791

792792
// Get the layer
793-
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling", "fs layer"))
793+
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling fs layer", nil))
794794
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token)
795795
if err != nil {
796-
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers"))
796+
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
797797
return err
798798
}
799799
defer layer.Close()
800-
if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf.FormatProgress(utils.TruncateID(id), "Downloading", "%8v/%v (%v)"), sf, false), img); err != nil {
801-
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "downloading dependent layers"))
800+
if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"), img); err != nil {
801+
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
802802
return err
803803
}
804804
}
805-
out.Write(sf.FormatProgress(utils.TruncateID(id), "Download", "complete"))
805+
out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
806806

807807
}
808808
return nil
@@ -875,29 +875,29 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
875875
}
876876
defer srv.poolRemove("pull", "img:"+img.ID)
877877

878-
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling", fmt.Sprintf("image (%s) from %s", img.Tag, localName)))
878+
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
879879
success := false
880880
var lastErr error
881881
for _, ep := range repoData.Endpoints {
882-
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling", fmt.Sprintf("image (%s) from %s, endpoint: %s", img.Tag, localName, ep)))
882+
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
883883
if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
884884
// Its not ideal that only the last error is returned, it would be better to concatenate the errors.
885885
// As the error is also given to the output stream the user will see the error.
886886
lastErr = err
887-
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Error pulling", fmt.Sprintf("image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err)))
887+
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
888888
continue
889889
}
890890
success = true
891891
break
892892
}
893893
if !success {
894-
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Error pulling", fmt.Sprintf("image (%s) from %s, %s", img.Tag, localName, lastErr)))
894+
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, %s", img.Tag, localName, lastErr), nil))
895895
if parallel {
896896
errors <- fmt.Errorf("Could not find repository on any of the indexed registries.")
897897
return
898898
}
899899
}
900-
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download", "complete"))
900+
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
901901

902902
if parallel {
903903
errors <- nil
@@ -1171,7 +1171,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
11711171
defer os.RemoveAll(layerData.Name())
11721172

11731173
// Send the layer
1174-
checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("", "Pushing", "%8v/%v (%v)"), sf, false), ep, token, jsonRaw)
1174+
checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, "", "Pushing"), ep, token, jsonRaw)
11751175
if err != nil {
11761176
return "", err
11771177
}
@@ -1251,7 +1251,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
12511251
if err != nil {
12521252
return err
12531253
}
1254-
archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("", "Importing", "%8v/%v (%v)"), sf, true)
1254+
archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing")
12551255
}
12561256
img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
12571257
if err != nil {

utils/jsonmessage.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package utils
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"time"
8+
)
9+
10+
type JSONError struct {
11+
Code int `json:"code,omitempty"`
12+
Message string `json:"message,omitempty"`
13+
}
14+
15+
func (e *JSONError) Error() string {
16+
return e.Message
17+
}
18+
19+
type JSONProgress struct {
20+
Current int `json:"current,omitempty"`
21+
Total int `json:"total,omitempty"`
22+
}
23+
24+
func (p *JSONProgress) String() string {
25+
if p.Current == 0 && p.Total == 0 {
26+
return ""
27+
}
28+
current := HumanSize(int64(p.Current))
29+
if p.Total == 0 {
30+
return fmt.Sprintf("%8v/?", current)
31+
}
32+
total := HumanSize(int64(p.Total))
33+
percentage := float64(p.Current) / float64(p.Total) * 100
34+
return fmt.Sprintf("%8v/%v (%.0f%%)", current, total, percentage)
35+
}
36+
37+
type JSONMessage struct {
38+
Status string `json:"status,omitempty"`
39+
Progress *JSONProgress `json:"progressDetail,omitempty"`
40+
ProgressMessage string `json:"progress,omitempty"` //deprecated
41+
ID string `json:"id,omitempty"`
42+
From string `json:"from,omitempty"`
43+
Time int64 `json:"time,omitempty"`
44+
Error *JSONError `json:"errorDetail,omitempty"`
45+
ErrorMessage string `json:"error,omitempty"` //deprecated
46+
}
47+
48+
func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error {
49+
if jm.Error != nil {
50+
if jm.Error.Code == 401 {
51+
return fmt.Errorf("Authentication is required.")
52+
}
53+
return jm.Error
54+
}
55+
endl := ""
56+
if isTerminal {
57+
// <ESC>[2K = erase entire current line
58+
fmt.Fprintf(out, "%c[2K\r", 27)
59+
endl = "\r"
60+
}
61+
if jm.Time != 0 {
62+
fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0))
63+
}
64+
if jm.ID != "" {
65+
fmt.Fprintf(out, "%s: ", jm.ID)
66+
}
67+
if jm.From != "" {
68+
fmt.Fprintf(out, "(from %s) ", jm.From)
69+
}
70+
if jm.Progress != nil {
71+
fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl)
72+
} else if jm.ProgressMessage != "" { //deprecated
73+
fmt.Fprintf(out, "%s %s%s", jm.Status, jm.ProgressMessage, endl)
74+
} else {
75+
fmt.Fprintf(out, "%s%s\n", jm.Status, endl)
76+
}
77+
return nil
78+
}
79+
80+
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, isTerminal bool) error {
81+
dec := json.NewDecoder(in)
82+
ids := make(map[string]int)
83+
diff := 0
84+
for {
85+
jm := JSONMessage{}
86+
if err := dec.Decode(&jm); err == io.EOF {
87+
break
88+
} else if err != nil {
89+
return err
90+
}
91+
if (jm.Progress != nil || jm.ProgressMessage != "") && jm.ID != "" {
92+
line, ok := ids[jm.ID]
93+
if !ok {
94+
line = len(ids)
95+
ids[jm.ID] = line
96+
fmt.Fprintf(out, "\n")
97+
diff = 0
98+
} else {
99+
diff = len(ids) - line
100+
}
101+
if isTerminal {
102+
// <ESC>[{diff}A = move cursor up diff rows
103+
fmt.Fprintf(out, "%c[%dA", 27, diff)
104+
}
105+
}
106+
err := jm.Display(out, isTerminal)
107+
if jm.ID != "" {
108+
if isTerminal {
109+
// <ESC>[{diff}B = move cursor down diff rows
110+
fmt.Fprintf(out, "%c[%dB", 27, diff)
111+
}
112+
}
113+
if err != nil {
114+
return err
115+
}
116+
}
117+
return nil
118+
}

utils/progressreader.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package utils
2+
3+
import (
4+
"io"
5+
)
6+
7+
// Reader with progress bar
8+
type progressReader struct {
9+
reader io.ReadCloser // Stream to read from
10+
output io.Writer // Where to send progress bar to
11+
progress JSONProgress
12+
// readTotal int // Expected stream length (bytes)
13+
// readProgress int // How much has been read so far (bytes)
14+
lastUpdate int // How many bytes read at least update
15+
ID string
16+
action string
17+
// template string // Template to print. Default "%v/%v (%v)"
18+
sf *StreamFormatter
19+
newLine bool
20+
}
21+
22+
func (r *progressReader) Read(p []byte) (n int, err error) {
23+
read, err := io.ReadCloser(r.reader).Read(p)
24+
r.progress.Current += read
25+
updateEvery := 1024 * 512 //512kB
26+
if r.progress.Total > 0 {
27+
// Update progress for every 1% read if 1% < 512kB
28+
if increment := int(0.01 * float64(r.progress.Total)); increment < updateEvery {
29+
updateEvery = increment
30+
}
31+
}
32+
if r.progress.Current-r.lastUpdate > updateEvery || err != nil {
33+
r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress))
34+
r.lastUpdate = r.progress.Current
35+
}
36+
// Send newline when complete
37+
if r.newLine && err != nil {
38+
r.output.Write(r.sf.FormatStatus("", ""))
39+
}
40+
return read, err
41+
}
42+
func (r *progressReader) Close() error {
43+
return io.ReadCloser(r.reader).Close()
44+
}
45+
func ProgressReader(r io.ReadCloser, size int, output io.Writer, sf *StreamFormatter, newline bool, ID, action string) *progressReader {
46+
return &progressReader{
47+
reader: r,
48+
output: NewWriteFlusher(output),
49+
ID: ID,
50+
action: action,
51+
progress: JSONProgress{Total: size},
52+
sf: sf,
53+
newLine: newline,
54+
}
55+
}

utils/streamformatter.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package utils
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
)
7+
8+
type StreamFormatter struct {
9+
json bool
10+
used bool
11+
}
12+
13+
func NewStreamFormatter(json bool) *StreamFormatter {
14+
return &StreamFormatter{json, false}
15+
}
16+
17+
func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte {
18+
sf.used = true
19+
str := fmt.Sprintf(format, a...)
20+
if sf.json {
21+
b, err := json.Marshal(&JSONMessage{ID: id, Status: str})
22+
if err != nil {
23+
return sf.FormatError(err)
24+
}
25+
return b
26+
}
27+
return []byte(str + "\r\n")
28+
}
29+
30+
func (sf *StreamFormatter) FormatError(err error) []byte {
31+
sf.used = true
32+
if sf.json {
33+
jsonError, ok := err.(*JSONError)
34+
if !ok {
35+
jsonError = &JSONError{Message: err.Error()}
36+
}
37+
if b, err := json.Marshal(&JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
38+
return b
39+
}
40+
return []byte("{\"error\":\"format error\"}")
41+
}
42+
return []byte("Error: " + err.Error() + "\r\n")
43+
}
44+
45+
func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgress) []byte {
46+
if progress == nil {
47+
progress = &JSONProgress{}
48+
}
49+
sf.used = true
50+
if sf.json {
51+
52+
b, err := json.Marshal(&JSONMessage{
53+
Status: action,
54+
ProgressMessage: progress.String(),
55+
Progress: progress,
56+
ID: id,
57+
})
58+
if err != nil {
59+
return nil
60+
}
61+
return b
62+
}
63+
return []byte(action + " " + progress.String() + "\r")
64+
}
65+
66+
func (sf *StreamFormatter) Used() bool {
67+
return sf.used
68+
}

0 commit comments

Comments
 (0)