Skip to content

Commit 88bd1f0

Browse files
mxxojblomer
authored andcommitted
[ntuple] Start compressing pages in CommitPage
The previous parallel zip implementation only started compressing pages when CommitCluster was called. By starting compression as soon as a page is buffered, we can further improve the overall write performance. The new implementation uses a linked list (std::list) instead of a vector to hold buffered pages. When we buffer a page, we pass an element iterator to the task scheduler to be compressed right away. The linked list guarantees that these element iterators remain valid when pages are buffered afterwards (unlike a vector, which might need to reallocate). One potential issue that when CommitCluster is called, we have to traverse the linked list, which is much more expensive than traversing a vector. Our hope is that performance gains from amortizing compression greatly outweighs the increased traversal cost, especially since there are not expected to be millions of pages per column in a single cluster. Another problem is that the single-threaded case pays this increased cost without any gains. In all measurements however, compression time remains the bottlneck.
1 parent 175cb85 commit 88bd1f0

File tree

2 files changed

+65
-72
lines changed

2 files changed

+65
-72
lines changed

tree/ntuple/v7/inc/ROOT/RPageSinkBuf.hxx

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include <ROOT/RPageStorage.hxx>
2121

22+
#include <iterator>
23+
#include <list>
2224
#include <memory>
2325

