Skip to content

Commit d6a92cb

Browse files
committed
Threads-SEND: Support tunnel for srtp-send.
1 parent d282ccd commit d6a92cb

File tree

3 files changed

+77
-7
lines changed

3 files changed

+77
-7
lines changed

trunk/src/app/srs_app_rtc_server.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,8 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)
721721

722722
// TODO: FIXME: Should move to Hybrid server stat.
723723
string loss_desc;
724-
if (!snk_desc.empty() || _srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
724+
_srs_pps_aloss->update();
725+
if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
725726
snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s());
726727
loss_desc = buf;
727728
}

trunk/src/app/srs_app_threads.cpp

+58-6
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ SrsPps* _srs_thread_sync_plus = new SrsPps();
6161

6262
SrsPps* _srs_tunnel_recv_raw = new SrsPps();
6363
SrsPps* _srs_tunnel_recv_hit = new SrsPps();
64+
SrsPps* _srs_tunnel_send_raw = new SrsPps();
65+
SrsPps* _srs_tunnel_send_hit = new SrsPps();
6466

6567
extern bool srs_is_rtp_or_rtcp(const uint8_t* data, size_t len);
6668
extern bool srs_is_rtcp(const uint8_t* data, size_t len);
@@ -251,12 +253,13 @@ srs_error_t SrsThreadPool::initialize()
251253

252254
bool async_tunnel = _srs_config->get_threads_async_tunnel();
253255
_srs_async_recv->set_tunnel_enabled(async_tunnel);
256+
_srs_async_srtp->set_tunnel_enabled(async_tunnel);
254257

255258
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d, tunnel=%d",
256259
entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp,
257260
entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2),
258-
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue, async_send,
259-
async_tunnel);
261+
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_,
262+
recv_queue, async_send, async_tunnel);
260263

261264
return err;
262265
}
@@ -339,7 +342,6 @@ srs_error_t SrsThreadPool::run()
339342

340343
// For Circuit-Breaker to update the SNMP, ASAP.
341344
srs_update_udp_snmp_statistic();
342-
_srs_pps_aloss->update();
343345

344346
// Update thread CPUs per 1s.
345347
for (int i = 0; i < (int)threads.size(); i++) {
@@ -377,7 +379,7 @@ srs_error_t SrsThreadPool::run()
377379

378380
string queue_desc;
379381
if (true) {
380-
snprintf(buf, sizeof(buf), ", queue=%d,%d,%d", _srs_async_recv->size(), _srs_async_srtp->size(), _srs_async_srtp->cooked_size());
382+
snprintf(buf, sizeof(buf), ", queue=%d,%d,%d,%d", _srs_async_recv->size(), _srs_async_srtp->size(), _srs_async_srtp->cooked_size(), _srs_async_send->size());
381383
queue_desc = buf;
382384
}
383385

@@ -391,8 +393,9 @@ srs_error_t SrsThreadPool::run()
391393

392394
string tunnel_desc;
393395
_srs_tunnel_recv_raw->update(); _srs_tunnel_recv_hit->update();
394-
if (_srs_tunnel_recv_raw->r10s() || _srs_tunnel_recv_hit->r10s()) {
395-
snprintf(buf, sizeof(buf), ", tunnel=%d,%d", _srs_tunnel_recv_raw->r10s(), _srs_tunnel_recv_hit->r10s());
396+
_srs_tunnel_send_raw->update(); _srs_tunnel_send_hit->update();
397+
if (_srs_tunnel_recv_raw->r10s() || _srs_tunnel_recv_hit->r10s() || _srs_tunnel_send_raw->r10s() || _srs_tunnel_send_hit->r10s()) {
398+
snprintf(buf, sizeof(buf), ", tunnel=(r:%d/%d,s:%d/%d)", _srs_tunnel_recv_raw->r10s(), _srs_tunnel_recv_hit->r10s(), _srs_tunnel_send_raw->r10s(), _srs_tunnel_send_hit->r10s());
396399
tunnel_desc = buf;
397400
}
398401

@@ -906,18 +909,23 @@ void SrsAsyncSRTP::dig_tunnel(SrsUdpMuxSocket* skt)
906909
}
907910

908911
_srs_async_recv->tunnels()->dig_tunnel(fast_id, task_);
912+
task_->dig_tunnel(skt);
909913
}
910914

911915
SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec)
912916
{
913917
codec_ = codec;
914918
impl_ = new SrsSRTP();
915919
disposing_ = false;
920+
sendonly_skt_ = NULL;
921+
lock_ = new SrsThreadMutex();
916922
}
917923

918924
SrsAsyncSRTPTask::~SrsAsyncSRTPTask()
919925
{
920926
srs_freep(impl_);
927+
srs_freep(sendonly_skt_);
928+
srs_freep(lock_);
921929
}
922930

923931
srs_error_t SrsAsyncSRTPTask::initialize(std::string recv_key, std::string send_key)
@@ -999,6 +1007,42 @@ srs_error_t SrsAsyncSRTPTask::consume(SrsAsyncSRTPPacket* pkt)
9991007
return err;
10001008
}
10011009

