Skip to content

Commit 1ea8447

Browse files
mxxojblomer
authored andcommitted
[ntuple] Add parallel cluster page compression
Implement basic fork-join cluster page compression that is used if the page sink has a task scheduler. This is the write-time analogue to parallel page decompression implemented in #6106. The current implementation only starts compression when CommitCluster is called, but this leaves potential performance improvements on the table because per-page compression could begin as soon as CommitPage is called. There are also memory-fragmentation concerns because we're allocating scratch space for every page. Instead, we could allocate a large buffer up front and divvy it up between the different pages.
1 parent 0a9b8e4 commit 1ea8447

File tree

5 files changed

+114
-0
lines changed

5 files changed

+114
-0
lines changed

tree/ntuple/v7/inc/ROOT/RNTuple.hxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ triggered by Flush() or by destructing the ntuple. On I/O errors, an exception
247247
// clang-format on
248248
class RNTupleWriter {
249249
private:
250+
/// The page sink's parallel page compression scheduler if IMT is on.
251+
/// Needs to be destructed after the page sink is destructed and so declared before.
252+
std::unique_ptr<Detail::RPageStorage::RTaskScheduler> fZipTasks;
250253
std::unique_ptr<Detail::RPageSink> fSink;
251254
/// Needs to be destructed before fSink
252255
std::unique_ptr<RNTupleModel> fModel;

tree/ntuple/v7/inc/ROOT/RPageSinkBuf.hxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ private:
6363
/// Vector of buffered column pages. Indexed by column id.
6464
std::vector<RColumnBuf> fBufferedColumns;
6565

66+
/// Compress and commit buffered cluster pages in parallel.
67+
void ParallelClusterZip(NTupleSize_t nEntries);
68+
6669
protected:
6770
void CreateImpl(const RNTupleModel &model) final;
6871
RClusterDescriptor::RLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final;

tree/ntuple/v7/src/RNTuple.cxx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,12 @@ ROOT::Experimental::RNTupleWriter::RNTupleWriter(
281281
if (!fSink) {
282282
throw RException(R__FAIL("null sink"));
283283
}
284+
#ifdef R__USE_IMT
285+
if (IsImplicitMTEnabled()) {
286+
fZipTasks = std::make_unique<RNTupleImtTaskScheduler>();
287+
fSink->SetTaskScheduler(fZipTasks.get());
288+
}
289+
#endif
284290
fSink->Create(*fModel.get());
285291
fMetrics.ObserveMetrics(fSink->GetMetrics());
286292
}

tree/ntuple/v7/src/RPageSinkBuf.cxx

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <ROOT/RNTupleOptions.hxx>
1818
#include <ROOT/RNTupleModel.hxx>
19+
#include <ROOT/RNTupleZip.hxx>
1920
#include <ROOT/RPageSinkBuf.hxx>
2021

2122
ROOT::Experimental::Detail::RPageSinkBuf::RPageSinkBuf(std::unique_ptr<RPageSink> inner)
@@ -55,6 +56,11 @@ ROOT::Experimental::Detail::RPageSinkBuf::CommitSealedPageImpl(
5556
ROOT::Experimental::RClusterDescriptor::RLocator
5657
ROOT::Experimental::Detail::RPageSinkBuf::CommitClusterImpl(ROOT::Experimental::NTupleSize_t nEntries)
5758
{
59+
if (fTaskScheduler) {
60+
ParallelClusterZip(nEntries);
61+
return RClusterDescriptor::RLocator{};
62+
}
63+
5864
for (auto &bufColumn : fBufferedColumns) {
5965
for (auto &bufPage : bufColumn.DrainBufferedPages()) {
6066
fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage);
@@ -82,3 +88,57 @@ void ROOT::Experimental::Detail::RPageSinkBuf::ReleasePage(RPage &page)
8288
{
8389
fInnerSink->ReleasePage(page);
8490
}
91+
92+
namespace {
93+
using namespace ROOT::Experimental::Detail;
94+
struct RPageZipItem {
95+
RPage fPage;
96+
// Compression scratch buffer for fSealedPage.
97+
std::unique_ptr<unsigned char[]> fBuf;
98+
RPageStorage::RSealedPage fSealedPage;
99+
explicit RPageZipItem(RPage page)
100+
: fPage(page), fBuf(std::make_unique<unsigned char[]>(fPage.GetSize())) {}
101+
};
102+
} // anonymous namespace
103+
104+
void ROOT::Experimental::Detail::RPageSinkBuf::ParallelClusterZip(
105+
ROOT::Experimental::NTupleSize_t nEntries)
106+
{
107+
// TODO(max) add timers like in RPageSourceFile::UnzipClusterImpl
108+
R__ASSERT(fTaskScheduler);
109+
fTaskScheduler->Reset();
110+
// zipItems[nColumns][nColumnPages]
111+
std::vector<std::vector<RPageZipItem>> zipItems;
112+
for (auto &col : fBufferedColumns) {
113+
std::vector<RPageZipItem> zipCol;
114+
for (const auto &page : col.DrainBufferedPages()) {
115+
zipCol.push_back(RPageZipItem(page));
116+
}
117+
zipItems.push_back(std::move(zipCol));
118+
}
119+
// Thread safety: Each task works on a distinct RPageZipItem `zi`.
120+
// Task (i,j) seals RPage (i,j) -- zi.fPage -- using the scratch buffer
121+
// zi.fBuf.
122+
for (std::size_t i = 0; i < fBufferedColumns.size(); i++) {
123+
for (std::size_t j = 0; j < zipItems.at(i).size(); j++) {
124+
fTaskScheduler->AddTask([this, &zipItems, i, j] {
125+
auto &zi = zipItems.at(i).at(j);
126+
zi.fSealedPage = SealPage(zi.fPage,
127+
*fBufferedColumns.at(i).GetHandle().fColumn->GetElement(),
128+
fOptions.GetCompression(), zi.fBuf.get()
129+
);
130+
});
131+
}
132+
}
133+
fTaskScheduler->Wait();
134+
135+
for (std::size_t i = 0; i < fBufferedColumns.size(); i++) {
136+
for (auto &zi : zipItems.at(i)) {
137+
fInnerSink->CommitSealedPage(
138+
fBufferedColumns.at(i).GetHandle().fId, zi.fSealedPage
139+
);
140+
ReleasePage(zi.fPage);
141+
}
142+
}
143+
fInnerSink->CommitCluster(nEntries);
144+
}

tree/ntuple/v7/test/ntuple_storage.cxx

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,3 +165,45 @@ TEST(RPageSinkBuf, Basics)
165165
ASSERT_EQ(column, next_page->first);
166166
}
167167
}
168+
169+
TEST(RPageSinkBuf, ParallelZip) {
170+
ROOT::EnableImplicitMT();
171+
172+
FileRaii fileGuard("test_ntuple_sinkbuf_pzip.root");
173+
{
174+
auto model = RNTupleModel::Create();
175+
auto floatField = model->MakeField<float>("pt");
176+
auto fieldKlassVec = model->MakeField<std::vector<CustomStruct>>("klassVec");
177+
auto ntuple = std::make_unique<RNTupleWriter>(std::move(model),
178+
std::make_unique<RPageSinkBuf>(std::make_unique<RPageSinkFile>(
179+
"buf_pzip", fileGuard.GetPath(), RNTupleWriteOptions()
180+
)));
181+
182+
for (int i = 0; i < 20000; i++) {
183+
*floatField = static_cast<float>(i);
184+
CustomStruct klass;
185+
klass.a = 42.0;
186+
klass.v1.emplace_back(static_cast<float>(i));
187+
klass.v2.emplace_back(std::vector<float>(3, static_cast<float>(i)));
188+
klass.s = "hi" + std::to_string(i);
189+
*fieldKlassVec = std::vector<CustomStruct>{klass};
190+
ntuple->Fill();
191+
if (i && i % 15000 == 0) {
192+
ntuple->CommitCluster();
193+
}
194+
}
195+
}
196+
197+
auto ntuple = RNTupleReader::Open("buf_pzip", fileGuard.GetPath());
198+
EXPECT_EQ(20000, ntuple->GetNEntries());
199+
200+
auto viewPt = ntuple->GetView<float>("pt");
201+
auto viewKlassVec = ntuple->GetView<std::vector<CustomStruct>>("klassVec");
202+
for (auto i : ntuple->GetEntryRange()) {
203+
float fi = static_cast<float>(i);
204+
EXPECT_EQ(fi, viewPt(i));
205+
EXPECT_EQ(std::vector<float>{fi}, viewKlassVec(i).at(0).v1);
206+
EXPECT_EQ((std::vector<float>(3, fi)), viewKlassVec(i).at(0).v2.at(0));
207+
EXPECT_EQ("hi" + std::to_string(i), viewKlassVec(i).at(0).s);
208+
}
209+
}

0 commit comments

Comments
 (0)