Skip to content

Commit 6e2de90

Browse files
committed
Threads-Hybrid: Support communicate between threads by chan and slot
1. Hybrid thread is responder, API thread is initiator. 2. Responder read message from initiator-slot, write message to responder-slot. 3. Initiator write message to initiator-slot, read message from responder-slot. 4. Responder start a coroutine to consume requests and response it.
1 parent 281350d commit 6e2de90

9 files changed

+356
-12
lines changed

trunk/src/app/srs_app_hybrid.cpp

+57
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <srs_service_st.hpp>
3030
#include <srs_app_utility.hpp>
3131
#include <srs_app_threads.hpp>
32+
#include <srs_app_rtc_server.hpp>
3233

3334
using namespace std;
3435

@@ -192,6 +193,19 @@ srs_error_t SrsHybridServer::initialize()
192193
}
193194
}
194195

196+
// Create slots for other threads to communicate with us.
197+
SrsThreadEntry* self = _srs_thread_pool->self();
198+
199+
self->slot_ = new SrsThreadPipeSlot(1);
200+
201+
if ((err = self->slot_->initialize()) != srs_success) {
202+
return srs_error_wrap(err, "init slot");
203+
}
204+
205+
if ((err = self->slot_->open_responder(this)) != srs_success) {
206+
return srs_error_wrap(err, "init slot");
207+
}
208+
195209
return err;
196210
}
197211

@@ -387,5 +401,48 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick)
387401
return err;
388402
}
389403

404+
srs_error_t SrsHybridServer::on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel)
405+
{
406+
srs_error_t err = srs_success;
407+
408+
RtcServerAdapter* adapter = NULL;
409+
if (true) {
410+
vector<ISrsHybridServer*> servers = _srs_hybrid->servers;
411+
for (vector<ISrsHybridServer*>::iterator it = servers.begin(); it != servers.end(); ++it) {
412+
RtcServerAdapter* server = dynamic_cast<RtcServerAdapter*>(*it);
413+
if (server) {
414+
adapter = server;
415+
break;
416+
}
417+
}
418+
}
419+
420+
// TODO: FIXME: Response error?
421+
if (!adapter) {
422+
return err;
423+
}
424+
425+
if (msg->id == (uint64_t)SrsThreadMessageIDRtcCreateSession) {
426+
SrsThreadMessageRtcCreateSession* s = (SrsThreadMessageRtcCreateSession*)msg->ptr;
427+
err = adapter->rtc->create_session(s->req, s->remote_sdp, s->local_sdp, s->mock_eip,
428+
s->publish, s->dtls, s->srtp, &s->session);
429+
430+
// TODO: FIXME: Response error?
431+
if (err != srs_success) {
432+
return srs_error_wrap(err, "create session");
433+
}
434+
435+
// TODO: FIXME: Response timeout if error?
436+
// TODO: FIXME: Response a different message? With trace ID?
437+
// We're responder, write response to responder.
438+
srs_error_t r0 = channel->responder()->write(msg, sizeof(SrsThreadMessage), NULL);
439+
if (r0 != srs_success) {
440+
srs_freep(r0); // Ignore any error.
441+
}
442+
}
443+
444+
return err;
445+
}
446+
390447
SrsHybridServer* _srs_hybrid = new SrsHybridServer();
391448

trunk/src/app/srs_app_hybrid.hpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <vector>
3030

3131
#include <srs_app_hourglass.hpp>
32+
#include <srs_app_threads.hpp>
3233

3334
class SrsServer;
3435
class SrsServerAdapter;
@@ -49,9 +50,8 @@ class ISrsHybridServer
4950
};
5051

