Skip to content

Commit e052b58

Browse files
committed
[ntuple] RPageSinkBuf: Always seal before CommitCluster
Without a task scheduler, seal in CommitPage. This avoids a page allocation and, in most cases, a copy of the uncompressed buffer (unless the element type is mappable and compression is disabled, in which case the sealed page would otherwise alias the page buffer). It also ensures that CommitCluster can vector-commit all sealed pages.
1 parent a100334 commit e052b58

File tree

5 files changed

+39
-35
lines changed

5 files changed

+39
-35
lines changed

tree/ntuple/v7/inc/ROOT/RPageSinkBuf.hxx

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ private:
5151
// Compression scratch buffer for fSealedPage.
5252
std::unique_ptr<unsigned char[]> fBuf;
5353
RPageStorage::RSealedPage *fSealedPage = nullptr;
54-
explicit RPageZipItem(RPage page)
55-
: fPage(page), fBuf(nullptr) {}
5654
bool IsSealed() const { return fSealedPage != nullptr; }
57-
void AllocateSealedPageBuf() { fBuf = std::unique_ptr<unsigned char[]>(new unsigned char[fPage.GetNBytes()]); }
55+
void AllocateSealedPageBuf(std::size_t nBytes)
56+
{
57+
fBuf = std::unique_ptr<unsigned char[]>(new unsigned char[nBytes]);
58+
}
5859
};
5960
public:
6061
RColumnBuf() = default;
@@ -66,17 +67,14 @@ private:
6667

6768
/// Returns a reference to the newly buffered page. The reference remains
6869
/// valid until the return value of DrainBufferedPages() is destroyed.
69-
/// Note that `BufferPage()` yields the ownership of `page` to RColumnBuf.
70-
RPageZipItem &BufferPage(
71-
RPageStorage::ColumnHandle_t columnHandle, const RPage &page)
70+
RPageZipItem &BufferPage(RPageStorage::ColumnHandle_t columnHandle)
7271
{
7372
if (!fCol) {
7473
fCol = columnHandle;
7574
}
7675
// Safety: Insertion at the end of a deque never invalidates references
7776
// to existing elements.
78-
fBufferedPages.push_back(RPageZipItem(page));
79-
return fBufferedPages.back();
77+
return fBufferedPages.emplace_back();
8078
}
8179
const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
8280
bool IsEmpty() const { return fBufferedPages.empty(); }

tree/ntuple/v7/inc/ROOT/RPageStorage.hxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ protected:
197197
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting);
198198

199199
/// Seal a page using the provided buffer.
200-
static RSealedPage SealPage(const RPage &page, const RColumnElementBase &element,
201-
int compressionSetting, void *buf);
200+
static RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting, void *buf,
201+
bool allowAlias = true);
202202

203203
public:
204204
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options);

