Skip to content

Commit 281350d

Browse files
committed
Threads-Hybrid: Extract pipe and pair to communicate between threads
1. Pipe can be open by any threads, because it's only os FDs. 2. If pipe is open read/write, it's associated by ST, so we MUST free it by the same thread. 3. If open pipe in one thread, it's ok to free it directly, without close pipe. 4. If open read in a thread, then open write in another thread, user MUST close it correctly.
1 parent 40e59ef commit 281350d

7 files changed

+252
-33
lines changed

trunk/src/app/srs_app_server.cpp

+36-30
Original file line numberDiff line numberDiff line change
@@ -404,39 +404,42 @@ SrsSignalManager* SrsSignalManager::instance = NULL;
404404
SrsSignalManager::SrsSignalManager(SrsServer* s)
405405
{
406406
SrsSignalManager::instance = this;
407-
407+
408+
pipe_ = new SrsThreadPipePair();
409+
408410
server = s;
409-
sig_pipe[0] = sig_pipe[1] = -1;
410411
trd = new SrsSTCoroutine("signal", this, _srs_context->get_id());
411-
signal_read_stfd = NULL;
412412
}
413413

414414
SrsSignalManager::~SrsSignalManager()
415415
{
416-
srs_close_stfd(signal_read_stfd);
417-
418-
if (sig_pipe[0] > 0) {
419-
::close(sig_pipe[0]);
420-
}
421-
if (sig_pipe[1] > 0) {
422-
::close(sig_pipe[1]);
423-
}
424-
425416
srs_freep(trd);
417+
418+
// Note that it's optional, because the read/write pair is in the same thread.
419+
pipe_->close_read();
420+
pipe_->close_write();
421+
422+
// If in the same thread, we could directly free the pipe, which will close all FDs.
423+
srs_freep(pipe_);
426424
}
427425

428426
srs_error_t SrsSignalManager::initialize()
429427
{
430-
/* Create signal pipe */
431-
if (pipe(sig_pipe) < 0) {
432-
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe");
428+
srs_error_t err = srs_success;
429+
430+
if ((err = pipe_->initialize()) != srs_success) {
431+
return srs_error_wrap(err, "init pipe");
433432
}
434-
435-
if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) {
436-
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "open pipe");
433+
434+
if ((err = pipe_->open_read()) != srs_success) {
435+
return srs_error_wrap(err, "init read pipe");
437436
}
438-
439-
return srs_success;
437+
438+
if ((err = pipe_->open_write()) != srs_success) {
439+
return srs_error_wrap(err, "init write pipe");
440+
}
441+
442+
return err;
440443
}
441444

442445
srs_error_t SrsSignalManager::start()
@@ -496,11 +499,12 @@ srs_error_t SrsSignalManager::cycle()
496499
}
497500

498501
int signo;
502+
// Read the next signal from the pipe
503+
if ((err = pipe_->read(&signo, sizeof(int), NULL)) != srs_success) {
504+
srs_freep(err); // Ignore any error.
505+
}
499506

500-
/* Read the next signal from the pipe */
501-
srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT);
502-
503-
/* Process signal synchronously */
507+
// Process signal synchronously
504508
server->on_signal(signo);
505509
}
506510

@@ -511,13 +515,15 @@ void SrsSignalManager::sig_catcher(int signo)
511515
{
512516
int err;
513517

514-
/* Save errno to restore it after the write() */
518+
// Save errno to restore it after the write()
515519
err = errno;
516-
517-
/* write() is reentrant/async-safe */
518-
int fd = SrsSignalManager::instance->sig_pipe[1];
519-
write(fd, &signo, sizeof(int));
520-
520+
521+
// write() is reentrant/async-safe
522+
srs_error_t r0 = SrsSignalManager::instance->pipe_->write(&signo, sizeof(int), NULL);
523+
if (r0 != srs_success) {
524+
srs_freep(r0); // Ignore any error.
525+
}
526+
521527
errno = err;
522528
}
523529

trunk/src/app/srs_app_server.hpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class SrsAppCasterFlv;
5757
class SrsRtspCaster;
5858
class SrsResourceManager;
5959
class SrsGb28181Caster;
60-
60+
class SrsThreadPipePair;
6161

