Skip to content

Commit 9ca3bb6

Browse files
committed
Store image manifests in containerd content store
This allows us to cache manifests and avoid extra round trips to the registry for content we already know about. dockerd currently does not support containerd on Windows, so this does not store manifests on Windows, yet. Signed-off-by: Brian Goff <[email protected]>
1 parent e4cf1c7 commit 9ca3bb6

13 files changed

Lines changed: 1004 additions & 45 deletions

File tree

daemon/content.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package daemon
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
7+
"github.com/containerd/containerd/content"
8+
"github.com/containerd/containerd/content/local"
9+
"github.com/containerd/containerd/leases"
10+
"github.com/containerd/containerd/metadata"
11+
"github.com/pkg/errors"
12+
"go.etcd.io/bbolt"
13+
)
14+
15+
func (d *Daemon) configureLocalContentStore() (content.Store, leases.Manager, error) {
16+
if err := os.MkdirAll(filepath.Join(d.root, "content"), 0700); err != nil {
17+
return nil, nil, errors.Wrap(err, "error creating dir for content store")
18+
}
19+
db, err := bbolt.Open(filepath.Join(d.root, "content", "metadata.db"), 0600, nil)
20+
if err != nil {
21+
return nil, nil, errors.Wrap(err, "error opening bolt db for content metadata store")
22+
}
23+
cs, err := local.NewStore(filepath.Join(d.root, "content", "data"))
24+
if err != nil {
25+
return nil, nil, errors.Wrap(err, "error setting up content store")
26+
}
27+
md := metadata.NewDB(db, cs, nil)
28+
d.mdDB = db
29+
return md.ContentStore(), metadata.NewLeaseManager(md), nil
30+
}

daemon/daemon.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
"github.com/docker/docker/pkg/fileutils"
23+
"go.etcd.io/bbolt"
2324
"google.golang.org/grpc"
2425
"google.golang.org/grpc/backoff"
2526

@@ -129,6 +130,11 @@ type Daemon struct {
129130

130131
attachmentStore network.AttachmentStore
131132
attachableNetworkLock *locker.Locker
133+
134+
// This is used for Windows which doesn't currently support running on containerd
135+
// It stores metadata for the content store (used for manifest caching)
136+
// This needs to be closed on daemon exit
137+
mdDB *bbolt.DB
132138
}
133139

134140
// StoreHosts stores the addresses the daemon is listening on
@@ -1066,10 +1072,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
10661072

10671073
d.linkIndex = newLinkIndex()
10681074

1069-
// TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
1070-
// used above to run migration. They could be initialized in ImageService
1071-
// if migration is called from daemon/images. layerStore might move as well.
1072-
d.imageService = images.NewImageService(images.ImageServiceConfig{
1075+
imgSvcConfig := images.ImageServiceConfig{
10731076
ContainerStore: d.containers,
10741077
DistributionMetadataStore: distributionMetadataStore,
10751078
EventsService: d.EventsService,
@@ -1081,7 +1084,28 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
10811084
ReferenceStore: rs,
10821085
RegistryService: registryService,
10831086
TrustKey: trustKey,
1084-
})
1087+
ContentNamespace: config.ContainerdNamespace,
1088+
}
1089+
1090+
// containerd is not currently supported with Windows.
1091+
// So sometimes d.containerdCli will be nil
1092+
// In that case we'll create a local content store... but otherwise we'll use containerd
1093+
if d.containerdCli != nil {
1094+
imgSvcConfig.Leases = d.containerdCli.LeasesService()
1095+
imgSvcConfig.ContentStore = d.containerdCli.ContentStore()
1096+
} else {
1097+
cs, lm, err := d.configureLocalContentStore()
1098+
if err != nil {
1099+
return nil, err
1100+
}
1101+
imgSvcConfig.ContentStore = cs
1102+
imgSvcConfig.Leases = lm
1103+
}
1104+
1105+
// TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
1106+
// used above to run migration. They could be initialized in ImageService
1107+
// if migration is called from daemon/images. layerStore might move as well.
1108+
d.imageService = images.NewImageService(imgSvcConfig)
10851109

