Skip to content

Commit 9f5f4f5

Browse files
committed
Add containerd migration to daemon startup
Add layer migration on startup Use image size threshold rather than image count Add daemon integration test Add test for migrating to containerd snapshotters Add vfs migration Add tar export for containerd migration Add containerd migration test with save and load Signed-off-by: Derek McGowan <[email protected]>
1 parent 4b97831 commit 9f5f4f5

4 files changed

Lines changed: 645 additions & 51 deletions

File tree

Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
package migration
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"strings"
10+
"sync"
11+
"time"
12+
13+
"github.com/containerd/containerd/v2/core/content"
14+
"github.com/containerd/containerd/v2/core/images"
15+
"github.com/containerd/containerd/v2/core/leases"
16+
"github.com/containerd/containerd/v2/core/mount"
17+
"github.com/containerd/containerd/v2/core/snapshots"
18+
"github.com/containerd/containerd/v2/pkg/archive/compression"
19+
"github.com/containerd/continuity/fs"
20+
cerrdefs "github.com/containerd/errdefs"
21+
"github.com/containerd/log"
22+
"github.com/moby/moby/v2/daemon/internal/image"
23+
"github.com/moby/moby/v2/daemon/internal/layer"
24+
refstore "github.com/moby/moby/v2/daemon/internal/refstore"
25+
"github.com/opencontainers/go-digest"
26+
"github.com/opencontainers/image-spec/identity"
27+
"github.com/opencontainers/image-spec/specs-go"
28+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
29+
"golang.org/x/sync/errgroup"
30+
)
31+
32+
type LayerMigrator struct {
33+
layers layer.Store
34+
refs refstore.Store
35+
dis image.Store
36+
leases leases.Manager
37+
content content.Store
38+
cis images.Store
39+
}
40+
41+
type Config struct {
42+
LayerStore layer.Store
43+
ReferenceStore refstore.Store
44+
DockerImageStore image.Store
45+
Leases leases.Manager
46+
Content content.Store
47+
ImageStore images.Store
48+
}
49+
50+
func NewLayerMigrator(config Config) *LayerMigrator {
51+
return &LayerMigrator{
52+
layers: config.LayerStore,
53+
refs: config.ReferenceStore,
54+
dis: config.DockerImageStore,
55+
leases: config.Leases,
56+
content: config.Content,
57+
cis: config.ImageStore,
58+
}
59+
}
60+
61+
// MigrateTocontainerd migrates containers from overlay2 to overlayfs or vfs to native
62+
func (lm *LayerMigrator) MigrateTocontainerd(ctx context.Context, snKey string, sn snapshots.Snapshotter) error {
63+
if sn == nil {
64+
return fmt.Errorf("no snapshotter to migrate to: %w", cerrdefs.ErrNotImplemented)
65+
}
66+
67+
switch driver := lm.layers.DriverName(); driver {
68+
case "overlay2":
69+
case "vfs":
70+
default:
71+
return fmt.Errorf("%q not supported for migration: %w", driver, cerrdefs.ErrNotImplemented)
72+
}
73+
74+
var (
75+
// Zstd makes migration 10x faster
76+
// TODO: make configurable
77+
layerMediaType = ocispec.MediaTypeImageLayerZstd
78+
layerCompression = compression.Zstd
79+
)
80+
81+
l, err := lm.leases.Create(ctx, leases.WithRandomID(), leases.WithExpiration(24*time.Hour))
82+
if err != nil {
83+
return err
84+
}
85+
defer func() {
86+
lm.leases.Delete(ctx, l)
87+
}()
88+
ctx = leases.WithLease(ctx, l.ID)
89+
90+
for imgID, img := range lm.dis.Heads() {
91+
diffids := img.RootFS.DiffIDs
92+
if len(diffids) == 0 {
93+
continue
94+
}
95+
var (
96+
parent string
97+
manifest = ocispec.Manifest{
98+
MediaType: ocispec.MediaTypeImageManifest,
99+
Versioned: specs.Versioned{
100+
SchemaVersion: 2,
101+
},
102+
Layers: make([]ocispec.Descriptor, len(diffids)),
103+
}
104+
ml sync.Mutex
105+
eg, egctx = errgroup.WithContext(ctx)
106+
)
107+
for i := range diffids {
108+
chainID := identity.ChainID(diffids[:i+1])
109+
l, err := lm.layers.Get(chainID)
110+
if err != nil {
111+
return fmt.Errorf("failed to get layer [%d] %q: %w", i, chainID, err)
112+
}
113+
layerIndex := i
114+
eg.Go(func() error {
115+
ctx := egctx
116+
t1 := time.Now()
117+
ts, err := l.TarStream()
118+
if err != nil {
119+
return err
120+
}
121+
122+
desc := ocispec.Descriptor{
123+
MediaType: layerMediaType,
124+
}
125+
126+
cw, err := lm.content.Writer(ctx,
127+
content.WithRef(fmt.Sprintf("ingest-%s", chainID)),
128+
content.WithDescriptor(desc))
129+
if err != nil {
130+
return fmt.Errorf("failed to get content writer: %w", err)
131+
}
132+
133+
dgstr := digest.Canonical.Digester()
134+
cs, _ := compression.CompressStream(io.MultiWriter(cw, dgstr.Hash()), layerCompression)
135+
_, err = io.Copy(cs, ts)
136+
if err != nil {
137+
return fmt.Errorf("failed to copy to compressed stream: %w", err)
138+
}
139+
cs.Close()
140+
141+
status, err := cw.Status()
142+
if err != nil {
143+
return err
144+
}
145+
146+
desc.Size = status.Offset
147+
desc.Digest = dgstr.Digest()
148+
149+
if err := cw.Commit(ctx, desc.Size, desc.Digest); err != nil && !cerrdefs.IsAlreadyExists(err) {
150+
return err
151+
}
152+
153+
log.G(ctx).WithFields(log.Fields{
154+
"t": time.Since(t1),
155+
"size": desc.Size,
156+
"digest": desc.Digest,
157+
}).Debug("Converted layer to content tar")
158+
159+
ml.Lock()
160+
manifest.Layers[layerIndex] = desc
161+
ml.Unlock()
162+
return nil
163+
})
164+
165+
metadata, err := l.Metadata()
166+
if err != nil {
167+
return err
168+
}
169+
src, ok := metadata["UpperDir"]
170+
if !ok {
171+
src, ok = metadata["SourceDir"]
172+
if !ok {
173+
log.G(ctx).WithField("metadata", metadata).WithField("driver", lm.layers.DriverName()).Debug("no source directory metadata")
174+
return fmt.Errorf("graphdriver not supported: %w", cerrdefs.ErrNotImplemented)
175+
}
176+
}
177+
log.G(ctx).WithField("metadata", metadata).Debugf("migrating %s from %s", chainID, src)
178+
179+
active := fmt.Sprintf("migration-%s", chainID)
180+
181+
key := chainID.String()
182+
183+
snapshotLabels := map[string]string{
184+
"containerd.io/snapshot.ref": key,
185+
}
186+
mounts, err := sn.Prepare(ctx, active, parent, snapshots.WithLabels(snapshotLabels))
187+
parent = key
188+
if err != nil {
189+
if cerrdefs.IsAlreadyExists(err) {
190+
continue
191+
}
192+
return err
193+
}
194+
195+
dst, err := extractSource(mounts)
196+
if err != nil {
197+
return err
198+
}
199+
200+
t1 := time.Now()
201+
if err := fs.CopyDir(dst, src); err != nil {
202+
return err
203+
}
204+
log.G(ctx).WithFields(log.Fields{
205+
"t": time.Since(t1),
206+
"key": key,
207+
}).Debug("Copied layer to snapshot")
208+
209+
if err := sn.Commit(ctx, key, active); err != nil && !cerrdefs.IsAlreadyExists(err) {
210+
return err
211+
}
212+
}
213+
214+
configBytes := img.RawJSON()
215+
digest.FromBytes(configBytes)
216+
manifest.Config = ocispec.Descriptor{
217+
MediaType: ocispec.MediaTypeImageConfig,
218+
Digest: digest.FromBytes(configBytes),
219+
Size: int64(len(configBytes)),
220+
}
221+
222+
configLabels := map[string]string{
223+
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snKey): parent,
224+
}
225+
if err = content.WriteBlob(ctx, lm.content, "config"+manifest.Config.Digest.String(), bytes.NewReader(configBytes), manifest.Config, content.WithLabels(configLabels)); err != nil && !cerrdefs.IsAlreadyExists(err) {
226+
return err
227+
}
228+
229+
if err := eg.Wait(); err != nil {
230+
return err
231+
}
232+
233+
manifestBytes, err := json.MarshalIndent(manifest, "", " ")
234+
if err != nil {
235+
return err
236+
}
237+
238+
manifestDesc := ocispec.Descriptor{
239+
MediaType: manifest.MediaType,
240+
Digest: digest.FromBytes(manifestBytes),
241+
Size: int64(len(manifestBytes)),
242+
}
243+
244+
manifestLabels := map[string]string{
245+
"containerd.io/gc.ref.content.config": manifest.Config.Digest.String(),
246+
}
247+
for i := range manifest.Layers {
248+
manifestLabels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = manifest.Layers[i].Digest.String()
249+
}
250+
251+
if err = content.WriteBlob(ctx, lm.content, "manifest"+manifestDesc.Digest.String(), bytes.NewReader(manifestBytes), manifestDesc, content.WithLabels(manifestLabels)); err != nil && !cerrdefs.IsAlreadyExists(err) {
252+
return err
253+
}
254+
255+
childrenHandler := images.ChildrenHandler(lm.content)
256+
childrenHandler = images.SetChildrenMappedLabels(lm.content, childrenHandler, nil)
257+
if err = images.Walk(ctx, childrenHandler, manifestDesc); err != nil {
258+
return err
259+
}
260+
261+
var added bool
262+
for _, named := range lm.refs.References(digest.Digest(imgID)) {
263+
img := images.Image{
264+
Name: named.String(),
265+
Target: manifestDesc,
266+
// TODO: Any labels?
267+
}
268+
img, err = lm.cis.Create(ctx, img)
269+
if err != nil && !cerrdefs.IsAlreadyExists(err) {
270+
return err
271+
} else if err != nil {
272+
log.G(ctx).Infof("Tag already exists: %s", named)
273+
continue
274+
}
275+
276+
log.G(ctx).Infof("Migrated image %s to %s", img.Name, img.Target.Digest)
277+
added = true
278+
}
279+
280+
if !added {
281+
img := images.Image{
282+
Name: "moby-dangling@" + manifestDesc.Digest.String(),
283+
Target: manifestDesc,
284+
// TODO: Any labels?
285+
}
286+
img, err = lm.cis.Create(ctx, img)
287+
if err != nil && !cerrdefs.IsAlreadyExists(err) {
288+
return err
289+
} else if err == nil {
290+
log.G(ctx).Infof("Migrated image %s to %s", img.Name, img.Target.Digest)
291+
}
292+
}
293+
}
294+
295+
return nil
296+
}
297+
298+
func extractSource(mounts []mount.Mount) (string, error) {
299+
if len(mounts) != 1 {
300+
return "", fmt.Errorf("cannot support snapshotters with multiple mount sources: %w", cerrdefs.ErrNotImplemented)
301+
}
302+
switch mounts[0].Type {
303+
case "bind":
304+
return mounts[0].Source, nil
305+
case "overlay":
306+
for _, option := range mounts[0].Options {
307+
if strings.HasPrefix(option, "upperdir=") {
308+
return option[9:], nil
309+
}
310+
}
311+
default:
312+
return "", fmt.Errorf("mount type %q not supported: %w", mounts[0].Type, cerrdefs.ErrNotImplemented)
313+
}
314+
315+
return "", fmt.Errorf("mount is missing upper option: %w", cerrdefs.ErrNotImplemented)
316+
}

0 commit comments

Comments
 (0)