Skip to content

Commit 1764ea9

Browse files
fuweiddims
authored andcommitted
CRI: improve image pulling performance
Background: With current design, the content backend uses key-lock for long-lived write transaction. If the content reference has been marked for write transaction, the other requestes on the same reference will fail fast with unavailable error. Since the metadata plugin is based on boltbd which only supports single-writer, the content backend can't block or handle the request too long. It requires the client to handle retry by itself, like OpenWriter - backoff retry helper. But the maximum retry interval can be up to 2 seconds. If there are several concurrent requestes fo the same image, the waiters maybe wakeup at the same time and there is only one waiter can continue. A lot of waiters will get into sleep and we will take long time to finish all the pulling jobs and be worse if the image has many more layers, which mentioned in issue containerd#4937. After fetching, containerd.Pull API allows several hanlers to commit same ChainID snapshotter but only one can be done successfully. Since unpack tar.gz is time-consuming job, it can impact the performance on unpacking for same ChainID snapshotter in parallel. For instance, the Request 2 doesn't need to prepare and commit, it should just wait for Request 1 finish, which mentioned in pull request containerd#6318. ```text Request 1 Request 2 Prepare | | | | Prepare Commit | | | | Commit(failed on exist) ``` Both content backoff retry and unnecessary unpack impacts the performance. Solution: Introduced the duplicate suppression in fetch and unpack context. The deplicate suppression uses key-mutex and single-waiter-notify to support singleflight. The caller can use the duplicate suppression in different PullImage handlers so that we can avoid unnecessary unpack and spin-lock in OpenWriter. Test Result: Before enhancement: ```bash ➜ /tmp sudo bash testing.sh "localhost:5000/redis:latest" 20 crictl pull localhost:5000/redis:latest (x20) takes ... real 1m6.172s user 0m0.268s sys 0m0.193s docker pull localhost:5000/redis:latest (x20) takes ... real 0m1.324s user 0m0.441s sys 0m0.316s ➜ /tmp sudo bash testing.sh "localhost:5000/golang:latest" 20 crictl pull localhost:5000/golang:latest (x20) takes ... real 1m47.657s user 0m0.284s sys 0m0.224s docker pull localhost:5000/golang:latest (x20) takes ... real 0m6.381s user 0m0.488s sys 0m0.358s ``` With this enhancement: ```bash ➜ /tmp sudo bash testing.sh "localhost:5000/redis:latest" 20 crictl pull localhost:5000/redis:latest (x20) takes ... real 0m1.140s user 0m0.243s sys 0m0.178s docker pull localhost:5000/redis:latest (x20) takes ... real 0m1.239s user 0m0.463s sys 0m0.275s ➜ /tmp sudo bash testing.sh "localhost:5000/golang:latest" 20 crictl pull localhost:5000/golang:latest (x20) takes ... real 0m5.546s user 0m0.217s sys 0m0.219s docker pull localhost:5000/golang:latest (x20) takes ... real 0m6.090s user 0m0.501s sys 0m0.331s ``` Test Script: localhost:5000/{redis|golang}:latest is equal to docker.io/library/{redis|golang}:latest. The image is hold in local registry service by `docker run -d -p 5000:5000 --name registry registry:2`. ```bash image_name="${1}" pull_times="${2:-10}" cleanup() { ctr image rmi "${image_name}" ctr -n k8s.io image rmi "${image_name}" crictl rmi "${image_name}" docker rmi "${image_name}" sleep 2 } crictl_testing() { for idx in $(seq 1 ${pull_times}); do crictl pull "${image_name}" > /dev/null 2>&1 & done wait } docker_testing() { for idx in $(seq 1 ${pull_times}); do docker pull "${image_name}" > /dev/null 2>&1 & done wait } cleanup > /dev/null 2>&1 echo 3 > /proc/sys/vm/drop_caches sleep 3 echo "crictl pull $image_name (x${pull_times}) takes ..." time crictl_testing echo echo 3 > /proc/sys/vm/drop_caches sleep 3 echo "docker pull $image_name (x${pull_times}) takes ..." time docker_testing ``` Fixes: containerd#4937 Close: containerd#4985 Close: containerd#6318 Signed-off-by: Wei Fu <[email protected]> (cherry picked from commit 8113758) Signed-off-by: Davanum Srinivas <[email protected]>
1 parent 64d2cf4 commit 1764ea9

