Skip to content

Commit c76e7b6

Browse files
committed
[Core] Fix data races in bls_worker and use ctpl_stl queue
Change ctpl implementation to use STL queue & mutex. Use ctpl synchronized queue instead of boost lockfree queue in bls worker aggregator. Use smart pointers for memory management of Aggregator and VectorAggregator. With 'delete this;' the objects are prone to data race on the delete operator. Use smart pointers for memory management of ContributionVerifier. Pass shared_ptr by value to other threads via worker pool.
1 parent 55babf4 commit c76e7b6

File tree

5 files changed

+109
-103
lines changed

5 files changed

+109
-103
lines changed

src/Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ BITCOIN_CORE_H = \
180180
core_io.h \
181181
cuckoocache.h \
182182
crypter.h \
183-
ctpl.h \
183+
ctpl_stl.h \
184184
cyclingvector.h \
185185
evo/deterministicmns.h \
186186
evo/evodb.h \

src/bls/bls_worker.cpp

Lines changed: 45 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& i
130130
// The input vector is not copied into the Aggregator but instead a vector of pointers to the original entries from the
131131
// input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator
132132
template <typename T>
133-
struct Aggregator {
133+
struct Aggregator : public std::enable_shared_from_this<Aggregator<T>> {
134134
typedef T ElementType;
135135

136136
size_t batchSize{16};
@@ -142,7 +142,7 @@ struct Aggregator {
142142
std::mutex m;
143143
// items in the queue are all intermediate aggregation results of finished batches.
144144
// The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue)
145-
boost::lockfree::queue<T*> aggQueue;
145+
ctpl::detail::Queue<T*> aggQueue;
146146
std::atomic<size_t> aggQueueSize{0};
147147

148148
typedef std::function<void(const T& agg)> DoneCallback;
@@ -160,7 +160,6 @@ struct Aggregator {
160160
DoneCallback _doneCallback) :
161161
workerPool(_workerPool),
162162
parallel(_parallel),
163-
aggQueue(0),
164163
doneCallback(std::move(_doneCallback))
165164
{
166165
inputVec = std::make_shared<std::vector<const T*> >(count);
@@ -184,19 +183,19 @@ struct Aggregator {
184183
} else {
185184
doneCallback(SyncAggregate(*inputVec, 0, inputVec->size()));
186185
}
187-
delete this;
188186
return;
189187
}
190188

191189
if (batchCount == 1) {
192190
// just a single batch of work, take a shortcut.
193-
PushWork([this](int threadId) {
194-
if (inputVec->size() == 1) {
195-
doneCallback(*(*inputVec)[0]);
196-
} else {
197-
doneCallback(SyncAggregate(*inputVec, 0, inputVec->size()));
198-
}
199-
delete this;
191+
auto self(this->shared_from_this());
192+
PushWork([self](int threadId) {
193+
size_t vecSize = self->inputVec->size();
194+
if (vecSize == 1) {
195+
self->doneCallback(*(*self->inputVec)[0]);
196+
} else {
197+
self->doneCallback(self->SyncAggregate(*self->inputVec, 0, vecSize));
198+
}
200199
});
201200
return;
202201
}
@@ -252,17 +251,18 @@ struct Aggregator {
252251
delete rem[i];
253252
}
254253
doneCallback(r);
255-
256-
delete this;
257254
}
258255

259-
void AsyncAggregateAndPushAggQueue(std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
256+
void AsyncAggregateAndPushAggQueue(const std::shared_ptr<std::vector<const T*>>& vec, size_t start, size_t count, bool del)
260257
{
261258
IncWait();
262-
PushWork(std::bind(&Aggregator::SyncAggregateAndPushAggQueue, this, vec, start, count, del));
259+
auto self(this->shared_from_this());
260+
PushWork([self, vec, start, count, del](int threadId){
261+
self->SyncAggregateAndPushAggQueue(vec, start, count, del);
262+
});
263263
}
264264

265-
void SyncAggregateAndPushAggQueue(std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
265+
void SyncAggregateAndPushAggQueue(const std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
266266
{
267267
// aggregate vec and push the intermediate result onto the work queue
268268
PushAggQueue(SyncAggregate(*vec, start, count));
@@ -333,7 +333,7 @@ struct Aggregator {
333333
// [ a1+a2+a3+a4, b1+b2+b3+b4, c1+c2+c3+c4, d1+d2+d3+d4]
334334
// Same rules for the input vectors apply to the VectorAggregator as for the Aggregator (they must stay alive)
335335
template <typename T>
336-
struct VectorAggregator {
336+
struct VectorAggregator : public std::enable_shared_from_this<VectorAggregator<T>> {
337337
typedef Aggregator<T> AggregatorType;
338338
typedef std::vector<T> VectorType;
339339
typedef std::shared_ptr<VectorType> VectorPtrType;
@@ -372,19 +372,15 @@ struct VectorAggregator {
372372

373373
void Start()
374374
{
375-
std::vector<AggregatorType*> aggregators;
376375
for (size_t i = 0; i < vecSize; i++) {
377376
std::vector<const T*> tmp(count);
378377
for (size_t j = 0; j < count; j++) {
379378
tmp[j] = &(*vecs[start + j])[i];
380379
}
381380

382-
auto aggregator = new AggregatorType(std::move(tmp), 0, count, parallel, workerPool, std::bind(&VectorAggregator::CheckDone, this, std::placeholders::_1, i));
383-
// we can't directly start the aggregator here as it might be so fast that it deletes "this" while we are still in this loop
384-
aggregators.emplace_back(aggregator);
385-
}
386-
for (auto agg : aggregators) {
387-
agg->Start();
381+
auto self(this->shared_from_this());
382+
auto aggregator = std::make_shared<AggregatorType>(std::move(tmp), 0, count, parallel, workerPool, [self, i](const T& agg) {self->CheckDone(agg, i);});
383+
aggregator->Start();
388384
}
389385
}
390386

@@ -393,14 +389,13 @@ struct VectorAggregator {
393389
(*result)[idx] = agg;
394390
if (++doneCount == vecSize) {
395391
doneCallback(result);
396-
delete this;
397392
}
398393
}
399394
};
400395

401396
// See comment of AsyncVerifyContributionShares for a description on what this does
402397
// Same rules as in Aggregator apply for the inputs
403-
struct ContributionVerifier {
398+
struct ContributionVerifier : public std::enable_shared_from_this<ContributionVerifier> {
404399
struct BatchState {
405400
size_t start;
406401
size_t count;
@@ -493,16 +488,16 @@ struct ContributionVerifier {
493488
}
494489
}
495490
doneCallback(result);
496-
delete this;
497491
}
498492

499493
void AsyncAggregate(size_t batchIdx)
500494
{
501495
auto& batchState = batchStates[batchIdx];
502496

503497
// aggregate vvecs and skShares of batch in parallel
504-
auto vvecAgg = new VectorAggregator<CBLSPublicKey>(vvecs, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggVvecDone, this, batchIdx, std::placeholders::_1));
505-
auto skShareAgg = new Aggregator<CBLSSecretKey>(skShares, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggSkShareDone, this, batchIdx, std::placeholders::_1));
498+
auto self(this->shared_from_this());
499+
auto vvecAgg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, batchState.start, batchState.count, parallel, workerPool, [self, batchIdx] (const BLSVerificationVectorPtr& vvec) {self->HandleAggVvecDone(batchIdx, vvec);});
500+
auto skShareAgg = std::make_shared<Aggregator<CBLSSecretKey>>(skShares, batchState.start, batchState.count, parallel, workerPool, [self, batchIdx] (const CBLSSecretKey& skShare) {self->HandleAggSkShareDone(batchIdx, skShare);});
506501

507502
vvecAgg->Start();
508503
skShareAgg->Start();
@@ -550,17 +545,18 @@ struct ContributionVerifier {
550545

551546
void AsyncAggregatedVerifyBatch(size_t batchIdx)
552547
{
553-
auto f = [this, batchIdx](int threadId) {
554-
auto& batchState = batchStates[batchIdx];
555-
bool result = Verify(batchState.vvec, batchState.skShare);
556-
if (result) {
557-
// whole batch is valid
558-
batchState.verifyResults.assign(batchState.count, 1);
559-
HandleVerifyDone(batchIdx, batchState.count);
560-
} else {
561-
// at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized)
562-
AsyncVerifyBatchOneByOne(batchIdx);
563-
}
548+
auto self(shared_from_this());
549+
auto f = [self, batchIdx](int threadId) {
550+
auto& batchState = self->batchStates[batchIdx];
551+
bool result = self->Verify(batchState.vvec, batchState.skShare);
552+
if (result) {
553+
// whole batch is valid
554+
batchState.verifyResults.assign(batchState.count, 1);
555+
self->HandleVerifyDone(batchIdx, batchState.count);
556+
} else {
557+
// at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized)
558+
self->AsyncVerifyBatchOneByOne(batchIdx);
559+
}
564560
};
565561
PushOrDoWork(std::move(f));
566562
}
@@ -570,12 +566,12 @@ struct ContributionVerifier {
570566
size_t count = batchStates[batchIdx].count;
571567
batchStates[batchIdx].verifyResults.assign(count, 0);
572568
for (size_t i = 0; i < count; i++) {
573-
auto f = [this, i, batchIdx](int threadId) {
574-
auto& batchState = batchStates[batchIdx];
575-
batchState.verifyResults[i] = Verify(vvecs[batchState.start + i], skShares[batchState.start + i]);
576-
HandleVerifyDone(batchIdx, 1);
577-
};
578-
PushOrDoWork(std::move(f));
569+
auto self(this->shared_from_this());
570+
PushOrDoWork([self, i, batchIdx](int threadId) {
571+
auto& batchState = self->batchStates[batchIdx];
572+
batchState.verifyResults[i] = self->Verify(self->vvecs[batchState.start + i], self->skShares[batchState.start + i]);
573+
self->HandleVerifyDone(batchIdx, 1);
574+
});
579575
}
580576
}
581577

@@ -617,7 +613,7 @@ void CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector<BLSVerific
617613
return;
618614
}
619615

620-
auto agg = new VectorAggregator<CBLSPublicKey>(vvecs, start, count, parallel, workerPool, std::move(doneCallback));
616+
auto agg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, start, count, parallel, workerPool, std::move(doneCallback));
621617
agg->Start();
622618
}
623619

@@ -652,7 +648,7 @@ void AsyncAggregateHelper(ctpl::thread_pool& workerPool,
652648
return;
653649
}
654650

655-
auto agg = new Aggregator<T>(vec, start, count, parallel, workerPool, std::move(doneCallback));
651+
auto agg = std::make_shared<Aggregator<T>>(vec, start, count, parallel, workerPool, std::move(doneCallback));
656652
agg->Start();
657653
}
658654

@@ -737,7 +733,7 @@ void CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::v
737733
return;
738734
}
739735

740-
auto verifier = new ContributionVerifier(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback));
736+
auto verifier = std::make_shared<ContributionVerifier>(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback));
741737
verifier->Start();
742738
}
743739

src/bls/bls_worker.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@
77
#define PIVX_CRYPTO_BLS_WORKER_H
88

99
#include "bls/bls_wrapper.h"
10-
#include "ctpl.h"
10+
#include "ctpl_stl.h"
1111

1212
#include <future>
1313
#include <mutex>
1414

15-
#include <boost/lockfree/queue.hpp>
16-
1715
// Low level BLS/DKG stuff. All very compute intensive and optimized for parallelization
1816
// The worker tries to parallelize as much as possible and utilizes a few properties of BLS aggregation to speed up things
1917
// For example, public key vectors can be aggregated in parallel if they are split into batches and the batched aggregations are

0 commit comments

Comments
 (0)