Skip to content

Commit dd7c0aa

Browse files
committed
remotes: support cross-repo-push
With distribution source label in content store, select the longest common prefix components as condidate mount blob source and try to push with mount blob. Fix #2964 Signed-off-by: Wei Fu <[email protected]>
1 parent 545e79a commit dd7c0aa

6 files changed

Lines changed: 397 additions & 87 deletions

File tree

remotes/docker/authorizer.go

Lines changed: 174 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ type dockerAuthorizer struct {
4444
ua string
4545
mu sync.Mutex
4646

47-
auth map[string]string
47+
// indexed by host name
48+
handlers map[string]*authHandler
4849
}
4950

5051
// NewAuthorizer creates a Docker authorizer using the provided function to
@@ -53,116 +54,226 @@ func NewAuthorizer(client *http.Client, f func(string) (string, string, error))
5354
if client == nil {
5455
client = http.DefaultClient
5556
}
57+
5658
return &dockerAuthorizer{
5759
credentials: f,
5860
client: client,
5961
ua: "containerd/" + version.Version,
60-
auth: map[string]string{},
62+
handlers: make(map[string]*authHandler),
6163
}
6264
}
6365

66+
// Authorize handles auth request.
6467
func (a *dockerAuthorizer) Authorize(ctx context.Context, req *http.Request) error {
65-
// TODO: Lookup matching challenge and scope rather than just host
66-
if auth := a.getAuth(req.URL.Host); auth != "" {
67-
req.Header.Set("Authorization", auth)
68+
// skip if there is no auth handler
69+
ah := a.getAuthHandler(req.URL.Host)
70+
if ah == nil {
71+
return nil
72+
}
73+
74+
auth, err := ah.authorize(ctx)
75+
if err != nil {
76+
return err
6877
}
6978

79+
req.Header.Set("Authorization", auth)
7080
return nil
7181
}
7282

83+
func (a *dockerAuthorizer) getAuthHandler(host string) *authHandler {
84+
a.mu.Lock()
85+
defer a.mu.Unlock()
86+
87+
return a.handlers[host]
88+
}
89+
7390
func (a *dockerAuthorizer) AddResponses(ctx context.Context, responses []*http.Response) error {
7491
last := responses[len(responses)-1]
7592
host := last.Request.URL.Host
93+
94+
a.mu.Lock()
95+
defer a.mu.Unlock()
7696
for _, c := range parseAuthHeader(last.Header) {
7797
if c.scheme == bearerAuth {
7898
if err := invalidAuthorization(c, responses); err != nil {
79-
// TODO: Clear token
80-
a.setAuth(host, "")
99+
delete(a.handlers, host)
81100
return err
82101
}
83102

84-
// TODO(dmcg): Store challenge, not token
85-
// Move token fetching to authorize
86-
return a.setTokenAuth(ctx, host, c.parameters)
103+
// reuse existing handler
104+
//
105+
// assume that one registry will return the common
106+
// challenge information, including realm and service.
107+
// and the resource scope is only different part
108+
// which can be provided by each request.
109+
if _, ok := a.handlers[host]; ok {
110+
return nil
111+
}
112+
113+
common, err := a.generateTokenOptions(ctx, host, c)
114+
if err != nil {
115+
return err
116+
}
117+
118+
a.handlers[host] = newAuthHandler(a.client, a.ua, c.scheme, common)
119+
return nil
87120
} else if c.scheme == basicAuth && a.credentials != nil {
88-
// TODO: Resolve credentials on authorize
89121
username, secret, err := a.credentials(host)
90122
if err != nil {
91123
return err
92124
}
125+
93126
if username != "" && secret != "" {
94-
auth := username + ":" + secret
95-
a.setAuth(host, fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(auth))))
127+
common := tokenOptions{
128+
username: username,
129+
secret: secret,
130+
}
131+
132+
a.handlers[host] = newAuthHandler(a.client, a.ua, c.scheme, common)
96133
return nil
97134
}
98135
}
99136
}
100-
101137
return errors.Wrap(errdefs.ErrNotImplemented, "failed to find supported auth scheme")
102138
}
103139