7 files changed

Lines changed: 429 additions & 25 deletions

File tree

image.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/containerd/containerd/diff"
2929
"github.com/containerd/containerd/errdefs"
3030
"github.com/containerd/containerd/images"
31+
"github.com/containerd/containerd/pkg/kmutex"
3132
"github.com/containerd/containerd/platforms"
3233
"github.com/containerd/containerd/rootfs"
3334
"github.com/containerd/containerd/snapshots"
@@ -287,6 +288,10 @@ type UnpackConfig struct {
287288
// CheckPlatformSupported is whether to validate that a snapshotter
288289
// supports an image's platform before unpacking
289290
CheckPlatformSupported bool
291+
// DuplicationSuppressor is used to make sure that there is only one
292+
// in-flight fetch request or unpack handler for a given descriptor's
293+
// digest or chain ID.
294+
DuplicationSuppressor kmutex.KeyedLocker
290295
}
291296

292297
// UnpackOpt provides configuration for unpack
@@ -300,6 +305,14 @@ func WithSnapshotterPlatformCheck() UnpackOpt {
300305
}
301306
}
302307

308+
// WithUnpackDuplicationSuppressor sets `DuplicationSuppressor` on the UnpackConfig.
309+
func WithUnpackDuplicationSuppressor(suppressor kmutex.KeyedLocker) UnpackOpt {
310+
return func(ctx context.Context, uc *UnpackConfig) error {
311+
uc.DuplicationSuppressor = suppressor
312+
return nil
313+
}
314+
}
315+
303316
func (i *image) Unpack(ctx context.Context, snapshotterName string, opts ...UnpackOpt) error {
304317
ctx, done, err := i.client.WithLease(ctx)
305318
if err != nil {

pkg/cri/server/image_pull.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
121121
containerd.WithPullLabel(imageLabelKey, imageLabelValue),
122122
containerd.WithMaxConcurrentDownloads(c.config.MaxConcurrentDownloads),
123123
containerd.WithImageHandler(imageHandler),
124+
containerd.WithUnpackOpts([]containerd.UnpackOpt{
125+
containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor),
126+
}),
124127
}
125128

126129
pullOpts = append(pullOpts, c.encryptedImagesPullOpts()...)

pkg/cri/server/service.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/containerd/containerd"
3030
"github.com/containerd/containerd/oci"
3131
"github.com/containerd/containerd/pkg/cri/streaming"
32+
"github.com/containerd/containerd/pkg/kmutex"
3233
"github.com/containerd/containerd/plugin"
3334
cni "github.com/containerd/go-cni"
3435
"github.com/sirupsen/logrus"
@@ -113,24 +114,29 @@ type criService struct {
113114
// allCaps is the list of the capabilities.
114115
// When nil, parsed from CapEff of /proc/self/status.
115116
allCaps []string // nolint
117+
// unpackDuplicationSuppressor is used to make sure that there is only
118+
// one in-flight fetch request or unpack handler for a given descriptor's
119+
// or chain ID.
120+
unpackDuplicationSuppressor kmutex.KeyedLocker
116121
}
117122

