Skip to content

Commit d282ccd

Browse files
committed
Threads-RECV: Support tunnel for recv-srtp.
1 parent eb7ce7f commit d282ccd

11 files changed

+234
-10
lines changed

trunk/conf/full.conf

+4
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ threads {
130130
# Whether enable the ASYNC SEND, send udp packets in dedicate threads.
131131
# Default: off
132132
async_send off;
133+
# Whether enable the tunnel, to consume packets between srtp/recv/send threads,
134+
# without proxy by hybrid(except the few head packets).
135+
# Default: off
136+
async_tunnel off;
133137
# CPU set for affinity, for example:
134138
# 0 means CPU0
135139
# 0-3 means CPU0, CPU1, CPU2

trunk/src/app/srs_app_config.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -4178,6 +4178,23 @@ bool SrsConfig::get_threads_async_send()
41784178
return SRS_CONF_PERFER_FALSE(conf->arg0());
41794179
}
41804180

4181+
bool SrsConfig::get_threads_async_tunnel()
4182+
{
4183+
static bool DEFAULT = false;
4184+
4185+
SrsConfDirective* conf = root->get("threads");
4186+
if (!conf) {
4187+
return DEFAULT;
4188+
}
4189+
4190+
conf = conf->get("async_tunnel");
4191+
if (!conf) {
4192+
return DEFAULT;
4193+
}
4194+
4195+
return SRS_CONF_PERFER_FALSE(conf->arg0());
4196+
}
4197+
41814198
bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end)
41824199
{
41834200
static int DEFAULT_START = 0;

trunk/src/app/srs_app_config.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ class SrsConfig
481481
virtual bool get_threads_async_srtp();
482482
virtual bool get_threads_async_recv();
483483
virtual bool get_threads_async_send();
484+
virtual bool get_threads_async_tunnel();
484485
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
485486
virtual int get_threads_max_recv_queue();
486487
virtual int get_high_threshold();

trunk/src/app/srs_app_rtc_conn.cpp

+34
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ ISrsRtcTransport::~ISrsRtcTransport()
9090
{
9191
}
9292

93+
void ISrsRtcTransport::dig_tunnel(SrsUdpMuxSocket* skt)
94+
{
95+
}
96+
9397
SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s)
9498
{
9599
session_ = s;
@@ -201,21 +205,37 @@ srs_error_t SrsSecurityTransport::srtp_initialize()
201205

202206
srs_error_t SrsSecurityTransport::on_rtp_plaintext(char* plaintext, int size)
203207
{
208+
// We should keep alive here, because when tunnel is enabled, the connection die
209+
// for the SrsRtcServer::on_udp_packet might be skipped.
210+
session_->alive();
211+
204212
return session_->on_rtp_plaintext(plaintext, size);
205213
}
206214

207215
srs_error_t SrsSecurityTransport::on_rtcp_plaintext(char* plaintext, int size)
208216
{
217+
// We should keep alive here, because when tunnel is enabled, the connection die
218+
// for the SrsRtcServer::on_udp_packet might be skipped.
219+
session_->alive();
220+
209221
return session_->on_rtcp_plaintext(plaintext, size);
210222
}
211223

212224
srs_error_t SrsSecurityTransport::on_rtp_cipher(char* cipher, int size)
213225
{
226+
// We should keep alive here, because when tunnel is enabled, the connection die
227+
// for the SrsRtcServer::on_udp_packet might be skipped.
228+
session_->alive();
229+
214230
return session_->on_rtp_cipher(cipher, size);
215231
}
216232

217233
srs_error_t SrsSecurityTransport::on_rtcp_cipher(char* cipher, int size)
218234
{
235+
// We should keep alive here, because when tunnel is enabled, the connection die
236+
// for the SrsRtcServer::on_udp_packet might be skipped.
237+
session_->alive();
238+
219239
return session_->on_rtcp_cipher(cipher, size);
220240
}
221241

@@ -239,6 +259,13 @@ srs_error_t SrsSecurityTransport::unprotect_rtcp(void* packet, int* nb_plaintext
239259
return srtp_->unprotect_rtcp(packet, nb_plaintext);
240260
}
241261

262+
void SrsSecurityTransport::dig_tunnel(SrsUdpMuxSocket* skt)
263+
{
264+
if (srtp_) {
265+
srtp_->dig_tunnel(skt);
266+
}
267+
}
268+
242269
SrsSemiSecurityTransport::SrsSemiSecurityTransport(SrsRtcConnection* s) : SrsSecurityTransport(s)
243270
{
244271
}
@@ -1832,6 +1859,13 @@ vector<SrsUdpMuxSocket*> SrsRtcConnection::peer_addresses()
18321859
return addresses;
18331860
}
18341861

1862+
void SrsRtcConnection::dig_tunnel(SrsUdpMuxSocket* skt)
1863+
{
1864+
if (transport_) {
1865+
transport_->dig_tunnel(skt);
1866+
}
1867+
}
1868+
18351869
const SrsContextId& SrsRtcConnection::get_id()
18361870
{
18371871
return cid_;

trunk/src/app/srs_app_rtc_conn.hpp

+8
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ class ISrsRtcTransport : public ISrsDtlsCallback
107107
// The nb_plaintext should be initialized to the size of cipher.
108108
virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext) = 0;
109109
virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext) = 0;
110+
public:
111+
// Try to dig the recv tunnel, for srtp thread to consume packets from recv
112+
// threads directly without proxying by hybrid thread.
113+
virtual void dig_tunnel(SrsUdpMuxSocket* skt);
110114
};
111115

