Skip to content

Commit 5d24251

Browse files
committed
chore: support ETag for providers
1 parent 223eae0 commit 5d24251

File tree

7 files changed

+201
-105
lines changed

7 files changed

+201
-105
lines changed

adapter/provider/provider.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,15 @@ func (pp *proxySetProvider) HealthCheck() {
7171
}
7272

7373
func (pp *proxySetProvider) Update() error {
74-
elm, same, err := pp.Fetcher.Update()
75-
if err == nil && !same {
76-
pp.OnUpdate(elm)
77-
}
74+
_, _, err := pp.Fetcher.Update()
7875
return err
7976
}
8077

8178
func (pp *proxySetProvider) Initial() error {
82-
elm, err := pp.Fetcher.Initial()
79+
_, err := pp.Fetcher.Initial()
8380
if err != nil {
8481
return err
8582
}
86-
pp.OnUpdate(elm)
8783
pp.getSubscriptionInfo()
8884
pp.closeAllConnections()
8985
return nil

component/profile/cachefile/cache.go

+55
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cachefile
22

33
import (
4+
"math"
45
"os"
56
"sync"
67
"time"
@@ -19,6 +20,7 @@ var (
1920

2021
bucketSelected = []byte("selected")
2122
bucketFakeip = []byte("fakeip")
23+
bucketETag = []byte("etag")
2224
)
2325

2426
// CacheFile store and update the cache file
@@ -143,6 +145,59 @@ func (c *CacheFile) FlushFakeIP() error {
143145
return err
144146
}
145147

148+
func (c *CacheFile) SetETagWithHash(url string, hash []byte, etag string) {
149+
if c.DB == nil {
150+
return
151+
}
152+
153+
lenHash := len(hash)
154+
if lenHash > math.MaxUint8 {
155+
return // maybe panic is better
156+
}
157+
158+
data := make([]byte, 1, 1+lenHash+len(etag))
159+
data[0] = uint8(lenHash)
160+
data = append(data, hash...)
161+
data = append(data, etag...)
162+
163+
err := c.DB.Batch(func(t *bbolt.Tx) error {
164+
bucket, err := t.CreateBucketIfNotExists(bucketETag)
165+
if err != nil {
166+
return err
167+
}
168+
169+
return bucket.Put([]byte(url), data)
170+
})
171+
if err != nil {
172+
log.Warnln("[CacheFile] write cache to %s failed: %s", c.DB.Path(), err.Error())
173+
return
174+
}
175+
}
176+
func (c *CacheFile) GetETagWithHash(key string) (hash []byte, etag string) {
177+
if c.DB == nil {
178+
return
179+
}
180+
var value []byte
181+
c.DB.View(func(t *bbolt.Tx) error {
182+
if bucket := t.Bucket(bucketETag); bucket != nil {
183+
if v := bucket.Get([]byte(key)); v != nil {
184+
value = v
185+
}
186+
}
187+
return nil
188+
})
189+
if len(value) == 0 {
190+
return
191+
}
192+
lenHash := int(value[0])
193+
if len(value) < 1+lenHash {
194+
return
195+
}
196+
hash = value[1 : 1+lenHash]
197+
etag = string(value[1+lenHash:])
198+
return
199+
}
200+
146201
func (c *CacheFile) Close() error {
147202
return c.DB.Close()
148203
}

component/resource/fetcher.go

+70-75
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package resource
22

33
import (
4-
"bytes"
54
"context"
6-
"crypto/md5"
75
"os"
86
"path/filepath"
97
"time"
@@ -29,10 +27,10 @@ type Fetcher[V any] struct {
2927
name string
3028
vehicle types.Vehicle
3129
updatedAt time.Time
32-
hash [16]byte
30+
hash types.HashType
3331
parser Parser[V]
3432
interval time.Duration
35-
OnUpdate func(V)
33+
onUpdate func(V)
3634
watcher *fswatch.Watcher
3735
}
3836

@@ -54,92 +52,63 @@ func (f *Fetcher[V]) UpdatedAt() time.Time {
5452

5553
func (f *Fetcher[V]) Initial() (V, error) {
5654
var (
57-
buf []byte
58-
err error
59-
isLocal bool
60-
forceUpdate bool
55+
buf []byte
56+
contents V
57+
err error
6158
)
6259

6360
if stat, fErr := os.Stat(f.vehicle.Path()); fErr == nil {
61+
// local file exists, use it first
6462
buf, err = os.ReadFile(f.vehicle.Path())
6563
modTime := stat.ModTime()
66-
f.updatedAt = modTime
67-
isLocal = true
68-
if time.Since(modTime) > f.interval {
69-
forceUpdate = true
64+
contents, _, err = f.loadBuf(buf, types.MakeHash(buf), false)
65+
f.updatedAt = modTime // reset updatedAt to file's modTime
66+
67+
if err == nil {
68+
err = f.startPullLoop(time.Since(modTime) > f.interval)
69+
if err != nil {
70+
return lo.Empty[V](), err
71+
}
72+
return contents, nil
7073
}
71-
} else {
72-
buf, err = f.vehicle.Read(f.ctx)
73-
f.updatedAt = time.Now()
7474
}
7575

76+
// parse local file error, fallback to remote
77+
contents, _, err = f.Update()
78+
7679
if err != nil {
7780
return lo.Empty[V](), err
7881
}
79-
80-
contents, err := f.parser(buf)
82+
err = f.startPullLoop(false)
8183
if err != nil {
82-
if !isLocal {
83-
return lo.Empty[V](), err
84-
}
85-
86-
// parse local file error, fallback to remote
87-
buf, err = f.vehicle.Read(f.ctx)
88-
if err != nil {
89-
return lo.Empty[V](), err
90-
}
91-
92-
contents, err = f.parser(buf)
93-
if err != nil {
94-
return lo.Empty[V](), err
95-
}
96-
97-
isLocal = false
98-
}
99-
100-
if f.vehicle.Type() != types.File && !isLocal {
101-
if err := safeWrite(f.vehicle.Path(), buf); err != nil {
102-
return lo.Empty[V](), err
103-
}
104-
}
105-
106-
f.hash = md5.Sum(buf)
107-
108-
// pull contents automatically
109-
if f.vehicle.Type() == types.File {
110-
f.watcher, err = fswatch.NewWatcher(fswatch.Options{
111-
Path: []string{f.vehicle.Path()},
112-
Direct: true,
113-
Callback: f.update,
114-
})
115-
if err != nil {
116-
return lo.Empty[V](), err
117-
}
118-
err = f.watcher.Start()
119-
if err != nil {
120-
return lo.Empty[V](), err
121-
}
122-
} else if f.interval > 0 {
123-
go f.pullLoop(forceUpdate)
84+
return lo.Empty[V](), err
12485
}
125-
12686
return contents, nil
12787
}
12888

12989
func (f *Fetcher[V]) Update() (V, bool, error) {
130-
buf, err := f.vehicle.Read(f.ctx)
90+
buf, hash, err := f.vehicle.Read(f.ctx, f.hash)
13191
if err != nil {
13292
return lo.Empty[V](), false, err
13393
}
134-
return f.SideUpdate(buf)
94+
return f.loadBuf(buf, hash, f.vehicle.Type() != types.File)
13595
}
13696

13797
func (f *Fetcher[V]) SideUpdate(buf []byte) (V, bool, error) {
98+
return f.loadBuf(buf, types.MakeHash(buf), true)
99+
}
100+
101+
func (f *Fetcher[V]) loadBuf(buf []byte, hash types.HashType, updateFile bool) (V, bool, error) {
138102
now := time.Now()
139-
hash := md5.Sum(buf)
140-
if bytes.Equal(f.hash[:], hash[:]) {
103+
if f.hash.Equal(hash) {
104+
if updateFile {
105+
_ = os.Chtimes(f.vehicle.Path(), now, now)
106+
}
141107
f.updatedAt = now
142-
_ = os.Chtimes(f.vehicle.Path(), now, now)
108+
return lo.Empty[V](), true, nil
109+
}
110+
111+
if buf == nil { // f.hash has been changed between f.vehicle.Read but should not happen (cause by concurrent)
143112
return lo.Empty[V](), true, nil
144113
}
145114

@@ -148,15 +117,18 @@ func (f *Fetcher[V]) SideUpdate(buf []byte) (V, bool, error) {
148117
return lo.Empty[V](), false, err
149118
}
150119

151-
if f.vehicle.Type() != types.File {
120+
if updateFile {
152121
if err = safeWrite(f.vehicle.Path(), buf); err != nil {
153122
return lo.Empty[V](), false, err
154123
}
155124
}
156-
157125
f.updatedAt = now
158126
f.hash = hash
159127

128+
if f.onUpdate != nil {
129+
f.onUpdate(contents)
130+
}
131+
160132
return contents, false, nil
161133
}
162134

@@ -176,7 +148,7 @@ func (f *Fetcher[V]) pullLoop(forceUpdate bool) {
176148

177149
if forceUpdate {
178150
log.Warnln("[Provider] %s not updated for a long time, force refresh", f.Name())
179-
f.update(f.vehicle.Path())
151+
f.updateWithLog()
180152
}
181153

182154
timer := time.NewTimer(initialInterval)
@@ -185,15 +157,40 @@ func (f *Fetcher[V]) pullLoop(forceUpdate bool) {
185157
select {
186158
case <-timer.C:
187159
timer.Reset(f.interval)
188-
f.update(f.vehicle.Path())
160+
f.updateWithLog()
189161
case <-f.ctx.Done():
190162
return
191163
}
192164
}
193165
}
194166

195-
func (f *Fetcher[V]) update(path string) {
196-
elm, same, err := f.Update()
167+
func (f *Fetcher[V]) startPullLoop(forceUpdate bool) (err error) {
168+
// pull contents automatically
169+
if f.vehicle.Type() == types.File {
170+
f.watcher, err = fswatch.NewWatcher(fswatch.Options{
171+
Path: []string{f.vehicle.Path()},
172+
Direct: true,
173+
Callback: f.updateCallback,
174+
})
175+
if err != nil {
176+
return err
177+
}
178+
err = f.watcher.Start()
179+
if err != nil {
180+
return err
181+
}
182+
} else if f.interval > 0 {
183+
go f.pullLoop(forceUpdate)
184+
}
185+
return
186+
}
187+
188+
func (f *Fetcher[V]) updateCallback(path string) {
189+
f.updateWithLog()
190+
}
191+
192+
func (f *Fetcher[V]) updateWithLog() {
193+
_, same, err := f.Update()
197194
if err != nil {
198195
log.Errorln("[Provider] %s pull error: %s", f.Name(), err.Error())
199196
return
@@ -205,9 +202,7 @@ func (f *Fetcher[V]) update(path string) {
205202
}
206203

207204
log.Infoln("[Provider] %s's content update", f.Name())
208-
if f.OnUpdate != nil {
209-
f.OnUpdate(elm)
210-
}
205+
return
211206
}
212207

213208
func safeWrite(path string, buf []byte) error {
@@ -230,7 +225,7 @@ func NewFetcher[V any](name string, interval time.Duration, vehicle types.Vehicl
230225
name: name,
231226
vehicle: vehicle,
232227
parser: parser,
233-
OnUpdate: onUpdate,
228+
onUpdate: onUpdate,
234229
interval: interval,
235230
}
236231
}

0 commit comments

Comments
 (0)