Skip to content

Commit aa13a91

Browse files
liuneng1994kyligence-git
authored andcommitted
insert range selective for shuffle write
1 parent 60016a3 commit aa13a91

21 files changed

+198
-0
lines changed

src/Columns/ColumnAggregateFunction.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,35 @@ void ColumnAggregateFunction::insertDefault()
524524
pushBackAndCreateState(data, arena, func.get());
525525
}
526526

527+
void ColumnAggregateFunction::insertRangeSelective(
528+
const IColumn & from, const IColumn::Selector & selector, size_t selector_start, size_t length)
529+
{
530+
const ColumnAggregateFunction & from_concrete = static_cast<const ColumnAggregateFunction &>(from);
531+
const auto & from_data = from_concrete.data;
532+
if (!empty() && src.get() != &from_concrete)
533+
{
534+
ensureOwnership();
535+
Arena & arena = createOrGetArena();
536+
Arena * arena_ptr = &arena;
537+
data.reserve(size() + length);
538+
for (size_t i = 0; i < length; ++i)
539+
{
540+
pushBackAndCreateState(data, arena, func.get());
541+
func->merge(data.back(), from_data[selector[selector_start + i]], arena_ptr);
542+
}
543+
return;
544+
}
545+
/// Keep shared ownership of aggregation states.
546+
src = from_concrete.getPtr();
547+
548+
size_t old_size = data.size();
549+
data.resize(old_size + length);
550+
auto * data_start = data.data();
551+
size_t element_size = sizeof(data[0]);
552+
for (size_t i = 0; i < length; ++i)
553+
memcpy(data_start + old_size + i, &from_concrete.data[selector[selector_start + i]], element_size);
554+
}
555+
527556
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin, const UInt8 *) const
528557
{
529558
WriteBufferFromArena out(arena, begin);

src/Columns/ColumnAggregateFunction.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ class ColumnAggregateFunction final : public COWHelper<IColumn, ColumnAggregateF
184184

185185
void insertRangeFrom(const IColumn & from, size_t start, size_t length) override;
186186

187+
void insertRangeSelective(const IColumn & src, const IColumn::Selector & selector, size_t selector_start, size_t length) override;
188+
187189
void popBack(size_t n) override;
188190

189191
ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override;

src/Columns/ColumnArray.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,29 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
547547
}
548548
}
549549

550+
void ColumnArray::insertRangeSelective(const IColumn & src, const Selector & selector, size_t selector_start, size_t length){
551+
552+
const ColumnArray & src_concrete = static_cast<const ColumnArray &>(src);
553+
const Offsets & src_offsets = src_concrete.getOffsets();
554+
const IColumn & src_data = src_concrete.getData();
555+
IColumn & cur_data = getData();
556+
Offsets & cur_offsets = getOffsets();
557+
558+
size_t old_size = cur_offsets.size();
559+
size_t cur_size = old_size + length;
560+
cur_data.reserve(cur_size);
561+
cur_offsets.resize(cur_size);
562+
563+
for (size_t i = 0; i < length; ++i)
564+
{
565+
size_t src_pos = selector[selector_start + i];
566+
size_t offset = src_offsets[src_pos - 1];
567+
size_t size = src_offsets[src_pos] - offset;
568+
cur_data.insertRangeFrom(src_data, offset, size);
569+
cur_offsets[old_size + i] = cur_offsets[old_size + i - 1] + size; // PaddedPODArray allows to use -1th element that will have value 0
570+
}
571+
}
572+
550573

