Skip to content

Commit 88530f4

Browse files
authored
Merge branch 'release-7.1' into zhewu/fix-batch-peek-pop-race
2 parents 2f25e2e + 1304602 commit 88530f4

File tree

1 file changed

+20
-18
lines changed

1 file changed

+20
-18
lines changed

fdbserver/DiskQueue.actor.cpp

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,27 +55,27 @@ struct StringBuffer {
5555
StringBuffer(UID fromFileID) : reserved(0), id(fromFileID) {}
5656

5757
int size() const { return str.size(); }
58-
StringRef& ref() { return str; }
58+
Standalone<StringRef> get() { return str; }
5959
void clear() {
6060
str = Standalone<StringRef>();
6161
reserved = 0;
6262
}
6363
void clearReserve(int size) {
6464
str = Standalone<StringRef>();
6565
reserved = size;
66-
ref() = StringRef(new (str.arena()) uint8_t[size], 0);
66+
str.contents() = StringRef(new (str.arena()) uint8_t[size], 0);
6767
}
6868
void append(StringRef x) { memcpy(append(x.size()), x.begin(), x.size()); }
6969
void* append(int bytes) {
7070
ASSERT(str.size() + bytes <= reserved);
7171
void* p = const_cast<uint8_t*>(str.end());
72-
ref() = StringRef(str.begin(), str.size() + bytes);
72+
str.contents() = StringRef(str.begin(), str.size() + bytes);
7373
return p;
7474
}
7575
StringRef pop_front(int bytes) {
7676
ASSERT(bytes <= str.size());
7777
StringRef result = str.substr(0, bytes);
78-
ref() = str.substr(bytes);
78+
str.contents() = str.substr(bytes);
7979
return result;
8080
}
8181
void alignReserve(int alignment, int size) {
@@ -101,7 +101,7 @@ struct StringBuffer {
101101
if (str.size() > 0) {
102102
memcpy(p, str.begin(), str.size());
103103
}
104-
ref() = StringRef(p, str.size());
104+
str.contents() = StringRef(p, str.size());
105105
}
106106
}
107107
};
@@ -196,7 +196,7 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
196196
stallCount.init(LiteralStringRef("RawDiskQueue.StallCount"));
197197
}
198198

199-
Future<Void> pushAndCommit(StringRef pageData, StringBuffer* pageMem, uint64_t poppedPages) {
199+
Future<Void> pushAndCommit(Standalone<StringRef> pageData, StringBuffer* pageMem, uint64_t poppedPages) {
200200
return pushAndCommit(this, pageData, pageMem, poppedPages);
201201
}
202202

@@ -332,13 +332,13 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
332332
}
333333
#endif
334334

335-
Future<Future<Void>> push(StringRef pageData, std::vector<Reference<SyncQueue>>* toSync) {
335+
Future<Future<Void>> push(Standalone<StringRef> pageData, std::vector<Reference<SyncQueue>>* toSync) {
336336
return push(this, pageData, toSync);
337337
}
338338

339-
ACTOR static Future<Future<Void>> push(RawDiskQueue_TwoFiles* self,
340-
StringRef pageData,
341-
std::vector<Reference<SyncQueue>>* toSync) {
339+
ACTOR static UNCANCELLABLE Future<Future<Void>> push(RawDiskQueue_TwoFiles* self,
340+
Standalone<StringRef> pageData,
341+
std::vector<Reference<SyncQueue>>* toSync) {
342342
// Write the given data (pageData) to the queue files, swapping or extending them if necessary.
343343
// Don't do any syncs, but push the modified file(s) onto toSync.
344344
ASSERT(self->readingFile == 2);
@@ -357,8 +357,9 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
357357
toSync->push_back(self->files[1].syncQueue);
358358
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
359359
.detail("WritingPos", self->writingPos).detail("WritingBytes", p);*/
360-
waitfor.push_back(self->files[1].f->write(pageData.begin(), p, self->writingPos));
361-
pageData = pageData.substr(p);
360+
waitfor.push_back(uncancellable(
361+
holdWhile(pageData, self->files[1].f->write(pageData.begin(), p, self->writingPos))));
362+
pageData.contents() = pageData.substr(p);
362363
}
363364

364365
self->dbg_file0BeginSeq += self->files[0].size;
@@ -426,7 +427,8 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
426427
.detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());*/
427428
self->files[1].size = std::max(self->files[1].size, self->writingPos + pageData.size());
428429
toSync->push_back(self->files[1].syncQueue);
429-
waitfor.push_back(self->files[1].f->write(pageData.begin(), pageData.size(), self->writingPos));
430+
waitfor.push_back(uncancellable(
431+
holdWhile(pageData, self->files[1].f->write(pageData.begin(), pageData.size(), self->writingPos))));
430432
self->writingPos += pageData.size();
431433

432434
return waitForAllReadyThenThrow(waitfor);
@@ -435,7 +437,7 @@ class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
435437
// Write the given data (pageData) to the queue files of self, sync data to disk, and delete the memory (pageMem)
436438
// that hold the pageData
437439
ACTOR static UNCANCELLABLE Future<Void> pushAndCommit(RawDiskQueue_TwoFiles* self,
438-
StringRef pageData,
440+
Standalone<StringRef> pageData,
439441
StringBuffer* pageMem,
440442
uint64_t poppedPages) {
441443
state Promise<Void> pushing, committed;
@@ -983,7 +985,7 @@ class DiskQueue final : public IDiskQueue, public Tracked<DiskQueue> {
983985

984986
lastCommittedSeq = backPage().endSeq();
985987
auto f = rawQueue->pushAndCommit(
986-
pushed_page_buffer->ref(), pushed_page_buffer, poppedSeq / sizeof(Page) - lastPoppedSeq / sizeof(Page));
988+
pushed_page_buffer->get(), pushed_page_buffer, poppedSeq / sizeof(Page) - lastPoppedSeq / sizeof(Page));
987989
lastPoppedSeq = poppedSeq;
988990
pushed_page_buffer = 0;
989991
return f;
@@ -1179,7 +1181,7 @@ class DiskQueue final : public IDiskQueue, public Tracked<DiskQueue> {
11791181
Standalone<StringRef> pagedData = wait(readPages(self, start, end));
11801182
const int startOffset = start % _PAGE_SIZE;
11811183
const int dataLen = end - start;
1182-
ASSERT(pagedData.substr(startOffset, dataLen).compare(buffer->ref().substr(0, dataLen)) == 0);
1184+
ASSERT(pagedData.substr(startOffset, dataLen).compare(buffer->get().substr(0, dataLen)) == 0);
11831185
} catch (Error& e) {
11841186
if (e.code() != error_code_io_error) {
11851187
delete buffer;
@@ -1546,9 +1548,9 @@ class DiskQueue final : public IDiskQueue, public Tracked<DiskQueue> {
15461548
StringBuffer* pushed_page_buffer;
15471549
Page& backPage() {
15481550
ASSERT(pushedPageCount());
1549-
return ((Page*)pushed_page_buffer->ref().end())[-1];
1551+
return ((Page*)pushed_page_buffer->get().end())[-1];
15501552
}
1551-
Page const& backPage() const { return ((Page*)pushed_page_buffer->ref().end())[-1]; }
1553+
Page const& backPage() const { return ((Page*)pushed_page_buffer->get().end())[-1]; }
15521554
int pushedPageCount() const { return pushed_page_buffer ? pushed_page_buffer->size() / sizeof(Page) : 0; }
15531555

15541556
// Recovery state

0 commit comments

Comments
 (0)