@@ -2,6 +2,7 @@ package resource
2
2
3
3
import (
4
4
"bytes"
5
+ "context"
5
6
"crypto/md5"
6
7
"os"
7
8
"path/filepath"
@@ -22,11 +23,12 @@ var (
22
23
type Parser [V any ] func ([]byte ) (V , error )
23
24
24
25
type Fetcher [V any ] struct {
26
+ ctx context.Context
27
+ ctxCancel context.CancelFunc
25
28
resourceType string
26
29
name string
27
30
vehicle types.Vehicle
28
- UpdatedAt time.Time
29
- done chan struct {}
31
+ updatedAt time.Time
30
32
hash [16 ]byte
31
33
parser Parser [V ]
32
34
interval time.Duration
@@ -46,6 +48,10 @@ func (f *Fetcher[V]) VehicleType() types.VehicleType {
46
48
return f .vehicle .Type ()
47
49
}
48
50
51
+ func (f * Fetcher [V ]) UpdatedAt () time.Time {
52
+ return f .updatedAt
53
+ }
54
+
49
55
func (f * Fetcher [V ]) Initial () (V , error ) {
50
56
var (
51
57
buf []byte
@@ -57,15 +63,15 @@ func (f *Fetcher[V]) Initial() (V, error) {
57
63
if stat , fErr := os .Stat (f .vehicle .Path ()); fErr == nil {
58
64
buf , err = os .ReadFile (f .vehicle .Path ())
59
65
modTime := stat .ModTime ()
60
- f .UpdatedAt = modTime
66
+ f .updatedAt = modTime
61
67
isLocal = true
62
68
if f .interval != 0 && modTime .Add (f .interval ).Before (time .Now ()) {
63
69
log .Warnln ("[Provider] %s not updated for a long time, force refresh" , f .Name ())
64
70
forceUpdate = true
65
71
}
66
72
} else {
67
- buf , err = f .vehicle .Read ()
68
- f .UpdatedAt = time .Now ()
73
+ buf , err = f .vehicle .Read (f . ctx )
74
+ f .updatedAt = time .Now ()
69
75
}
70
76
71
77
if err != nil {
@@ -75,7 +81,7 @@ func (f *Fetcher[V]) Initial() (V, error) {
75
81
var contents V
76
82
if forceUpdate {
77
83
var forceBuf []byte
78
- if forceBuf , err = f .vehicle .Read (); err == nil {
84
+ if forceBuf , err = f .vehicle .Read (f . ctx ); err == nil {
79
85
if contents , err = f .parser (forceBuf ); err == nil {
80
86
isLocal = false
81
87
buf = forceBuf
@@ -93,7 +99,7 @@ func (f *Fetcher[V]) Initial() (V, error) {
93
99
}
94
100
95
101
// parse local file error, fallback to remote
96
- buf , err = f .vehicle .Read ()
102
+ buf , err = f .vehicle .Read (f . ctx )
97
103
if err != nil {
98
104
return lo .Empty [V ](), err
99
105
}
@@ -136,15 +142,18 @@ func (f *Fetcher[V]) Initial() (V, error) {
136
142
}
137
143
138
144
func (f * Fetcher [V ]) Update () (V , bool , error ) {
139
- buf , err := f .vehicle .Read ()
145
+ buf , err := f .vehicle .Read (f . ctx )
140
146
if err != nil {
141
147
return lo .Empty [V ](), false , err
142
148
}
149
+ return f .SideUpdate (buf )
150
+ }
143
151
152
+ func (f * Fetcher [V ]) SideUpdate (buf []byte ) (V , bool , error ) {
144
153
now := time .Now ()
145
154
hash := md5 .Sum (buf )
146
155
if bytes .Equal (f .hash [:], hash [:]) {
147
- f .UpdatedAt = now
156
+ f .updatedAt = now
148
157
_ = os .Chtimes (f .vehicle .Path (), now , now )
149
158
return lo .Empty [V ](), true , nil
150
159
}
@@ -160,24 +169,22 @@ func (f *Fetcher[V]) Update() (V, bool, error) {
160
169
}
161
170
}
162
171
163
- f .UpdatedAt = now
172
+ f .updatedAt = now
164
173
f .hash = hash
165
174
166
175
return contents , false , nil
167
176
}
168
177
169
- func (f * Fetcher [V ]) Destroy () error {
170
- if f .interval > 0 {
171
- f .done <- struct {}{}
172
- }
178
+ func (f * Fetcher [V ]) Close () error {
179
+ f .ctxCancel ()
173
180
if f .watcher != nil {
174
181
_ = f .watcher .Close ()
175
182
}
176
183
return nil
177
184
}
178
185
179
186
func (f * Fetcher [V ]) pullLoop () {
180
- initialInterval := f .interval - time .Since (f .UpdatedAt )
187
+ initialInterval := f .interval - time .Since (f .updatedAt )
181
188
if initialInterval > f .interval {
182
189
initialInterval = f .interval
183
190
}
@@ -189,7 +196,7 @@ func (f *Fetcher[V]) pullLoop() {
189
196
case <- timer .C :
190
197
timer .Reset (f .interval )
191
198
f .update (f .vehicle .Path ())
192
- case <- f .done :
199
+ case <- f .ctx . Done () :
193
200
return
194
201
}
195
202
}
@@ -226,13 +233,14 @@ func safeWrite(path string, buf []byte) error {
226
233
}
227
234
228
235
func NewFetcher [V any ](name string , interval time.Duration , vehicle types.Vehicle , parser Parser [V ], onUpdate func (V )) * Fetcher [V ] {
229
-
236
+ ctx , cancel := context . WithCancel ( context . Background ())
230
237
return & Fetcher [V ]{
231
- name : name ,
232
- vehicle : vehicle ,
233
- parser : parser ,
234
- done : make (chan struct {}, 8 ),
235
- OnUpdate : onUpdate ,
236
- interval : interval ,
238
+ ctx : ctx ,
239
+ ctxCancel : cancel ,
240
+ name : name ,
241
+ vehicle : vehicle ,
242
+ parser : parser ,
243
+ OnUpdate : onUpdate ,
244
+ interval : interval ,
237
245
}
238
246
}
0 commit comments