@@ -103,6 +103,10 @@ srs_error_t SrsPipe::initialize()
103
103
{
104
104
srs_error_t err = srs_success;
105
105
106
+ if (pipes_[0 ] > 0 ) {
107
+ return err;
108
+ }
109
+
106
110
if (pipe (pipes_) < 0 ) {
107
111
return srs_error_new (ERROR_SYSTEM_CREATE_PIPE, " create pipe" );
108
112
}
@@ -134,6 +138,10 @@ srs_error_t SrsThreadPipe::initialize(int fd)
134
138
{
135
139
srs_error_t err = srs_success;
136
140
141
+ if (stfd_) {
142
+ return err;
143
+ }
144
+
137
145
if ((stfd_ = srs_netfd_open (fd)) == NULL ) {
138
146
return srs_error_new (ERROR_PIPE_OPEN, " open pipe" );
139
147
}
@@ -220,6 +228,134 @@ srs_error_t SrsThreadPipePair::write(void* buf, size_t size, ssize_t* nwrite)
220
228
return wpipe_->write (buf, size, nwrite);
221
229
}
222
230
231
+ SrsThreadPipeChannel::SrsThreadPipeChannel ()
232
+ {
233
+ initiator_ = new SrsThreadPipePair ();
234
+ responder_ = new SrsThreadPipePair ();
235
+
236
+ trd_ = new SrsFastCoroutine (" chan" , this );
237
+ handler_ = NULL ;
238
+ }
239
+
240
+ SrsThreadPipeChannel::~SrsThreadPipeChannel ()
241
+ {
242
+ srs_freep (trd_);
243
+ srs_freep (initiator_);
244
+ srs_freep (responder_);
245
+ }
246
+
247
+ SrsThreadPipePair* SrsThreadPipeChannel::initiator ()
248
+ {
249
+ return initiator_;
250
+ }
251
+
252
+ SrsThreadPipePair* SrsThreadPipeChannel::responder ()
253
+ {
254
+ return responder_;
255
+ }
256
+
257
+ srs_error_t SrsThreadPipeChannel::start (ISrsThreadResponder* h)
258
+ {
259
+ handler_ = h;
260
+ return trd_->start ();
261
+ }
262
+
263
+ srs_error_t SrsThreadPipeChannel::cycle ()
264
+ {
265
+ srs_error_t err = srs_success;
266
+
267
+ while (true ) {
268
+ if ((err = trd_->pull ()) != srs_success) {
269
+ return srs_error_wrap (err, " pull" );
270
+ }
271
+
272
+ // Here we're responder, read from initiator.
273
+ SrsThreadMessage m;
274
+ if ((err = initiator_->read (&m, sizeof (m), NULL )) != srs_success) {
275
+ return srs_error_wrap (err, " read" );
276
+ }
277
+
278
+ // Consume the message, the responder can write response to responder.
279
+ if (handler_ && (err = handler_->on_thread_message (&m, this )) != srs_success) {
280
+ return srs_error_wrap (err, " consume" );
281
+ }
282
+ }
283
+
284
+ return err;
285
+ }
286
+
287
+ SrsThreadPipeSlot::SrsThreadPipeSlot (int slots)
288
+ {
289
+ nn_channels_ = slots;
290
+ channels_ = new SrsThreadPipeChannel[slots];
291
+
292
+ index_ = 0 ;
293
+ lock_ = new SrsThreadMutex ();
294
+ }
295
+
296
+ SrsThreadPipeSlot::~SrsThreadPipeSlot ()
297
+ {
298
+ srs_freepa (channels_);
299
+ srs_freep (lock_);
300
+ }
301
+
302
+ srs_error_t SrsThreadPipeSlot::initialize ()
303
+ {
304
+ srs_error_t err = srs_success;
305
+
306
+ for (int i = 0 ; i < nn_channels_; i++) {
307
+ SrsThreadPipeChannel* channel = &channels_[i];
308
+
309
+ // Here we're responder, but it's ok to initialize the initiator.
310
+ if ((err = channel->initiator ()->initialize ()) != srs_success) {
311
+ return srs_error_wrap (err, " init %d initiator" , i);
312
+ }
313
+ if ((err = channel->responder ()->initialize ()) != srs_success) {
314
+ return srs_error_wrap (err, " init %d responder" , i);
315
+ }
316
+ }
317
+
318
+ return err;
319
+ }
320
+
321
+ srs_error_t SrsThreadPipeSlot::open_responder (ISrsThreadResponder* h)
322
+ {
323
+ srs_error_t err = srs_success;
324
+
325
+ for (int i = 0 ; i < nn_channels_; i++) {
326
+ SrsThreadPipeChannel* channel = &channels_[i];
327
+
328
+ // We're responder, read from initiator, write to responder.
329
+ if ((err = channel->initiator ()->open_read ()) != srs_success) {
330
+ return srs_error_wrap (err, " open read" );
331
+ }
332
+ if ((err = channel->responder ()->open_write ()) != srs_success) {
333
+ return srs_error_wrap (err, " open write" );
334
+ }
335
+
336
+ // OK, we start the cycle coroutine for responder.
337
+ if ((err = channel->start (h)) != srs_success) {
338
+ return srs_error_wrap (err, " start %d consume coroutine" , i);
339
+ }
340
+ }
341
+
342
+ return err;
343
+ }
344
+
345
+ SrsThreadPipeChannel* SrsThreadPipeSlot::allocate ()
346
+ {
347
+ SrsThreadLocker (lock_);
348
+ return index_ < nn_channels_? &channels_[index_++] : NULL ;
349
+ }
350
+
351
+ ISrsThreadResponder::ISrsThreadResponder ()
352
+ {
353
+ }
354
+
355
+ ISrsThreadResponder::~ISrsThreadResponder ()
356
+ {
357
+ }
358
+
223
359
SrsThreadMutex::SrsThreadMutex ()
224
360
{
225
361
// https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html
@@ -276,6 +412,7 @@ SrsThreadEntry::SrsThreadEntry()
276
412
cpuset_ok = false ;
277
413
278
414
stat = new SrsProcSelfStat ();
415
+ slot_ = NULL ;
279
416
280
417
received_packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
281
418
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
@@ -286,6 +423,9 @@ SrsThreadEntry::~SrsThreadEntry()
286
423
srs_freep (stat);
287
424
srs_freep (err);
288
425
426
+ // TODO: FIXME: Before free slot, we MUST close pipes in threads that open them.
427
+ srs_freep (slot_);
428
+
289
429
srs_freep (received_packets_);
290
430
srs_freep (cooked_packets_);
291
431
0 commit comments