Skip to content

Commit 28d8f1a

Browse files
committed
Threads: Support multiple SRTP/SEND/RECV threads.
1. Move received and cooked packets queue to thread entry, that is, each thread has its own queue. 2. In RECV thread, push received packet to queue of source thread, in thread listener. 3. In SRTP thread, push cooked packet to queue of source thread, in asyn SRTP task. 4. In hybrid thread, directly and only consume the packets of self thread. 5. Sync between SRTP task by lock.
1 parent a505b6b commit 28d8f1a

9 files changed

+219
-142
lines changed

trunk/conf/full.conf

+12-9
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,18 @@ threads {
121121
# The thread pool manager cycle interval, in seconds.
122122
# Default: 5
123123
interval 5;
124-
# Whether enable the ASYNC SRTP, codec in dedicate threads.
125-
# Default: off
126-
async_srtp off;
127-
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
128-
# Default: off
129-
async_recv off;
130-
# Whether enable the ASYNC SEND, send udp packets in dedicate threads.
131-
# Default: off
132-
async_send off;
124+
# The number of threads for SRTP, codec in dedicate threads.
125+
# Note that 0 to disable it. Max to 64 threads.
126+
# Default: 1
127+
async_srtp 1;
128+
# The number of threads for RECV, recv udp packets in dedicate threads.
129+
# Note that 0 to disable it. Max to 64 threads.
130+
# Default: 1
131+
async_recv 1;
132+
# The number of threads for SEND, send udp packets in dedicate threads.
133+
# Note that 0 to disable it. Max to 64 threads.
134+
# Default: 1
135+
async_send 1;
133136
# Whether enable the tunnel, to consume packets between srtp/recv/send threads,
134137
# without proxy by hybrid(except the few head packets).
135138
# Default: off

trunk/src/app/srs_app_config.cpp

+21-9
Original file line numberDiff line numberDiff line change
@@ -4127,9 +4127,9 @@ srs_utime_t SrsConfig::get_threads_interval()
41274127
return v * SRS_UTIME_SECONDS;
41284128
}
41294129

4130-
bool SrsConfig::get_threads_async_srtp()
4130+
int SrsConfig::get_threads_async_srtp()
41314131
{
4132-
static bool DEFAULT = false;
4132+
static int DEFAULT = 1;
41334133

41344134
SrsConfDirective* conf = root->get("threads");
41354135
if (!conf) {
@@ -4141,12 +4141,16 @@ bool SrsConfig::get_threads_async_srtp()
41414141
return DEFAULT;
41424142
}
41434143

4144-
return SRS_CONF_PERFER_FALSE(conf->arg0());
4144+
int v = ::atoi(conf->arg0().c_str());
4145+
if (v < 0 || v > 64) {
4146+
return DEFAULT;
4147+
}
4148+
return v;
41454149
}
41464150

4147-
bool SrsConfig::get_threads_async_recv()
4151+
int SrsConfig::get_threads_async_recv()
41484152
{
4149-
static bool DEFAULT = false;
4153+
static int DEFAULT = 1;
41504154

41514155
SrsConfDirective* conf = root->get("threads");
41524156
if (!conf) {
@@ -4158,12 +4162,16 @@ bool SrsConfig::get_threads_async_recv()
41584162
return DEFAULT;
41594163
}
41604164

4161-
return SRS_CONF_PERFER_FALSE(conf->arg0());
4165+
int v = ::atoi(conf->arg0().c_str());
4166+
if (v < 0 || v > 64) {
4167+
return DEFAULT;
4168+
}
4169+
return v;
41624170
}
41634171

4164-
bool SrsConfig::get_threads_async_send()
4172+
int SrsConfig::get_threads_async_send()
41654173
{
4166-
static bool DEFAULT = false;
4174+
static int DEFAULT = 1;
41674175

41684176
SrsConfDirective* conf = root->get("threads");
41694177
if (!conf) {
@@ -4175,7 +4183,11 @@ bool SrsConfig::get_threads_async_send()
41754183
return DEFAULT;
41764184
}
41774185

4178-
return SRS_CONF_PERFER_FALSE(conf->arg0());
4186+
int v = ::atoi(conf->arg0().c_str());
4187+
if (v < 0 || v > 64) {
4188+
return DEFAULT;
4189+
}
4190+
return v;
41794191
}
41804192

41814193
bool SrsConfig::get_threads_async_tunnel()

trunk/src/app/srs_app_config.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -478,9 +478,9 @@ class SrsConfig
478478
// Thread pool section.
479479
public:
480480
virtual srs_utime_t get_threads_interval();
481-
virtual bool get_threads_async_srtp();
482-
virtual bool get_threads_async_recv();
483-
virtual bool get_threads_async_send();
481+
virtual int get_threads_async_srtp();
482+
virtual int get_threads_async_recv();
483+
virtual int get_threads_async_send();
484484
virtual bool get_threads_async_tunnel();
485485
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
486486
virtual int get_threads_max_recv_queue();

trunk/src/app/srs_app_hybrid.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -215,17 +215,22 @@ srs_error_t SrsHybridServer::run()
215215
}
216216
}
217217

218+
// Get the entry of self thread.
219+
SrsThreadEntry* entry = _srs_thread_pool->self();
220+
218221
// Consume the async UDP/SRTP packets.
219222
while (true) {
220223
int consumed = 0;
221224

222225
// Consume the received UDP packets.
223-
if ((err = _srs_async_recv->consume(&consumed)) != srs_success) {
226+
// Note that this might run in multiple threads, but it's ok.
227+
if ((err = _srs_async_recv->consume(entry, &consumed)) != srs_success) {
224228
srs_error_reset(err); // Ignore any error.
225229
}
226230

227231
// Consume the cooked SRTP packets.
228-
if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) {
232+
// Note that this might run in multiple threads, but it's ok.
233+
if ((err = _srs_async_srtp->consume(entry, &consumed)) != srs_success) {
229234
srs_error_reset(err); // Ignore any error.
230235
}
231236

trunk/src/app/srs_app_listener.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ srs_error_t SrsUdpMuxListener::cycle()
669669
set_socket_buffer();
670670

671671
// Sleep infinite if use async_recv.
672-
if (_srs_config->get_threads_async_recv()) {
672+
if (_srs_config->get_threads_async_recv() > 0) {
673673
SrsThreadUdpListener* listener = new SrsThreadUdpListener(lfd, handler);
674674
_srs_async_recv->add_listener(listener);
675675

trunk/src/app/srs_app_rtc_conn.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s)
100100

101101
dtls_ = new SrsDtls((ISrsDtlsCallback*)this);
102102

103-
bool async_srtp = _srs_config->get_threads_async_srtp();
104-
if (!async_srtp) {
103+
int async_srtp = _srs_config->get_threads_async_srtp();
104+
if (async_srtp <= 0) {
105105
srtp_ = new SrsSRTP();
106106
} else {
107107
srtp_ = new SrsAsyncSRTP(this);

0 commit comments

Comments
 (0)