|
| 1 | +package migration |
| 2 | + |
| 3 | +import ( |
| 4 | + "bytes" |
| 5 | + "context" |
| 6 | + "encoding/json" |
| 7 | + "fmt" |
| 8 | + "io" |
| 9 | + "strings" |
| 10 | + "sync" |
| 11 | + "time" |
| 12 | + |
| 13 | + "github.com/containerd/containerd/v2/core/content" |
| 14 | + "github.com/containerd/containerd/v2/core/images" |
| 15 | + "github.com/containerd/containerd/v2/core/leases" |
| 16 | + "github.com/containerd/containerd/v2/core/mount" |
| 17 | + "github.com/containerd/containerd/v2/core/snapshots" |
| 18 | + "github.com/containerd/containerd/v2/pkg/archive/compression" |
| 19 | + "github.com/containerd/continuity/fs" |
| 20 | + cerrdefs "github.com/containerd/errdefs" |
| 21 | + "github.com/containerd/log" |
| 22 | + "github.com/moby/moby/v2/daemon/internal/image" |
| 23 | + "github.com/moby/moby/v2/daemon/internal/layer" |
| 24 | + refstore "github.com/moby/moby/v2/daemon/internal/refstore" |
| 25 | + "github.com/opencontainers/go-digest" |
| 26 | + "github.com/opencontainers/image-spec/identity" |
| 27 | + "github.com/opencontainers/image-spec/specs-go" |
| 28 | + ocispec "github.com/opencontainers/image-spec/specs-go/v1" |
| 29 | + "golang.org/x/sync/errgroup" |
| 30 | +) |
| 31 | + |
| 32 | +type LayerMigrator struct { |
| 33 | + layers layer.Store |
| 34 | + refs refstore.Store |
| 35 | + dis image.Store |
| 36 | + leases leases.Manager |
| 37 | + content content.Store |
| 38 | + cis images.Store |
| 39 | +} |
| 40 | + |
| 41 | +type Config struct { |
| 42 | + LayerStore layer.Store |
| 43 | + ReferenceStore refstore.Store |
| 44 | + DockerImageStore image.Store |
| 45 | + Leases leases.Manager |
| 46 | + Content content.Store |
| 47 | + ImageStore images.Store |
| 48 | +} |
| 49 | + |
| 50 | +func NewLayerMigrator(config Config) *LayerMigrator { |
| 51 | + return &LayerMigrator{ |
| 52 | + layers: config.LayerStore, |
| 53 | + refs: config.ReferenceStore, |
| 54 | + dis: config.DockerImageStore, |
| 55 | + leases: config.Leases, |
| 56 | + content: config.Content, |
| 57 | + cis: config.ImageStore, |
| 58 | + } |
| 59 | +} |
| 60 | + |
| 61 | +// MigrateTocontainerd migrates containers from overlay2 to overlayfs or vfs to native |
| 62 | +func (lm *LayerMigrator) MigrateTocontainerd(ctx context.Context, snKey string, sn snapshots.Snapshotter) error { |
| 63 | + if sn == nil { |
| 64 | + return fmt.Errorf("no snapshotter to migrate to: %w", cerrdefs.ErrNotImplemented) |
| 65 | + } |
| 66 | + |
| 67 | + switch driver := lm.layers.DriverName(); driver { |
| 68 | + case "overlay2": |
| 69 | + case "vfs": |
| 70 | + default: |
| 71 | + return fmt.Errorf("%q not supported for migration: %w", driver, cerrdefs.ErrNotImplemented) |
| 72 | + } |
| 73 | + |
| 74 | + var ( |
| 75 | + // Zstd makes migration 10x faster |
| 76 | + // TODO: make configurable |
| 77 | + layerMediaType = ocispec.MediaTypeImageLayerZstd |
| 78 | + layerCompression = compression.Zstd |
| 79 | + ) |
| 80 | + |
| 81 | + l, err := lm.leases.Create(ctx, leases.WithRandomID(), leases.WithExpiration(24*time.Hour)) |
| 82 | + if err != nil { |
| 83 | + return err |
| 84 | + } |
| 85 | + defer func() { |
| 86 | + lm.leases.Delete(ctx, l) |
| 87 | + }() |
| 88 | + ctx = leases.WithLease(ctx, l.ID) |
| 89 | + |
| 90 | + for imgID, img := range lm.dis.Heads() { |
| 91 | + diffids := img.RootFS.DiffIDs |
| 92 | + if len(diffids) == 0 { |
| 93 | + continue |
| 94 | + } |
| 95 | + var ( |
| 96 | + parent string |
| 97 | + manifest = ocispec.Manifest{ |
| 98 | + MediaType: ocispec.MediaTypeImageManifest, |
| 99 | + Versioned: specs.Versioned{ |
| 100 | + SchemaVersion: 2, |
| 101 | + }, |
| 102 | + Layers: make([]ocispec.Descriptor, len(diffids)), |
| 103 | + } |
| 104 | + ml sync.Mutex |
| 105 | + eg, egctx = errgroup.WithContext(ctx) |
| 106 | + ) |
| 107 | + for i := range diffids { |
| 108 | + chainID := identity.ChainID(diffids[:i+1]) |
| 109 | + l, err := lm.layers.Get(chainID) |
| 110 | + if err != nil { |
| 111 | + return fmt.Errorf("failed to get layer [%d] %q: %w", i, chainID, err) |
| 112 | + } |
| 113 | + layerIndex := i |
| 114 | + eg.Go(func() error { |
| 115 | + ctx := egctx |
| 116 | + t1 := time.Now() |
| 117 | + ts, err := l.TarStream() |
| 118 | + if err != nil { |
| 119 | + return err |
| 120 | + } |
| 121 | + |
| 122 | + desc := ocispec.Descriptor{ |
| 123 | + MediaType: layerMediaType, |
| 124 | + } |
| 125 | + |
| 126 | + cw, err := lm.content.Writer(ctx, |
| 127 | + content.WithRef(fmt.Sprintf("ingest-%s", chainID)), |
| 128 | + content.WithDescriptor(desc)) |
| 129 | + if err != nil { |
| 130 | + return fmt.Errorf("failed to get content writer: %w", err) |
| 131 | + } |
| 132 | + |
| 133 | + dgstr := digest.Canonical.Digester() |
| 134 | + cs, _ := compression.CompressStream(io.MultiWriter(cw, dgstr.Hash()), layerCompression) |
| 135 | + _, err = io.Copy(cs, ts) |
| 136 | + if err != nil { |
| 137 | + return fmt.Errorf("failed to copy to compressed stream: %w", err) |
| 138 | + } |
| 139 | + cs.Close() |
| 140 | + |
| 141 | + status, err := cw.Status() |
| 142 | + if err != nil { |
| 143 | + return err |
| 144 | + } |
| 145 | + |
| 146 | + desc.Size = status.Offset |
| 147 | + desc.Digest = dgstr.Digest() |
| 148 | + |
| 149 | + if err := cw.Commit(ctx, desc.Size, desc.Digest); err != nil && !cerrdefs.IsAlreadyExists(err) { |
| 150 | + return err |
| 151 | + } |
| 152 | + |
| 153 | + log.G(ctx).WithFields(log.Fields{ |
| 154 | + "t": time.Since(t1), |
| 155 | + "size": desc.Size, |
| 156 | + "digest": desc.Digest, |
| 157 | + }).Debug("Converted layer to content tar") |
| 158 | + |
| 159 | + ml.Lock() |
| 160 | + manifest.Layers[layerIndex] = desc |
| 161 | + ml.Unlock() |
| 162 | + return nil |
| 163 | + }) |
| 164 | + |
| 165 | + metadata, err := l.Metadata() |
| 166 | + if err != nil { |
| 167 | + return err |
| 168 | + } |
| 169 | + src, ok := metadata["UpperDir"] |
| 170 | + if !ok { |
| 171 | + src, ok = metadata["SourceDir"] |
| 172 | + if !ok { |
| 173 | + log.G(ctx).WithField("metadata", metadata).WithField("driver", lm.layers.DriverName()).Debug("no source directory metadata") |
| 174 | + return fmt.Errorf("graphdriver not supported: %w", cerrdefs.ErrNotImplemented) |
| 175 | + } |
| 176 | + } |
| 177 | + log.G(ctx).WithField("metadata", metadata).Debugf("migrating %s from %s", chainID, src) |
| 178 | + |
| 179 | + active := fmt.Sprintf("migration-%s", chainID) |
| 180 | + |
| 181 | + key := chainID.String() |
| 182 | + |
| 183 | + snapshotLabels := map[string]string{ |
| 184 | + "containerd.io/snapshot.ref": key, |
| 185 | + } |
| 186 | + mounts, err := sn.Prepare(ctx, active, parent, snapshots.WithLabels(snapshotLabels)) |
| 187 | + parent = key |
| 188 | + if err != nil { |
| 189 | + if cerrdefs.IsAlreadyExists(err) { |
| 190 | + continue |
| 191 | + } |
| 192 | + return err |
| 193 | + } |
| 194 | + |
| 195 | + dst, err := extractSource(mounts) |
| 196 | + if err != nil { |
| 197 | + return err |
| 198 | + } |
| 199 | + |
| 200 | + t1 := time.Now() |
| 201 | + if err := fs.CopyDir(dst, src); err != nil { |
| 202 | + return err |
| 203 | + } |
| 204 | + log.G(ctx).WithFields(log.Fields{ |
| 205 | + "t": time.Since(t1), |
| 206 | + "key": key, |
| 207 | + }).Debug("Copied layer to snapshot") |
| 208 | + |
| 209 | + if err := sn.Commit(ctx, key, active); err != nil && !cerrdefs.IsAlreadyExists(err) { |
| 210 | + return err |
| 211 | + } |
| 212 | + } |
| 213 | + |
| 214 | + configBytes := img.RawJSON() |
| 215 | + digest.FromBytes(configBytes) |
| 216 | + manifest.Config = ocispec.Descriptor{ |
| 217 | + MediaType: ocispec.MediaTypeImageConfig, |
| 218 | + Digest: digest.FromBytes(configBytes), |
| 219 | + Size: int64(len(configBytes)), |
| 220 | + } |
| 221 | + |
| 222 | + configLabels := map[string]string{ |
| 223 | + fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snKey): parent, |
| 224 | + } |
| 225 | + if err = content.WriteBlob(ctx, lm.content, "config"+manifest.Config.Digest.String(), bytes.NewReader(configBytes), manifest.Config, content.WithLabels(configLabels)); err != nil && !cerrdefs.IsAlreadyExists(err) { |
| 226 | + return err |
| 227 | + } |
| 228 | + |
| 229 | + if err := eg.Wait(); err != nil { |
| 230 | + return err |
| 231 | + } |
| 232 | + |
| 233 | + manifestBytes, err := json.MarshalIndent(manifest, "", " ") |
| 234 | + if err != nil { |
| 235 | + return err |
| 236 | + } |
| 237 | + |
| 238 | + manifestDesc := ocispec.Descriptor{ |
| 239 | + MediaType: manifest.MediaType, |
| 240 | + Digest: digest.FromBytes(manifestBytes), |
| 241 | + Size: int64(len(manifestBytes)), |
| 242 | + } |
| 243 | + |
| 244 | + manifestLabels := map[string]string{ |
| 245 | + "containerd.io/gc.ref.content.config": manifest.Config.Digest.String(), |
| 246 | + } |
| 247 | + for i := range manifest.Layers { |
| 248 | + manifestLabels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = manifest.Layers[i].Digest.String() |
| 249 | + } |
| 250 | + |
| 251 | + if err = content.WriteBlob(ctx, lm.content, "manifest"+manifestDesc.Digest.String(), bytes.NewReader(manifestBytes), manifestDesc, content.WithLabels(manifestLabels)); err != nil && !cerrdefs.IsAlreadyExists(err) { |
| 252 | + return err |
| 253 | + } |
| 254 | + |
| 255 | + childrenHandler := images.ChildrenHandler(lm.content) |
| 256 | + childrenHandler = images.SetChildrenMappedLabels(lm.content, childrenHandler, nil) |
| 257 | + if err = images.Walk(ctx, childrenHandler, manifestDesc); err != nil { |
| 258 | + return err |
| 259 | + } |
| 260 | + |
| 261 | + var added bool |
| 262 | + for _, named := range lm.refs.References(digest.Digest(imgID)) { |
| 263 | + img := images.Image{ |
| 264 | + Name: named.String(), |
| 265 | + Target: manifestDesc, |
| 266 | + // TODO: Any labels? |
| 267 | + } |
| 268 | + img, err = lm.cis.Create(ctx, img) |
| 269 | + if err != nil && !cerrdefs.IsAlreadyExists(err) { |
| 270 | + return err |
| 271 | + } else if err != nil { |
| 272 | + log.G(ctx).Infof("Tag already exists: %s", named) |
| 273 | + continue |
| 274 | + } |
| 275 | + |
| 276 | + log.G(ctx).Infof("Migrated image %s to %s", img.Name, img.Target.Digest) |
| 277 | + added = true |
| 278 | + } |
| 279 | + |
| 280 | + if !added { |
| 281 | + img := images.Image{ |
| 282 | + Name: "moby-dangling@" + manifestDesc.Digest.String(), |
| 283 | + Target: manifestDesc, |
| 284 | + // TODO: Any labels? |
| 285 | + } |
| 286 | + img, err = lm.cis.Create(ctx, img) |
| 287 | + if err != nil && !cerrdefs.IsAlreadyExists(err) { |
| 288 | + return err |
| 289 | + } else if err == nil { |
| 290 | + log.G(ctx).Infof("Migrated image %s to %s", img.Name, img.Target.Digest) |
| 291 | + } |
| 292 | + } |
| 293 | + } |
| 294 | + |
| 295 | + return nil |
| 296 | +} |
| 297 | + |
| 298 | +func extractSource(mounts []mount.Mount) (string, error) { |
| 299 | + if len(mounts) != 1 { |
| 300 | + return "", fmt.Errorf("cannot support snapshotters with multiple mount sources: %w", cerrdefs.ErrNotImplemented) |
| 301 | + } |
| 302 | + switch mounts[0].Type { |
| 303 | + case "bind": |
| 304 | + return mounts[0].Source, nil |
| 305 | + case "overlay": |
| 306 | + for _, option := range mounts[0].Options { |
| 307 | + if strings.HasPrefix(option, "upperdir=") { |
| 308 | + return option[9:], nil |
| 309 | + } |
| 310 | + } |
| 311 | + default: |
| 312 | + return "", fmt.Errorf("mount type %q not supported: %w", mounts[0].Type, cerrdefs.ErrNotImplemented) |
| 313 | + } |
| 314 | + |
| 315 | + return "", fmt.Errorf("mount is missing upper option: %w", cerrdefs.ErrNotImplemented) |
| 316 | +} |
0 commit comments