551574
ColumnPtr ColumnArray::filter(const Filter & filt, ssize_t result_size_hint) const
552575
{

src/Columns/ColumnArray.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray>
8484
void updateWeakHash32(WeakHash32 & hash) const override;
8585
void updateHashFast(SipHash & hash) const override;
8686
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
87+
void insertRangeSelective(const IColumn & src, const Selector & selector, size_t selector_start, size_t length) override;
8788
void insert(const Field & x) override;
8889
void insertFrom(const IColumn & src_, size_t n) override;
8990
void insertDefault() override;
@@ -149,6 +150,8 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray>
149150

150151
void gather(ColumnGathererStream & gatherer_stream) override;
151152

153+
bool canBeInsideNullable() const override { return true; }
154+
152155
ColumnPtr compress() const override;
153156

154157
void forEachSubcolumn(MutableColumnCallback callback) override

src/Columns/ColumnCompressed.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class ColumnCompressed : public COWHelper<IColumn, ColumnCompressed>
8585
bool isDefaultAt(size_t) const override { throwMustBeDecompressed(); }
8686
void insert(const Field &) override { throwMustBeDecompressed(); }
8787
void insertRangeFrom(const IColumn &, size_t, size_t) override { throwMustBeDecompressed(); }
88+
void insertRangeSelective(const IColumn &, const Selector &, size_t, size_t) override { throwMustBeDecompressed(); }
8889
void insertData(const char *, size_t) override { throwMustBeDecompressed(); }
8990
void insertDefault() override { throwMustBeDecompressed(); }
9091
void popBack(size_t) override { throwMustBeDecompressed(); }

src/Columns/ColumnConst.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst>
126126
s += length;
127127
}
128128

129+
void insertRangeSelective(const IColumn & /*src*/, const Selector & /*selector*/, size_t /*selector_start*/, size_t length) override
130+
{
131+
s += length;
132+
}
133+
129134
void insert(const Field &) override
130135
{
131136
++s;

src/Columns/ColumnDecimal.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,18 @@ void ColumnDecimal<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
359359
memcpy(data.data() + old_size, &src_vec.data[start], length * sizeof(data[0]));
360360
}
361361

362+
template <is_decimal T>
363+
void ColumnDecimal<T>::insertRangeSelective(const IColumn & src, const IColumn::Selector & selector, size_t selector_start, size_t length)
364+
{
365+
size_t old_size = data.size();
366+
data.resize(old_size + length);
367+
const auto & src_data = (static_cast<const Self &>(src)).getData();
368+
for (size_t i = 0; i < length; ++i)
369+
{
370+
data[old_size + i] = src_data[selector[selector_start + i]];
371+
}
372+
}
373+
362374
template <is_decimal T>
363375
ColumnPtr ColumnDecimal<T>::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
364376
{

src/Columns/ColumnDecimal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class ColumnDecimal final : public COWHelper<ColumnVectorHelper, ColumnDecimal<T
6262
void insertManyDefaults(size_t length) override { data.resize_fill(data.size() + length); }
6363
void insert(const Field & x) override { data.push_back(x.get<T>()); }
6464
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
65+
void insertRangeSelective(const IColumn & src, const IColumn::Selector & selector, size_t selector_start, size_t length) override;
6566

6667
void popBack(size_t n) override
6768
{

src/Columns/ColumnFixedString.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,22 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_
214214
memcpy(chars.data() + old_size, &src_concrete.chars[start * n], length * n);
215215
}
216216

217+
void ColumnFixedString::insertRangeSelective(const IColumn & src, const IColumn::Selector & selector, size_t selector_start, size_t length)
218+
{
219+
const ColumnFixedString & src_concrete = static_cast<const ColumnFixedString &>(src);
220+
221+
size_t old_size = chars.size();
222+
chars.resize(old_size + length * n);
223+
auto * cur_data_end = chars.data() + old_size;
224+
auto * src_data_start = src_concrete.chars.data();
225+
226+
for (size_t i = 0; i < length; ++i)
227+
{
228+
size_t src_pos = selector[selector_start + i];
229+
memcpySmallAllowReadWriteOverflow15(cur_data_end + i * n, src_data_start+ n * src_pos, n);
230+
}
231+
}
232+
217233
ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
218234
{
219235
size_t col_size = size();

src/Columns/ColumnFixedString.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ class ColumnFixedString final : public COWHelper<ColumnVectorHelper, ColumnFixed
154154

155155
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
156156

157+
void insertRangeSelective(const IColumn & src, const Selector & selector, size_t selector_start, size_t length) override;
158+
157159
ColumnPtr filter(const IColumn::Filter & filt, ssize_t result_size_hint) const override;
158160

159161
void expand(const IColumn::Filter & mask, bool inverted) override;

0 commit comments

Comments
 (0)