1
1
package resource
2
2
3
3
import (
4
- "bytes"
5
4
"context"
6
- "crypto/md5"
7
5
"os"
8
6
"path/filepath"
9
7
"time"
@@ -29,10 +27,10 @@ type Fetcher[V any] struct {
29
27
name string
30
28
vehicle types.Vehicle
31
29
updatedAt time.Time
32
- hash [ 16 ] byte
30
+ hash types. HashType
33
31
parser Parser [V ]
34
32
interval time.Duration
35
- OnUpdate func (V )
33
+ onUpdate func (V )
36
34
watcher * fswatch.Watcher
37
35
}
38
36
@@ -54,92 +52,63 @@ func (f *Fetcher[V]) UpdatedAt() time.Time {
54
52
55
53
func (f * Fetcher [V ]) Initial () (V , error ) {
56
54
var (
57
- buf []byte
58
- err error
59
- isLocal bool
60
- forceUpdate bool
55
+ buf []byte
56
+ contents V
57
+ err error
61
58
)
62
59
63
60
if stat , fErr := os .Stat (f .vehicle .Path ()); fErr == nil {
61
+ // local file exists, use it first
64
62
buf , err = os .ReadFile (f .vehicle .Path ())
65
63
modTime := stat .ModTime ()
66
- f .updatedAt = modTime
67
- isLocal = true
68
- if time .Since (modTime ) > f .interval {
69
- forceUpdate = true
64
+ contents , _ , err = f .loadBuf (buf , types .MakeHash (buf ), false )
65
+ f .updatedAt = modTime // reset updatedAt to file's modTime
66
+
67
+ if err == nil {
68
+ err = f .startPullLoop (time .Since (modTime ) > f .interval )
69
+ if err != nil {
70
+ return lo .Empty [V ](), err
71
+ }
72
+ return contents , nil
70
73
}
71
- } else {
72
- buf , err = f .vehicle .Read (f .ctx )
73
- f .updatedAt = time .Now ()
74
74
}
75
75
76
+ // parse local file error, fallback to remote
77
+ contents , _ , err = f .Update ()
78
+
76
79
if err != nil {
77
80
return lo .Empty [V ](), err
78
81
}
79
-
80
- contents , err := f .parser (buf )
82
+ err = f .startPullLoop (false )
81
83
if err != nil {
82
- if ! isLocal {
83
- return lo .Empty [V ](), err
84
- }
85
-
86
- // parse local file error, fallback to remote
87
- buf , err = f .vehicle .Read (f .ctx )
88
- if err != nil {
89
- return lo .Empty [V ](), err
90
- }
91
-
92
- contents , err = f .parser (buf )
93
- if err != nil {
94
- return lo .Empty [V ](), err
95
- }
96
-
97
- isLocal = false
98
- }
99
-
100
- if f .vehicle .Type () != types .File && ! isLocal {
101
- if err := safeWrite (f .vehicle .Path (), buf ); err != nil {
102
- return lo .Empty [V ](), err
103
- }
104
- }
105
-
106
- f .hash = md5 .Sum (buf )
107
-
108
- // pull contents automatically
109
- if f .vehicle .Type () == types .File {
110
- f .watcher , err = fswatch .NewWatcher (fswatch.Options {
111
- Path : []string {f .vehicle .Path ()},
112
- Direct : true ,
113
- Callback : f .update ,
114
- })
115
- if err != nil {
116
- return lo .Empty [V ](), err
117
- }
118
- err = f .watcher .Start ()
119
- if err != nil {
120
- return lo .Empty [V ](), err
121
- }
122
- } else if f .interval > 0 {
123
- go f .pullLoop (forceUpdate )
84
+ return lo .Empty [V ](), err
124
85
}
125
-
126
86
return contents , nil
127
87
}
128
88
129
89
func (f * Fetcher [V ]) Update () (V , bool , error ) {
130
- buf , err := f .vehicle .Read (f .ctx )
90
+ buf , hash , err := f .vehicle .Read (f .ctx , f . hash )
131
91
if err != nil {
132
92
return lo .Empty [V ](), false , err
133
93
}
134
- return f .SideUpdate (buf )
94
+ return f .loadBuf (buf , hash , f . vehicle . Type () != types . File )
135
95
}
136
96
137
97
func (f * Fetcher [V ]) SideUpdate (buf []byte ) (V , bool , error ) {
98
+ return f .loadBuf (buf , types .MakeHash (buf ), true )
99
+ }
100
+
101
+ func (f * Fetcher [V ]) loadBuf (buf []byte , hash types.HashType , updateFile bool ) (V , bool , error ) {
138
102
now := time .Now ()
139
- hash := md5 .Sum (buf )
140
- if bytes .Equal (f .hash [:], hash [:]) {
103
+ if f .hash .Equal (hash ) {
104
+ if updateFile {
105
+ _ = os .Chtimes (f .vehicle .Path (), now , now )
106
+ }
141
107
f .updatedAt = now
142
- _ = os .Chtimes (f .vehicle .Path (), now , now )
108
+ return lo .Empty [V ](), true , nil
109
+ }
110
+
111
+ if buf == nil { // f.hash has been changed between f.vehicle.Read but should not happen (cause by concurrent)
143
112
return lo .Empty [V ](), true , nil
144
113
}
145
114
@@ -148,15 +117,18 @@ func (f *Fetcher[V]) SideUpdate(buf []byte) (V, bool, error) {
148
117
return lo .Empty [V ](), false , err
149
118
}
150
119
151
- if f . vehicle . Type () != types . File {
120
+ if updateFile {
152
121
if err = safeWrite (f .vehicle .Path (), buf ); err != nil {
153
122
return lo .Empty [V ](), false , err
154
123
}
155
124
}
156
-
157
125
f .updatedAt = now
158
126
f .hash = hash
159
127
128
+ if f .onUpdate != nil {
129
+ f .onUpdate (contents )
130
+ }
131
+
160
132
return contents , false , nil
161
133
}
162
134
@@ -176,7 +148,7 @@ func (f *Fetcher[V]) pullLoop(forceUpdate bool) {
176
148
177
149
if forceUpdate {
178
150
log .Warnln ("[Provider] %s not updated for a long time, force refresh" , f .Name ())
179
- f .update ( f . vehicle . Path () )
151
+ f .updateWithLog ( )
180
152
}
181
153
182
154
timer := time .NewTimer (initialInterval )
@@ -185,15 +157,40 @@ func (f *Fetcher[V]) pullLoop(forceUpdate bool) {
185
157
select {
186
158
case <- timer .C :
187
159
timer .Reset (f .interval )
188
- f .update ( f . vehicle . Path () )
160
+ f .updateWithLog ( )
189
161
case <- f .ctx .Done ():
190
162
return
191
163
}
192
164
}
193
165
}
194
166
195
- func (f * Fetcher [V ]) update (path string ) {
196
- elm , same , err := f .Update ()
167
+ func (f * Fetcher [V ]) startPullLoop (forceUpdate bool ) (err error ) {
168
+ // pull contents automatically
169
+ if f .vehicle .Type () == types .File {
170
+ f .watcher , err = fswatch .NewWatcher (fswatch.Options {
171
+ Path : []string {f .vehicle .Path ()},
172
+ Direct : true ,
173
+ Callback : f .updateCallback ,
174
+ })
175
+ if err != nil {
176
+ return err
177
+ }
178
+ err = f .watcher .Start ()
179
+ if err != nil {
180
+ return err
181
+ }
182
+ } else if f .interval > 0 {
183
+ go f .pullLoop (forceUpdate )
184
+ }
185
+ return
186
+ }
187
+
188
+ func (f * Fetcher [V ]) updateCallback (path string ) {
189
+ f .updateWithLog ()
190
+ }
191
+
192
+ func (f * Fetcher [V ]) updateWithLog () {
193
+ _ , same , err := f .Update ()
197
194
if err != nil {
198
195
log .Errorln ("[Provider] %s pull error: %s" , f .Name (), err .Error ())
199
196
return
@@ -205,9 +202,7 @@ func (f *Fetcher[V]) update(path string) {
205
202
}
206
203
207
204
log .Infoln ("[Provider] %s's content update" , f .Name ())
208
- if f .OnUpdate != nil {
209
- f .OnUpdate (elm )
210
- }
205
+ return
211
206
}
212
207
213
208
func safeWrite (path string , buf []byte ) error {
@@ -230,7 +225,7 @@ func NewFetcher[V any](name string, interval time.Duration, vehicle types.Vehicl
230
225
name : name ,
231
226
vehicle : vehicle ,
232
227
parser : parser ,
233
- OnUpdate : onUpdate ,
228
+ onUpdate : onUpdate ,
234
229
interval : interval ,
235
230
}
236
231
}
0 commit comments