6262
// The listener type for server to identify the connection,
6363
// that is, use different type to process the connection.
@@ -215,8 +215,7 @@ class SrsSignalManager : public ISrsCoroutineHandler
215215
private:
216216
// Per-process pipe which is used as a signal queue.
217217
// Up to PIPE_BUF/sizeof(int) signals can be queued up.
218-
int sig_pipe[2];
219-
srs_netfd_t signal_read_stfd;
218+
SrsThreadPipePair* pipe_;
220219
private:
221220
SrsServer* server;
222221
SrsCoroutine* trd;

trunk/src/app/srs_app_threads.cpp

+137
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,143 @@ uint64_t srs_covert_cpuset(cpu_set_t v)
8383
#endif
8484
}
8585

86+
SrsPipe::SrsPipe()
87+
{
88+
pipes_[0] = pipes_[1] = -1;
89+
}
90+
91+
SrsPipe::~SrsPipe()
92+
{
93+
// Close the FDs because we might not open it as stfd.
94+
if (pipes_[0] > 0) {
95+
::close(pipes_[0]);
96+
}
97+
if (pipes_[1] > 0) {
98+
::close(pipes_[1]);
99+
}
100+
}
101+
102+
srs_error_t SrsPipe::initialize()
103+
{
104+
srs_error_t err = srs_success;
105+
106+
if (pipe(pipes_) < 0) {
107+
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe");
108+
}
109+
110+
return err;
111+
}
112+
113+
int SrsPipe::read_fd()
114+
{
115+
return pipes_[0];
116+
}
117+
118+
int SrsPipe::write_fd()
119+
{
120+
return pipes_[1];
121+
}
122+
123+
SrsThreadPipe::SrsThreadPipe()
124+
{
125+
stfd_ = NULL;
126+
}
127+
128+
SrsThreadPipe::~SrsThreadPipe()
129+
{
130+
srs_close_stfd(stfd_);
131+
}
132+
133+
srs_error_t SrsThreadPipe::initialize(int fd)
134+
{
135+
srs_error_t err = srs_success;
136+
137+
if ((stfd_ = srs_netfd_open(fd)) == NULL) {
138+
return srs_error_new(ERROR_PIPE_OPEN, "open pipe");
139+
}
140+
141+
return err;
142+
}
143+
144+
srs_error_t SrsThreadPipe::read(void* buf, size_t size, ssize_t* nread)
145+
{
146+
ssize_t nn = srs_read(stfd_, buf, size, SRS_UTIME_NO_TIMEOUT);
147+
148+
if (nread) {
149+
*nread = nn;
150+
}
151+
152+
if (nn < 0) {
153+
return srs_error_new(ERROR_PIPE_READ, "read");
154+
}
155+
156+
return srs_success;
157+
}
158+
159+
srs_error_t SrsThreadPipe::write(void* buf, size_t size, ssize_t* nwrite)
160+
{
161+
ssize_t nn = srs_write(stfd_, buf, size, SRS_UTIME_NO_TIMEOUT);
162+
163+
if (nwrite) {
164+
*nwrite = nn;
165+
}
166+
167+
if (nn < 0) {
168+
return srs_error_new(ERROR_PIPE_WRITE, "write");
169+
}
170+
171+
return srs_success;
172+
}
173+
174+
SrsThreadPipePair::SrsThreadPipePair()
175+
{
176+
pipe_ = new SrsPipe();
177+
rpipe_ = new SrsThreadPipe();
178+
wpipe_ = new SrsThreadPipe();
179+
}
180+
181+
SrsThreadPipePair::~SrsThreadPipePair()
182+
{
183+
close_read();
184+
close_write();
185+
srs_freep(pipe_);
186+
}
187+
188+
srs_error_t SrsThreadPipePair::initialize()
189+
{
190+
return pipe_->initialize();
191+
}
192+
193+
srs_error_t SrsThreadPipePair::open_read()
194+
{
195+
return rpipe_->initialize(pipe_->read_fd());
196+
}
197+
198+
srs_error_t SrsThreadPipePair::open_write()
199+
{
200+
return wpipe_->initialize(pipe_->write_fd());
201+
}
202+
203+
void SrsThreadPipePair::close_read()
204+
{
205+
srs_freep(rpipe_);
206+
}
207+
208+
void SrsThreadPipePair::close_write()
209+
{
210+
srs_freep(wpipe_);
211+
}
212+
213+
srs_error_t SrsThreadPipePair::read(void* buf, size_t size, ssize_t* nread)
214+
{
215+
return rpipe_->read(buf, size, nread);
216+
}
217+
218+
srs_error_t SrsThreadPipePair::write(void* buf, size_t size, ssize_t* nwrite)
219+
{
220+
return wpipe_->write(buf, size, nwrite);
221+
}
222+
86223
SrsThreadMutex::SrsThreadMutex()
87224
{
88225
// https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html

trunk/src/app/srs_app_threads.hpp

+68
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,74 @@ class SrsAsyncSRTPPacket;
4444
class SrsSecurityTransport;
4545
class SrsProcSelfStat;
4646

47+
// The pipe wraps the os pipes(fds).
48+
class SrsPipe
49+
{
50+
private:
51+
// The max buffer size of pipe is PIPE_BUF, so if we used to transmit signals(int),
52+
// up to PIPE_BUF/sizeof(int) signals can be queued up.
53+
// @see https://man7.org/linux/man-pages/man2/pipe.2.html
54+
int pipes_[2];
55+
public:
56+
SrsPipe();
57+
virtual ~SrsPipe();
58+
public:
59+
srs_error_t initialize();
60+
public:
61+
int read_fd();
62+
int write_fd();
63+
};
64+
65+
// The pipe to communicate between thread-local ST of threads.
66+
class SrsThreadPipe
67+
{
68+
private:
69+
srs_netfd_t stfd_;
70+
public:
71+
SrsThreadPipe();
72+
virtual ~SrsThreadPipe();
73+
public:
74+
// Open fd by ST, should be free by the same thread.
75+
srs_error_t initialize(int fd);
76+
public:
77+
// Note that the pipe is unidirectional data channel, so only one of
78+
// read/write is available.
79+
srs_error_t read(void* buf, size_t size, ssize_t* nread);
80+
srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
81+
};
82+
83+
// A thread pipe pair, to communicate between threads.
84+
// @remark If thread A open read, then it MUST close the read.
85+
class SrsThreadPipePair
86+
{
87+
private:
88+
// Per-process pipe which is used as a signal queue.
89+
// Up to PIPE_BUF/sizeof(int) signals can be queued up.
90+
SrsPipe* pipe_;
91+
SrsThreadPipe* rpipe_;
92+
SrsThreadPipe* wpipe_;
93+
public:
94+
SrsThreadPipePair();
95+
virtual ~SrsThreadPipePair();
96+
public:
97+
// It's ok to initialize pipe in another threads.
98+
srs_error_t initialize();
99+
public:
100+
// It's ok to open read/write in one or two threads.
101+
srs_error_t open_read();
102+
srs_error_t open_write();
103+
public:
104+
// For multiple-threading, if a thread open the pipe, it MUST close it, never close it by
105+
// another thread which has not open it.
106+
// If pair(read/write) alive in one thread, user can directly free the pair, without closing
107+
// the read/write, because it's in the same thread.
108+
void close_read();
109+
void close_write();
110+
public:
111+
srs_error_t read(void* buf, size_t size, ssize_t* nread);
112+
srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
113+
};
114+
47115
// The thread mutex wrapper, without error.
48116
class SrsThreadMutex
49117
{

trunk/src/kernel/srs_kernel_error.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@
120120
#define ERROR_SOCKET_ACCEPT 1081
121121
#define ERROR_THREAD_CREATE 1082
122122
#define ERROR_SYSTEM_LOGFILE 1083
123+
#define ERROR_PIPE_OPEN 1084
124+
#define ERROR_PIPE_READ 1085
125+
#define ERROR_PIPE_WRITE 1086
123126

124127
///////////////////////////////////////////////////////
125128
// RTMP protocol error.

trunk/src/protocol/srs_service_st.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout)
457457
return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
458458
}
459459

460+
ssize_t srs_write(srs_netfd_t stfd, const void *buf, size_t nbyte, srs_utime_t timeout)
461+
{
462+
return st_write((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
463+
}
464+
460465
bool srs_is_never_timeout(srs_utime_t tm)
461466
{
462467
return tm == SRS_UTIME_NO_TIMEOUT;

trunk/src/protocol/srs_service_st.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, sr
102102
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);
103103

104104
extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);
105+
extern ssize_t srs_write(srs_netfd_t stfd, const void *buf, size_t nbyte, srs_utime_t timeout);
105106

106107
extern bool srs_is_never_timeout(srs_utime_t tm);
107108

0 commit comments

Comments
 (0)