Skip to content

Commit 1c90497

Browse files
committed
Threads: Use coroutine to consume recv/srtp packets.
1 parent 957034e commit 1c90497

File tree

4 files changed

+116
-48
lines changed

4 files changed

+116
-48
lines changed

trunk/src/app/srs_app_hourglass.cpp

-11
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ using namespace std;
2828
#include <srs_kernel_error.hpp>
2929
#include <srs_kernel_log.hpp>
3030
#include <srs_kernel_utility.hpp>
31-
#include <srs_app_threads.hpp>
3231

3332
#include <srs_protocol_kbps.hpp>
3433

@@ -261,16 +260,6 @@ srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick
261260
++_srs_pps_timer_s->sugar;
262261
}
263262

264-
// Consume the async received UDP packets.
265-
if ((err = _srs_async_recv->consume()) != srs_success) {
266-
srs_error_reset(err); // Ignore any error.
267-
}
268-
269-
// Consume the async cooked SRTP packets.
270-
if ((err = _srs_async_srtp->consume()) != srs_success) {
271-
srs_error_reset(err); // Ignore any error.
272-
}
273-
274263
return err;
275264
}
276265

trunk/src/app/srs_app_hybrid.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,16 @@ srs_error_t SrsHybridServer::initialize()
183183
// A monitor to check the clock wall deviation, per clock tick.
184184
timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_);
185185

186+
// Consume the async cooked SRTP packets.
187+
if ((err = _srs_async_srtp->consume()) != srs_success) {
188+
return srs_error_wrap(err, "srtp");
189+
}
190+
191+
// Consume the async received UDP packets.
192+
if ((err = _srs_async_recv->consume()) != srs_success) {
193+
return srs_error_wrap(err, "recv");
194+
}
195+
186196
vector<ISrsHybridServer*>::iterator it;
187197
for (it = servers.begin(); it != servers.end(); ++it) {
188198
ISrsHybridServer* server = *it;

trunk/src/app/srs_app_threads.cpp

+93-34
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,6 @@ srs_error_t SrsThreadPool::initialize()
184184
r1 = pthread_getaffinity_np(pthread_self(), sizeof(entry->cpuset2), &entry->cpuset2);
185185
#endif
186186

187-
if ((err = _srs_async_recv->initialize()) != srs_success) {
188-
return srs_error_wrap(err, "init async recv");
189-
}
190-
191187
interval_ = _srs_config->get_threads_interval();
192188
bool async_srtp = _srs_config->get_threads_async_srtp();
193189
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64,
@@ -705,11 +701,14 @@ SrsAsyncSRTPManager::SrsAsyncSRTPManager()
705701
lock_ = new SrsThreadMutex();
706702
packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
707703
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
704+
trd_ = new SrsFastCoroutine("srtp", this);
708705
}
709706