118123
// NewCRIService returns a new instance of CRIService
119124
func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) {
120125
var err error
121126
labels := label.NewStore()
122127
c := &criService{
123-
config: config,
124-
client: client,
125-
os: osinterface.RealOS{},
126-
sandboxStore: sandboxstore.NewStore(labels),
127-
containerStore: containerstore.NewStore(labels),
128-
imageStore: imagestore.NewStore(client),
129-
snapshotStore: snapshotstore.NewStore(),
130-
sandboxNameIndex: registrar.NewRegistrar(),
131-
containerNameIndex: registrar.NewRegistrar(),
132-
initialized: atomic.NewBool(false),
133-
netPlugin: make(map[string]cni.CNI),
128+
config: config,
129+
client: client,
130+
os: osinterface.RealOS{},
131+
sandboxStore: sandboxstore.NewStore(labels),
132+
containerStore: containerstore.NewStore(labels),
133+
imageStore: imagestore.NewStore(client),
134+
snapshotStore: snapshotstore.NewStore(),
135+
sandboxNameIndex: registrar.NewRegistrar(),
136+
containerNameIndex: registrar.NewRegistrar(),
137+
initialized: atomic.NewBool(false),
138+
netPlugin: make(map[string]cni.CNI),
139+
unpackDuplicationSuppressor: kmutex.New(),
134140
}
135141

136142
if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {

pkg/kmutex/kmutex.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package kmutex provides synchronization primitives to lock/unlock resource by unique key.
18+
package kmutex
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"sync"
24+
25+
"golang.org/x/sync/semaphore"
26+
)
27+
28+
// KeyedLocker is the interface for acquiring locks based on string.
29+
type KeyedLocker interface {
30+
Lock(ctx context.Context, key string) error
31+
Unlock(key string)
32+
}
33+
34+
func New() KeyedLocker {
35+
return newKeyMutex()
36+
}
37+
38+
func newKeyMutex() *keyMutex {
39+
return &keyMutex{
40+
locks: make(map[string]*klock),
41+
}
42+
}
43+
44+
type keyMutex struct {
45+
mu sync.Mutex
46+
47+
locks map[string]*klock
48+
}
49+
50+
type klock struct {
51+
*semaphore.Weighted
52+
ref int
53+
}
54+
55+
func (km *keyMutex) Lock(ctx context.Context, key string) error {
56+
km.mu.Lock()
57+
58+
l, ok := km.locks[key]
59+
if !ok {
60+
km.locks[key] = &klock{
61+
Weighted: semaphore.NewWeighted(1),
62+
}
63+
l = km.locks[key]
64+
}
65+
l.ref++
66+
km.mu.Unlock()
67+
68+
if err := l.Acquire(ctx, 1); err != nil {
69+
km.mu.Lock()
70+
defer km.mu.Unlock()
71+
72+
l.ref--
73+
74+
if l.ref < 0 {
75+
panic(fmt.Errorf("kmutex: release of unlocked key %v", key))
76+
}
77+
78+
if l.ref == 0 {
79+
delete(km.locks, key)
80+
}
81+
return err
82+
}
83+
return nil
84+
}
85+
86+
func (km *keyMutex) Unlock(key string) {
87+
km.mu.Lock()
88+
defer km.mu.Unlock()
89+
90+
l, ok := km.locks[key]
91+
if !ok {
92+
panic(fmt.Errorf("kmutex: unlock of unlocked key %v", key))
93+
}
94+
l.Release(1)
95+
96+
l.ref--
97+
98+
if l.ref < 0 {
99+
panic(fmt.Errorf("kmutex: released of unlocked key %v", key))
100+
}
101+
102+
if l.ref == 0 {
103+
delete(km.locks, key)
104+
}
105+
}

pkg/kmutex/kmutex_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kmutex
18+
19+
import (
20+
"context"
21+
"math/rand"
22+
"runtime"
23+
"strconv"
24+
"sync"
25+
"testing"
26+
"time"
27+
28+
"github.com/containerd/containerd/pkg/seed"
29+
"github.com/stretchr/testify/assert"
30+
)
31+
32+
func init() {
33+
seed.WithTimeAndRand()
34+
}
35+
36+
func TestBasic(t *testing.T) {
37+
t.Parallel()
38+
39+
km := newKeyMutex()
40+
ctx := context.Background()
41+
42+
km.Lock(ctx, "c1")
43+
km.Lock(ctx, "c2")
44+
45+
assert.Equal(t, len(km.locks), 2)
46+
assert.Equal(t, km.locks["c1"].ref, 1)
47+
assert.Equal(t, km.locks["c2"].ref, 1)
48+
49+
checkWaitFn := func(key string, num int) {
50+
retries := 100
51+
waitLock := false
52+
53+
for i := 0; i < retries; i++ {
54+
// prevent from data-race
55+
km.mu.Lock()
56+
ref := km.locks[key].ref
57+
km.mu.Unlock()
58+
59+
if ref == num {
60+
waitLock = true
61+
break
62+
}
63+
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
64+
}
65+
assert.Equal(t, waitLock, true)
66+
}
67+
68+
// should acquire successfully after release
69+
{
70+
waitCh := make(chan struct{})
71+
go func() {
72+
defer close(waitCh)
73+
74+
km.Lock(ctx, "c1")
75+
}()
76+
checkWaitFn("c1", 2)
77+
78+
km.Unlock("c1")
79+
80+
<-waitCh
81+
assert.Equal(t, km.locks["c1"].ref, 1)
82+
}
83+
84+
// failed to acquire if context cancel
85+
{
86+
var errCh = make(chan error, 1)
87+
88+
ctx, cancel := context.WithCancel(context.Background())
89+
go func() {
90+
errCh <- km.Lock(ctx, "c1")
91+
}()
92+
93+
checkWaitFn("c1", 2)
94+
95+
cancel()
96+
assert.Equal(t, <-errCh, context.Canceled)
97+
assert.Equal(t, km.locks["c1"].ref, 1)
98+
}
99+
}
100+
101+
func TestReleasePanic(t *testing.T) {
102+
t.Parallel()
103+
104+
km := newKeyMutex()
105+
106+
defer func() {
107+
if recover() == nil {
108+
t.Fatal("release of unlocked key did not panic")
109+
}
110+
}()
111+
112+
km.Unlock(t.Name())
113+
}
114+
115+
func TestMultileAcquireOnKeys(t *testing.T) {
116+
t.Parallel()
117+
118+
km := newKeyMutex()
119+
nloops := 10000
120+
nproc := runtime.GOMAXPROCS(0)
121+
ctx := context.Background()
122+
123+
var wg sync.WaitGroup
124+
for i := 0; i < nproc; i++ {
125+
wg.Add(1)
126+
127+
go func(key string) {
128+
defer wg.Done()
129+
130+
for i := 0; i < nloops; i++ {
131+
km.Lock(ctx, key)
132+
133+
time.Sleep(time.Duration(rand.Int63n(100)) * time.Nanosecond)
134+
135+
km.Unlock(key)
136+
}
137+
}("key-" + strconv.Itoa(i))
138+
}
139+
wg.Wait()
140+
}
141+
142+
func TestMultiAcquireOnSameKey(t *testing.T) {
143+
t.Parallel()
144+
145+
km := newKeyMutex()
146+
key := "c1"
147+
ctx := context.Background()
148+
149+
assert.Nil(t, km.Lock(ctx, key))
150+
151+
nproc := runtime.GOMAXPROCS(0)
152+
nloops := 10000
153+
154+
var wg sync.WaitGroup
155+
for i := 0; i < nproc; i++ {
156+
wg.Add(1)
157+
158+
go func() {
159+
defer wg.Done()
160+
161+
for i := 0; i < nloops; i++ {
162+
km.Lock(ctx, key)
163+
164+
time.Sleep(time.Duration(rand.Int63n(100)) * time.Nanosecond)
165+
166+
km.Unlock(key)
167+
}
168+
}()
169+
}
170+
km.Unlock(key)
171+
wg.Wait()
172+
173+
// c1 key has been released so the it should not have any klock.
174+
assert.Equal(t, len(km.locks), 0)
175+
}

0 commit comments

Comments
 (0)