104-
func (a *dockerAuthorizer) getAuth(host string) string {
105-
a.mu.Lock()
106-
defer a.mu.Unlock()
107-
108-
return a.auth[host]
109-
}
110-
111-
func (a *dockerAuthorizer) setAuth(host string, auth string) bool {
112-
a.mu.Lock()
113-
defer a.mu.Unlock()
114-
115-
changed := a.auth[host] != auth
116-
a.auth[host] = auth
117-
118-
return changed
119-
}
120-
121-
func (a *dockerAuthorizer) setTokenAuth(ctx context.Context, host string, params map[string]string) error {
122-
realm, ok := params["realm"]
140+
func (a *dockerAuthorizer) generateTokenOptions(ctx context.Context, host string, c challenge) (tokenOptions, error) {
141+
realm, ok := c.parameters["realm"]
123142
if !ok {
124-
return errors.New("no realm specified for token auth challenge")
143+
return tokenOptions{}, errors.New("no realm specified for token auth challenge")
125144
}
126145

127146
realmURL, err := url.Parse(realm)
128147
if err != nil {
129-
return errors.Wrap(err, "invalid token auth challenge realm")
148+
return tokenOptions{}, errors.Wrap(err, "invalid token auth challenge realm")
130149
}
131150

132151
to := tokenOptions{
133152
realm: realmURL.String(),
134-
service: params["service"],
153+
service: c.parameters["service"],
135154
}
136155

137-
to.scopes = getTokenScopes(ctx, params)
138-
if len(to.scopes) == 0 {
139-
return errors.Errorf("no scope specified for token auth challenge")
156+
scope, ok := c.parameters["scope"]
157+
if !ok {
158+
return tokenOptions{}, errors.Errorf("no scope specified for token auth challenge")
140159
}
160+
to.scopes = append(to.scopes, scope)
141161

142162
if a.credentials != nil {
143163
to.username, to.secret, err = a.credentials(host)
144164
if err != nil {
145-
return err
165+
return tokenOptions{}, err
146166
}
147167
}
168+
return to, nil
169+
}
170+
171+
// authResult is used to control limit rate.
172+
type authResult struct {
173+
sync.WaitGroup
174+
token string
175+
err error
176+
}
177+
178+
// authHandler is used to handle auth request per registry server.
179+
type authHandler struct {
180+
sync.Mutex
181+
182+
ua string
183+
184+
client *http.Client
185+
186+
// only support basic and bearer schemes
187+
scheme authenticationScheme
188+
189+
// common contains common challenge answer
190+
common tokenOptions
191+
192+
// scopedTokens caches token indexed by scopes, which used in
193+
// bearer auth case
194+
scopedTokens map[string]*authResult
195+
}
196+
197+
func newAuthHandler(client *http.Client, ua string, scheme authenticationScheme, opts tokenOptions) *authHandler {
198+
if client == nil {
199+
client = http.DefaultClient
200+
}
148201

149-
var token string
202+
return &authHandler{
203+
ua: ua,
204+
client: client,
205+
scheme: scheme,
206+
common: opts,
207+
scopedTokens: map[string]*authResult{},
208+
}
209+
}
210+
211+
func (ah *authHandler) authorize(ctx context.Context) (string, error) {
212+
switch ah.scheme {
213+
case basicAuth:
214+
return ah.doBasicAuth(ctx)
215+
case bearerAuth:
216+
return ah.doBearerAuth(ctx)
217+
default:
218+
return "", errors.Wrap(errdefs.ErrNotImplemented, "failed to find supported auth scheme")
219+
}
220+
}
221+
222+
func (ah *authHandler) doBasicAuth(ctx context.Context) (string, error) {
223+
username, secret := ah.common.username, ah.common.secret
224+
225+
if username == "" || secret == "" {
226+
return "", fmt.Errorf("failed to handle basic auth because missing username or secret")
227+
}
228+
229+
auth := base64.StdEncoding.EncodeToString([]byte(username + ":" + secret))
230+
return fmt.Sprintf("%s %s", "Basic", auth), nil
231+
}
232+
233+
func (ah *authHandler) doBearerAuth(ctx context.Context) (string, error) {
234+
// copy common tokenOptions
235+
to := ah.common
236+
237+
to.scopes = getTokenScopes(ctx, to.scopes)
238+
if len(to.scopes) == 0 {
239+
return "", errors.Errorf("no scope specified for token auth challenge")
240+
}
241+
242+
// Docs: https://docs.docker.com/registry/spec/auth/scope
243+
scoped := strings.Join(to.scopes, " ")
244+
245+
ah.Lock()
246+
if r, exist := ah.scopedTokens[scoped]; exist {
247+
ah.Unlock()
248+
r.Wait()
249+
return r.token, r.err
250+
}
251+
252+
// only one fetch token job
253+
r := new(authResult)
254+
r.Add(1)
255+
ah.scopedTokens[scoped] = r
256+
ah.Unlock()
257+
258+
// fetch token for the resource scope
259+
var (
260+
token string
261+
err error
262+
)
150263
if to.secret != "" {
151-
// Credential information is provided, use oauth POST endpoint
152-
token, err = a.fetchTokenWithOAuth(ctx, to)
153-
if err != nil {
154-
return errors.Wrap(err, "failed to fetch oauth token")
155-
}
264+
// credential information is provided, use oauth POST endpoint
265+
token, err = ah.fetchTokenWithOAuth(ctx, to)
266+
err = errors.Wrap(err, "failed to fetch oauth token")
156267
} else {
157-
// Do request anonymously
158-
token, err = a.fetchToken(ctx, to)
159-
if err != nil {
160-
return errors.Wrap(err, "failed to fetch anonymous token")
161-
}
268+
// do request anonymously
269+
token, err = ah.fetchToken(ctx, to)
270+
err = errors.Wrap(err, "failed to fetch anonymous token")
162271
}
163-
a.setAuth(host, fmt.Sprintf("Bearer %s", token))
272+
token = fmt.Sprintf("%s %s", "Bearer", token)
164273

165-
return nil
274+
r.token, r.err = token, err
275+
r.Done()
276+
return r.token, r.err
166277
}
167278

168279
type tokenOptions struct {
@@ -181,7 +292,7 @@ type postTokenResponse struct {
181292
Scope string `json:"scope"`
182293
}
183294

184-
func (a *dockerAuthorizer) fetchTokenWithOAuth(ctx context.Context, to tokenOptions) (string, error) {
295+
func (ah *authHandler) fetchTokenWithOAuth(ctx context.Context, to tokenOptions) (string, error) {
185296
form := url.Values{}
186297
form.Set("scope", strings.Join(to.scopes, " "))
187298
form.Set("service", to.service)
@@ -202,11 +313,11 @@ func (a *dockerAuthorizer) fetchTokenWithOAuth(ctx context.Context, to tokenOpti
202313
return "", err
203314
}
204315
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
205-
if a.ua != "" {
206-
req.Header.Set("User-Agent", a.ua)
316+
if ah.ua != "" {
317+
req.Header.Set("User-Agent", ah.ua)
207318
}
208319

209-
resp, err := ctxhttp.Do(ctx, a.client, req)
320+
resp, err := ctxhttp.Do(ctx, ah.client, req)
210321
if err != nil {
211322
return "", err
212323
}
@@ -216,7 +327,7 @@ func (a *dockerAuthorizer) fetchTokenWithOAuth(ctx context.Context, to tokenOpti
216327
// As of September 2017, GCR is known to return 404.
217328
// As of February 2018, JFrog Artifactory is known to return 401.
218329
if (resp.StatusCode == 405 && to.username != "") || resp.StatusCode == 404 || resp.StatusCode == 401 {
219-
return a.fetchToken(ctx, to)
330+
return ah.fetchToken(ctx, to)
220331
} else if resp.StatusCode < 200 || resp.StatusCode >= 400 {
221332
b, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB
222333
log.G(ctx).WithFields(logrus.Fields{
@@ -245,15 +356,15 @@ type getTokenResponse struct {
245356
RefreshToken string `json:"refresh_token"`
246357
}
247358

248-
// getToken fetches a token using a GET request
249-
func (a *dockerAuthorizer) fetchToken(ctx context.Context, to tokenOptions) (string, error) {
359+
// fetchToken fetches a token using a GET request
360+
func (ah *authHandler) fetchToken(ctx context.Context, to tokenOptions) (string, error) {
250361
req, err := http.NewRequest("GET", to.realm, nil)
251362
if err != nil {
252363
return "", err
253364
}
254365

255-
if a.ua != "" {
256-
req.Header.Set("User-Agent", a.ua)
366+
if ah.ua != "" {
367+
req.Header.Set("User-Agent", ah.ua)
257368
}
258369

259370
reqParams := req.URL.Query()
@@ -272,7 +383,7 @@ func (a *dockerAuthorizer) fetchToken(ctx context.Context, to tokenOptions) (str
272383

273384
req.URL.RawQuery = reqParams.Encode()
274385

275-
resp, err := ctxhttp.Do(ctx, a.client, req)
386+
resp, err := ctxhttp.Do(ctx, ah.client, req)
276387
if err != nil {
277388
return "", err
278389
}

remotes/docker/handler.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,45 @@ func appendDistributionSourceLabel(originLabel, repo string) string {
110110
func distributionSourceLabelKey(source string) string {
111111
return fmt.Sprintf("%s.%s", labelDistributionSource, source)
112112
}
113+
114+
// selectRepositoryMountCandidate will select the repo which has longest
115+
// common prefix components as the candidate.
116+
func selectRepositoryMountCandidate(refspec reference.Spec, sources map[string]string) string {
117+
u, err := url.Parse("dummy://" + refspec.Locator)
118+
if err != nil {
119+
// NOTE: basically, it won't be error here
120+
return ""
121+
}
122+
123+
source, target := u.Hostname(), strings.TrimPrefix(u.Path, "/")
124+
repoLabel, ok := sources[distributionSourceLabelKey(source)]
125+
if !ok || repoLabel == "" {
126+
return ""
127+
}
128+
129+
n, match := 0, ""
130+
components := strings.Split(target, "/")
131+
for _, repo := range strings.Split(repoLabel, ",") {
132+
// the target repo is not a candidate
133+
if repo == target {
134+
continue
135+
}
136+
137+
if l := commonPrefixComponents(components, repo); l >= n {
138+
n, match = l, repo
139+
}
140+
}
141+
return match
142+
}
143+
144+
func commonPrefixComponents(components []string, target string) int {
145+
targetComponents := strings.Split(target, "/")
146+
147+
i := 0
148+
for ; i < len(components) && i < len(targetComponents); i++ {
149+
if components[i] != targetComponents[i] {
150+
break
151+
}
152+
}
153+
return i
154+
}

0 commit comments

Comments
 (0)