710707
// TODO: FIXME: We should stop the thread first, then free the manager.
711708
SrsAsyncSRTPManager::~SrsAsyncSRTPManager()
712709
{
710+
srs_freep(trd_);
711+
713712
srs_freep(lock_);
714713
srs_freep(packets_);
715714
srs_freep(cooked_packets_);
@@ -806,24 +805,56 @@ srs_error_t SrsAsyncSRTPManager::consume()
806805
{
807806
srs_error_t err = srs_success;
808807

809-
vector<SrsAsyncSRTPPacket*> flying;
810-
cooked_packets_->swap(flying);
808+
if ((err = trd_->start()) != srs_success) {
809+
return srs_error_wrap(err, "start");
810+
}
811811

812-
for (int i = 0; i < (int)flying.size(); i++) {
813-
SrsAsyncSRTPPacket* pkt = flying.at(i);
814-
SrsSecurityTransport* transport = pkt->task_->codec_->transport_;
815-
char* payload = pkt->msg_->payload;
812+
return err;
813+
}
816814

817-
if (pkt->do_decrypt_) {
818-
if (pkt->is_rtp_) {
819-
err = transport->on_rtp_plaintext(payload, pkt->nb_consumed_);
820-
}
815+
srs_error_t SrsAsyncSRTPManager::cycle()
816+
{
817+
srs_error_t err = srs_success;
818+
819+
// How many messages to run a yield.
820+
uint32_t nn_msgs_for_yield = 0;
821+
822+
while (true) {
823+
if ((err = trd_->pull()) != srs_success) {
824+
return srs_error_wrap(err, "pull");
821825
}
822-
if (err != srs_success) {
823-
srs_error_reset(err); // Ignore any error.
826+
827+
vector<SrsAsyncSRTPPacket*> flying;
828+
cooked_packets_->swap(flying);
829+
830+
if (flying.empty()) {
831+
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
832+
continue;
824833
}
825834

826-
srs_freep(pkt);
835+
for (int i = 0; i < (int)flying.size(); i++) {
836+
SrsAsyncSRTPPacket* pkt = flying.at(i);
837+
SrsSecurityTransport* transport = pkt->task_->codec_->transport_;
838+
char* payload = pkt->msg_->payload;
839+
840+
if (pkt->do_decrypt_) {
841+
if (pkt->is_rtp_) {
842+
err = transport->on_rtp_plaintext(payload, pkt->nb_consumed_);
843+
}
844+
}
845+
if (err != srs_success) {
846+
srs_error_reset(err); // Ignore any error.
847+
}
848+
849+
srs_freep(pkt);
850+
851+
// Yield to another coroutines.
852+
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
853+
if (++nn_msgs_for_yield > 10) {
854+
nn_msgs_for_yield = 0;
855+
srs_thread_yield();
856+
}
857+
}
827858
}
828859

829860
return err;
@@ -846,11 +877,14 @@ SrsAsyncRecvManager::SrsAsyncRecvManager()
846877
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
847878
handler_ = NULL;
848879
max_recv_queue_ = 0;
880+
trd_ = new SrsFastCoroutine("recv", this);
849881
}
850882

851883
// TODO: FIXME: We should stop the thread first, then free the manager.
852884
SrsAsyncRecvManager::~SrsAsyncRecvManager()
853885
{
886+
srs_freep(trd_);
887+
854888
srs_freep(lock_);
855889
srs_freep(packets_);
856890

@@ -861,16 +895,6 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
861895
}
862896
}
863897

864-
srs_error_t SrsAsyncRecvManager::initialize()
865-
{
866-
srs_error_t err = srs_success;
867-
868-
max_recv_queue_ = _srs_config->get_threads_max_recv_queue();
869-
srs_trace("AsyncRecv: Set max_queue=%d", max_recv_queue_);
870-
871-
return err;
872-
}
873-
874898
void SrsAsyncRecvManager::set_handler(ISrsUdpMuxHandler* v)
875899
{
876900
handler_ = v;
@@ -946,17 +970,52 @@ srs_error_t SrsAsyncRecvManager::consume()
946970
{
947971
srs_error_t err = srs_success;
948972

949-
vector<SrsUdpMuxSocket*> flying;
950-
packets_->swap(flying);
973+
max_recv_queue_ = _srs_config->get_threads_max_recv_queue();
974+
srs_trace("AsyncRecv: Set max_queue=%d", max_recv_queue_);
975+
976+
if ((err = trd_->start()) != srs_success) {
977+
return srs_error_wrap(err, "start");
978+
}
951979

952-
for (int i = 0; i < (int)flying.size(); i++) {
953-
SrsUdpMuxSocket* pkt = flying.at(i);
980+
return err;
981+
}
982+
983+
srs_error_t SrsAsyncRecvManager::cycle()
984+
{
985+
srs_error_t err = srs_success;
954986

955-
if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
956-
srs_error_reset(err); // Ignore any error.
987+
// How many messages to run a yield.
988+
uint32_t nn_msgs_for_yield = 0;
989+
990+
while (true) {
991+
if ((err = trd_->pull()) != srs_success) {
992+
return srs_error_wrap(err, "pull");
993+
}
994+
995+
vector<SrsUdpMuxSocket*> flying;
996+
packets_->swap(flying);
997+
998+
if (flying.empty()) {
999+
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
1000+
continue;
9571001
}
9581002

959-
srs_freep(pkt);
1003+
for (int i = 0; i < (int)flying.size(); i++) {
1004+
SrsUdpMuxSocket* pkt = flying.at(i);
1005+
1006+
if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
1007+
srs_error_reset(err); // Ignore any error.
1008+
}
1009+
1010+
srs_freep(pkt);
1011+
1012+
// Yield to another coroutines.
1013+
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
1014+
if (++nn_msgs_for_yield > 10) {
1015+
nn_msgs_for_yield = 0;
1016+
srs_thread_yield();
1017+
}
1018+
}
9601019
}
9611020

9621021
return err;

trunk/src/app/srs_app_threads.hpp

+13-3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <srs_app_rtc_dtls.hpp>
3232
#include <srs_app_rtc_conn.hpp>
3333
#include <srs_app_listener.hpp>
34+
#include <srs_app_st.hpp>
3435

3536
#include <pthread.h>
3637

@@ -321,14 +322,16 @@ class SrsAsyncSRTPPacket
321322
};
322323

323324
// The async SRTP manager, to start a thread to consume packets.
324-
class SrsAsyncSRTPManager
325+
class SrsAsyncSRTPManager : public ISrsCoroutineHandler
325326
{
326327
private:
327328
std::vector<SrsAsyncSRTPTask*> tasks_;
328329
SrsThreadMutex* lock_;
329330
private:
330331
SrsThreadQueue<SrsAsyncSRTPPacket>* packets_;
331332
private:
333+
// A coroutine to consume cooked packets.
334+
SrsFastCoroutine* trd_;
332335
// The packets cooked by async SRTP manager.
333336
SrsThreadQueue<SrsAsyncSRTPPacket>* cooked_packets_;
334337
public:
@@ -346,6 +349,8 @@ class SrsAsyncSRTPManager
346349
public:
347350
// Consume cooked SRTP packets. Must call in worker/service thread.
348351
virtual srs_error_t consume();
352+
private:
353+
srs_error_t cycle();
349354
};
350355

351356
// The global async SRTP manager.
@@ -363,12 +368,16 @@ class SrsThreadUdpListener
363368
};
364369

365370
// The async RECV manager, to recv UDP packets.
366-
class SrsAsyncRecvManager
371+
class SrsAsyncRecvManager : public ISrsCoroutineHandler
367372
{
368373
private:
369374
ISrsUdpMuxHandler* handler_;
370375
private:
376+
// A coroutine to consume received packets.
377+
SrsFastCoroutine* trd_;
378+
// If exceed max queue, drop packet.
371379
int max_recv_queue_;
380+
// The received UDP packets.
372381
SrsThreadQueue<SrsUdpMuxSocket>* packets_;
373382
private:
374383
std::vector<SrsThreadUdpListener*> listeners_;
@@ -377,7 +386,6 @@ class SrsAsyncRecvManager
377386
SrsAsyncRecvManager();
378387
virtual ~SrsAsyncRecvManager();
379388
public:
380-
srs_error_t initialize();
381389
void set_handler(ISrsUdpMuxHandler* v);
382390
void add_listener(SrsThreadUdpListener* listener);
383391
int size();
@@ -387,6 +395,8 @@ class SrsAsyncRecvManager
387395
public:
388396
// Consume received UDP packets. Must call in worker/service thread.
389397
virtual srs_error_t consume();
398+
private:
399+
srs_error_t cycle();
390400
};
391401

392402
// The global async RECV manager.

0 commit comments

Comments
 (0)