Skip to content

Commit 6fca411

Browse files
committed
Threads-RECV: Drop received packet if exceed max queue size.
1. Print the number of recv/srtp queue packets. 2. Drop packet if exceed max recv queue size.
1 parent 9e554ca commit 6fca411

File tree

5 files changed

+70
-2
lines changed

5 files changed

+70
-2
lines changed

trunk/conf/full.conf

+3
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ threads {
127127
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
128128
# Default: off
129129
async_recv off;
130+
# If exceed the max size of recv queue, drop the received packet.
131+
# Default: 5000
132+
max_recv_queue 5000;
130133
# CPU set for affinity, for example:
131134
# 0 means CPU0
132135
# 0-3 means CPU0, CPU1, CPU2

trunk/src/app/srs_app_config.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -4202,6 +4202,23 @@ bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end
42024202
return true;
42034203
}
42044204

4205+
int SrsConfig::get_threads_max_recv_queue()
4206+
{
4207+
static int DEFAULT = 5000;
4208+
4209+
SrsConfDirective* conf = root->get("threads");
4210+
if (!conf) {
4211+
return DEFAULT;
4212+
}
4213+
4214+
conf = conf->get("max_recv_queue");
4215+
if (!conf) {
4216+
return DEFAULT;
4217+
}
4218+
4219+
return ::atoi(conf->arg0().c_str());
4220+
}
4221+
42054222
vector<SrsConfDirective*> SrsConfig::get_stream_casters()
42064223
{
42074224
srs_assert(root);

trunk/src/app/srs_app_config.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ class SrsConfig
481481
virtual bool get_threads_async_srtp();
482482
virtual bool get_threads_async_recv();
483483
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
484+
virtual int get_threads_max_recv_queue();
484485
// stream_caster section
485486
public:
486487
// Get all stream_caster in config file.

trunk/src/app/srs_app_threads.cpp

+44-2
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ srs_error_t SrsThreadPool::initialize()
170170
r1 = pthread_getaffinity_np(pthread_self(), sizeof(entry->cpuset2), &entry->cpuset2);
171171
#endif
172172

173+
if ((err = _srs_async_recv->initialize()) != srs_success) {
174+
return srs_error_wrap(err, "init async recv");
175+
}
176+
173177
interval_ = _srs_config->get_threads_interval();
174178
bool async_srtp = _srs_config->get_threads_async_srtp();
175179
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64,
@@ -254,6 +258,12 @@ srs_error_t SrsThreadPool::run()
254258
static char buf[128];
255259
string async_logs = _srs_async_log->description();
256260

261+
string queue_desc;
262+
if (true) {
263+
snprintf(buf, sizeof(buf), ", queue=%d,%d,%d", _srs_async_recv->size(), _srs_async_srtp->size(), _srs_async_srtp->cooked_size());
264+
queue_desc = buf;
265+
}
266+
257267
string sync_desc;
258268
_srs_thread_sync_10us->update(); _srs_thread_sync_100us->update();
259269
_srs_thread_sync_1000us->update(); _srs_thread_sync_plus->update();
@@ -262,8 +272,8 @@ srs_error_t SrsThreadPool::run()
262272
sync_desc = buf;
263273
}
264274

265-
srs_trace("Thread: %s cycle threads=%d%s%s", entry_->name.c_str(), (int)threads_.size(),
266-
async_logs.c_str(), sync_desc.c_str());
275+
srs_trace("Thread: %s cycle threads=%d%s%s%s", entry_->name.c_str(), (int)threads_.size(),
276+
async_logs.c_str(), sync_desc.c_str(), queue_desc.c_str());
267277
}
268278

269279
return err;
@@ -722,6 +732,15 @@ void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt)
722732
packets_->push_back(pkt);
723733
}
724734

735+
int SrsAsyncSRTPManager::size()
736+
{
737+
return packets_->size();
738+
}
739+
int SrsAsyncSRTPManager::cooked_size()
740+
{
741+
return cooked_packets_->size();
742+
}
743+
725744
srs_error_t SrsAsyncSRTPManager::start(void* arg)
726745
{
727746
SrsAsyncSRTPManager* srtp = (SrsAsyncSRTPManager*)arg;
@@ -807,6 +826,7 @@ SrsAsyncRecvManager::SrsAsyncRecvManager()
807826
lock_ = new SrsThreadMutex();
808827
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
809828
handler_ = NULL;
829+
max_recv_queue_ = 0;
810830
}
811831

812832
// TODO: FIXME: We should stop the thread first, then free the manager.
@@ -822,6 +842,16 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
822842
}
823843
}
824844

845+
srs_error_t SrsAsyncRecvManager::initialize()
846+
{
847+
srs_error_t err = srs_success;
848+
849+
max_recv_queue_ = _srs_config->get_threads_max_recv_queue();
850+
srs_trace("AsyncRecv: Set max_queue=%d", max_recv_queue_);
851+
852+
return err;
853+
}
854+
825855
void SrsAsyncRecvManager::set_handler(ISrsUdpMuxHandler* v)
826856
{
827857
handler_ = v;
@@ -833,6 +863,11 @@ void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)
833863
listeners_.push_back(listener);
834864
}
835865

866+
int SrsAsyncRecvManager::size()
867+
{
868+
return packets_->size();
869+
}
870+
836871
srs_error_t SrsAsyncRecvManager::start(void* arg)
837872
{
838873
SrsAsyncRecvManager* recv = (SrsAsyncRecvManager*)arg;
@@ -859,6 +894,13 @@ srs_error_t SrsAsyncRecvManager::do_start()
859894

860895
// TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok.
861896
int nread = listener->skt_->raw_recvfrom();
897+
898+
// Drop packet if exceed max recv queue size.
899+
if ((int)packets_->size() >= max_recv_queue_) {
900+
continue;
901+
}
902+
903+
// If got packet, copy to the queue.
862904
if (nread > 0) {
863905
got_packet = true;
864906
packets_->push_back(listener->skt_->copy());

trunk/src/app/srs_app_threads.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ class SrsAsyncSRTPManager
336336
void register_task(SrsAsyncSRTPTask* task);
337337
void remove_task(SrsAsyncSRTPTask* task);
338338
void add_packet(SrsAsyncSRTPPacket* pkt);
339+
int size();
340+
int cooked_size();
339341
static srs_error_t start(void* arg);
340342
private:
341343
srs_error_t do_start();
@@ -364,6 +366,7 @@ class SrsAsyncRecvManager
364366
private:
365367
ISrsUdpMuxHandler* handler_;
366368
private:
369+
int max_recv_queue_;
367370
SrsThreadQueue<SrsUdpMuxSocket>* packets_;
368371
private:
369372
std::vector<SrsThreadUdpListener*> listeners_;
@@ -372,8 +375,10 @@ class SrsAsyncRecvManager
372375
SrsAsyncRecvManager();
373376
virtual ~SrsAsyncRecvManager();
374377
public:
378+
srs_error_t initialize();
375379
void set_handler(ISrsUdpMuxHandler* v);
376380
void add_listener(SrsThreadUdpListener* listener);
381+
int size();
377382
static srs_error_t start(void* arg);
378383
private:
379384
srs_error_t do_start();

0 commit comments

Comments
 (0)