Skip to content

Commit fc415c8

Browse files
authored
Fix(multipart-put): add parts on multipart complete (#403)
fix(multipart-put): add parts on multipart complete Signed-off-by: Warh40k <[email protected]>
1 parent ec1ba4b commit fc415c8

File tree

1 file changed

+17
-6
lines changed

1 file changed

+17
-6
lines changed

pkg/bench/multipart_put.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (g *MultipartPut) Start(ctx context.Context, wait chan struct{}) error {
5050
continue
5151
}
5252

53-
err = g.uploadParts(ctx, thread, objectName, uploadID)
53+
parts, err := g.uploadParts(ctx, thread, objectName, uploadID)
5454
if errors.Is(err, context.Canceled) {
5555
return nil
5656
}
@@ -59,7 +59,7 @@ func (g *MultipartPut) Start(ctx context.Context, wait chan struct{}) error {
5959
continue
6060
}
6161

62-
err = g.completeMultipartUpload(ctx, objectName, uploadID)
62+
err = g.completeMultipartUpload(ctx, objectName, uploadID, parts)
6363
if err != nil {
6464
g.Error("complete multipart upload")
6565
}
@@ -89,13 +89,15 @@ func (g *MultipartPut) createMultupartUpload(ctx context.Context, objectName str
8989
return c.NewMultipartUpload(nonTerm, g.Bucket, objectName, g.PutOpts)
9090
}
9191

92-
func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectName, uploadID string) error {
92+
func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectName, uploadID string) ([]minio.CompletePart, error) {
9393
partIdxCh := make(chan int, g.PartsNumber)
9494
for i := 0; i < g.PartsNumber; i++ {
9595
partIdxCh <- i + 1
9696
}
9797
close(partIdxCh)
9898

99+
parts := make([]minio.CompletePart, g.PartsNumber)
100+
99101
eg, ctx := errgroup.WithContext(ctx)
100102

101103
// Non-terminating context.
@@ -157,6 +159,7 @@ func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectNam
157159
}
158160
g.Error(err)
159161
}
162+
parts[res.PartNumber-1] = minio.CompletePart{PartNumber: res.PartNumber, ETag: res.ETag}
160163

161164
g.Collector.Receiver() <- op
162165
}
@@ -165,16 +168,24 @@ func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectNam
165168
})
166169
}
167170

168-
return eg.Wait()
171+
err := eg.Wait()
172+
173+
return parts, err
169174
}
170175

171-
func (g *MultipartPut) completeMultipartUpload(_ context.Context, objectName, uploadID string) error {
176+
func (g *MultipartPut) completeMultipartUpload(ctx context.Context, objectName, uploadID string, parts []minio.CompletePart) error {
177+
select {
178+
case <-ctx.Done():
179+
return nil
180+
default:
181+
}
182+
172183
// Non-terminating context.
173184
nonTerm := context.Background()
174185

175186
cl, done := g.Client()
176187
c := minio.Core{Client: cl}
177188
defer done()
178-
_, err := c.CompleteMultipartUpload(nonTerm, g.Bucket, objectName, uploadID, nil, g.PutOpts)
189+
_, err := c.CompleteMultipartUpload(nonTerm, g.Bucket, objectName, uploadID, parts, g.PutOpts)
179190
return err
180191
}

0 commit comments

Comments
 (0)