@@ -89,6 +89,8 @@ func (b batch) Len() int { return len(b) }
8989func (b batch ) Less (i , j int ) bool { return b [i ].Size < b [j ].Size }
9090func (b batch ) Swap (i , j int ) { b [i ], b [j ] = b [j ], b [i ] }
9191
92+ type NotifyFn func (oid string , ok bool )
93+
9294// TransferQueue organises the wider process of uploading and downloading,
9395// including calling the API, passing the actual transfer request to transfer
9496// adapters, and dealing with progress, errors and retries.
@@ -107,6 +109,7 @@ type TransferQueue struct {
107109 incoming chan * objectTuple
108110 errorc chan error // Channel for processing errors
109111 watchers []chan string
112+ notify []NotifyFn
110113 trMutex * sync.Mutex
111114 startProgress sync.Once
112115 collectorWait sync.WaitGroup
@@ -302,6 +305,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
302305
303306 next = append (next , t )
304307 } else {
308+ q .sendNotifications (t .Oid , false )
305309 q .wait .Done ()
306310 }
307311 }
@@ -318,6 +322,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
318322 if o .Error != nil {
319323 q .errorc <- errors .Wrapf (o .Error , "[%v] %v" , o .Oid , o .Error .Message )
320324 q .Skip (o .Size )
325+ q .sendNotifications (o .Oid , false )
321326 q .wait .Done ()
322327
323328 continue
@@ -334,6 +339,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
334339 q .errorc <- errors .Errorf ("[%v] The server returned an unknown OID." , o .Oid )
335340
336341 q .Skip (o .Size )
342+ q .sendNotifications (o .Oid , false )
337343 q .wait .Done ()
338344 } else {
339345 tr := newTransfer (t .Name , o , t .Path )
@@ -352,6 +358,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
352358 }
353359
354360 q .Skip (o .Size )
361+ q .sendNotifications (tr .Oid , false )
355362 q .wait .Done ()
356363 }
357364
@@ -394,6 +401,7 @@ func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan *objectTuple {
394401 q .errorc <- err
395402 for _ , t := range pending {
396403 q .Skip (t .Size )
404+ q .sendNotifications (t .Oid , false )
397405 q .wait .Done ()
398406 }
399407
@@ -463,14 +471,13 @@ func (q *TransferQueue) handleTransferResult(
463471 // the retry channel, and the error will be reported
464472 // immediately.
465473 q .errorc <- res .Error
474+ q .sendNotifications (oid , false )
466475 q .wait .Done ()
467476 }
468477 } else {
469478 // Otherwise, if the transfer was successful, notify all of the
470479 // watchers, and mark it as finished.
471- for _ , c := range q .watchers {
472- c <- oid
473- }
480+ q .sendNotifications (oid , true )
474481
475482 q .meter .FinishTransfer (res .Transfer .Name )
476483 q .wait .Done ()
@@ -567,6 +574,22 @@ func (q *TransferQueue) Watch() chan string {
567574 return c
568575}
569576
577+ func (q * TransferQueue ) Notify (cb NotifyFn ) {
578+ q .notify = append (q .notify , cb )
579+ }
580+
581+ func (q * TransferQueue ) sendNotifications (oid string , ok bool ) {
582+ if ok {
583+ for _ , c := range q .watchers {
584+ c <- oid
585+ }
586+ }
587+
588+ for _ , cb := range q .notify {
589+ cb (oid , ok )
590+ }
591+ }
592+
570593// This goroutine collects errors returned from transfers
571594func (q * TransferQueue ) errorCollector () {
572595 for err := range q .errorc {
0 commit comments