112116
// The security transport, use DTLS/SRTP to protect the data.
@@ -147,6 +151,8 @@ class SrsSecurityTransport : public ISrsRtcTransport
147151
srs_error_t on_rtcp_plaintext(char* plaintext, int size);
148152
srs_error_t on_rtp_cipher(char* cipher, int size);
149153
srs_error_t on_rtcp_cipher(char* cipher, int size);
154+
public:
155+
void dig_tunnel(SrsUdpMuxSocket* skt);
150156
};
151157

152158
// Semi security transport, setup DTLS and SRTP, with SRTP decrypt, without SRTP encrypt.
@@ -473,6 +479,8 @@ class SrsRtcConnection : public ISrsResource
473479
std::string username();
474480
// Get all addresses client used.
475481
std::vector<SrsUdpMuxSocket*> peer_addresses();
482+
public:
483+
void dig_tunnel(SrsUdpMuxSocket* skt);
476484
// Interface ISrsResource.
477485
public:
478486
virtual const SrsContextId& get_id();

trunk/src/app/srs_app_rtc_dtls.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -1181,3 +1181,7 @@ srs_error_t SrsSRTP::unprotect_rtcp(void* packet, int* nb_plaintext)
11811181
return err;
11821182
}
11831183

1184+
void SrsSRTP::dig_tunnel(SrsUdpMuxSocket* skt)
1185+
{
1186+
}
1187+

trunk/src/app/srs_app_rtc_dtls.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <srs_app_st.hpp>
3636

3737
class SrsRequest;
38+
class SrsUdpMuxSocket;
3839

3940
class SrsDtlsCertificate
4041
{
@@ -245,6 +246,8 @@ class SrsSRTP
245246
virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher);
246247
virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
247248
virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
249+
public:
250+
virtual void dig_tunnel(SrsUdpMuxSocket* skt);
248251
};
249252

250253
#endif

trunk/src/app/srs_app_rtc_server.cpp

+20-3
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ SrsRtcServer::SrsRtcServer()
254254
handler = NULL;
255255
hijacker = NULL;
256256

257+
async_tunnel_ = false;
258+
257259
_srs_config->subscribe(this);
258260
}
259261

@@ -295,11 +297,14 @@ srs_error_t SrsRtcServer::initialize()
295297
_srs_rtp_msg_cache_buffers->setup(rtp_msg_cache_enabled, rtp_msg_cache_buffer_size);
296298
_srs_rtp_msg_cache_objs->setup(rtp_msg_cache_enabled, rtp_msg_cache_msg_size);
297299

298-
srs_trace("RTC: Object cache init, rtp-cache=(enabled:%d,pkt:%dm-%dw,payload:%dm-%dw-%dw), msg-cache=(enabled:%d,obj:%dm-%dw,buf:%dm-%dw)",
300+
async_tunnel_ = _srs_config->get_threads_async_tunnel();
301+
302+
srs_trace("RTC: Object cache init, rtp-cache=(enabled:%d,pkt:%dm-%dw,payload:%dm-%dw-%dw), msg-cache=(enabled:%d,obj:%dm-%dw,buf:%dm-%dw), tunnel=%d",
299303
rtp_cache_enabled, (int)(rtp_cache_pkt_size/1024/1024), _srs_rtp_cache->capacity()/10000,
300304
(int)(rtp_cache_payload_size/1024/1024), _srs_rtp_raw_cache->capacity()/10000, _srs_rtp_fua_cache->capacity()/10000,
301305
rtp_msg_cache_enabled, (int)(rtp_msg_cache_msg_size/1024/1024), _srs_rtp_msg_cache_objs->capacity()/10000,
302-
(int)(rtp_msg_cache_buffer_size/1024/1024), _srs_rtp_msg_cache_buffers->capacity()/10000);
306+
(int)(rtp_msg_cache_buffer_size/1024/1024), _srs_rtp_msg_cache_buffers->capacity()/10000,
307+
async_tunnel_);
303308

304309
return err;
305310
}
@@ -446,6 +451,11 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
446451
ping.get_username().c_str(), peer_id.c_str(), fast_id);
447452
}
448453

454+
// Try to dig tunnel for ping-pong(the address might change).
455+
if (async_tunnel_) {
456+
session->dig_tunnel(skt);
457+
}
458+
449459
return session->on_stun(skt, &ping);
450460
}
451461

@@ -475,7 +485,14 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
475485
if (srs_is_dtls((uint8_t*)data, size)) {
476486
++_srs_pps_rstuns->sugar;
477487

478-
return session->on_dtls(data, size);
488+
err = session->on_dtls(data, size);
489+
490+
// Try to dig tunnel for DTLS packets(when DTLS done).
491+
if (async_tunnel_) {
492+
session->dig_tunnel(skt);
493+
}
494+
495+
return err;
479496
}
480497
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
481498
}

trunk/src/app/srs_app_rtc_server.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs
9191
std::vector<SrsUdpMuxListener*> listeners;
9292
ISrsRtcServerHandler* handler;
9393
ISrsRtcServerHijacker* hijacker;
94+
private:
95+
bool async_tunnel_;
9496
public:
9597
SrsRtcServer();
9698
virtual ~SrsRtcServer();

0 commit comments

Comments
 (0)