@@ -50,6 +50,7 @@ using namespace std;
5050#include < srs_app_utility.hpp>
5151#include < srs_protocol_utility.hpp>
5252#include < srs_kernel_utility.hpp>
53+ #include < srs_protocol_msg_array.hpp>
5354
5455// when stream is busy, for example, streaming is already
5556// publishing, when a new client to request to publish,
@@ -382,7 +383,7 @@ int SrsRtmpConn::stream_service_cycle()
382383 }
383384
384385 srs_info (" start to publish stream %s success" , req->stream .c_str ());
385- ret = fmle_publish (source);
386+ ret = fmle_publishing (source);
386387
387388 // when edge, notice edge to change state.
388389 // when origin, notice all service to unpublish.
@@ -416,7 +417,7 @@ int SrsRtmpConn::stream_service_cycle()
416417 }
417418
418419 srs_info (" flash start to publish stream %s success" , req->stream .c_str ());
419- ret = flash_publish (source);
420+ ret = flash_publishing (source);
420421
421422 // when edge, notice edge to change state.
422423 // when origin, notice all service to unpublish.
@@ -476,6 +477,8 @@ int SrsRtmpConn::check_vhost()
476477 return ret;
477478}
478479
480+ #define SYS_MAX_PLAY_SEND_MSGS 128
481+
479482int SrsRtmpConn::playing (SrsSource* source)
480483{
481484 int ret = ERROR_SUCCESS;
@@ -499,38 +502,43 @@ int SrsRtmpConn::playing(SrsSource* source)
499502 rtmp->set_recv_timeout (SRS_PULSE_TIMEOUT_US);
500503
501504 SrsPithyPrint pithy_print (SRS_STAGE_PLAY_USER);
505+
506+ SrsSharedPtrMessageArray msgs (SYS_MAX_PLAY_SEND_MSGS);
502507
508+ bool user_specified_duration_to_stop = (req->duration > 0 );
503509 int64_t starttime = -1 ;
510+
504511 while (true ) {
505- // switch to other st-threads.
506- st_usleep (0 );
507-
512+ // collect elapse for pithy print.
508513 pithy_print.elapse ();
509514
510515 // read from client.
511516 if (true ) {
512517 SrsMessage* msg = NULL ;
513518 ret = rtmp->recv_message (&msg);
514-
515519 srs_verbose (" play loop recv message. ret=%d" , ret);
516- if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
517- if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close (ret)) {
520+
521+ if (ret == ERROR_SOCKET_TIMEOUT) {
522+ // it's ok, do nothing.
523+ ret = ERROR_SUCCESS;
524+ } else if (ret != ERROR_SUCCESS) {
525+ if (!srs_is_client_gracefully_close (ret)) {
518526 srs_error (" recv client control message failed. ret=%d" , ret);
519527 }
520528 return ret;
521- }
522- if ((ret = process_play_control_msg (consumer, msg)) != ERROR_SUCCESS) {
523- if (!srs_is_system_control_error (ret)) {
524- srs_error (" process play control message failed. ret=%d" , ret);
529+ } else {
530+ if ((ret = process_play_control_msg (consumer, msg)) != ERROR_SUCCESS) {
531+ if (!srs_is_system_control_error (ret)) {
532+ srs_error (" process play control message failed. ret=%d" , ret);
533+ }
534+ return ret;
525535 }
526- return ret;
527536 }
528537 }
529538
530539 // get messages from consumer.
531- SrsSharedPtrMessage** msgs = NULL ;
532540 int count = 0 ;
533- if ((ret = consumer->get_packets ( 0 , msgs, count)) != ERROR_SUCCESS) {
541+ if ((ret = consumer->dump_packets (msgs. size , msgs. msgs , count)) != ERROR_SUCCESS) {
534542 srs_error (" get messages from consumer failed. ret=%d" , ret);
535543 return ret;
536544 }
@@ -545,32 +553,29 @@ int SrsRtmpConn::playing(SrsSource* source)
545553 kbps->get_recv_kbps (), kbps->get_recv_kbps_30s (), kbps->get_recv_kbps_5m ());
546554 }
547555
548- if (count <= 0 ) {
549- srs_verbose (" no packets in queue." );
550- continue ;
551- }
552- SrsAutoFreeArray (SrsSharedPtrMessage, msgs, count);
553-
554556 // sendout messages
555557 // @remark, becareful, all msgs must be free explicitly,
556558 // free by send_and_free_message or srs_freep.
557559 for (int i = 0 ; i < count; i++) {
558- SrsSharedPtrMessage* msg = msgs[i];
560+ SrsSharedPtrMessage* msg = msgs. msgs [i];
559561
560562 // the send_message will free the msg,
561563 // so set the msgs[i] to NULL.
562- msgs[i] = NULL ;
563-
564- srs_assert (msg);
564+ msgs.msgs [i] = NULL ;
565565
566- // foreach msg, collect the duration.
567- // @remark: never use msg when sent it, for the protocol sdk will free it.
568- if (starttime < 0 || starttime > msg->header .timestamp ) {
566+ // only when user specifies the duration,
567+ // we start to collect the durations for each message.
568+ if (user_specified_duration_to_stop) {
569+ // foreach msg, collect the duration.
570+ // @remark: never use msg when sent it, for the protocol sdk will free it.
571+ if (starttime < 0 || starttime > msg->header .timestamp ) {
572+ starttime = msg->header .timestamp ;
573+ }
574+ duration += msg->header .timestamp - starttime;
569575 starttime = msg->header .timestamp ;
570576 }
571- duration += msg->header .timestamp - starttime;
572- starttime = msg->header .timestamp ;
573577
578+ // no need to assert msg, for the rtmp will assert it.
574579 if ((ret = rtmp->send_and_free_message (msg, res->stream_id )) != ERROR_SUCCESS) {
575580 srs_error (" send message to client failed. ret=%d" , ret);
576581 return ret;
@@ -579,17 +584,22 @@ int SrsRtmpConn::playing(SrsSource* source)
579584
580585 // if duration specified, and exceed it, stop play live.
581586 // @see: https://github.com/winlinvip/simple-rtmp-server/issues/45
582- if (req->duration > 0 && duration >= (int64_t )req->duration ) {
583- ret = ERROR_RTMP_DURATION_EXCEED;
584- srs_trace (" stop live for duration exceed. ret=%d" , ret);
585- return ret;
587+ if (user_specified_duration_to_stop) {
588+ if (duration >= (int64_t )req->duration ) {
589+ ret = ERROR_RTMP_DURATION_EXCEED;
590+ srs_trace (" stop live for duration exceed. ret=%d" , ret);
591+ return ret;
592+ }
586593 }
594+
595+ // switch to other threads, to anti dead loop.
596+ st_usleep (0 );
587597 }
588598
589599 return ret;
590600}
591601
592- int SrsRtmpConn::fmle_publish (SrsSource* source)
602+ int SrsRtmpConn::fmle_publishing (SrsSource* source)
593603{
594604 int ret = ERROR_SUCCESS;
595605
@@ -668,7 +678,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
668678 return ret;
669679}
670680
671- int SrsRtmpConn::flash_publish (SrsSource* source)
681+ int SrsRtmpConn::flash_publishing (SrsSource* source)
672682{
673683 int ret = ERROR_SUCCESS;
674684
0 commit comments