Skip to content

Commit 85ea39d

Browse files
committed
Threads-RECV: Support dedicate thread to recv UDP packets.
1. Use SrsUdpMuxSocket::raw_recvfrom to read, without ST. 2. Start a UDP recv thread, to recv packets. 3. Consume UDP packets in RTC server timer.
1 parent e3686a4 commit 85ea39d

9 files changed

+244
-1
lines changed

trunk/conf/full.conf

+3
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ threads {
124124
# Whether enable the ASYNC SRTP, codec in dedicate threads.
125125
# Default: off
126126
async_srtp off;
127+
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
128+
# Default: off
129+
async_recv off;
127130
}
128131

129132
#############################################################################################

trunk/src/app/srs_app_config.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -4144,6 +4144,23 @@ bool SrsConfig::get_threads_async_srtp()
41444144
return SRS_CONF_PERFER_FALSE(conf->arg0());
41454145
}
41464146

4147+
bool SrsConfig::get_threads_async_recv()
4148+
{
4149+
static bool DEFAULT = false;
4150+
4151+
SrsConfDirective* conf = root->get("threads");
4152+
if (!conf) {
4153+
return DEFAULT;
4154+
}
4155+
4156+
conf = conf->get("async_recv");
4157+
if (!conf) {
4158+
return DEFAULT;
4159+
}
4160+
4161+
return SRS_CONF_PERFER_FALSE(conf->arg0());
4162+
}
4163+
41474164
vector<SrsConfDirective*> SrsConfig::get_stream_casters()
41484165
{
41494166
srs_assert(root);

trunk/src/app/srs_app_config.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,7 @@ class SrsConfig
479479
public:
480480
virtual srs_utime_t get_threads_interval();
481481
virtual bool get_threads_async_srtp();
482+
virtual bool get_threads_async_recv();
482483
// stream_caster section
483484
public:
484485
// Get all stream_caster in config file.

trunk/src/app/srs_app_hourglass.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,12 @@ srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick
261261
++_srs_pps_timer_s->sugar;
262262
}
263263

264-
// Consume the cooked async SRTP packets.
264+
// Consume the async received UDP packets.
265+
if ((err = _srs_async_recv->consume()) != srs_success) {
266+
srs_error_reset(err); // Ignore any error.
267+
}
268+
269+
// Consume the async cooked SRTP packets.
265270
if ((err = _srs_async_srtp->consume()) != srs_success) {
266271
srs_error_reset(err); // Ignore any error.
267272
}

trunk/src/app/srs_app_listener.cpp

+55
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ using namespace std;
4242
#include <srs_app_utility.hpp>
4343
#include <srs_kernel_utility.hpp>
4444
#include <srs_kernel_buffer.hpp>
45+
#include <srs_app_config.hpp>
46+
#include <srs_app_threads.hpp>
4547

4648
#include <srs_protocol_kbps.hpp>
4749

@@ -323,6 +325,24 @@ int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout)
323325
return nread;
324326
}
325327

328+
return on_recvfrom();
329+
}
330+
331+
int SrsUdpMuxSocket::raw_recvfrom()
332+
{
333+
int osfd = srs_netfd_fileno(lfd);
334+
335+
fromlen = sizeof(from);
336+
nread = ::recvfrom(osfd, buf, nb_buf, 0, (sockaddr*)&from, (socklen_t*)&fromlen);
337+
if (nread <= 0) {
338+
return nread;
339+
}
340+
341+
return on_recvfrom();
342+
}
343+
344+
int SrsUdpMuxSocket::on_recvfrom()
345+
{
326346
// Reset the fast cache buffer size.
327347
cache_buffer_->set_size(nread);
328348
cache_buffer_->skip(-1 * cache_buffer_->pos());
@@ -494,6 +514,29 @@ SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
494514
return sendonly;
495515
}
496516

