@@ -347,11 +347,16 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
347347
348348 auto & waiting_output = output_ports[input_with_data.waiting_output ];
349349
350- if (waiting_output.status != OutputStatus::NeedData )
351- throw Exception (" Invalid status for associated output." , ErrorCodes::LOGICAL_ERROR);
350+ if (waiting_output.status == OutputStatus::NotActive )
351+ throw Exception (" Invalid status NotActive for associated output." , ErrorCodes::LOGICAL_ERROR);
352352
353- waiting_output.port ->pushData (input_with_data.port ->pullData (/* set_not_needed = */ true ));
354- waiting_output.status = OutputStatus::NotActive;
353+ if (waiting_output.status != OutputStatus::Finished)
354+ {
355+ waiting_output.port ->pushData (input_with_data.port ->pullData (/* set_not_needed = */ true ));
356+ waiting_output.status = OutputStatus::NotActive;
357+ }
358+ else
359+ abandoned_chunks.emplace_back (input_with_data.port ->pullData (/* set_not_needed = */ true ));
355360
356361 if (input_with_data.port ->isFinished ())
357362 {
@@ -370,6 +375,18 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
370375 return Status::Finished;
371376 }
372377
378+ // / Process abandoned chunks if any.
379+ while (!abandoned_chunks.empty () && !waiting_outputs.empty ())
380+ {
381+ auto & waiting_output = output_ports[waiting_outputs.front ()];
382+ waiting_outputs.pop ();
383+
384+ waiting_output.port ->pushData (std::move (abandoned_chunks.back ()));
385+ abandoned_chunks.pop_back ();
386+
387+ waiting_output.status = OutputStatus::NotActive;
388+ }
389+
373390 // / Enable more inputs if needed.
374391 while (!disabled_input_ports.empty () && !waiting_outputs.empty ())
375392 {
@@ -383,6 +400,7 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
383400 waiting_outputs.pop ();
384401 }
385402
403+ // / Close all other waiting for data outputs (there is no corresponding input for them).
386404 while (!waiting_outputs.empty ())
387405 {
388406 auto & output = output_ports[waiting_outputs.front ()];
0 commit comments