@@ -29,6 +29,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2929#include < srs_protocol_buffer.hpp>
3030#include < srs_kernel_utility.hpp>
3131#include < srs_core_performance.hpp>
32+ #include < srs_app_config.hpp>
33+
34+ using namespace std ;
3235
3336ISrsMessageHandler::ISrsMessageHandler ()
3437{
@@ -221,11 +224,13 @@ void SrsQueueRecvThread::on_thread_stop()
221224}
222225
223226SrsPublishRecvThread::SrsPublishRecvThread (
224- SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms,
227+ SrsRtmpServer* rtmp_sdk,
228+ SrsRequest* _req, int mr_sock_fd, int timeout_ms,
225229 SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
226230): trd(this , rtmp_sdk, timeout_ms)
227231{
228232 rtmp = rtmp_sdk;
233+
229234 _conn = conn;
230235 _source = source;
231236 _is_fmle = is_fmle;
@@ -234,12 +239,22 @@ SrsPublishRecvThread::SrsPublishRecvThread(
234239 recv_error_code = ERROR_SUCCESS;
235240 _nb_msgs = 0 ;
236241 error = st_cond_new ();
242+
243+ req = _req;
244+ mr_fd = mr_sock_fd;
237245
238- mr_fd = fd;
246+ // the mr settings,
247+ // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
248+ mr = _srs_config->get_mr_enabled (req->vhost );
249+ mr_sleep = _srs_config->get_mr_sleep_ms (req->vhost );
250+
251+ _srs_config->subscribe (this );
239252}
240253
241254SrsPublishRecvThread::~SrsPublishRecvThread ()
242255{
256+ _srs_config->unsubscribe (this );
257+
243258 trd.stop ();
244259 st_cond_destroy (error);
245260}
@@ -282,20 +297,8 @@ void SrsPublishRecvThread::on_thread_start()
282297 // for the main thread never send message.
283298
284299#ifdef SRS_PERF_MERGED_READ
285- // socket recv buffer, system will double it.
286- int nb_rbuf = SRS_MR_SOCKET_BUFFER / 2 ;
287- socklen_t sock_buf_size = sizeof (int );
288- if (setsockopt (mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0 ) {
289- srs_warn (" set sock SO_RCVBUF=%d failed." , nb_rbuf);
290- }
291- getsockopt (mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
292-
293- srs_trace (" merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d" ,
294- SRS_MR_SOCKET_BUFFER, nb_rbuf, SRS_MR_MAX_SLEEP_MS, SRS_MR_SMALL_BYTES);
295-
296- // enable the merge read
297- // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
298- rtmp->set_merge_read (true , this );
300+ // for mr.
301+ update_buffer (mr, mr_sleep);
299302#endif
300303}
301304
@@ -349,7 +352,11 @@ void SrsPublishRecvThread::on_recv_error(int ret)
349352#ifdef SRS_PERF_MERGED_READ
350353void SrsPublishRecvThread::on_read (ssize_t nread)
351354{
352- if (nread < 0 || SRS_MR_MAX_SLEEP_MS <= 0 ) {
355+ if (!mr) {
356+ return ;
357+ }
358+
359+ if (nread < 0 || mr_sleep <= 0 ) {
353360 return ;
354361 }
355362
@@ -360,7 +367,72 @@ void SrsPublishRecvThread::on_read(ssize_t nread)
360367 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
361368 */
362369 if (nread < SRS_MR_SMALL_BYTES) {
363- st_usleep (SRS_MR_MAX_SLEEP_MS * 1000 );
370+ st_usleep (mr_sleep * 1000 );
364371 }
365372}
366373#endif
374+
375+ int SrsPublishRecvThread::on_reload_vhost_mr (string vhost)
376+ {
377+ int ret = ERROR_SUCCESS;
378+
379+ // the mr settings,
380+ // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
381+ bool mr_enabled = _srs_config->get_mr_enabled (req->vhost );
382+ int sleep_ms = _srs_config->get_mr_sleep_ms (req->vhost );
383+ update_buffer (mr_enabled, sleep_ms);
384+
385+ return ret;
386+ }
387+
388+ void SrsPublishRecvThread::update_buffer (bool mr_enabled, int sleep_ms)
389+ {
390+ // TODO: FIXME: refine it.
391+
392+ #ifdef SRS_PERF_MERGED_READ
393+ // previous enabled mr, update the buffer.
394+ if (mr && mr_sleep != sleep_ms) {
395+ // the underlayer api will set to SRS_MR_SOCKET_BUFFER bytes.
396+ // 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
397+ // 128KB=131072, 256KB=262144, 512KB=524288
398+ // the buffer should set to SRS_MR_MAX_SLEEP_MS*kbps/8,
399+ // for example, your system delivery stream in 1000kbps,
400+ // sleep 800ms for small bytes, the buffer should set to:
401+ // 800*1000/8=100000B(about 128KB).
402+ // 2000*3000/8=750000B(about 732KB).
403+ int kbps = 3000 ;
404+ int socket_buffer_size = mr_sleep * kbps / 8 ;
405+
406+ // socket recv buffer, system will double it.
407+ int nb_rbuf = socket_buffer_size / 2 ;
408+ socklen_t sock_buf_size = sizeof (int );
409+ if (setsockopt (mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0 ) {
410+ srs_warn (" set sock SO_RCVBUF=%d failed." , nb_rbuf);
411+ }
412+ getsockopt (mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
413+
414+ srs_trace (" merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d" ,
415+ socket_buffer_size, nb_rbuf, mr_sleep, SRS_MR_SMALL_BYTES);
416+
417+ rtmp->set_recv_buffer (nb_rbuf);
418+ }
419+ #endif
420+
421+ // update to new state
422+ mr = mr_enabled;
423+ mr_sleep = sleep_ms;
424+
425+ #ifdef SRS_PERF_MERGED_READ
426+ // apply new state.
427+ if (mr) {
428+ // enable the merge read
429+ // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
430+ rtmp->set_merge_read (true , this );
431+ } else {
432+ // disable the merge read
433+ // @see https://github.com/winlinvip/simple-rtmp-server/issues/241
434+ rtmp->set_merge_read (false , NULL );
435+ }
436+ #endif
437+ }
438+
0 commit comments