@@ -42,50 +42,13 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
4242 if (!offset ())
4343 return ;
4444
45- input.src = reinterpret_cast <unsigned char *>(working_buffer.begin ());
46- input.size = offset ();
47- input.pos = 0 ;
48-
4945 if (first_write && append_to_existing_file && isNeedToAddEmptyBlock ())
5046 {
5147 addEmptyBlock ();
5248 first_write = false ;
5349 }
5450
55- try
56- {
57- bool ended = false ;
58- do
59- {
60- out->nextIfAtEnd ();
61-
62- output.dst = reinterpret_cast <unsigned char *>(out->buffer ().begin ());
63- output.size = out->buffer ().size ();
64- output.pos = out->offset ();
65-
66- size_t compression_result = ZSTD_compressStream2 (cctx, &output, &input, ZSTD_e_flush);
67- if (ZSTD_isError (compression_result))
68- throw Exception (
69- ErrorCodes::ZSTD_ENCODER_FAILED,
70- " ZSTD stream decoding failed: error code: {}; ZSTD version: {}" ,
71- ZSTD_getErrorName (compression_result), ZSTD_VERSION_STRING);
72-
73- first_write = false ;
74- out->position () = out->buffer ().begin () + output.pos ;
75-
76- bool everything_was_compressed = (input.pos == input.size );
77- bool everything_was_flushed = compression_result == 0 ;
78-
79- ended = everything_was_compressed && everything_was_flushed;
80- } while (!ended);
81- }
82- catch (...)
83- {
84- // / Do not try to write next time after exception.
85- out->position () = out->buffer ().begin ();
86- throw ;
87- }
88-
51+ flush (ZSTD_e_flush);
8952}
9053
9154ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer ()
@@ -103,58 +66,58 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
10366 }
10467 else
10568 {
106- try
107- {
108- finalizeBefore ();
109- out->finalize ();
110- finalizeAfter ();
111- }
112- catch (...)
113- {
114- // / Do not try to flush next time after exception.
115- out->position () = out->buffer ().begin ();
116- throw ;
117- }
69+ finalizeBefore ();
70+ out->finalize ();
71+ finalizeAfter ();
11872 }
11973}
12074
12175void ZstdDeflatingAppendableWriteBuffer::finalizeBefore ()
12276{
123- next ();
124-
125- out->nextIfAtEnd ();
126-
127- input.src = reinterpret_cast <unsigned char *>(working_buffer.begin ());
128- input.size = offset ();
129- input.pos = 0 ;
130-
131- output.dst = reinterpret_cast <unsigned char *>(out->buffer ().begin ());
132- output.size = out->buffer ().size ();
133- output.pos = out->offset ();
134-
13577 // / Actually we can use ZSTD_e_flush here and add empty termination
13678 // / block on each new buffer creation for non-empty file unconditionally (without isNeedToAddEmptyBlock).
13779 // / However ZSTD_decompressStream is able to read non-terminated frame (we use it in reader buffer),
13880 // / but console zstd utility cannot.
139- size_t remaining = ZSTD_compressStream2 (cctx, &output, &input, ZSTD_e_end);
140- while (remaining != 0 )
141- {
142- if (ZSTD_isError (remaining))
143- throw Exception (ErrorCodes::ZSTD_ENCODER_FAILED,
144- " ZSTD stream encoder end failed: error: '{}' ZSTD version: {}" ,
145- ZSTD_getErrorName (remaining), ZSTD_VERSION_STRING);
146-
147- remaining = ZSTD_compressStream2 (cctx, &output, &input, ZSTD_e_end);
81+ flush (ZSTD_e_end);
82+ }
14883
149- out->position () = out->buffer ().begin () + output.pos ;
84+ void ZstdDeflatingAppendableWriteBuffer::flush (ZSTD_EndDirective mode)
85+ {
86+ input.src = reinterpret_cast <unsigned char *>(working_buffer.begin ());
87+ input.size = offset ();
88+ input.pos = 0 ;
15089
151- if (!out->hasPendingData ())
90+ try
91+ {
92+ bool ended = false ;
93+ do
15294 {
153- out->next ();
95+ out->nextIfAtEnd ();
96+
15497 output.dst = reinterpret_cast <unsigned char *>(out->buffer ().begin ());
15598 output.size = out->buffer ().size ();
15699 output.pos = out->offset ();
157- }
100+
101+ size_t compression_result = ZSTD_compressStream2 (cctx, &output, &input, mode);
102+ if (ZSTD_isError (compression_result))
103+ throw Exception (
104+ ErrorCodes::ZSTD_ENCODER_FAILED,
105+ " ZSTD stream decoding failed: error code: {}; ZSTD version: {}" ,
106+ ZSTD_getErrorName (compression_result), ZSTD_VERSION_STRING);
107+
108+ out->position () = out->buffer ().begin () + output.pos ;
109+
110+ bool everything_was_compressed = (input.pos == input.size );
111+ bool everything_was_flushed = compression_result == 0 ;
112+
113+ ended = everything_was_compressed && everything_was_flushed;
114+ } while (!ended);
115+ }
116+ catch (...)
117+ {
118+ // / Do not try to write next time after exception.
119+ out->position () = out->buffer ().begin ();
120+ throw ;
158121 }
159122}
160123
0 commit comments