Skip to content

Commit cf2453f

Browse files
Minor enhancements to the worker pool
1 parent f59675d commit cf2453f

File tree

9 files changed

+117
-95
lines changed

9 files changed

+117
-95
lines changed

pooling/pooling.go

+43-48
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"sync/atomic"
1111
"time"
1212

13-
"github.com/cenkalti/backoff/v4"
1413
"github.com/phf/go-queue/queue"
1514
"github.com/wabarc/wayback/config"
1615
"github.com/wabarc/wayback/errors"
@@ -55,8 +54,8 @@ type Bucket struct {
5554
// Count of retried attempts
5655
elapsed uint64
5756

58-
// Reports whether it is process has been completed.
59-
// processed chan bool
57+
// An object that will perform exactly one action.
58+
once sync.Once
6059
}
6160

6261
func newResource(id int) *resource {
@@ -80,7 +79,7 @@ func New(ctx context.Context, capacity int) *Pool {
8079

8180
p.closed = make(chan bool, 1)
8281
p.timeout = config.Opts.WaybackTimeout()
83-
p.maxRetries = config.Opts.WaybackMaxRetries()
82+
p.maxRetries = config.Opts.WaybackMaxRetries() + 1
8483
p.multiplier = 0.75
8584
p.context = ctx
8685

@@ -111,6 +110,7 @@ func (p *Pool) Roll() {
111110
select {
112111
default:
113112
case <-p.closed:
113+
close(p.closed)
114114
return
115115
}
116116

@@ -119,11 +119,11 @@ func (p *Pool) Roll() {
119119
continue
120120
}
121121

122-
b := p.bucket()
123-
if b == nil {
124-
continue
122+
if b := p.bucket(); b != nil {
123+
go b.once.Do(func() {
124+
p.do(b)
125+
})
125126
}
126-
go p.do(b, b.Request, b.Fallback)
127127
}
128128
}
129129

@@ -144,7 +144,6 @@ func (p *Pool) Close() {
144144
processing := atomic.LoadInt32(&p.processing)
145145
if p.resource != nil && waiting == 0 && processing == 0 {
146146
once.Do(func() {
147-
close(p.resource)
148147
p.closed <- true
149148
})
150149
return
@@ -169,74 +168,70 @@ func (p *Pool) push(r *resource) error {
169168
return nil
170169
}
171170

172-
func (p *Pool) do(b *Bucket, request, fallback func(context.Context) error) error {
171+
func (p *Pool) do(b *Bucket) error {
173172
atomic.AddInt32(&p.processing, 1)
174173
defer func() {
175174
atomic.AddInt32(&p.waiting, -1)
176175
atomic.AddInt32(&p.processing, -1)
177176
}()
178177

179178
action := func() error {
180-
ctx, cancel := context.WithCancel(p.context)
179+
interval := float64(b.elapsed) * p.multiplier
180+
timeout := p.timeout + p.timeout*time.Duration(interval)
181+
ctx, cancel := context.WithTimeout(p.context, timeout)
181182
defer cancel()
182183

183-
if b.elapsed > p.maxRetries {
184-
if fallback != nil {
185-
fallback(ctx)
186-
}
187-
return errElapsed
188-
}
189-
190184
r := p.pull()
191185
defer func() {
192186
p.push(r)
187+
if b.elapsed >= p.maxRetries {
188+
if b.Fallback != nil {
189+
b.Fallback(ctx)
190+
}
191+
}
193192
}()
194193

195-
res := make(chan error, 1)
194+
ch := make(chan error, 1)
196195
go func() {
197-
if request != nil {
198-
res <- request(ctx)
199-
return
196+
if b.Request != nil {
197+
ch <- b.Request(ctx)
200198
}
201-
res <- nil
202-
return
203199
}()
204200

205-
interval := float64(b.elapsed) * p.multiplier
206-
timeout := p.timeout + p.timeout*time.Duration(interval)
207-
for {
208-
select {
209-
case err := <-res:
210-
if err != nil {
211-
atomic.AddUint64(&b.elapsed, 1)
212-
}
213-
return err
214-
case <-time.After(timeout):
201+
select {
202+
case err := <-ch:
203+
if err != nil {
215204
atomic.AddUint64(&b.elapsed, 1)
216-
return errRollTimeout
217205
}
206+
close(ch)
207+
return err
208+
case <-ctx.Done():
209+
atomic.AddUint64(&b.elapsed, 1)
210+
return errRollTimeout
211+
}
212+
}
213+
214+
ran := uint64(1)
215+
max := p.maxRetries
216+
for ; ran <= max; ran++ {
217+
err := action()
218+
switch err {
219+
case nil, errElapsed:
220+
return err
221+
case errRollTimeout:
218222
}
219223
}
220224

221-
return p.doRetry(action)
225+
return nil
222226
}
223227

224228
func (p *Pool) bucket() *Bucket {
225229
p.mutex.Lock()
226230
defer p.mutex.Unlock()
227231

228-
b, ok := p.staging.PopBack().(*Bucket)
229-
if !ok {
230-
return nil
232+
if b, ok := p.staging.PopBack().(*Bucket); ok {
233+
return b
231234
}
232235

233-
return b
234-
}
235-
236-
func (p *Pool) doRetry(o backoff.Operation) error {
237-
exp := backoff.NewExponentialBackOff()
238-
exp.Reset()
239-
b := backoff.WithMaxRetries(exp, p.maxRetries+1) // One more retry for fallback to be triggered
240-
241-
return backoff.Retry(o, b)
236+
return nil
242237
}

pooling/pooling_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,7 @@ func TestMaxRetries(t *testing.T) {
116116

117117
bucket := &Bucket{
118118
Request: func(_ context.Context) error {
119-
time.Sleep(100 * time.Microsecond)
120-
return nil
119+
return errors.New("process request failed")
121120
},
122121
Fallback: func(_ context.Context) error {
123122
return nil
@@ -131,7 +130,7 @@ func TestMaxRetries(t *testing.T) {
131130
go p.Roll()
132131
p.Put(bucket)
133132
p.Close()
134-
if bucket.elapsed-1 != maxRetries {
133+
if bucket.elapsed != maxRetries {
135134
t.Fatalf("Unexpected max retries got %d instead of %d", bucket.elapsed, maxRetries)
136135
}
137136
}
@@ -148,7 +147,6 @@ func TestFallback(t *testing.T) {
148147
fall := ""
149148
bucket := &Bucket{
150149
Request: func(_ context.Context) error {
151-
time.Sleep(100 * time.Microsecond)
152150
return errors.New("some error")
153151
},
154152
Fallback: func(_ context.Context) error {

reduxer/reduxer.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,11 @@ func remoteHeadless(addr string) net.Addr {
309309

310310
func createDir(baseDir string) (dir string, err error) {
311311
dir = filepath.Join(baseDir, time.Now().Format("200601"))
312+
if helper.Exists(dir) {
313+
return
314+
}
312315
if err := os.MkdirAll(dir, 0o755); err != nil {
313-
logger.Error("mkdir failed: %v", err)
314-
return "", err
316+
return "", errors.Wrap(err, "mkdir failed: "+dir)
315317
}
316318
return dir, nil
317319
}

service/service.go

+24-19
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package service // import "github.com/wabarc/wayback/service"
77
import (
88
"context"
99
"net/url"
10-
"sync"
1110

1211
"github.com/wabarc/wayback"
1312
"github.com/wabarc/wayback/errors"
@@ -21,28 +20,34 @@ const (
2120

2221
// Wayback in a separate goroutine.
2322
func Wayback(ctx context.Context, urls []*url.URL, do func(cols []wayback.Collect, rdx reduxer.Reduxer) error) error {
24-
var once sync.Once
25-
var done = make(chan bool, 1)
23+
var done = make(chan error, 1)
2624
var cols []wayback.Collect
2725
var rdx reduxer.Reduxer
28-
var err error
29-
for {
26+
27+
go func() {
28+
go func() {
29+
var err error
30+
cols, rdx, err = wayback.Wayback(ctx, urls...)
31+
if err != nil {
32+
done <- errors.Wrap(err, "wayback failed")
33+
return
34+
}
35+
defer rdx.Flush()
36+
done <- do(cols, rdx)
37+
}()
38+
39+
// Block until context is finished.
3040
select {
3141
case <-ctx.Done():
32-
return ctx.Err()
33-
case <-done:
34-
err = do(cols, rdx)
35-
rdx.Flush()
36-
return err
37-
default:
38-
once.Do(func() {
39-
cols, rdx, err = wayback.Wayback(ctx, urls...)
40-
if err != nil {
41-
err = errors.Wrap(err, "wayback failed")
42-
} else {
43-
done <- true
44-
}
45-
})
42+
return
4643
}
44+
}()
45+
46+
select {
47+
case <-ctx.Done():
48+
return ctx.Err()
49+
case err := <-done:
50+
close(done)
51+
return err
4752
}
4853
}

service/telegram/telegram.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -223,26 +223,26 @@ func (t *Telegram) process(message *telegram.Message) (err error) {
223223
t.reply(message, "URL no found.")
224224
default:
225225
metrics.IncrementWayback(metrics.ServiceTelegram, metrics.StatusRequest)
226-
queue, err := t.reply(message, "Queue...")
226+
request, err := t.reply(message, "Queue...")
227227
if err != nil {
228228
return errors.Wrap(err, "reply message failed")
229229
}
230230
bucket := &pooling.Bucket{
231231
Request: func(ctx context.Context) error {
232-
onhold, err := t.bot.Edit(queue, "Archiving...")
233-
if err != nil {
232+
_, err := t.bot.Edit(request, "Archiving...")
233+
if err != nil && err != telegram.ErrSameMessageContent {
234234
return errors.Wrap(err, "telegram: send archiving message failed")
235235
}
236236

237-
if err := t.wayback(ctx, onhold, urls); err != nil {
238-
t.bot.Edit(onhold, service.MsgWaybackRetrying)
237+
if err := t.wayback(ctx, request, urls); err != nil {
238+
t.bot.Edit(request, service.MsgWaybackRetrying)
239239
return errors.Wrap(err, "archives failed")
240240
}
241241
metrics.IncrementWayback(metrics.ServiceTelegram, metrics.StatusSuccess)
242242
return nil
243243
},
244244
Fallback: func(_ context.Context) error {
245-
t.bot.Delete(queue)
245+
t.bot.Delete(request)
246246
t.bot.Reply(message, service.MsgWaybackTimeout)
247247
metrics.IncrementWayback(metrics.ServiceTelegram, metrics.StatusFailure)
248248
return nil
@@ -253,13 +253,13 @@ func (t *Telegram) process(message *telegram.Message) (err error) {
253253
return nil
254254
}
255255

256-
func (t *Telegram) wayback(ctx context.Context, onhold *telegram.Message, urls []*url.URL) error {
256+
func (t *Telegram) wayback(ctx context.Context, request *telegram.Message, urls []*url.URL) error {
257257
do := func(cols []wayback.Collect, rdx reduxer.Reduxer) error {
258258
opts := &telegram.SendOptions{DisableWebPagePreview: true}
259259
replyText := render.ForReply(&render.Telegram{Cols: cols, Data: rdx}).String()
260260
logger.Debug("reply text, %s", replyText)
261261

262-
if _, err := t.bot.Edit(onhold, replyText, opts); err != nil {
262+
if _, err := t.bot.Edit(request, replyText, opts); err != nil {
263263
return errors.Wrap(err, "telegram: update message failed")
264264
}
265265

@@ -281,8 +281,8 @@ func (t *Telegram) wayback(ctx context.Context, onhold *telegram.Message, urls [
281281
}
282282

283283
// Send album attach files, and reply to wayback result message
284-
opts = &telegram.SendOptions{ReplyTo: onhold, DisableNotification: true}
285-
if _, err := t.bot.SendAlbum(onhold.Chat, albums, opts); err != nil {
284+
opts = &telegram.SendOptions{ReplyTo: request, DisableNotification: true}
285+
if _, err := t.bot.SendAlbum(request.Chat, albums, opts); err != nil {
286286
logger.Error("reply failed: %v", err)
287287
}
288288
return nil

service/telegram/telegram_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ var (
123123

124124
func handle(mux *http.ServeMux, updatesJSON string) {
125125
var count int32
126+
var edit int32
126127
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
127128
w.Header().Set("Content-Type", "application/json")
128129

@@ -159,7 +160,12 @@ func handle(mux *http.ServeMux, updatesJSON string) {
159160
}
160161
case "editMessageText":
161162
if strings.Contains(text, config.SlotName("ia")) || strings.Contains(text, "Archiving...") {
162-
fmt.Fprintln(w, replyJSON)
163+
atomic.AddInt32(&edit, 1)
164+
if edit == 0 {
165+
fmt.Fprintln(w, replyJSON)
166+
} else {
167+
fmt.Fprintln(w, fmt.Sprintf(`{"ok":true, "result":{"message":"%s"}}`, telegram.ErrSameMessageContent))
168+
}
163169
} else {
164170
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
165171
}

service/utils.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"path"
1111
"strings"
12+
"sync"
1213

1314
"github.com/dustin/go-humanize"
1415
"github.com/wabarc/helper"
@@ -29,10 +30,20 @@ func MatchURL(s string) (urls []*url.URL) {
2930
matches = helper.MatchURLFallback(s)
3031
}
3132

33+
wg := sync.WaitGroup{}
34+
urls = make([]*url.URL, len(matches))
3235
for i := range matches {
33-
u, _ := url.Parse(matches[i])
34-
urls = append(urls, helper.RealURI(u))
36+
wg.Add(1)
37+
go func(i int) {
38+
defer wg.Done()
39+
u, err := url.Parse(matches[i])
40+
if err != nil {
41+
return
42+
}
43+
urls[i] = helper.RealURI(u)
44+
}(i)
3545
}
46+
wg.Wait()
3647

3748
return removeDuplicates(urls)
3849
}

0 commit comments

Comments
 (0)