3838#include " consumer.hh"
3939#include " readers/generating_v2.hh"
4040#include " service/topology_guard.hh"
41+ #include " utils/error_injection.hh"
4142
4243namespace streaming {
4344
@@ -127,18 +128,27 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
127128 return make_exception_future<rpc::sink<int >>(std::runtime_error (format (" Node {} is not fully initialized for streaming, try again later" ,
128129 utils::fb_utilities::get_broadcast_address ())));
129130 }
130- return _mm.local ().get_schema_for_write (schema_id, from, _ms.local (), as).then ([this , from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard] (schema_ptr s) mutable {
131- return _db.local ().obtain_reader_permit (s, " stream-session" , db::no_timeout, {}).then ([this , from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s] (reader_permit permit) mutable {
131+ return _mm.local ().get_schema_for_write (schema_id, from, _ms.local (), as).then ([this , from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as ] (schema_ptr s) mutable {
132+ return _db.local ().obtain_reader_permit (s, " stream-session" , db::no_timeout, {}).then ([this , from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s, &as ] (reader_permit permit) mutable {
132133 struct stream_mutation_fragments_cmd_status {
133134 bool got_cmd = false ;
134135 bool got_end_of_stream = false ;
135136 };
136137 auto cmd_status = make_lw_shared<stream_mutation_fragments_cmd_status>();
137138 auto offstrategy_update = make_lw_shared<offstrategy_trigger>(_db, cf_id, plan_id);
138139 auto guard = service::topology_guard (s->table (), topo_guard);
139- auto get_next_mutation_fragment = [guard = std::move (guard), &sm = container (), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
140+
141+ // Will log a message when streaming is done. Used to synchronize tests.
142+ lw_shared_ptr<std::any> log_done;
143+ if (utils::get_local_injector ().is_enabled (" stream_mutation_fragments" )) {
144+ log_done = make_lw_shared<std::any>(seastar::make_shared (seastar::defer ([] {
145+ sslog.info (" stream_mutation_fragments: done" );
146+ })));
147+ }
148+
149+ auto get_next_mutation_fragment = [guard = std::move (guard), &as, &sm = container (), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
140150 guard.check ();
141- return source ().then ([&sm, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
151+ return source ().then ([&sm, &guard, &as, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
142152 if (opt) {
143153 auto cmd = std::get<1 >(*opt);
144154 if (cmd) {
@@ -160,7 +170,19 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
160170 auto mf = fmf.unfreeze (*s, permit);
161171 sm.local ().update_progress (plan_id, from.addr , progress_info::direction::IN, sz);
162172 offstrategy_update->update ();
163- return make_ready_future<mutation_fragment_opt>(std::move (mf));
173+
174+ return utils::get_local_injector ().inject_with_handler (" stream_mutation_fragments" , [&guard, &as] (auto & handler) -> future<> {
175+ auto & guard_ = guard;
176+ auto & as_ = as;
177+ sslog.info (" stream_mutation_fragments: waiting" );
178+ while (!handler.poll_for_message ()) {
179+ guard_.check ();
180+ co_await sleep_abortable (std::chrono::milliseconds (5 ), as_);
181+ }
182+ sslog.info (" stream_mutation_fragments: released" );
183+ }).then ([mf = std::move (mf)] () mutable {
184+ return mutation_fragment_opt (std::move (mf));
185+ });
164186 } else {
165187 // If the sender has sent stream_mutation_fragments_cmd it means it is
166188 // a node that understands the new protocol. It must send end_of_stream
@@ -185,7 +207,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
185207 make_generating_reader_v1 (s, permit, std::move (get_next_mutation_fragment)),
186208 make_streaming_consumer (" streaming" , _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported (reason), topo_guard),
187209 std::move (op)
188- ).then_wrapped ([s, plan_id, from, sink, estimated_partitions, sh_ptr = std::move (sharder_ptr)] (future<uint64_t > f) mutable {
210+ ).then_wrapped ([s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move (sharder_ptr)] (future<uint64_t > f) mutable {
189211 int32_t status = 0 ;
190212 uint64_t received_partitions = 0 ;
191213 if (f.failed ()) {
0 commit comments