@@ -170,6 +170,10 @@ srs_error_t SrsThreadPool::initialize()
170
170
r1 = pthread_getaffinity_np (pthread_self (), sizeof (entry->cpuset2 ), &entry->cpuset2 );
171
171
#endif
172
172
173
+ if ((err = _srs_async_recv->initialize ()) != srs_success) {
174
+ return srs_error_wrap (err, " init async recv" );
175
+ }
176
+
173
177
interval_ = _srs_config->get_threads_interval ();
174
178
bool async_srtp = _srs_config->get_threads_async_srtp ();
175
179
srs_trace (" Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 " /%d-0x%" PRIx64,
@@ -254,6 +258,12 @@ srs_error_t SrsThreadPool::run()
254
258
static char buf[128 ];
255
259
string async_logs = _srs_async_log->description ();
256
260
261
+ string queue_desc;
262
+ if (true ) {
263
+ snprintf (buf, sizeof (buf), " , queue=%d,%d,%d" , _srs_async_recv->size (), _srs_async_srtp->size (), _srs_async_srtp->cooked_size ());
264
+ queue_desc = buf;
265
+ }
266
+
257
267
string sync_desc;
258
268
_srs_thread_sync_10us->update (); _srs_thread_sync_100us->update ();
259
269
_srs_thread_sync_1000us->update (); _srs_thread_sync_plus->update ();
@@ -262,8 +272,8 @@ srs_error_t SrsThreadPool::run()
262
272
sync_desc = buf;
263
273
}
264
274
265
- srs_trace (" Thread: %s cycle threads=%d%s%s" , entry_->name .c_str (), (int )threads_.size (),
266
- async_logs.c_str (), sync_desc.c_str ());
275
+ srs_trace (" Thread: %s cycle threads=%d%s%s%s " , entry_->name .c_str (), (int )threads_.size (),
276
+ async_logs.c_str (), sync_desc.c_str (), queue_desc. c_str () );
267
277
}
268
278
269
279
return err;
@@ -722,6 +732,15 @@ void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt)
722
732
packets_->push_back (pkt);
723
733
}
724
734
735
+ int SrsAsyncSRTPManager::size ()
736
+ {
737
+ return packets_->size ();
738
+ }
739
+ int SrsAsyncSRTPManager::cooked_size ()
740
+ {
741
+ return cooked_packets_->size ();
742
+ }
743
+
725
744
srs_error_t SrsAsyncSRTPManager::start (void * arg)
726
745
{
727
746
SrsAsyncSRTPManager* srtp = (SrsAsyncSRTPManager*)arg;
@@ -807,6 +826,7 @@ SrsAsyncRecvManager::SrsAsyncRecvManager()
807
826
lock_ = new SrsThreadMutex ();
808
827
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
809
828
handler_ = NULL ;
829
+ max_recv_queue_ = 0 ;
810
830
}
811
831
812
832
// TODO: FIXME: We should stop the thread first, then free the manager.
@@ -822,6 +842,16 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
822
842
}
823
843
}
824
844
845
+ srs_error_t SrsAsyncRecvManager::initialize ()
846
+ {
847
+ srs_error_t err = srs_success;
848
+
849
+ max_recv_queue_ = _srs_config->get_threads_max_recv_queue ();
850
+ srs_trace (" AsyncRecv: Set max_queue=%d" , max_recv_queue_);
851
+
852
+ return err;
853
+ }
854
+
825
855
void SrsAsyncRecvManager::set_handler (ISrsUdpMuxHandler* v)
826
856
{
827
857
handler_ = v;
@@ -833,6 +863,11 @@ void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)
833
863
listeners_.push_back (listener);
834
864
}
835
865
866
+ int SrsAsyncRecvManager::size ()
867
+ {
868
+ return packets_->size ();
869
+ }
870
+
836
871
srs_error_t SrsAsyncRecvManager::start (void * arg)
837
872
{
838
873
SrsAsyncRecvManager* recv = (SrsAsyncRecvManager*)arg;
@@ -859,6 +894,13 @@ srs_error_t SrsAsyncRecvManager::do_start()
859
894
860
895
// TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok.
861
896
int nread = listener->skt_ ->raw_recvfrom ();
897
+
898
+ // Drop packet if exceed max recv queue size.
899
+ if ((int )packets_->size () >= max_recv_queue_) {
900
+ continue ;
901
+ }
902
+
903
+ // If got packet, copy to the queue.
862
904
if (nread > 0 ) {
863
905
got_packet = true ;
864
906
packets_->push_back (listener->skt_ ->copy ());
0 commit comments