Skip to content

Commit ea02ce9

Browse files
zepatrikory-bot
authored andcommitted
test: deflake directory watcherx
GitOrigin-RevId: 6b1c66f96f46833273490b22179aadcb5c7ce01c
1 parent 10df7e1 commit ea02ce9

File tree

1 file changed

+62
-29
lines changed

1 file changed

+62
-29
lines changed

oryx/watcherx/directory.go

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,97 +33,130 @@ func WatchDirectory(ctx context.Context, dir string, c EventChannel) (Watcher, e
3333
return nil, err
3434
}
3535

36-
d := newDispatcher()
37-
go streamDirectoryEvents(ctx, w, c, d.trigger, d.done, dir, subDirs)
38-
return d, nil
36+
dw := &directoryWatcher{
37+
dispatcher: newDispatcher(),
38+
c: c,
39+
dir: dir,
40+
subDirs: subDirs,
41+
w: w,
42+
}
43+
go dw.streamDirectoryEvents(ctx)
44+
return dw, nil
45+
}
46+
47+
type directoryWatcher struct {
48+
*dispatcher
49+
c EventChannel
50+
dir string
51+
subDirs map[string]struct{}
52+
w *fsnotify.Watcher
3953
}
4054

41-
func handleEvent(e fsnotify.Event, w *fsnotify.Watcher, c EventChannel, subDirs map[string]struct{}) {
55+
func (w *directoryWatcher) handleEvent(ctx context.Context, e fsnotify.Event) {
4256
if e.Has(fsnotify.Remove) {
43-
if _, ok := subDirs[e.Name]; ok {
57+
if _, ok := w.subDirs[e.Name]; ok {
4458
// we do not want any event on deletion of a directory
45-
delete(subDirs, e.Name)
59+
delete(w.subDirs, e.Name)
4660
return
4761
}
48-
c <- &RemoveEvent{
62+
w.maybeSend(ctx, &RemoveEvent{
4963
source: source(e.Name),
50-
}
64+
})
5165
return
5266
} else if e.Has(fsnotify.Write | fsnotify.Create) {
5367
if stats, err := os.Stat(e.Name); err != nil {
54-
c <- &ErrorEvent{
68+
w.maybeSend(ctx, &ErrorEvent{
5569
error: errors.WithStack(err),
5670
source: source(e.Name),
57-
}
71+
})
5872
return
5973
} else if stats.IsDir() {
60-
if err := w.Add(e.Name); err != nil {
61-
c <- &ErrorEvent{
74+
if err := w.w.Add(e.Name); err != nil {
75+
w.maybeSend(ctx, &ErrorEvent{
6276
error: errors.WithStack(err),
6377
source: source(e.Name),
64-
}
78+
})
6579
}
66-
subDirs[e.Name] = struct{}{}
80+
w.subDirs[e.Name] = struct{}{}
6781
return
6882
}
6983

7084
//#nosec G304 -- false positive
7185
data, err := os.ReadFile(e.Name)
7286
if err != nil {
73-
c <- &ErrorEvent{
87+
w.maybeSend(ctx, &ErrorEvent{
7488
error: err,
7589
source: source(e.Name),
76-
}
90+
})
7791
} else {
78-
c <- &ChangeEvent{
92+
w.maybeSend(ctx, &ChangeEvent{
7993
data: data,
8094
source: source(e.Name),
81-
}
95+
})
8296
}
8397
}
8498
}
8599

86-
func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, sendNowDone chan<- int, dir string, subDirs map[string]struct{}) {
100+
func (w *directoryWatcher) maybeSend(ctx context.Context, e Event) bool {
101+
select {
102+
case <-ctx.Done():
103+
return false
104+
case w.c <- e:
105+
return true
106+
}
107+
}
108+
109+
func (w *directoryWatcher) streamDirectoryEvents(ctx context.Context) {
110+
defer func() {
111+
close(w.done)
112+
close(w.c)
113+
_ = w.w.Close()
114+
}()
87115
for {
88116
select {
89117
case <-ctx.Done():
90-
_ = w.Close()
91118
return
92-
case e := <-w.Events:
93-
handleEvent(e, w, c, subDirs)
94-
case <-sendNow:
119+
case e := <-w.w.Events:
120+
w.handleEvent(ctx, e)
121+
case <-w.trigger:
95122
var eventsSent int
96123

97-
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
124+
if err := filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
98125
if err != nil {
99126
return err
100127
}
101128
if !info.IsDir() {
102129
//#nosec G304 -- false positive
103130
data, err := os.ReadFile(path)
104131
if err != nil {
105-
c <- &ErrorEvent{
132+
if !w.maybeSend(ctx, &ErrorEvent{
106133
error: err,
107134
source: source(path),
135+
}) {
136+
return errors.WithStack(context.Canceled)
108137
}
109138
} else {
110-
c <- &ChangeEvent{
139+
if !w.maybeSend(ctx, &ChangeEvent{
111140
data: data,
112141
source: source(path),
142+
}) {
143+
return errors.WithStack(context.Canceled)
113144
}
114145
}
115146
eventsSent++
116147
}
117148
return nil
118149
}); err != nil {
119-
c <- &ErrorEvent{
150+
if !w.maybeSend(ctx, &ErrorEvent{
120151
error: err,
121-
source: source(dir),
152+
source: source(w.dir),
153+
}) {
154+
return
122155
}
123156
eventsSent++
124157
}
125158

126-
sendNowDone <- eventsSent
159+
w.done <- eventsSent
127160
}
128161
}
129162
}

0 commit comments

Comments
 (0)