10861110
go d.execCommandGC()
10871111

@@ -1246,6 +1270,10 @@ func (daemon *Daemon) Shutdown() error {
12461270
daemon.containerdCli.Close()
12471271
}
12481272

1273+
if daemon.mdDB != nil {
1274+
daemon.mdDB.Close()
1275+
}
1276+
12491277
return daemon.cleanupMounts()
12501278
}
12511279

daemon/images/image_pull.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"strings"
77
"time"
88

9+
"github.com/containerd/containerd/leases"
10+
"github.com/containerd/containerd/namespaces"
911
dist "github.com/docker/distribution"
1012
"github.com/docker/distribution/reference"
1113
"github.com/docker/docker/api/types"
@@ -16,6 +18,7 @@ import (
1618
"github.com/docker/docker/registry"
1719
digest "github.com/opencontainers/go-digest"
1820
specs "github.com/opencontainers/image-spec/specs-go/v1"
21+
"github.com/pkg/errors"
1922
)
2023

2124
// PullImage initiates a pull operation. image is the repository name to pull, and
@@ -65,6 +68,25 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference
6568
close(writesDone)
6669
}()
6770

71+
ctx = namespaces.WithNamespace(ctx, i.contentNamespace)
72+
// Take out a temporary lease for everything that gets persisted to the content store.
73+
// Before the lease is cancelled, any content we want to keep should have it's own lease applied.
74+
ctx, done, err := tempLease(ctx, i.leases)
75+
if err != nil {
76+
return err
77+
}
78+
defer done(ctx)
79+
80+
cs := &contentStoreForPull{
81+
ContentStore: i.content,
82+
leases: i.leases,
83+
}
84+
imageStore := &imageStoreForPull{
85+
ImageConfigStore: distribution.NewImageConfigStoreFromStore(i.imageStore),
86+
ingested: cs,
87+
leases: i.leases,
88+
}
89+
6890
imagePullConfig := &distribution.ImagePullConfig{
6991
Config: distribution.Config{
7092
MetaHeaders: metaHeaders,
@@ -73,15 +95,15 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference
7395
RegistryService: i.registryService,
7496
ImageEventLogger: i.LogImageEvent,
7597
MetadataStore: i.distributionMetadataStore,
76-
ImageStore: distribution.NewImageConfigStoreFromStore(i.imageStore),
98+
ImageStore: imageStore,
7799
ReferenceStore: i.referenceStore,
78100
},
79101
DownloadManager: i.downloadManager,
80102
Schema2Types: distribution.ImageTypes,
81103
Platform: platform,
82104
}
83105

84-
err := distribution.Pull(ctx, ref, imagePullConfig)
106+
err = distribution.Pull(ctx, ref, imagePullConfig, cs)
85107
close(progressChan)
86108
<-writesDone
87109
return err
@@ -124,3 +146,29 @@ func (i *ImageService) GetRepository(ctx context.Context, ref reference.Named, a
124146
}
125147
return repository, confirmedV2, lastError
126148
}
149+
150+
func tempLease(ctx context.Context, mgr leases.Manager) (context.Context, func(context.Context) error, error) {
151+
nop := func(context.Context) error { return nil }
152+
_, ok := leases.FromContext(ctx)
153+
if ok {
154+
return ctx, nop, nil
155+
}
156+
157+
// Use an expiration that ensures the lease is cleaned up at some point if there is a crash, SIGKILL, etc.
158+
opts := []leases.Opt{
159+
leases.WithRandomID(),
160+
leases.WithExpiration(24 * time.Hour),
161+
leases.WithLabels(map[string]string{
162+
"moby.lease/temporary": time.Now().UTC().Format(time.RFC3339Nano),
163+
}),
164+
}
165+
l, err := mgr.Create(ctx, opts...)
166+
if err != nil {
167+
return ctx, nop, errors.Wrap(err, "error creating temporary lease")
168+
}
169+
170+
ctx = leases.WithLease(ctx, l.ID)
171+
return ctx, func(ctx context.Context) error {
172+
return mgr.Delete(ctx, l)
173+
}, nil
174+
}