1010+
void SrsAsyncSRTPTask::dig_tunnel(SrsUdpMuxSocket* skt)
1011+
{
1012+
if (!skt) {
1013+
return;
1014+
}
1015+
1016+
SrsThreadLocker(lock_);
1017+
srs_freep(sendonly_skt_);
1018+
sendonly_skt_ = skt->copy_sendonly();
1019+
}
1020+
1021+
bool SrsAsyncSRTPTask::consume_by_tunnel(SrsAsyncSRTPPacket* src)
1022+
{
1023+
// If decrypt, we should consume by hybrid.
1024+
if (src->do_decrypt_) {
1025+
return false;
1026+
}
1027+
1028+
// No tunnel established, ignore.
1029+
if (!sendonly_skt_) {
1030+
return false;
1031+
}
1032+
1033+
SrsAsyncUdpPacket* pkt = new SrsAsyncUdpPacket();
1034+
if (true) {
1035+
SrsThreadLocker(lock_);
1036+
pkt->from(sendonly_skt_, (char*)src->msg_->payload, src->nb_consumed_);
1037+
}
1038+
1039+
_srs_async_send->add_packet(pkt);
1040+
srs_freep(src);
1041+
1042+
++_srs_tunnel_send_hit->sugar;
1043+
return true;
1044+
}
1045+
10021046
SrsAsyncSRTPPacket::SrsAsyncSRTPPacket(SrsAsyncSRTPTask* task)
10031047
{
10041048
srs_assert(task);
@@ -1020,6 +1064,7 @@ SrsAsyncSRTPManager::SrsAsyncSRTPManager()
10201064
lock_ = new SrsThreadMutex();
10211065
srtp_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
10221066
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
1067+
tunnel_enabled_ = false;
10231068
}
10241069

10251070
// TODO: FIXME: We should stop the thread first, then free the manager.
@@ -1101,7 +1146,14 @@ srs_error_t SrsAsyncSRTPManager::do_start()
11011146
srs_error_reset(err); // Ignore any error.
11021147
}
11031148

1149+
// Try to consume the packet by tunnel.
1150+
if (tunnel_enabled_ && pkt->task_->consume_by_tunnel(pkt)) {
1151+
continue;
1152+
}
1153+
11041154
cooked_packets_->push_back(pkt);
1155+
1156+
++_srs_tunnel_send_raw->sugar;
11051157
}
11061158

11071159
// If got packets, maybe more packets in queue.

trunk/src/app/srs_app_threads.hpp

+17
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ class SrsAsyncSRTPTask
332332
SrsSRTP* impl_;
333333
// For disposing, only set a flag, free it in future.
334334
int disposing_;
335+
private:
336+
// For tunnel, srtp-send.
337+
SrsUdpMuxSocket* sendonly_skt_;
338+
SrsThreadMutex* lock_;
335339
public:
336340
SrsAsyncSRTPTask(SrsAsyncSRTP* codec);
337341
virtual ~SrsAsyncSRTPTask();
@@ -341,6 +345,10 @@ class SrsAsyncSRTPTask
341345
public:
342346
srs_error_t cook(SrsAsyncSRTPPacket* pkt);
343347
srs_error_t consume(SrsAsyncSRTPPacket* pkt);
348+
public:
349+
void dig_tunnel(SrsUdpMuxSocket* skt);
350+
// Try to consume by tunnel.
351+
bool consume_by_tunnel(SrsAsyncSRTPPacket* pkt);
344352
};
345353

346354
// The async SRTP packet, handle by task.
@@ -369,10 +377,16 @@ class SrsAsyncSRTPManager
369377
private:
370378
// The packets cooked by async SRTP manager.
371379
SrsThreadQueue<SrsAsyncSRTPPacket>* cooked_packets_;
380+
private:
381+
// Whether enabled tunnel.
382+
bool tunnel_enabled_;
372383
public:
373384
SrsAsyncSRTPManager();
374385
virtual ~SrsAsyncSRTPManager();
375386
public:
387+
// Enable or disable the tunnel.
388+
// SrsAsyncSRTPManager::set_tunnel_enabled()
389+
void set_tunnel_enabled(bool v) { tunnel_enabled_ = v; }
376390
void register_task(SrsAsyncSRTPTask* task);
377391
void on_srtp_codec_destroy(SrsAsyncSRTPTask* task);
378392
void add_packet(SrsAsyncSRTPPacket* pkt);
@@ -498,6 +512,9 @@ class SrsAsyncSendManager
498512
void set_enabled(bool v) { enabled_ = v; }
499513
// Send the packet.
500514
void add_packet(SrsAsyncUdpPacket* pkt);
515+
// Get the size of packets queue.
516+
// SrsAsyncSendManager::size()
517+
int size() { return sending_packets_->size(); }
501518
// Start the thread.
502519
static srs_error_t start(void* arg);
503520
private:

0 commit comments

Comments
 (0)