517+
SrsUdpMuxSocket* SrsUdpMuxSocket::copy()
518+
{
519+
SrsUdpMuxSocket* cp = new SrsUdpMuxSocket(lfd);
520+
521+
cp->nb_buf = nb_buf;
522+
if (nread) {
523+
memcpy(cp->buf, buf, nread);
524+
}
525+
cp->nread = nread;
526+
cp->lfd = lfd;
527+
cp->from = from;
528+
cp->fromlen = fromlen;
529+
cp->peer_ip = peer_ip;
530+
cp->peer_port = peer_port;
531+
532+
// Copy the fast id.
533+
cp->peer_id_ = peer_id_;
534+
cp->fast_id_ = fast_id_;
535+
cp->address_changed_ = address_changed_;
536+
537+
return cp;
538+
}
539+
497540
SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p)
498541
{
499542
handler = h;
@@ -601,6 +644,18 @@ srs_error_t SrsUdpMuxListener::cycle()
601644

602645
set_socket_buffer();
603646

647+
// Sleep infinite if use async_recv.
648+
if (_srs_config->get_threads_async_recv()) {
649+
SrsThreadUdpListener* listener = new SrsThreadUdpListener(lfd);
650+
651+
_srs_async_recv->add_listener(listener);
652+
_srs_async_recv->set_handler(handler);
653+
654+
srs_usleep(SRS_UTIME_NO_TIMEOUT);
655+
656+
return trd->pull();
657+
}
658+
604659
// Because we have to decrypt the cipher of received packet payload,
605660
// and the size is not determined, so we think there is at least one copy,
606661
// and we can reuse the plaintext h264/opus with players when got plaintext.

trunk/src/app/srs_app_listener.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ class SrsUdpMuxSocket
164164
virtual ~SrsUdpMuxSocket();
165165
public:
166166
int recvfrom(srs_utime_t timeout);
167+
int raw_recvfrom();
168+
private:
169+
int on_recvfrom();
170+
public:
167171
srs_error_t sendto(void* data, int size, srs_utime_t timeout);
168172
srs_netfd_t stfd();
169173
sockaddr_in* peer_addr();
@@ -176,6 +180,8 @@ class SrsUdpMuxSocket
176180
uint64_t fast_id();
177181
SrsBuffer* buffer();
178182
SrsUdpMuxSocket* copy_sendonly();
183+
public:
184+
SrsUdpMuxSocket* copy();
179185
};
180186

181187
class SrsUdpMuxListener : public ISrsCoroutineHandler

trunk/src/app/srs_app_threads.cpp

+112
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,7 @@ void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
646646
}
647647
}
648648

649+
// TODO: FIXME: We could use a coroutine queue, then cook all packet in RTC server timer.
649650
void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt)
650651
{
651652
packets_->push_back(pkt);
@@ -661,7 +662,9 @@ srs_error_t SrsAsyncSRTPManager::do_start()
661662
{
662663
srs_error_t err = srs_success;
663664

665+
// TODO: FIXME: Config it?
664666
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;
667+
665668
while (true) {
666669
vector<SrsAsyncSRTPPacket*> flying;
667670
packets_->swap(flying);
@@ -719,3 +722,112 @@ srs_error_t SrsAsyncSRTPManager::consume()
719722
}
720723

721724
SrsAsyncSRTPManager* _srs_async_srtp = new SrsAsyncSRTPManager();
725+
726+
SrsThreadUdpListener::SrsThreadUdpListener(srs_netfd_t fd)
727+
{
728+
skt_ = new SrsUdpMuxSocket(fd);
729+
}
730+
731+
SrsThreadUdpListener::~SrsThreadUdpListener()
732+
{
733+
}
734+
735+
SrsAsyncRecvManager::SrsAsyncRecvManager()
736+
{
737+
lock_ = new SrsThreadMutex();
738+
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
739+
handler_ = NULL;
740+
}
741+
742+
// TODO: FIXME: We should stop the thread first, then free the manager.
743+
SrsAsyncRecvManager::~SrsAsyncRecvManager()
744+
{
745+
srs_freep(lock_);
746+
srs_freep(packets_);
747+
748+
vector<SrsThreadUdpListener*>::iterator it;
749+
for (it = listeners_.begin(); it != listeners_.end(); ++it) {
750+
SrsThreadUdpListener* listener = *it;
751+
srs_freep(listener);
752+
}
753+
}
754+
755+
void SrsAsyncRecvManager::set_handler(ISrsUdpMuxHandler* v)
756+
{
757+
handler_ = v;
758+
}
759+
760+
void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)
761+
{
762+
SrsThreadLocker(lock_);
763+
listeners_.push_back(listener);
764+
}
765+
766+
srs_error_t SrsAsyncRecvManager::start(void* arg)
767+
{
768+
SrsAsyncRecvManager* recv = (SrsAsyncRecvManager*)arg;
769+
return recv->do_start();
770+
}
771+
772+
srs_error_t SrsAsyncRecvManager::do_start()
773+
{
774+
srs_error_t err = srs_success;
775+
776+
// TODO: FIXME: Config it?
777+
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;
778+
779+
while (true) {
780+
vector<SrsThreadUdpListener*> listeners;
781+
if (true) {
782+
SrsThreadLocker(lock_);
783+
listeners = listeners_;
784+
}
785+
786+
bool got_packet = false;
787+
for (int i = 0; i < (int)listeners.size(); i++) {
788+
SrsThreadUdpListener* listener = listeners.at(i);
789+
790+
// TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok.
791+
int nread = listener->skt_->raw_recvfrom();
792+
if (nread > 0) {
793+
got_packet = true;
794+
packets_->push_back(listener->skt_->copy());
795+
}
796+
}
797+
798+
// If got packets, maybe more packets in queue.
799+
if (got_packet) {
800+
continue;
801+
}
802+
803+
// TODO: FIXME: Maybe we should use cond wait?
804+
timespec tv = {0};
805+
tv.tv_sec = interval / SRS_UTIME_SECONDS;
806+
tv.tv_nsec = (interval % SRS_UTIME_SECONDS) * 1000;
807+
nanosleep(&tv, NULL);
808+
}
809+
810+
return err;
811+
}
812+
813+
srs_error_t SrsAsyncRecvManager::consume()
814+
{
815+
srs_error_t err = srs_success;
816+
817+
vector<SrsUdpMuxSocket*> flying;
818+
packets_->swap(flying);
819+
820+
for (int i = 0; i < (int)flying.size(); i++) {
821+
SrsUdpMuxSocket* pkt = flying.at(i);
822+
823+
if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
824+
srs_error_reset(err); // Ignore any error.
825+
}
826+
827+
srs_freep(pkt);
828+
}
829+
830+
return err;
831+
}
832+
833+
SrsAsyncRecvManager* _srs_async_recv = new SrsAsyncRecvManager();

