Skip to content

Commit 7bbf6ed

Browse files
committed
chunked: generate tar-split as part of zstd:chunked
change the file format to store the tar-split as part of the zstd:chunked image. This will allow clients to rebuild the entire tarball without having to download it fully. also store the uncompressed digest for the tarball, so that it can be stored into the storage database. Needs: containers/image#1976 Signed-off-by: Giuseppe Scrivano <[email protected]>
1 parent 7846152 commit 7bbf6ed

File tree

7 files changed

+253
-65
lines changed

7 files changed

+253
-65
lines changed

drivers/driver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ type DriverWithDifferOutput struct {
187187
UncompressedDigest digest.Digest
188188
Metadata string
189189
BigData map[string][]byte
190+
TarSplit []byte
190191
}
191192

192193
// Differ defines the interface for using a custom differ.

layers.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2392,8 +2392,26 @@ func (r *layerStore) ApplyDiffFromStagingDirectory(id, stagingDirectory string,
23922392
layer.UncompressedDigest = diffOutput.UncompressedDigest
23932393
layer.UncompressedSize = diffOutput.Size
23942394
layer.Metadata = diffOutput.Metadata
2395-
if err = r.saveFor(layer); err != nil {
2396-
return err
2395+
if len(diffOutput.TarSplit) != 0 {
2396+
tsdata := bytes.Buffer{}
2397+
compressor, err := pgzip.NewWriterLevel(&tsdata, pgzip.BestSpeed)
2398+
if err != nil {
2399+
compressor = pgzip.NewWriter(&tsdata)
2400+
}
2401+
if err := compressor.SetConcurrency(1024*1024, 1); err != nil { // 1024*1024 is the hard-coded default; we're not changing that
2402+
logrus.Infof("setting compression concurrency threads to 1: %v; ignoring", err)
2403+
}
2404+
if _, err := compressor.Write(diffOutput.TarSplit); err != nil {
2405+
compressor.Close()
2406+
return err
2407+
}
2408+
compressor.Close()
2409+
if err := os.MkdirAll(filepath.Dir(r.tspath(layer.ID)), 0o700); err != nil {
2410+
return err
2411+
}
2412+
if err := ioutils.AtomicWriteFile(r.tspath(layer.ID), tsdata.Bytes(), 0o600); err != nil {
2413+
return err
2414+
}
23972415
}
23982416
for k, v := range diffOutput.BigData {
23992417
if err := r.SetBigData(id, k, bytes.NewReader(v)); err != nil {
@@ -2403,6 +2421,9 @@ func (r *layerStore) ApplyDiffFromStagingDirectory(id, stagingDirectory string,
24032421
return err
24042422
}
24052423
}
2424+
if err = r.saveFor(layer); err != nil {
2425+
return err
2426+
}
24062427
return err
24072428
}
24082429

pkg/chunked/compression_linux.go

Lines changed: 88 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -150,22 +150,32 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
150150
// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream. The blob total size must
151151
// be specified.
152152
// This function uses the io.github.containers.zstd-chunked. annotations when specified.
153-
func readZstdChunkedManifest(ctx context.Context, blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, int64, error) {
153+
func readZstdChunkedManifest(ctx context.Context, blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, []byte, int64, error) {
154154
footerSize := int64(internal.FooterSizeSupported)
155155
if blobSize <= footerSize {
156-
return nil, 0, errors.New("blob too small")
156+
return nil, nil, 0, errors.New("blob too small")
157157
}
158158

159159
manifestChecksumAnnotation := annotations[internal.ManifestChecksumKey]
160160
if manifestChecksumAnnotation == "" {
161-
return nil, 0, fmt.Errorf("manifest checksum annotation %q not found", internal.ManifestChecksumKey)
161+
return nil, nil, 0, fmt.Errorf("manifest checksum annotation %q not found", internal.ManifestChecksumKey)
162162
}
163163

164164
var offset, length, lengthUncompressed, manifestType uint64
165165

166+
var offsetTarSplit, lengthTarSplit, lengthUncompressedTarSplit uint64
167+
tarSplitChecksumAnnotation := ""
168+
166169
if offsetMetadata := annotations[internal.ManifestInfoKey]; offsetMetadata != "" {
167170
if _, err := fmt.Sscanf(offsetMetadata, "%d:%d:%d:%d", &offset, &length, &lengthUncompressed, &manifestType); err != nil {
168-
return nil, 0, err
171+
return nil, nil, 0, err
172+
}
173+
174+
if tarSplitInfoKeyAnnotation, found := annotations[internal.TarSplitInfoKey]; found {
175+
if _, err := fmt.Sscanf(tarSplitInfoKeyAnnotation, "%d:%d:%d", &offsetTarSplit, &lengthTarSplit, &lengthUncompressedTarSplit); err != nil {
176+
return nil, nil, 0, err
177+
}
178+
tarSplitChecksumAnnotation = annotations[internal.TarSplitChecksumKey]
169179
}
170180
} else {
171181
chunk := ImageSourceChunk{
@@ -174,87 +184,129 @@ func readZstdChunkedManifest(ctx context.Context, blobStream ImageSourceSeekable
174184
}
175185
parts, errs, err := blobStream.GetBlobAt([]ImageSourceChunk{chunk})
176186
if err != nil {
177-
return nil, 0, err
187+
return nil, nil, 0, err
178188
}
179189
var reader io.ReadCloser
180190
select {
181191
case r := <-parts:
182192
reader = r
183193
case err := <-errs:
184-
return nil, 0, err
194+
return nil, nil, 0, err
185195
}
186196
footer := make([]byte, footerSize)
187197
if _, err := io.ReadFull(reader, footer); err != nil {
188-
return nil, 0, err
198+
return nil, nil, 0, err
189199
}
190200

191201
offset = binary.LittleEndian.Uint64(footer[0:8])
192202
length = binary.LittleEndian.Uint64(footer[8:16])
193203
lengthUncompressed = binary.LittleEndian.Uint64(footer[16:24])
194204
manifestType = binary.LittleEndian.Uint64(footer[24:32])
195-
if !isZstdChunkedFrameMagic(footer[32:40]) {
196-
return nil, 0, errors.New("invalid magic number")
205+
if !isZstdChunkedFrameMagic(footer[48:56]) {
206+
return nil, nil, 0, errors.New("invalid magic number")
197207
}
198208
}
199209

200210
if manifestType != internal.ManifestTypeCRFS {
201-
return nil, 0, errors.New("invalid manifest type")
211+
return nil, nil, 0, errors.New("invalid manifest type")
202212
}
203213

204214
// set a reasonable limit
205215
if length > (1<<20)*50 {
206-
return nil, 0, errors.New("manifest too big")
216+
return nil, nil, 0, errors.New("manifest too big")
207217
}
208218
if lengthUncompressed > (1<<20)*50 {
209-
return nil, 0, errors.New("manifest too big")
219+
return nil, nil, 0, errors.New("manifest too big")
210220
}
211221

212222
chunk := ImageSourceChunk{
213223
Offset: offset,
214224
Length: length,
215225
}
216226

217-
parts, errs, err := blobStream.GetBlobAt([]ImageSourceChunk{chunk})
227+
chunks := []ImageSourceChunk{chunk}
228+
229+
if offsetTarSplit > 0 {
230+
chunkTarSplit := ImageSourceChunk{
231+
Offset: offsetTarSplit,
232+
Length: lengthTarSplit,
233+
}
234+
chunks = append(chunks, chunkTarSplit)
235+
}
236+
237+
parts, errs, err := blobStream.GetBlobAt(chunks)
218238
if err != nil {
219-
return nil, 0, err
239+
return nil, nil, 0, err
220240
}
221-
var reader io.ReadCloser
222-
select {
223-
case r := <-parts:
224-
reader = r
225-
case err := <-errs:
226-
return nil, 0, err
241+
242+
readBlob := func(len uint64) ([]byte, error) {
243+
var reader io.ReadCloser
244+
select {
245+
case r := <-parts:
246+
reader = r
247+
case err := <-errs:
248+
return nil, err
249+
}
250+
251+
blob := make([]byte, len)
252+
if _, err := io.ReadFull(reader, blob); err != nil {
253+
reader.Close()
254+
return nil, err
255+
}
256+
if err := reader.Close(); err != nil {
257+
return nil, err
258+
}
259+
return blob, nil
227260
}
228261

229-
manifest := make([]byte, length)
230-
if _, err := io.ReadFull(reader, manifest); err != nil {
231-
return nil, 0, err
262+
manifest, err := readBlob(length)
263+
if err != nil {
264+
return nil, nil, 0, err
232265
}
233266

234-
manifestDigester := digest.Canonical.Digester()
235-
manifestChecksum := manifestDigester.Hash()
236-
if _, err := manifestChecksum.Write(manifest); err != nil {
237-
return nil, 0, err
267+
decodedBlob, err := decodeAndValidateBlob(manifest, lengthUncompressed, manifestChecksumAnnotation)
268+
if err != nil {
269+
return nil, nil, 0, err
270+
}
271+
decodedTarSplit := []byte{}
272+
if offsetTarSplit > 0 {
273+
tarSplit, err := readBlob(lengthTarSplit)
274+
if err != nil {
275+
return nil, nil, 0, err
276+
}
277+
278+
decodedTarSplit, err = decodeAndValidateBlob(tarSplit, lengthUncompressedTarSplit, tarSplitChecksumAnnotation)
279+
if err != nil {
280+
return nil, nil, 0, err
281+
}
238282
}
283+
return decodedBlob, decodedTarSplit, int64(offset), err
284+
}
239285

240-
d, err := digest.Parse(manifestChecksumAnnotation)
286+
func decodeAndValidateBlob(blob []byte, lengthUncompressed uint64, expectedUncompressedChecksum string) ([]byte, error) {
287+
d, err := digest.Parse(expectedUncompressedChecksum)
241288
if err != nil {
242-
return nil, 0, err
289+
return nil, err
243290
}
244-
if manifestDigester.Digest() != d {
245-
return nil, 0, errors.New("invalid manifest checksum")
291+
292+
blobDigester := d.Algorithm().Digester()
293+
blobChecksum := blobDigester.Hash()
294+
if _, err := blobChecksum.Write(blob); err != nil {
295+
return nil, err
296+
}
297+
if blobDigester.Digest() != d {
298+
return nil, fmt.Errorf("invalid blob checksum, expected checksum %s, got %s", d, blobDigester.Digest())
246299
}
247300

248301
decoder, err := zstd.NewReader(nil) //nolint:contextcheck
249302
if err != nil {
250-
return nil, 0, err
303+
return nil, err
251304
}
252305
defer decoder.Close()
253306

254307
b := make([]byte, 0, lengthUncompressed)
255-
if decoded, err := decoder.DecodeAll(manifest, b); err == nil {
256-
return decoded, int64(offset), nil
308+
if decoded, err := decoder.DecodeAll(blob, b); err == nil {
309+
return decoded, nil
257310
}
258-
259-
return manifest, int64(offset), nil
311+
return blob, nil
260312
}

pkg/chunked/compressor/compressor.go

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@ package compressor
66

77
import (
88
"bufio"
9+
"bytes"
910
"encoding/base64"
1011
"io"
1112

1213
"github.com/containers/storage/pkg/chunked/internal"
1314
"github.com/containers/storage/pkg/ioutils"
15+
"github.com/klauspost/compress/zstd"
1416
"github.com/opencontainers/go-digest"
1517
"github.com/vbatts/tar-split/archive/tar"
18+
"github.com/vbatts/tar-split/tar/asm"
19+
"github.com/vbatts/tar-split/tar/storage"
1620
)
1721

1822
const (
@@ -198,11 +202,55 @@ type chunk struct {
198202
ChunkType string
199203
}
200204

205+
type tarSplitData struct {
206+
compressed *bytes.Buffer
207+
digester digest.Digester
208+
uncompressedCounter *ioutils.WriteCounter
209+
zstd *zstd.Encoder
210+
packer storage.Packer
211+
}
212+
213+
func newTarSplitData(level int) (*tarSplitData, error) {
214+
compressed := bytes.NewBuffer(nil)
215+
digester := digest.Canonical.Digester()
216+
217+
zstdWriter, err := internal.ZstdWriterWithLevel(io.MultiWriter(compressed, digester.Hash()), level)
218+
if err != nil {
219+
return nil, err
220+
}
221+
222+
uncompressedCounter := ioutils.NewWriteCounter(zstdWriter)
223+
metaPacker := storage.NewJSONPacker(uncompressedCounter)
224+
225+
return &tarSplitData{
226+
compressed: compressed,
227+
digester: digester,
228+
uncompressedCounter: uncompressedCounter,
229+
zstd: zstdWriter,
230+
packer: metaPacker,
231+
}, nil
232+
}
233+
201234
func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error {
202235
// total written so far. Used to retrieve partial offsets in the file
203236
dest := ioutils.NewWriteCounter(destFile)
204237

205-
tr := tar.NewReader(reader)
238+
tarSplitData, err := newTarSplitData(level)
239+
if err != nil {
240+
return err
241+
}
242+
defer func() {
243+
if tarSplitData.zstd != nil {
244+
tarSplitData.zstd.Close()
245+
}
246+
}()
247+
248+
its, err := asm.NewInputTarStream(reader, tarSplitData.packer, nil)
249+
if err != nil {
250+
return err
251+
}
252+
253+
tr := tar.NewReader(its)
206254
tr.RawAccounting = true
207255

208256
buf := make([]byte, 4096)
@@ -369,17 +417,33 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r
369417

370418
rawBytes := tr.RawBytes()
371419
if _, err := zstdWriter.Write(rawBytes); err != nil {
420+
zstdWriter.Close()
372421
return err
373422
}
374423
if err := zstdWriter.Flush(); err != nil {
424+
zstdWriter.Close()
375425
return err
376426
}
377427
if err := zstdWriter.Close(); err != nil {
378428
return err
379429
}
380430
zstdWriter = nil
381431

382-
return internal.WriteZstdChunkedManifest(dest, outMetadata, uint64(dest.Count), metadata, level)
432+
if err := tarSplitData.zstd.Flush(); err != nil {
433+
return err
434+
}
435+
if err := tarSplitData.zstd.Close(); err != nil {
436+
return err
437+
}
438+
tarSplitData.zstd = nil
439+
440+
ts := internal.TarSplitData{
441+
Data: tarSplitData.compressed.Bytes(),
442+
Digest: tarSplitData.digester.Digest(),
443+
UncompressedSize: tarSplitData.uncompressedCounter.Count,
444+
}
445+
446+
return internal.WriteZstdChunkedManifest(dest, outMetadata, uint64(dest.Count), &ts, metadata, level)
383447
}
384448

385449
type zstdChunkedWriter struct {

0 commit comments

Comments
 (0)