Skip to content

Commit b14b8de

Browse files
Backport #53064 to 23.7: Fix ZstdDeflatingWriteBuffer truncating the output sometimes
1 parent 7321fb8 commit b14b8de

File tree

6 files changed

+53
-98
lines changed

6 files changed

+53
-98
lines changed

src/IO/ZstdDeflatingAppendableWriteBuffer.cpp

Lines changed: 38 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -42,50 +42,13 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
4242
if (!offset())
4343
return;
4444

45-
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
46-
input.size = offset();
47-
input.pos = 0;
48-
4945
if (first_write && append_to_existing_file && isNeedToAddEmptyBlock())
5046
{
5147
addEmptyBlock();
5248
first_write = false;
5349
}
5450

55-
try
56-
{
57-
bool ended = false;
58-
do
59-
{
60-
out->nextIfAtEnd();
61-
62-
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
63-
output.size = out->buffer().size();
64-
output.pos = out->offset();
65-
66-
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_flush);
67-
if (ZSTD_isError(compression_result))
68-
throw Exception(
69-
ErrorCodes::ZSTD_ENCODER_FAILED,
70-
"ZSTD stream decoding failed: error code: {}; ZSTD version: {}",
71-
ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
72-
73-
first_write = false;
74-
out->position() = out->buffer().begin() + output.pos;
75-
76-
bool everything_was_compressed = (input.pos == input.size);
77-
bool everything_was_flushed = compression_result == 0;
78-
79-
ended = everything_was_compressed && everything_was_flushed;
80-
} while (!ended);
81-
}
82-
catch (...)
83-
{
84-
/// Do not try to write next time after exception.
85-
out->position() = out->buffer().begin();
86-
throw;
87-
}
88-
51+
flush(ZSTD_e_flush);
8952
}
9053

9154
ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
@@ -103,58 +66,58 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
10366
}
10467
else
10568
{
106-
try
107-
{
108-
finalizeBefore();
109-
out->finalize();
110-
finalizeAfter();
111-
}
112-
catch (...)
113-
{
114-
/// Do not try to flush next time after exception.
115-
out->position() = out->buffer().begin();
116-
throw;
117-
}
69+
finalizeBefore();
70+
out->finalize();
71+
finalizeAfter();
11872
}
11973
}
12074

