@@ -130,7 +130,7 @@ bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& i
130130// The input vector is not copied into the Aggregator but instead a vector of pointers to the original entries from the
131131// input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator
132132template <typename T>
133- struct Aggregator {
133+ struct Aggregator : public std ::enable_shared_from_this<Aggregator<T>> {
134134 typedef T ElementType;
135135
136136 size_t batchSize{16 };
@@ -142,7 +142,7 @@ struct Aggregator {
142142 std::mutex m;
143143 // items in the queue are all intermediate aggregation results of finished batches.
144144 // The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue)
145- boost::lockfree::queue <T*> aggQueue;
145+ ctpl::detail::Queue <T*> aggQueue;
146146 std::atomic<size_t > aggQueueSize{0 };
147147
148148 typedef std::function<void (const T& agg)> DoneCallback;
@@ -160,7 +160,6 @@ struct Aggregator {
160160 DoneCallback _doneCallback) :
161161 workerPool (_workerPool),
162162 parallel (_parallel),
163- aggQueue (0 ),
164163 doneCallback (std::move(_doneCallback))
165164 {
166165 inputVec = std::make_shared<std::vector<const T*> >(count);
@@ -184,19 +183,19 @@ struct Aggregator {
184183 } else {
185184 doneCallback (SyncAggregate (*inputVec, 0 , inputVec->size ()));
186185 }
187- delete this ;
188186 return ;
189187 }
190188
191189 if (batchCount == 1 ) {
192190 // just a single batch of work, take a shortcut.
193- PushWork ([this ](int threadId) {
194- if (inputVec->size () == 1 ) {
195- doneCallback (*(*inputVec)[0 ]);
196- } else {
197- doneCallback (SyncAggregate (*inputVec, 0 , inputVec->size ()));
198- }
199- delete this ;
191+ auto self (this ->shared_from_this ());
192+ PushWork ([self](int threadId) {
193+ size_t vecSize = self->inputVec ->size ();
194+ if (vecSize == 1 ) {
195+ self->doneCallback (*(*self->inputVec )[0 ]);
196+ } else {
197+ self->doneCallback (self->SyncAggregate (*self->inputVec , 0 , vecSize));
198+ }
200199 });
201200 return ;
202201 }
@@ -252,17 +251,18 @@ struct Aggregator {
252251 delete rem[i];
253252 }
254253 doneCallback (r);
255-
256- delete this ;
257254 }
258255
259- void AsyncAggregateAndPushAggQueue (std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
256+ void AsyncAggregateAndPushAggQueue (const std::shared_ptr<std::vector<const T*>>& vec, size_t start, size_t count, bool del)
260257 {
261258 IncWait ();
262- PushWork (std::bind (&Aggregator::SyncAggregateAndPushAggQueue, this , vec, start, count, del));
259+ auto self (this ->shared_from_this ());
260+ PushWork ([self, vec, start, count, del](int threadId){
261+ self->SyncAggregateAndPushAggQueue (vec, start, count, del);
262+ });
263263 }
264264
265- void SyncAggregateAndPushAggQueue (std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
265+ void SyncAggregateAndPushAggQueue (const std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
266266 {
267267 // aggregate vec and push the intermediate result onto the work queue
268268 PushAggQueue (SyncAggregate (*vec, start, count));
@@ -333,7 +333,7 @@ struct Aggregator {
333333// [ a1+a2+a3+a4, b1+b2+b3+b4, c1+c2+c3+c4, d1+d2+d3+d4]
334334// Same rules for the input vectors apply to the VectorAggregator as for the Aggregator (they must stay alive)
335335template <typename T>
336- struct VectorAggregator {
336+ struct VectorAggregator : public std ::enable_shared_from_this<VectorAggregator<T>> {
337337 typedef Aggregator<T> AggregatorType;
338338 typedef std::vector<T> VectorType;
339339 typedef std::shared_ptr<VectorType> VectorPtrType;
@@ -372,19 +372,15 @@ struct VectorAggregator {
372372
373373 void Start ()
374374 {
375- std::vector<AggregatorType*> aggregators;
376375 for (size_t i = 0 ; i < vecSize; i++) {
377376 std::vector<const T*> tmp (count);
378377 for (size_t j = 0 ; j < count; j++) {
379378 tmp[j] = &(*vecs[start + j])[i];
380379 }
381380
382- auto aggregator = new AggregatorType (std::move (tmp), 0 , count, parallel, workerPool, std::bind (&VectorAggregator::CheckDone, this , std::placeholders::_1, i));
383- // we can't directly start the aggregator here as it might be so fast that it deletes "this" while we are still in this loop
384- aggregators.emplace_back (aggregator);
385- }
386- for (auto agg : aggregators) {
387- agg->Start ();
381+ auto self (this ->shared_from_this ());
382+ auto aggregator = std::make_shared<AggregatorType>(std::move (tmp), 0 , count, parallel, workerPool, [self, i](const T& agg) {self->CheckDone (agg, i);});
383+ aggregator->Start ();
388384 }
389385 }
390386
@@ -393,14 +389,13 @@ struct VectorAggregator {
393389 (*result)[idx] = agg;
394390 if (++doneCount == vecSize) {
395391 doneCallback (result);
396- delete this ;
397392 }
398393 }
399394};
400395
401396// See comment of AsyncVerifyContributionShares for a description on what this does
402397// Same rules as in Aggregator apply for the inputs
403- struct ContributionVerifier {
398+ struct ContributionVerifier : public std ::enable_shared_from_this<ContributionVerifier> {
404399 struct BatchState {
405400 size_t start;
406401 size_t count;
@@ -493,16 +488,16 @@ struct ContributionVerifier {
493488 }
494489 }
495490 doneCallback (result);
496- delete this ;
497491 }
498492
499493 void AsyncAggregate (size_t batchIdx)
500494 {
501495 auto & batchState = batchStates[batchIdx];
502496
503497 // aggregate vvecs and skShares of batch in parallel
504- auto vvecAgg = new VectorAggregator<CBLSPublicKey>(vvecs, batchState.start , batchState.count , parallel, workerPool, std::bind (&ContributionVerifier::HandleAggVvecDone, this , batchIdx, std::placeholders::_1));
505- auto skShareAgg = new Aggregator<CBLSSecretKey>(skShares, batchState.start , batchState.count , parallel, workerPool, std::bind (&ContributionVerifier::HandleAggSkShareDone, this , batchIdx, std::placeholders::_1));
498+ auto self (this ->shared_from_this ());
499+ auto vvecAgg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, batchState.start , batchState.count , parallel, workerPool, [self, batchIdx] (const BLSVerificationVectorPtr& vvec) {self->HandleAggVvecDone (batchIdx, vvec);});
500+ auto skShareAgg = std::make_shared<Aggregator<CBLSSecretKey>>(skShares, batchState.start , batchState.count , parallel, workerPool, [self, batchIdx] (const CBLSSecretKey& skShare) {self->HandleAggSkShareDone (batchIdx, skShare);});
506501
507502 vvecAgg->Start ();
508503 skShareAgg->Start ();
@@ -550,17 +545,18 @@ struct ContributionVerifier {
550545
551546 void AsyncAggregatedVerifyBatch (size_t batchIdx)
552547 {
553- auto f = [this , batchIdx](int threadId) {
554- auto & batchState = batchStates[batchIdx];
555- bool result = Verify (batchState.vvec , batchState.skShare );
556- if (result) {
557- // whole batch is valid
558- batchState.verifyResults .assign (batchState.count , 1 );
559- HandleVerifyDone (batchIdx, batchState.count );
560- } else {
561- // at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized)
562- AsyncVerifyBatchOneByOne (batchIdx);
563- }
548+ auto self (shared_from_this ());
549+ auto f = [self, batchIdx](int threadId) {
550+ auto & batchState = self->batchStates [batchIdx];
551+ bool result = self->Verify (batchState.vvec , batchState.skShare );
552+ if (result) {
553+ // whole batch is valid
554+ batchState.verifyResults .assign (batchState.count , 1 );
555+ self->HandleVerifyDone (batchIdx, batchState.count );
556+ } else {
557+ // at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized)
558+ self->AsyncVerifyBatchOneByOne (batchIdx);
559+ }
564560 };
565561 PushOrDoWork (std::move (f));
566562 }
@@ -570,12 +566,12 @@ struct ContributionVerifier {
570566 size_t count = batchStates[batchIdx].count ;
571567 batchStates[batchIdx].verifyResults .assign (count, 0 );
572568 for (size_t i = 0 ; i < count; i++) {
573- auto f = [ this , i, batchIdx]( int threadId) {
574- auto & batchState = batchStates[ batchIdx];
575- batchState. verifyResults [i] = Verify (vvecs[batchState. start + i], skShares[batchState. start + i]) ;
576- HandleVerifyDone (batchIdx, 1 );
577- } ;
578- PushOrDoWork ( std::move (f) );
569+ auto self ( this -> shared_from_this ());
570+ PushOrDoWork ([self, i, batchIdx]( int threadId) {
571+ auto & batchState = self-> batchStates [batchIdx] ;
572+ batchState. verifyResults [i] = self-> Verify (self-> vvecs [batchState. start + i], self-> skShares [batchState. start + i] );
573+ self-> HandleVerifyDone (batchIdx, 1 ) ;
574+ } );
579575 }
580576 }
581577
@@ -617,7 +613,7 @@ void CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector<BLSVerific
617613 return ;
618614 }
619615
620- auto agg = new VectorAggregator<CBLSPublicKey>(vvecs, start, count, parallel, workerPool, std::move (doneCallback));
616+ auto agg = std::make_shared< VectorAggregator<CBLSPublicKey> >(vvecs, start, count, parallel, workerPool, std::move (doneCallback));
621617 agg->Start ();
622618}
623619
@@ -652,7 +648,7 @@ void AsyncAggregateHelper(ctpl::thread_pool& workerPool,
652648 return ;
653649 }
654650
655- auto agg = new Aggregator<T>(vec, start, count, parallel, workerPool, std::move (doneCallback));
651+ auto agg = std::make_shared< Aggregator<T> >(vec, start, count, parallel, workerPool, std::move (doneCallback));
656652 agg->Start ();
657653}
658654
@@ -737,7 +733,7 @@ void CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::v
737733 return ;
738734 }
739735
740- auto verifier = new ContributionVerifier (forId, vvecs, skShares, 8 , parallel, aggregated, workerPool, std::move (doneCallback));
736+ auto verifier = std::make_shared< ContributionVerifier> (forId, vvecs, skShares, 8 , parallel, aggregated, workerPool, std::move (doneCallback));
741737 verifier->Start ();
742738}
743739
0 commit comments