@@ -184,10 +184,6 @@ srs_error_t SrsThreadPool::initialize()
184
184
r1 = pthread_getaffinity_np (pthread_self (), sizeof (entry->cpuset2 ), &entry->cpuset2 );
185
185
#endif
186
186
187
- if ((err = _srs_async_recv->initialize ()) != srs_success) {
188
- return srs_error_wrap (err, " init async recv" );
189
- }
190
-
191
187
interval_ = _srs_config->get_threads_interval ();
192
188
bool async_srtp = _srs_config->get_threads_async_srtp ();
193
189
srs_trace (" Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 " /%d-0x%" PRIx64,
@@ -705,11 +701,14 @@ SrsAsyncSRTPManager::SrsAsyncSRTPManager()
705
701
lock_ = new SrsThreadMutex ();
706
702
packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
707
703
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
704
+ trd_ = new SrsFastCoroutine (" srtp" , this );
708
705
}
709
706
710
707
// TODO: FIXME: We should stop the thread first, then free the manager.
711
708
SrsAsyncSRTPManager::~SrsAsyncSRTPManager ()
712
709
{
710
+ srs_freep (trd_);
711
+
713
712
srs_freep (lock_);
714
713
srs_freep (packets_);
715
714
srs_freep (cooked_packets_);
@@ -806,24 +805,56 @@ srs_error_t SrsAsyncSRTPManager::consume()
806
805
{
807
806
srs_error_t err = srs_success;
808
807
809
- vector<SrsAsyncSRTPPacket*> flying;
810
- cooked_packets_->swap (flying);
808
+ if ((err = trd_->start ()) != srs_success) {
809
+ return srs_error_wrap (err, " start" );
810
+ }
811
811
812
- for (int i = 0 ; i < (int )flying.size (); i++) {
813
- SrsAsyncSRTPPacket* pkt = flying.at (i);
814
- SrsSecurityTransport* transport = pkt->task_ ->codec_ ->transport_ ;
815
- char * payload = pkt->msg_ ->payload ;
812
+ return err;
813
+ }
816
814
817
- if (pkt->do_decrypt_ ) {
818
- if (pkt->is_rtp_ ) {
819
- err = transport->on_rtp_plaintext (payload, pkt->nb_consumed_ );
820
- }
815
+ srs_error_t SrsAsyncSRTPManager::cycle ()
816
+ {
817
+ srs_error_t err = srs_success;
818
+
819
+ // How many messages to run a yield.
820
+ uint32_t nn_msgs_for_yield = 0 ;
821
+
822
+ while (true ) {
823
+ if ((err = trd_->pull ()) != srs_success) {
824
+ return srs_error_wrap (err, " pull" );
821
825
}
822
- if (err != srs_success) {
823
- srs_error_reset (err); // Ignore any error.
826
+
827
+ vector<SrsAsyncSRTPPacket*> flying;
828
+ cooked_packets_->swap (flying);
829
+
830
+ if (flying.empty ()) {
831
+ srs_usleep (20 * SRS_UTIME_MILLISECONDS);
832
+ continue ;
824
833
}
825
834
826
- srs_freep (pkt);
835
+ for (int i = 0 ; i < (int )flying.size (); i++) {
836
+ SrsAsyncSRTPPacket* pkt = flying.at (i);
837
+ SrsSecurityTransport* transport = pkt->task_ ->codec_ ->transport_ ;
838
+ char * payload = pkt->msg_ ->payload ;
839
+
840
+ if (pkt->do_decrypt_ ) {
841
+ if (pkt->is_rtp_ ) {
842
+ err = transport->on_rtp_plaintext (payload, pkt->nb_consumed_ );
843
+ }
844
+ }
845
+ if (err != srs_success) {
846
+ srs_error_reset (err); // Ignore any error.
847
+ }
848
+
849
+ srs_freep (pkt);
850
+
851
+ // Yield to another coroutines.
852
+ // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
853
+ if (++nn_msgs_for_yield > 10 ) {
854
+ nn_msgs_for_yield = 0 ;
855
+ srs_thread_yield ();
856
+ }
857
+ }
827
858
}
828
859
829
860
return err;
@@ -846,11 +877,14 @@ SrsAsyncRecvManager::SrsAsyncRecvManager()
846
877
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
847
878
handler_ = NULL ;
848
879
max_recv_queue_ = 0 ;
880
+ trd_ = new SrsFastCoroutine (" recv" , this );
849
881
}
850
882
851
883
// TODO: FIXME: We should stop the thread first, then free the manager.
852
884
SrsAsyncRecvManager::~SrsAsyncRecvManager ()
853
885
{
886
+ srs_freep (trd_);
887
+
854
888
srs_freep (lock_);
855
889
srs_freep (packets_);
856
890
@@ -861,16 +895,6 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
861
895
}
862
896
}
863
897
864
- srs_error_t SrsAsyncRecvManager::initialize ()
865
- {
866
- srs_error_t err = srs_success;
867
-
868
- max_recv_queue_ = _srs_config->get_threads_max_recv_queue ();
869
- srs_trace (" AsyncRecv: Set max_queue=%d" , max_recv_queue_);
870
-
871
- return err;
872
- }
873
-
874
898
void SrsAsyncRecvManager::set_handler (ISrsUdpMuxHandler* v)
875
899
{
876
900
handler_ = v;
@@ -946,17 +970,52 @@ srs_error_t SrsAsyncRecvManager::consume()
946
970
{
947
971
srs_error_t err = srs_success;
948
972
949
- vector<SrsUdpMuxSocket*> flying;
950
- packets_->swap (flying);
973
+ max_recv_queue_ = _srs_config->get_threads_max_recv_queue ();
974
+ srs_trace (" AsyncRecv: Set max_queue=%d" , max_recv_queue_);
975
+
976
+ if ((err = trd_->start ()) != srs_success) {
977
+ return srs_error_wrap (err, " start" );
978
+ }
951
979
952
- for (int i = 0 ; i < (int )flying.size (); i++) {
953
- SrsUdpMuxSocket* pkt = flying.at (i);
980
+ return err;
981
+ }
982
+
983
+ srs_error_t SrsAsyncRecvManager::cycle ()
984
+ {
985
+ srs_error_t err = srs_success;
954
986
955
- if (handler_ && (err = handler_->on_udp_packet (pkt)) != srs_success) {
956
- srs_error_reset (err); // Ignore any error.
987
+ // How many messages to run a yield.
988
+ uint32_t nn_msgs_for_yield = 0 ;
989
+
990
+ while (true ) {
991
+ if ((err = trd_->pull ()) != srs_success) {
992
+ return srs_error_wrap (err, " pull" );
993
+ }
994
+
995
+ vector<SrsUdpMuxSocket*> flying;
996
+ packets_->swap (flying);
997
+
998
+ if (flying.empty ()) {
999
+ srs_usleep (20 * SRS_UTIME_MILLISECONDS);
1000
+ continue ;
957
1001
}
958
1002
959
- srs_freep (pkt);
1003
+ for (int i = 0 ; i < (int )flying.size (); i++) {
1004
+ SrsUdpMuxSocket* pkt = flying.at (i);
1005
+
1006
+ if (handler_ && (err = handler_->on_udp_packet (pkt)) != srs_success) {
1007
+ srs_error_reset (err); // Ignore any error.
1008
+ }
1009
+
1010
+ srs_freep (pkt);
1011
+
1012
+ // Yield to another coroutines.
1013
+ // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
1014
+ if (++nn_msgs_for_yield > 10 ) {
1015
+ nn_msgs_for_yield = 0 ;
1016
+ srs_thread_yield ();
1017
+ }
1018
+ }
960
1019
}
961
1020
962
1021
return err;
0 commit comments