12175
void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
12276
{
123-
next();
124-
125-
out->nextIfAtEnd();
126-
127-
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
128-
input.size = offset();
129-
input.pos = 0;
130-
131-
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
132-
output.size = out->buffer().size();
133-
output.pos = out->offset();
134-
13577
/// Actually we can use ZSTD_e_flush here and add empty termination
13678
/// block on each new buffer creation for non-empty file unconditionally (without isNeedToAddEmptyBlock).
13779
/// However ZSTD_decompressStream is able to read non-terminated frame (we use it in reader buffer),
13880
/// but console zstd utility cannot.
139-
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
140-
while (remaining != 0)
141-
{
142-
if (ZSTD_isError(remaining))
143-
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
144-
"ZSTD stream encoder end failed: error: '{}' ZSTD version: {}",
145-
ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
146-
147-
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
81+
flush(ZSTD_e_end);
82+
}
14883

149-
out->position() = out->buffer().begin() + output.pos;
84+
void ZstdDeflatingAppendableWriteBuffer::flush(ZSTD_EndDirective mode)
85+
{
86+
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
87+
input.size = offset();
88+
input.pos = 0;
15089

151-
if (!out->hasPendingData())
90+
try
91+
{
92+
bool ended = false;
93+
do
15294
{
153-
out->next();
95+
out->nextIfAtEnd();
96+
15497
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
15598
output.size = out->buffer().size();
15699
output.pos = out->offset();
157-
}
100+
101+
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
102+
if (ZSTD_isError(compression_result))
103+
throw Exception(
104+
ErrorCodes::ZSTD_ENCODER_FAILED,
105+
"ZSTD stream decoding failed: error code: {}; ZSTD version: {}",
106+
ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
107+
108+
out->position() = out->buffer().begin() + output.pos;
109+
110+
bool everything_was_compressed = (input.pos == input.size);
111+
bool everything_was_flushed = compression_result == 0;
112+
113+
ended = everything_was_compressed && everything_was_flushed;
114+
} while (!ended);
115+
}
116+
catch (...)
117+
{
118+
/// Do not try to write next time after exception.
119+
out->position() = out->buffer().begin();
120+
throw;
158121
}
159122
}
160123

src/IO/ZstdDeflatingAppendableWriteBuffer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class ZstdDeflatingAppendableWriteBuffer : public BufferWithOwnMemory<WriteBuffe
5252
/// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full
5353
void nextImpl() override;
5454

55+
void flush(ZSTD_EndDirective mode);
56+
5557
/// Write terminating ZSTD_e_end: empty block + frame epilogue. BTW it
5658
/// should be almost noop, because frame epilogue contains only checksums,
5759
/// and they are disabled for this buffer.

src/IO/ZstdDeflatingWriteBuffer.cpp

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,8 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
3232

3333
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer() = default;
3434

35-
void ZstdDeflatingWriteBuffer::nextImpl()
35+
void ZstdDeflatingWriteBuffer::flush(ZSTD_EndDirective mode)
3636
{
37-
if (!offset())
38-
return;
39-
40-
ZSTD_EndDirective mode = ZSTD_e_flush;
41-
4237
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
4338
input.size = offset();
4439
input.pos = 0;
@@ -54,7 +49,6 @@ void ZstdDeflatingWriteBuffer::nextImpl()
5449
output.size = out->buffer().size();
5550
output.pos = out->offset();
5651

57-
5852
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
5953
if (ZSTD_isError(compression_result))
6054
throw Exception(
@@ -78,24 +72,15 @@ void ZstdDeflatingWriteBuffer::nextImpl()
7872
}
7973
}
8074

81-
void ZstdDeflatingWriteBuffer::finalizeBefore()
75+
void ZstdDeflatingWriteBuffer::nextImpl()
8276
{
83-
next();
84-
85-
out->nextIfAtEnd();
86-
87-
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
88-
input.size = offset();
89-
input.pos = 0;
90-
91-
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
92-
output.size = out->buffer().size();
93-
output.pos = out->offset();
77+
if (offset())
78+
flush(ZSTD_e_flush);
79+
}
9480

95-
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
96-
if (ZSTD_isError(remaining))
97-
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: zstd version: {}", ZSTD_VERSION_STRING);
98-
out->position() = out->buffer().begin() + output.pos;
81+
void ZstdDeflatingWriteBuffer::finalizeBefore()
82+
{
83+
flush(ZSTD_e_end);
9984
}
10085

10186
void ZstdDeflatingWriteBuffer::finalizeAfter()

src/IO/ZstdDeflatingWriteBuffer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class ZstdDeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
3737
void finalizeBefore() override;
3838
void finalizeAfter() override;
3939

40+
void flush(ZSTD_EndDirective mode);
41+
4042
ZSTD_CCtx * cctx;
4143
ZSTD_inBuffer input;
4244
ZSTD_outBuffer output;

tests/queries/0_stateless/02735_parquet_encoder.reference

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ ipv6 Nullable(FixedString(16))
4343
[(2,0,NULL,'','[]')]
4444
1 1
4545
0 1
46+
5090915589685802007
4647
16159458007063698496
4748
16159458007063698496
4849
BYTE_ARRAY String

tests/queries/0_stateless/02735_parquet_encoder.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ insert into function file(compressed_02735.parquet) select concat('aaaaaaaaaaaaa
147147
select total_compressed_size < 10000, total_uncompressed_size > 15000 from file(compressed_02735.parquet, ParquetMetadata);
148148
insert into function file(compressed_02735.parquet) select concat('aaaaaaaaaaaaaaaa', toString(number)) as s from numbers(1000) settings output_format_parquet_row_group_size = 10000, output_format_parquet_compression_method='none';
149149
select total_compressed_size < 10000, total_uncompressed_size > 15000 from file(compressed_02735.parquet, ParquetMetadata);
150+
insert into function file(compressed_02735.parquet) select if(number%3==1, NULL, 42) as x from numbers(70) settings output_format_parquet_compression_method='zstd';
151+
select sum(cityHash64(*)) from file(compressed_02735.parquet);
150152

151153
-- Single-threaded encoding and Arrow encoder.
152154
drop table if exists other_encoders_02735;

0 commit comments

Comments
 (0)