Skip to content

Commit 57b7519

Browse files
author
Roman Volosatovs
committed
daemon,volume: migrate to x/sync/singleflight
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent 9b26c2b commit 57b7519

8 files changed

Lines changed: 282 additions & 224 deletions

File tree

daemon/daemon.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ import (
4848
"github.com/docker/docker/libnetwork"
4949
"github.com/docker/docker/libnetwork/cluster"
5050
nwconfig "github.com/docker/docker/libnetwork/config"
51-
"github.com/docker/docker/pkg/compute"
5251
"github.com/docker/docker/pkg/fileutils"
5352
"github.com/docker/docker/pkg/idtools"
5453
"github.com/docker/docker/pkg/plugingetter"
@@ -66,6 +65,7 @@ import (
6665
"github.com/sirupsen/logrus"
6766
"go.etcd.io/bbolt"
6867
"golang.org/x/sync/semaphore"
68+
"golang.org/x/sync/singleflight"
6969
"google.golang.org/grpc"
7070
"google.golang.org/grpc/backoff"
7171
)
@@ -118,7 +118,7 @@ type Daemon struct {
118118
seccompProfile []byte
119119
seccompProfilePath string
120120

121-
containerDiskUsageSingleton *compute.Singleton
121+
singleflightGroup singleflight.Group
122122

123123
pruneRunning int32
124124
hosts map[string]bool // hosts stores the addresses the daemon is listening on
@@ -737,7 +737,7 @@ func (daemon *Daemon) IsSwarmCompatible() error {
737737

738738
// NewDaemon sets up everything for the daemon to be able to service
739739
// requests from the webserver.
740-
func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.Store) (d *Daemon, err error) {
740+
func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.Store) (daemon *Daemon, err error) {
741741
setDefaultMtu(config)
742742

743743
registryService, err := registry.NewService(config.ServiceOptions)
@@ -801,13 +801,10 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
801801
os.Setenv("TMPDIR", realTmp)
802802
}
803803

804-
d = &Daemon{
804+
d := &Daemon{
805805
configStore: config,
806806
PluginStore: pluginStore,
807807
startupDone: make(chan struct{}),
808-
containerDiskUsageSingleton: compute.NewSingleton(func(ctx context.Context) (interface{}, error) {
809-
return d.containerDiskUsage(ctx)
810-
}),
811808
}
812809

813810
// Ensure the daemon is properly shutdown if there is a failure during

daemon/disk_usage.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,19 @@ func (daemon *Daemon) containerDiskUsage(ctx context.Context) ([]*types.Containe
2323

2424
// ContainerDiskUsage returns information about container data disk usage.
2525
func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Container, error) {
26-
v, err := daemon.containerDiskUsageSingleton.Do(ctx)
27-
if err != nil {
28-
return nil, err
26+
ch := daemon.singleflightGroup.DoChan("containerDiskUsage", func() (interface{}, error) {
27+
return daemon.containerDiskUsage(ctx)
28+
})
29+
select {
30+
case <-ctx.Done():
31+
go func() { <-ch }()
32+
return nil, ctx.Err()
33+
case res := <-ch:
34+
if res.Err != nil {
35+
return nil, res.Err
36+
}
37+
return res.Val.([]*types.Container), nil
2938
}
30-
return v.([]*types.Container), nil
3139
}
3240

3341
// SystemDiskUsage returns information about the daemon data disk usage.

daemon/images/service.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ import (
1616
"github.com/docker/docker/distribution/xfer"
1717
"github.com/docker/docker/image"
1818
"github.com/docker/docker/layer"
19-
"github.com/docker/docker/pkg/compute"
2019
dockerreference "github.com/docker/docker/reference"
2120
"github.com/docker/docker/registry"
2221
"github.com/docker/libtrust"
2322
digest "github.com/opencontainers/go-digest"
2423
"github.com/pkg/errors"
2524
"github.com/sirupsen/logrus"
25+
"golang.org/x/sync/singleflight"
2626
)
2727

2828
type containerStore interface {
@@ -53,7 +53,7 @@ type ImageServiceConfig struct {
5353
}
5454

5555
// NewImageService returns a new ImageService from a configuration
56-
func NewImageService(config ImageServiceConfig) (i *ImageService) {
56+
func NewImageService(config ImageServiceConfig) *ImageService {
5757
logrus.Debugf("Max Concurrent Downloads: %d", config.MaxConcurrentDownloads)
5858
logrus.Debugf("Max Concurrent Uploads: %d", config.MaxConcurrentUploads)
5959
logrus.Debugf("Max Download Attempts: %d", config.MaxDownloadAttempts)
@@ -71,12 +71,6 @@ func NewImageService(config ImageServiceConfig) (i *ImageService) {
7171
leases: config.Leases,
7272
content: config.ContentStore,
7373
contentNamespace: config.ContentNamespace,
74-
imageDiskUsageSingleton: compute.NewSingleton(func(ctx context.Context) (interface{}, error) {
75-
return i.imageDiskUsage(ctx)
76-
}),
77-
layerDiskUsageSingleton: compute.NewSingleton(func(ctx context.Context) (interface{}, error) {
78-
return i.layerDiskUsage(ctx)
79-
}),
8074
}
8175
}
8276

@@ -96,8 +90,7 @@ type ImageService struct {
9690
leases leases.Manager
9791
content content.Store
9892
contentNamespace string
99-
imageDiskUsageSingleton *compute.Singleton
100-
layerDiskUsageSingleton *compute.Singleton
93+
singleflightGroup singleflight.Group
10194
}
10295

10396
// DistributionServices provides daemon image storage services
@@ -229,11 +222,19 @@ func (i *ImageService) layerDiskUsage(ctx context.Context) (int64, error) {
229222
// LayerDiskUsage returns the number of bytes used by layer stores
230223
// called from disk_usage.go
231224
func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) {
232-
v, err := i.layerDiskUsageSingleton.Do(ctx)
233-
if err != nil {
234-
return 0, err
225+
ch := i.singleflightGroup.DoChan("layerDiskUsage", func() (interface{}, error) {
226+
return i.layerDiskUsage(ctx)
227+
})
228+
select {
229+
case <-ctx.Done():
230+
go func() { <-ch }()
231+
return 0, ctx.Err()
232+
case res := <-ch:
233+
if res.Err != nil {
234+
return 0, res.Err
235+
}
236+
return res.Val.(int64), nil
235237
}
236-
return v.(int64), nil
237238
}
238239

239240
func (i *ImageService) getLayerRefs() map[layer.ChainID]int {
@@ -272,11 +273,19 @@ func (i *ImageService) imageDiskUsage(ctx context.Context) ([]*types.ImageSummar
272273

273274
// ImageDiskUsage returns information about image data disk usage.
274275
func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) {
275-
v, err := i.imageDiskUsageSingleton.Do(ctx)
276-
if err != nil {
277-
return nil, err
276+
ch := i.singleflightGroup.DoChan("imageDiskUsage", func() (interface{}, error) {
277+
return i.imageDiskUsage(ctx)
278+
})
279+
select {
280+
case <-ctx.Done():
281+
go func() { <-ch }()
282+
return nil, ctx.Err()
283+
case res := <-ch:
284+
if res.Err != nil {
285+
return nil, res.Err
286+
}
287+
return res.Val.([]*types.ImageSummary), nil
278288
}
279-
return v.([]*types.ImageSummary), nil
280289
}
281290

282291
// UpdateConfig values

pkg/compute/compute.go

Lines changed: 0 additions & 83 deletions
This file was deleted.

pkg/compute/compute_test.go

Lines changed: 0 additions & 80 deletions
This file was deleted.

0 commit comments

Comments
 (0)