@@ -61,6 +61,8 @@ SrsPps* _srs_thread_sync_plus = new SrsPps();
61
61
62
62
SrsPps* _srs_tunnel_recv_raw = new SrsPps();
63
63
SrsPps* _srs_tunnel_recv_hit = new SrsPps();
64
+ SrsPps* _srs_tunnel_send_raw = new SrsPps();
65
+ SrsPps* _srs_tunnel_send_hit = new SrsPps();
64
66
65
67
extern bool srs_is_rtp_or_rtcp (const uint8_t * data, size_t len);
66
68
extern bool srs_is_rtcp (const uint8_t * data, size_t len);
@@ -251,12 +253,13 @@ srs_error_t SrsThreadPool::initialize()
251
253
252
254
bool async_tunnel = _srs_config->get_threads_async_tunnel ();
253
255
_srs_async_recv->set_tunnel_enabled (async_tunnel);
256
+ _srs_async_srtp->set_tunnel_enabled (async_tunnel);
254
257
255
258
srs_trace (" Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 " /%d-0x%" PRIx64 " , water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d, tunnel=%d" ,
256
259
entry->num , entry->label .c_str (), entry->name .c_str (), srsu2msi (interval_), async_srtp,
257
260
entry->cpuset_ok , r0, srs_covert_cpuset (entry->cpuset ), r1, srs_covert_cpuset (entry->cpuset2 ),
258
- high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue, async_send,
259
- async_tunnel);
261
+ high_pulse_, high_threshold_, critical_pulse_, critical_threshold_,
262
+ recv_queue, async_send, async_tunnel);
260
263
261
264
return err;
262
265
}
@@ -339,7 +342,6 @@ srs_error_t SrsThreadPool::run()
339
342
340
343
// For Circuit-Breaker to update the SNMP, ASAP.
341
344
srs_update_udp_snmp_statistic ();
342
- _srs_pps_aloss->update ();
343
345
344
346
// Update thread CPUs per 1s.
345
347
for (int i = 0 ; i < (int )threads.size (); i++) {
@@ -377,7 +379,7 @@ srs_error_t SrsThreadPool::run()
377
379
378
380
string queue_desc;
379
381
if (true ) {
380
- snprintf (buf, sizeof (buf), " , queue=%d,%d,%d" , _srs_async_recv->size (), _srs_async_srtp->size (), _srs_async_srtp->cooked_size ());
382
+ snprintf (buf, sizeof (buf), " , queue=%d,%d,%d,%d " , _srs_async_recv->size (), _srs_async_srtp->size (), _srs_async_srtp->cooked_size (), _srs_async_send-> size ());
381
383
queue_desc = buf;
382
384
}
383
385
@@ -391,8 +393,9 @@ srs_error_t SrsThreadPool::run()
391
393
392
394
string tunnel_desc;
393
395
_srs_tunnel_recv_raw->update (); _srs_tunnel_recv_hit->update ();
394
- if (_srs_tunnel_recv_raw->r10s () || _srs_tunnel_recv_hit->r10s ()) {
395
- snprintf (buf, sizeof (buf), " , tunnel=%d,%d" , _srs_tunnel_recv_raw->r10s (), _srs_tunnel_recv_hit->r10s ());
396
+ _srs_tunnel_send_raw->update (); _srs_tunnel_send_hit->update ();
397
+ if (_srs_tunnel_recv_raw->r10s () || _srs_tunnel_recv_hit->r10s () || _srs_tunnel_send_raw->r10s () || _srs_tunnel_send_hit->r10s ()) {
398
+ snprintf (buf, sizeof (buf), " , tunnel=(r:%d/%d,s:%d/%d)" , _srs_tunnel_recv_raw->r10s (), _srs_tunnel_recv_hit->r10s (), _srs_tunnel_send_raw->r10s (), _srs_tunnel_send_hit->r10s ());
396
399
tunnel_desc = buf;
397
400
}
398
401
@@ -906,18 +909,23 @@ void SrsAsyncSRTP::dig_tunnel(SrsUdpMuxSocket* skt)
906
909
}
907
910
908
911
_srs_async_recv->tunnels ()->dig_tunnel (fast_id, task_);
912
+ task_->dig_tunnel (skt);
909
913
}
910
914
911
915
SrsAsyncSRTPTask::SrsAsyncSRTPTask (SrsAsyncSRTP* codec)
912
916
{
913
917
codec_ = codec;
914
918
impl_ = new SrsSRTP ();
915
919
disposing_ = false ;
920
+ sendonly_skt_ = NULL ;
921
+ lock_ = new SrsThreadMutex ();
916
922
}
917
923
918
924
SrsAsyncSRTPTask::~SrsAsyncSRTPTask ()
919
925
{
920
926
srs_freep (impl_);
927
+ srs_freep (sendonly_skt_);
928
+ srs_freep (lock_);
921
929
}
922
930
923
931
srs_error_t SrsAsyncSRTPTask::initialize (std::string recv_key, std::string send_key)
@@ -999,6 +1007,42 @@ srs_error_t SrsAsyncSRTPTask::consume(SrsAsyncSRTPPacket* pkt)
999
1007
return err;
1000
1008
}
1001
1009
1010
+ void SrsAsyncSRTPTask::dig_tunnel (SrsUdpMuxSocket* skt)
1011
+ {
1012
+ if (!skt) {
1013
+ return ;
1014
+ }
1015
+
1016
+ SrsThreadLocker (lock_);
1017
+ srs_freep (sendonly_skt_);
1018
+ sendonly_skt_ = skt->copy_sendonly ();
1019
+ }
1020
+
1021
+ bool SrsAsyncSRTPTask::consume_by_tunnel (SrsAsyncSRTPPacket* src)
1022
+ {
1023
+ // If decrypt, we should consume by hybrid.
1024
+ if (src->do_decrypt_ ) {
1025
+ return false ;
1026
+ }
1027
+
1028
+ // No tunnel established, ignore.
1029
+ if (!sendonly_skt_) {
1030
+ return false ;
1031
+ }
1032
+
1033
+ SrsAsyncUdpPacket* pkt = new SrsAsyncUdpPacket ();
1034
+ if (true ) {
1035
+ SrsThreadLocker (lock_);
1036
+ pkt->from (sendonly_skt_, (char *)src->msg_ ->payload , src->nb_consumed_ );
1037
+ }
1038
+
1039
+ _srs_async_send->add_packet (pkt);
1040
+ srs_freep (src);
1041
+
1042
+ ++_srs_tunnel_send_hit->sugar ;
1043
+ return true ;
1044
+ }
1045
+
1002
1046
SrsAsyncSRTPPacket::SrsAsyncSRTPPacket (SrsAsyncSRTPTask* task)
1003
1047
{
1004
1048
srs_assert (task);
@@ -1020,6 +1064,7 @@ SrsAsyncSRTPManager::SrsAsyncSRTPManager()
1020
1064
lock_ = new SrsThreadMutex ();
1021
1065
srtp_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
1022
1066
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
1067
+ tunnel_enabled_ = false ;
1023
1068
}
1024
1069
1025
1070
// TODO: FIXME: We should stop the thread first, then free the manager.
@@ -1101,7 +1146,14 @@ srs_error_t SrsAsyncSRTPManager::do_start()
1101
1146
srs_error_reset (err); // Ignore any error.
1102
1147
}
1103
1148
1149
+ // Try to consume the packet by tunnel.
1150
+ if (tunnel_enabled_ && pkt->task_ ->consume_by_tunnel (pkt)) {
1151
+ continue ;
1152
+ }
1153
+
1104
1154
cooked_packets_->push_back (pkt);
1155
+
1156
+ ++_srs_tunnel_send_raw->sugar ;
1105
1157
}
1106
1158
1107
1159
// If got packets, maybe more packets in queue.
0 commit comments