trunk/src/app/srs_app_threads.hpp

+39
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <srs_kernel_flv.hpp>
3131
#include <srs_app_rtc_dtls.hpp>
3232
#include <srs_app_rtc_conn.hpp>
33+
#include <srs_app_listener.hpp>
3334

3435
#include <pthread.h>
3536

@@ -336,4 +337,42 @@ class SrsAsyncSRTPManager
336337
// The global async SRTP manager.
337338
extern SrsAsyncSRTPManager* _srs_async_srtp;
338339

340+
// A thread-safe UDP listener.
341+
// TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok.
342+
class SrsThreadUdpListener
343+
{
344+
public:
345+
SrsUdpMuxSocket* skt_;
346+
public:
347+
SrsThreadUdpListener(srs_netfd_t fd);
348+
virtual ~SrsThreadUdpListener();
349+
};
350+
351+
// The async RECV manager, to recv UDP packets.
352+
class SrsAsyncRecvManager
353+
{
354+
private:
355+
ISrsUdpMuxHandler* handler_;
356+
private:
357+
SrsThreadQueue<SrsUdpMuxSocket>* packets_;
358+
private:
359+
std::vector<SrsThreadUdpListener*> listeners_;
360+
SrsThreadMutex* lock_;
361+
public:
362+
SrsAsyncRecvManager();
363+
virtual ~SrsAsyncRecvManager();
364+
public:
365+
void set_handler(ISrsUdpMuxHandler* v);
366+
void add_listener(SrsThreadUdpListener* listener);
367+
static srs_error_t start(void* arg);
368+
private:
369+
srs_error_t do_start();
370+
public:
371+
// Consume received UDP packets. Must call in worker/service thread.
372+
virtual srs_error_t consume();
373+
};
374+
375+
// The global async RECV manager.
376+
extern SrsAsyncRecvManager* _srs_async_recv;
377+
339378
#endif

trunk/src/main/srs_main_server.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,11 @@ srs_error_t run_in_thread_pool()
492492
return srs_error_wrap(err, "start async srtp thread");
493493
}
494494

495+
// Start the async RECV worker thread, to recv UDP packets.
496+
if ((err = _srs_thread_pool->execute("recv", SrsAsyncRecvManager::start, _srs_async_recv)) != srs_success) {
497+
return srs_error_wrap(err, "start async recv thread");
498+
}
499+
495500
// Start the service worker thread, for RTMP and RTC server, etc.
496501
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
497502
return srs_error_wrap(err, "start hybrid server thread");

0 commit comments

Comments
 (0)