2426
namespace ROOT {
@@ -37,21 +39,45 @@ private:
3739
/// A buffered column. The column is not responsible for RPage memory management (i.e.
3840
/// ReservePage/ReleasePage), which is handled by the enclosing RPageSinkBuf.
3941
class RColumnBuf {
40-
private:
41-
std::pair<RPageStorage::ColumnHandle_t, std::vector<RPage>> fBuf;
4242
public:
43-
void BufferPage(RPageStorage::ColumnHandle_t columnHandle, const RPage &page) {
44-
if (!fBuf.first) {
45-
fBuf.first = columnHandle;
43+
struct RPageZipItem {
44+
RPage fPage;
45+
// Compression scratch buffer for fSealedPage.
46+
std::unique_ptr<unsigned char[]> fBuf;
47+
RPageStorage::RSealedPage fSealedPage;
48+
explicit RPageZipItem(RPage page)
49+
: fPage(page), fBuf(nullptr) {}
50+
bool IsSealed() {
51+
return fSealedPage.fBuffer != nullptr;
52+
}
53+
void AllocateSealedPageBuf() {
54+
fBuf = std::make_unique<unsigned char[]>(fPage.GetSize());
55+
}
56+
};
57+
/// Returns an iterator to the newly buffered page. The iterator remains
58+
/// valid until the return value of DrainBufferedPages() is destroyed.
59+
std::list<RPageZipItem>::iterator BufferPage(
60+
RPageStorage::ColumnHandle_t columnHandle, const RPage &page)
61+
{
62+
if (!fCol) {
63+
fCol = columnHandle;
4664
}
47-
fBuf.second.push_back(page);
65+
fBufferedPages.push_back(RPageZipItem(page));
66+
return std::prev(fBufferedPages.end());
4867
}
49-
const RPageStorage::ColumnHandle_t &GetHandle() const { return fBuf.first; }
50-
std::vector<RPage> DrainBufferedPages() {
51-
std::vector<RPage> drained;
52-
std::swap(fBuf.second, drained);
68+
const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
69+
// When the return value of DrainBufferedPages() is destroyed, all iterators
70+
// returned by GetBuffer are invalidated.
71+
std::list<RPageZipItem> DrainBufferedPages() {
72+
std::list<RPageZipItem> drained;
73+
std::swap(fBufferedPages, drained);
5374
return drained;
5475
}
76+
private:
77+
RPageStorage::ColumnHandle_t fCol;
78+
// Using a linked list guarantees that references to list elements are
79+
// never invalidated by appends in BufferPage.
80+
std::list<RPageZipItem> fBufferedPages;
5581
};
5682

5783
private:
@@ -63,9 +89,6 @@ private:
6389
/// Vector of buffered column pages. Indexed by column id.
6490
std::vector<RColumnBuf> fBufferedColumns;
6591

66-
/// Compress and commit buffered cluster pages in parallel.
67-
void ParallelClusterZip(NTupleSize_t nEntries);
68-
6992
protected:
7093
void CreateImpl(const RNTupleModel &model) final;
7194
RClusterDescriptor::RLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final;

tree/ntuple/v7/src/RPageSinkBuf.cxx

Lines changed: 29 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,27 @@ ROOT::Experimental::Detail::RPageSinkBuf::CommitPageImpl(ColumnHandle_t columnHa
3737
// make sure the page is aware of how many elements it will have
3838
R__ASSERT(bufPage.TryGrow(page.GetNElements()));
3939
memcpy(bufPage.GetBuffer(), page.GetBuffer(), page.GetSize());
40-
fBufferedColumns.at(columnHandle.fId).BufferPage(columnHandle, bufPage);
40+
std::list<RColumnBuf::RPageZipItem>::iterator zipItem =
41+
fBufferedColumns.at(columnHandle.fId).BufferPage(columnHandle, bufPage);
42+
if (!fTaskScheduler) {
43+
return RClusterDescriptor::RLocator{};
44+
}
45+
// Safety: std::list<T>::iterators are guaranteed to be valid until the
46+
// element is destroyed. In other words, all buffered page iterators are
47+
// valid until the return value of DrainBufferedPages() goes out of scope in
48+
// CommitCluster().
49+
//
50+
// Thread safety: Each thread works on a distinct zipItem which owns its
51+
// compression buffer.
52+
zipItem->AllocateSealedPageBuf();
53+
R__ASSERT(zipItem->fBuf);
54+
fTaskScheduler->AddTask([this, zipItem, colId = columnHandle.fId] {
55+
zipItem->fSealedPage = SealPage(zipItem->fPage,
56+
*fBufferedColumns.at(colId).GetHandle().fColumn->GetElement(),
57+
fOptions.GetCompression(), zipItem->fBuf.get()
58+
);
59+
});
60+
4161
// we're feeding bad locators to fOpenPageRanges but it should not matter
4262
// because they never get written out
4363
return RClusterDescriptor::RLocator{};
@@ -57,14 +77,18 @@ ROOT::Experimental::RClusterDescriptor::RLocator
5777
ROOT::Experimental::Detail::RPageSinkBuf::CommitClusterImpl(ROOT::Experimental::NTupleSize_t nEntries)
5878
{
5979
if (fTaskScheduler) {
60-
ParallelClusterZip(nEntries);
61-
return RClusterDescriptor::RLocator{};
80+
fTaskScheduler->Wait();
81+
fTaskScheduler->Reset();
6282
}
6383

6484
for (auto &bufColumn : fBufferedColumns) {
6585
for (auto &bufPage : bufColumn.DrainBufferedPages()) {
66-
fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage);
67-
ReleasePage(bufPage);
86+
if (bufPage.IsSealed()) {
87+
fInnerSink->CommitSealedPage(bufColumn.GetHandle().fId, bufPage.fSealedPage);
88+
} else {
89+
fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage.fPage);
90+
}
91+
ReleasePage(bufPage.fPage);
6892
}
6993
}
7094
fInnerSink->CommitCluster(nEntries);
@@ -88,57 +112,3 @@ void ROOT::Experimental::Detail::RPageSinkBuf::ReleasePage(RPage &page)
88112
{
89113
fInnerSink->ReleasePage(page);
90114
}
91-
92-
namespace {
93-
using namespace ROOT::Experimental::Detail;
94-
struct RPageZipItem {
95-
RPage fPage;
96-
// Compression scratch buffer for fSealedPage.
97-
std::unique_ptr<unsigned char[]> fBuf;
98-
RPageStorage::RSealedPage fSealedPage;
99-
explicit RPageZipItem(RPage page)
100-
: fPage(page), fBuf(std::make_unique<unsigned char[]>(fPage.GetSize())) {}
101-
};
102-
} // anonymous namespace
103-
104-
void ROOT::Experimental::Detail::RPageSinkBuf::ParallelClusterZip(
105-
ROOT::Experimental::NTupleSize_t nEntries)
106-
{
107-
// TODO(max) add timers like in RPageSourceFile::UnzipClusterImpl
108-
R__ASSERT(fTaskScheduler);
109-
fTaskScheduler->Reset();
110-
// zipItems[nColumns][nColumnPages]
111-
std::vector<std::vector<RPageZipItem>> zipItems;
112-
for (auto &col : fBufferedColumns) {
113-
std::vector<RPageZipItem> zipCol;
114-
for (const auto &page : col.DrainBufferedPages()) {
115-
zipCol.push_back(RPageZipItem(page));
116-
}
117-
zipItems.push_back(std::move(zipCol));
118-
}
119-
// Thread safety: Each task works on a distinct RPageZipItem `zi`.
120-
// Task (i,j) seals RPage (i,j) -- zi.fPage -- using the scratch buffer
121-
// zi.fBuf.
122-
for (std::size_t i = 0; i < fBufferedColumns.size(); i++) {
123-
for (std::size_t j = 0; j < zipItems.at(i).size(); j++) {
124-
fTaskScheduler->AddTask([this, &zipItems, i, j] {
125-
auto &zi = zipItems.at(i).at(j);
126-
zi.fSealedPage = SealPage(zi.fPage,
127-
*fBufferedColumns.at(i).GetHandle().fColumn->GetElement(),
128-
fOptions.GetCompression(), zi.fBuf.get()
129-
);
130-
});
131-
}
132-
}
133-
fTaskScheduler->Wait();
134-
135-
for (std::size_t i = 0; i < fBufferedColumns.size(); i++) {
136-
for (auto &zi : zipItems.at(i)) {
137-
fInnerSink->CommitSealedPage(
138-
fBufferedColumns.at(i).GetHandle().fId, zi.fSealedPage
139-
);
140-
ReleasePage(zi.fPage);
141-
}
142-
}
143-
fInnerSink->CommitCluster(nEntries);
144-
}

0 commit comments

Comments
 (0)