daemon/images/service.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"os"
66
"runtime"
77

8+
"github.com/containerd/containerd/content"
9+
"github.com/containerd/containerd/leases"
810
"github.com/docker/docker/container"
911
daemonevents "github.com/docker/docker/daemon/events"
1012
"github.com/docker/docker/distribution"
@@ -42,6 +44,9 @@ type ImageServiceConfig struct {
4244
ReferenceStore dockerreference.Store
4345
RegistryService registry.Service
4446
TrustKey libtrust.PrivateKey
47+
ContentStore content.Store
48+
Leases leases.Manager
49+
ContentNamespace string
4550
}
4651

4752
// NewImageService returns a new ImageService from a configuration
@@ -54,12 +59,15 @@ func NewImageService(config ImageServiceConfig) *ImageService {
5459
distributionMetadataStore: config.DistributionMetadataStore,
5560
downloadManager: xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads, xfer.WithMaxDownloadAttempts(config.MaxDownloadAttempts)),
5661
eventsService: config.EventsService,
57-
imageStore: config.ImageStore,
62+
imageStore: &imageStoreWithLease{Store: config.ImageStore, leases: config.Leases, ns: config.ContentNamespace},
5863
layerStores: config.LayerStores,
5964
referenceStore: config.ReferenceStore,
6065
registryService: config.RegistryService,
6166
trustKey: config.TrustKey,
6267
uploadManager: xfer.NewLayerUploadManager(config.MaxConcurrentUploads),
68+
leases: config.Leases,
69+
content: config.ContentStore,
70+
contentNamespace: config.ContentNamespace,
6371
}
6472
}
6573

@@ -76,6 +84,9 @@ type ImageService struct {
7684
registryService registry.Service
7785
trustKey libtrust.PrivateKey
7886
uploadManager *xfer.LayerUploadManager
87+
leases leases.Manager
88+
content content.Store
89+
contentNamespace string
7990
}
8091

8192
// DistributionServices provides daemon image storage services