tree/ntuple/v7/src/RPageSinkBuf.cxx

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
void ROOT::Experimental::Detail::RPageSinkBuf::RColumnBuf::DropBufferedPages()
2727
{
2828
for (auto &bufPage : fBufferedPages) {
29-
fCol.fColumn->GetPageSink()->ReleasePage(bufPage.fPage);
29+
if (!bufPage.fPage.IsNull()) {
30+
fCol.fColumn->GetPageSink()->ReleasePage(bufPage.fPage);
31+
}
3032
}
3133
fBufferedPages.clear();
3234
// Each RSealedPage points to the same region as `fBuf` for some element in `fBufferedPages`; thus, no further
@@ -123,28 +125,37 @@ void ROOT::Experimental::Detail::RPageSinkBuf::UpdateSchema(const RNTupleModelCh
123125

124126
void ROOT::Experimental::Detail::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const RPage &page)
125127
{
126-
// TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
127-
RPage bufPage = ReservePage(columnHandle, page.GetNElements());
128-
// make sure the page is aware of how many elements it will have
129-
bufPage.GrowUnchecked(page.GetNElements());
130-
memcpy(bufPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
128+
auto colId = columnHandle.fPhysicalId;
129+
const auto &element = *columnHandle.fColumn->GetElement();
130+
131131
// Safety: References are guaranteed to be valid until the
132132
// element is destroyed. In other words, all buffered page elements are
133133
// valid until the return value of DrainBufferedPages() goes out of scope in
134134
// CommitCluster().
135-
auto &zipItem = fBufferedColumns.at(columnHandle.fPhysicalId).BufferPage(columnHandle, bufPage);
135+
auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
136+
zipItem.AllocateSealedPageBuf(page.GetNBytes());
137+
R__ASSERT(zipItem.fBuf);
138+
auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
139+
136140
if (!fTaskScheduler) {
141+
// Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
142+
sealedPage =
143+
SealPage(page, element, GetWriteOptions().GetCompression(), zipItem.fBuf.get(), /*allowAlias=*/false);
144+
zipItem.fSealedPage = &sealedPage;
137145
return;
138146
}
147+
148+
// TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
149+
zipItem.fPage = ReservePage(columnHandle, page.GetNElements());
150+
// make sure the page is aware of how many elements it will have
151+
zipItem.fPage.GrowUnchecked(page.GetNElements());
152+
memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
153+
139154
fCounters->fParallelZip.SetValue(1);
140155
// Thread safety: Each thread works on a distinct zipItem which owns its
141156
// compression buffer.
142-
zipItem.AllocateSealedPageBuf();
143-
R__ASSERT(zipItem.fBuf);
144-
auto &sealedPage = fBufferedColumns.at(columnHandle.fPhysicalId).RegisterSealedPage();
145-
fTaskScheduler->AddTask([this, &zipItem, &sealedPage, colId = columnHandle.fPhysicalId] {
146-
sealedPage = SealPage(zipItem.fPage, *fBufferedColumns.at(colId).GetHandle().fColumn->GetElement(),
147-
GetWriteOptions().GetCompression(), zipItem.fBuf.get());
157+
fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element] {
158+
sealedPage = SealPage(zipItem.fPage, element, GetWriteOptions().GetCompression(), zipItem.fBuf.get());
148159
zipItem.fSealedPage = &sealedPage;
149160
});
150161
}

tree/ntuple/v7/src/RPageStorage.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,8 @@ std::unique_ptr<ROOT::Experimental::Detail::RPageSink> ROOT::Experimental::Detai
350350
}
351351

352352
ROOT::Experimental::Detail::RPageStorage::RSealedPage
353-
ROOT::Experimental::Detail::RPageSink::SealPage(const RPage &page,
354-
const RColumnElementBase &element, int compressionSetting, void *buf)
353+
ROOT::Experimental::Detail::RPageSink::SealPage(const RPage &page, const RColumnElementBase &element,
354+
int compressionSetting, void *buf, bool allowAlias)
355355
{
356356
unsigned char *pageBuf = reinterpret_cast<unsigned char *>(page.GetBuffer());
357357
bool isAdoptedBuffer = true;
@@ -365,7 +365,7 @@ ROOT::Experimental::Detail::RPageSink::SealPage(const RPage &page,
365365
}
366366
auto zippedBytes = packedBytes;
367367

368-
if ((compressionSetting != 0) || !element.IsMappable()) {
368+
if ((compressionSetting != 0) || !element.IsMappable() || !allowAlias) {
369369
zippedBytes = RNTupleCompressor::Zip(pageBuf, packedBytes, compressionSetting, buf);
370370
if (!isAdoptedBuffer)
371371
delete[] pageBuf;

tree/ntuple/v7/test/ntuple_storage.cxx

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -451,10 +451,10 @@ TEST(RPageSinkBuf, CommitSealedPageV)
451451
ntuple->Fill();
452452
ntuple->Fill();
453453
ntuple->CommitCluster();
454-
// Parallel zip not available; all pages committed separately
455-
EXPECT_EQ(5, counters.fNCommitPage);
454+
// Parallel zip not available, but pages are still vector-commited
455+
EXPECT_EQ(0, counters.fNCommitPage);
456456
EXPECT_EQ(0, counters.fNCommitSealedPage);
457-
EXPECT_EQ(0, counters.fNCommitSealedPageV);
457+
EXPECT_EQ(1, counters.fNCommitSealedPageV);
458458
}
459459
#ifdef R__USE_IMT
460460
ROOT::EnableImplicitMT();
@@ -471,15 +471,10 @@ TEST(RPageSinkBuf, CommitSealedPageV)
471471
ntuple->Fill();
472472
ntuple->Fill();
473473
ntuple->CommitCluster();
474-
#ifdef R__USE_IMT
475474
// All pages in all columns committed via a single call to `CommitSealedPageV()`
476475
EXPECT_EQ(0, counters.fNCommitPage);
477-
EXPECT_EQ(1, counters.fNCommitSealedPageV);
478-
#else
479-
EXPECT_EQ(3, counters.fNCommitPage);
480-
EXPECT_EQ(0, counters.fNCommitSealedPageV);
481-
#endif
482476
EXPECT_EQ(0, counters.fNCommitSealedPage);
477+
EXPECT_EQ(1, counters.fNCommitSealedPageV);
483478
}
484479
}
485480

0 commit comments

Comments
 (0)