Skip to content

Commit 15673a4

Browse files
committed
tq/transfer_queue: add Notify() and callback function
1 parent 7af9281 commit 15673a4

File tree

1 file changed

+26
-3
lines changed

1 file changed

+26
-3
lines changed

tq/transfer_queue.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ func (b batch) Len() int { return len(b) }
8989
func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
9090
func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
9191

92+
type NotifyFn func(oid string, ok bool)
93+
9294
// TransferQueue organises the wider process of uploading and downloading,
9395
// including calling the API, passing the actual transfer request to transfer
9496
// adapters, and dealing with progress, errors and retries.
@@ -107,6 +109,7 @@ type TransferQueue struct {
107109
incoming chan *objectTuple
108110
errorc chan error // Channel for processing errors
109111
watchers []chan string
112+
notify []NotifyFn
110113
trMutex *sync.Mutex
111114
startProgress sync.Once
112115
collectorWait sync.WaitGroup
@@ -302,6 +305,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
302305

303306
next = append(next, t)
304307
} else {
308+
q.sendNotifications(t.Oid, false)
305309
q.wait.Done()
306310
}
307311
}
@@ -318,6 +322,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
318322
if o.Error != nil {
319323
q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
320324
q.Skip(o.Size)
325+
q.sendNotifications(o.Oid, false)
321326
q.wait.Done()
322327

323328
continue
@@ -334,6 +339,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
334339
q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid)
335340

336341
q.Skip(o.Size)
342+
q.sendNotifications(o.Oid, false)
337343
q.wait.Done()
338344
} else {
339345
tr := newTransfer(t.Name, o, t.Path)
@@ -352,6 +358,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
352358
}
353359

354360
q.Skip(o.Size)
361+
q.sendNotifications(tr.Oid, false)
355362
q.wait.Done()
356363
}
357364

@@ -394,6 +401,7 @@ func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan *objectTuple {
394401
q.errorc <- err
395402
for _, t := range pending {
396403
q.Skip(t.Size)
404+
q.sendNotifications(t.Oid, false)
397405
q.wait.Done()
398406
}
399407

@@ -463,14 +471,13 @@ func (q *TransferQueue) handleTransferResult(
463471
// the retry channel, and the error will be reported
464472
// immediately.
465473
q.errorc <- res.Error
474+
q.sendNotifications(oid, false)
466475
q.wait.Done()
467476
}
468477
} else {
469478
// Otherwise, if the transfer was successful, notify all of the
470479
// watchers, and mark it as finished.
471-
for _, c := range q.watchers {
472-
c <- oid
473-
}
480+
q.sendNotifications(oid, true)
474481

475482
q.meter.FinishTransfer(res.Transfer.Name)
476483
q.wait.Done()
@@ -567,6 +574,22 @@ func (q *TransferQueue) Watch() chan string {
567574
return c
568575
}
569576

577+
func (q *TransferQueue) Notify(cb NotifyFn) {
578+
q.notify = append(q.notify, cb)
579+
}
580+
581+
func (q *TransferQueue) sendNotifications(oid string, ok bool) {
582+
if ok {
583+
for _, c := range q.watchers {
584+
c <- oid
585+
}
586+
}
587+
588+
for _, cb := range q.notify {
589+
cb(oid, ok)
590+
}
591+
}
592+
570593
// This goroutine collects errors returned from transfers
571594
func (q *TransferQueue) errorCollector() {
572595
for err := range q.errorc {

0 commit comments

Comments
 (0)