daemon/images/store.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package images
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/containerd/containerd/content"
8+
c8derrdefs "github.com/containerd/containerd/errdefs"
9+
"github.com/containerd/containerd/leases"
10+
"github.com/containerd/containerd/log"
11+
"github.com/containerd/containerd/namespaces"
12+
"github.com/docker/docker/distribution"
13+
"github.com/docker/docker/image"
14+
"github.com/docker/docker/layer"
15+
digest "github.com/opencontainers/go-digest"
16+
"github.com/pkg/errors"
17+
"github.com/sirupsen/logrus"
18+
)
19+
20+
func imageKey(dgst digest.Digest) string {
21+
return "moby-image-" + dgst.String()
22+
}
23+
24+
// imageStoreWithLease wraps the configured image store with one that deletes the lease
25+
// reigstered for a given image ID, if one exists
26+
//
27+
// This is used by the main image service to wrap delete calls to the real image store.
28+
type imageStoreWithLease struct {
29+
image.Store
30+
leases leases.Manager
31+
32+
// Normally we'd pass namespace down through a context.Context, however...
33+
// The interface for image store doesn't allow this, so we store it here.
34+
ns string
35+
}
36+
37+
func (s *imageStoreWithLease) Delete(id image.ID) ([]layer.Metadata, error) {
38+
ctx := namespaces.WithNamespace(context.TODO(), s.ns)
39+
if err := s.leases.Delete(ctx, leases.Lease{ID: imageKey(digest.Digest(id))}); err != nil && !c8derrdefs.IsNotFound(err) {
40+
return nil, errors.Wrap(err, "error deleting lease")
41+
}
42+
return s.Store.Delete(id)
43+
}
44+
45+
// iamgeStoreForPull is created for each pull It wraps an underlying image store
46+
// to handle registering leases for content fetched in a single image pull.
47+
type imageStoreForPull struct {
48+
distribution.ImageConfigStore
49+
leases leases.Manager
50+
ingested *contentStoreForPull
51+
}
52+
53+
func (s *imageStoreForPull) Put(ctx context.Context, config []byte) (digest.Digest, error) {
54+
id, err := s.ImageConfigStore.Put(ctx, config)
55+
if err != nil {
56+
return "", err
57+
}
58+
return id, s.updateLease(ctx, id)
59+
}
60+
61+
func (s *imageStoreForPull) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
62+
id, err := s.ImageConfigStore.Get(ctx, dgst)
63+
if err != nil {
64+
return nil, err
65+
}
66+
return id, s.updateLease(ctx, dgst)
67+
}
68+
69+
func (s *imageStoreForPull) updateLease(ctx context.Context, dgst digest.Digest) error {
70+
leaseID := imageKey(dgst)
71+
lease, err := s.leases.Create(ctx, leases.WithID(leaseID))
72+
if err != nil {
73+
if !c8derrdefs.IsAlreadyExists(err) {
74+
return errors.Wrap(err, "error creating lease")
75+
}
76+
lease = leases.Lease{ID: leaseID}
77+
}
78+
79+
digested := s.ingested.getDigested()
80+
resource := leases.Resource{
81+
Type: "content",
82+
}
83+
for _, dgst := range digested {
84+
log.G(ctx).WithFields(logrus.Fields{
85+
"digest": dgst,
86+
"lease": lease.ID,
87+
}).Debug("Adding content digest to lease")
88+
89+
resource.ID = dgst.String()
90+
if err := s.leases.AddResource(ctx, lease, resource); err != nil {
91+
return errors.Wrapf(err, "error adding content digest to lease: %s", dgst)
92+
}
93+
}
94+
return nil
95+
}
96+
97+
// contentStoreForPull is used to wrap the configured content store to
98+
// add lease management for a single `pull`
99+
// It stores all committed digests so that `imageStoreForPull` can add
100+
// the digsted resources to the lease for an image.
101+
type contentStoreForPull struct {
102+
distribution.ContentStore
103+
leases leases.Manager
104+
105+
mu sync.Mutex
106+
digested []digest.Digest
107+
}
108+
109+
func (c *contentStoreForPull) addDigested(dgst digest.Digest) {
110+
c.mu.Lock()
111+
c.digested = append(c.digested, dgst)
112+
c.mu.Unlock()
113+
}
114+
115+
func (c *contentStoreForPull) getDigested() []digest.Digest {
116+
c.mu.Lock()
117+
digested := make([]digest.Digest, len(c.digested))
118+
copy(digested, c.digested)
119+
c.mu.Unlock()
120+
return digested
121+
}
122+
123+
func (c *contentStoreForPull) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
124+
w, err := c.ContentStore.Writer(ctx, opts...)
125+
if err != nil {
126+
if c8derrdefs.IsAlreadyExists(err) {
127+
var cfg content.WriterOpts
128+
for _, o := range opts {
129+
if err := o(&cfg); err != nil {
130+
return nil, err
131+
}
132+
133+
}
134+
c.addDigested(cfg.Desc.Digest)
135+
}
136+
return nil, err
137+
}
138+
return &contentWriter{
139+
cs: c,
140+
Writer: w,
141+
}, nil
142+
}
143+
144+
type contentWriter struct {
145+
cs *contentStoreForPull
146+
content.Writer
147+
}
148+
149+
func (w *contentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
150+
err := w.Writer.Commit(ctx, size, expected, opts...)
151+
if err == nil || c8derrdefs.IsAlreadyExists(err) {
152+
w.cs.addDigested(expected)
153+
}
154+
return err
155+
}

0 commit comments

Comments
 (0)