5152
// The hybrid server manager.
52-
class SrsHybridServer : public ISrsFastTimer
53+
class SrsHybridServer : public ISrsFastTimer, public ISrsThreadResponder
5354
{
54-
friend class SrsApiServer;
5555
private:
5656
std::vector<ISrsHybridServer*> servers;
5757
SrsFastTimer* timer_;
@@ -70,6 +70,8 @@ class SrsHybridServer : public ISrsFastTimer
7070
// interface ISrsFastTimer
7171
private:
7272
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
73+
private:
74+
srs_error_t on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel);
7375
};
7476

7577
extern SrsHybridServer* _srs_hybrid;

trunk/src/app/srs_app_rtc_conn.hpp

-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
#include <srs_service_st.hpp>
3030
#include <srs_kernel_utility.hpp>
3131
#include <srs_rtmp_stack.hpp>
32-
#include <srs_app_hybrid.hpp>
3332
#include <srs_app_hourglass.hpp>
3433
#include <srs_app_rtc_sdp.hpp>
3534
#include <srs_app_reload.hpp>

trunk/src/app/srs_app_rtc_server.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs
132132
// The RTC server adapter.
133133
class RtcServerAdapter : public ISrsHybridServer
134134
{
135-
friend class SrsApiServer;
135+
friend class SrsHybridServer;
136136
private:
137137
SrsRtcServer* rtc;
138138
public:

trunk/src/app/srs_app_server.cpp

+50-8
Original file line numberDiff line numberDiff line change
@@ -1993,18 +1993,60 @@ srs_error_t SrsApiServer::create_session(
19931993
) {
19941994
srs_error_t err = srs_success;
19951995

1996-
vector<ISrsHybridServer*> servers = _srs_hybrid->servers;
1997-
for (vector<ISrsHybridServer*>::iterator it = servers.begin(); it != servers.end(); ++it) {
1998-
RtcServerAdapter* adapter = dynamic_cast<RtcServerAdapter*>(*it);
1999-
if (!adapter) {
2000-
continue;
1996+
// Allocate slot to communicate with hybrid thread.
1997+
SrsThreadEntry* self = _srs_thread_pool->self();
1998+
SrsThreadEntry* hybrid = _srs_thread_pool->hybrid();
1999+
srs_assert(self && hybrid);
2000+
2001+
SrsThreadPipeChannel* channel = NULL;
2002+
if (true) {
2003+
map<pthread_t, SrsThreadPipeChannel*>::iterator it = self->channels_.find(hybrid->trd);
2004+
if (it == self->channels_.end()) {
2005+
self->channels_[hybrid->trd] = channel = hybrid->slot_->allocate();
2006+
} else {
2007+
channel = it->second;
20012008
}
2009+
}
2010+
srs_assert(channel);
20022011

2003-
// TODO: FIXME: Should notify thread by thread-slot.
2004-
return adapter->rtc->create_session(req, remote_sdp, local_sdp, mock_eip,
2005-
publish, dtls, srtp, psession);
2012+
// We're initiator, write to initiator, read from responder.
2013+
if ((err = channel->initiator()->open_write()) != srs_success) {
2014+
return srs_error_wrap(err, "open write");
2015+
}
2016+
if ((err = channel->responder()->open_read()) != srs_success) {
2017+
return srs_error_wrap(err, "open read");
20062018
}
20072019

2020+
SrsThreadMessageRtcCreateSession s;
2021+
s.req = req;
2022+
s.remote_sdp = remote_sdp;
2023+
s.local_sdp = local_sdp;
2024+
s.mock_eip = mock_eip;
2025+
s.publish = publish;
2026+
s.dtls = dtls;
2027+
s.srtp = srtp;
2028+
s.session = NULL;
2029+
2030+
SrsThreadMessage m;
2031+
m.id = (uint64_t)SrsThreadMessageIDRtcCreateSession;
2032+
m.ptr = (uint64_t)&s;
2033+
2034+
// We're initiator, write to initiator, read from responder.
2035+
// TODO: FIXME: Write important logs, and error response, and timeout?
2036+
if ((err = channel->initiator()->write(&m, sizeof(m), NULL)) != srs_success) {
2037+
return srs_error_wrap(err, "write");
2038+
}
2039+
2040+
// TODO: FIXME: Write important logs, and error response, and timeout?
2041+
if ((err = channel->responder()->read(&m, sizeof(m), NULL)) != srs_success) {
2042+
return srs_error_wrap(err, "read");
2043+
}
2044+
2045+
// Covert to output params.
2046+
local_sdp = s.local_sdp;
2047+
// TODO: FIMXE: Should never return it, for it's not thread-safe.
2048+
*psession = s.session;
2049+
20082050
return err;
20092051
}
20102052

trunk/src/app/srs_app_server.hpp

+18
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <srs_app_hourglass.hpp>
4242
#include <srs_app_hybrid.hpp>
4343
#include <srs_app_rtc_api.hpp>
44+
#include <srs_app_rtc_sdp.hpp>
4445

4546
class SrsServer;
4647
class SrsHttpServeMux;
@@ -449,5 +450,22 @@ class SrsApiServer : public ISrsTcpMuxHandler, public ISrsResourceManager, publi
449450
srs_error_t do_start();
450451
};
451452

453+
// The RTC create session information.
454+
struct SrsThreadMessageRtcCreateSession
455+
{
456+
// Input.
457+
SrsRequest* req;
458+
SrsSdp remote_sdp;
459+
std::string mock_eip;
460+
bool publish;
461+
bool dtls;
462+
bool srtp;
463+
464+
// Output.
465+
SrsSdp local_sdp;
466+
// TODO: FIXME: It's not thread-safe.
467+
SrsRtcConnection* session;
468+
};
469+
452470
#endif
453471

trunk/src/app/srs_app_threads.cpp

+140
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ srs_error_t SrsPipe::initialize()
103103
{
104104
srs_error_t err = srs_success;
105105

106+
if (pipes_[0] > 0) {
107+
return err;
108+
}
109+
106110
if (pipe(pipes_) < 0) {
107111
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe");
108112
}
@@ -134,6 +138,10 @@ srs_error_t SrsThreadPipe::initialize(int fd)
134138
{
135139
srs_error_t err = srs_success;
136140

141+
if (stfd_) {
142+
return err;
143+
}
144+
137145
if ((stfd_ = srs_netfd_open(fd)) == NULL) {
138146
return srs_error_new(ERROR_PIPE_OPEN, "open pipe");
139147
}
@@ -220,6 +228,134 @@ srs_error_t SrsThreadPipePair::write(void* buf, size_t size, ssize_t* nwrite)
220228
return wpipe_->write(buf, size, nwrite);
221229
}
222230

231+
SrsThreadPipeChannel::SrsThreadPipeChannel()
232+
{
233+
initiator_ = new SrsThreadPipePair();
234+
responder_ = new SrsThreadPipePair();
235+
236+
trd_ = new SrsFastCoroutine("chan", this);
237+
handler_ = NULL;
238+
}
239+
240+
SrsThreadPipeChannel::~SrsThreadPipeChannel()
241+
{
242+
srs_freep(trd_);
243+
srs_freep(initiator_);
244+
srs_freep(responder_);
245+
}
246+
247+
SrsThreadPipePair* SrsThreadPipeChannel::initiator()
248+
{
249+
return initiator_;
250+
}
251+
252+
SrsThreadPipePair* SrsThreadPipeChannel::responder()
253+
{
254+
return responder_;
255+
}
256+
257+
srs_error_t SrsThreadPipeChannel::start(ISrsThreadResponder* h)
258+
{
259+
handler_ = h;
260+
return trd_->start();
261+
}
262+
263+
srs_error_t SrsThreadPipeChannel::cycle()
264+
{
265+
srs_error_t err = srs_success;
266+
267+
while (true) {
268+
if ((err = trd_->pull()) != srs_success) {
269+
return srs_error_wrap(err, "pull");
270+
}
271+
272+
// Here we're responder, read from initiator.
273+
SrsThreadMessage m;
274+
if ((err = initiator_->read(&m, sizeof(m), NULL)) != srs_success) {
275+
return srs_error_wrap(err, "read");
276+
}
277+
278+
// Consume the message, the responder can write response to responder.
279+
if (handler_ && (err = handler_->on_thread_message(&m, this)) != srs_success) {
280+
return srs_error_wrap(err, "consume");
281+
}
282+
}
283+
284+
return err;
285+
}
286+
287+
SrsThreadPipeSlot::SrsThreadPipeSlot(int slots)
288+
{
289+
nn_channels_ = slots;
290+
channels_ = new SrsThreadPipeChannel[slots];
291+
292+
index_ = 0;
293+
lock_ = new SrsThreadMutex();
294+
}
295+
296+
SrsThreadPipeSlot::~SrsThreadPipeSlot()
297+
{
298+
srs_freepa(channels_);
299+
srs_freep(lock_);
300+
}
301+
302+
srs_error_t SrsThreadPipeSlot::initialize()
303+
{
304+
srs_error_t err = srs_success;
305+
306+
for (int i = 0; i < nn_channels_; i++) {
307+
SrsThreadPipeChannel* channel = &channels_[i];
308+
309+
// Here we're responder, but it's ok to initialize the initiator.
310+
if ((err = channel->initiator()->initialize()) != srs_success) {
311+
return srs_error_wrap(err, "init %d initiator", i);
312+
}
313+
if ((err = channel->responder()->initialize()) != srs_success) {
314+
return srs_error_wrap(err, "init %d responder", i);
315+
}
316+
}
317+
318+
return err;
319+
}
320+
321+
srs_error_t SrsThreadPipeSlot::open_responder(ISrsThreadResponder* h)
322+
{
323+
srs_error_t err = srs_success;
324+
325+
for (int i = 0; i < nn_channels_; i++) {
326+
SrsThreadPipeChannel* channel = &channels_[i];
327+
328+
// We're responder, read from initiator, write to responder.
329+
if ((err = channel->initiator()->open_read()) != srs_success) {
330+
return srs_error_wrap(err, "open read");
331+
}
332+
if ((err = channel->responder()->open_write()) != srs_success) {
333+
return srs_error_wrap(err, "open write");
334+
}
335+
336+
// OK, we start the cycle coroutine for responder.
337+
if ((err = channel->start(h)) != srs_success) {
338+
return srs_error_wrap(err, "start %d consume coroutine", i);
339+
}
340+
}
341+
342+
return err;
343+
}
344+
345+
SrsThreadPipeChannel* SrsThreadPipeSlot::allocate()
346+
{
347+
SrsThreadLocker(lock_);
348+
return index_ < nn_channels_? &channels_[index_++] : NULL;
349+
}
350+
351+
ISrsThreadResponder::ISrsThreadResponder()
352+
{
353+
}
354+
355+
ISrsThreadResponder::~ISrsThreadResponder()
356+
{
357+
}
358+
223359
SrsThreadMutex::SrsThreadMutex()
224360
{
225361
// https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html
@@ -276,6 +412,7 @@ SrsThreadEntry::SrsThreadEntry()
276412
cpuset_ok = false;
277413

278414
stat = new SrsProcSelfStat();
415+
slot_ = NULL;
279416

280417
received_packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
281418
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
@@ -286,6 +423,9 @@ SrsThreadEntry::~SrsThreadEntry()
286423
srs_freep(stat);
287424
srs_freep(err);
288425

426+
// TODO: FIXME: Before free slot, we MUST close pipes in threads that open them.
427+
srs_freep(slot_);
428+
289429
srs_freep(received_packets_);
290430
srs_freep(cooked_packets_);
291431

0 commit comments

Comments
 (0)