Skip to content

Commit a505b6b

Browse files
committed
Threads: Directly use hybrid thread to consume messages.
1 parent 14bc7bc commit a505b6b

File tree

3 files changed

+21
-59
lines changed

3 files changed

+21
-59
lines changed

trunk/src/app/srs_app_hybrid.cpp

+20-3
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ srs_error_t SrsHybridServer::run()
205205
{
206206
srs_error_t err = srs_success;
207207

208+
// Run all servers, which should never block.
208209
vector<ISrsHybridServer*>::iterator it;
209210
for (it = servers.begin(); it != servers.end(); ++it) {
210211
ISrsHybridServer* server = *it;
@@ -214,9 +215,25 @@ srs_error_t SrsHybridServer::run()
214215
}
215216
}
216217

217-
// TODO: FIXME: Should run the signal manager and directly quit.
218-
// Wait for all server to quit.
219-
srs_usleep(SRS_UTIME_NO_TIMEOUT);
218+
// Consume the async UDP/SRTP packets.
219+
while (true) {
220+
int consumed = 0;
221+
222+
// Consume the received UDP packets.
223+
if ((err = _srs_async_recv->consume(&consumed)) != srs_success) {
224+
srs_error_reset(err); // Ignore any error.
225+
}
226+
227+
// Consume the cooked SRTP packets.
228+
if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) {
229+
srs_error_reset(err); // Ignore any error.
230+
}
231+
232+
// Wait for a while if no packets.
233+
if (!consumed) {
234+
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
235+
}
236+
}
220237

221238
return err;
222239
}

trunk/src/app/srs_app_threads.cpp

-47
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,6 @@ SrsThreadPool::SrsThreadPool()
158158
hybrid_critical_water_level_ = 0;
159159
hybrid_dying_water_level_ = 0;
160160

161-
trd_ = new SrsFastCoroutine("pool", this);
162-
163161
high_threshold_ = 0;
164162
high_pulse_ = 0;
165163
critical_threshold_ = 0;
@@ -188,8 +186,6 @@ SrsThreadPool::SrsThreadPool()
188186
// TODO: FIMXE: If free the pool, we should stop all threads.
189187
SrsThreadPool::~SrsThreadPool()
190188
{
191-
srs_freep(trd_);
192-
193189
srs_freep(lock_);
194190
}
195191

@@ -496,49 +492,6 @@ void* SrsThreadPool::start(void* arg)
496492
return NULL;
497493
}
498494

499-
srs_error_t SrsThreadPool::consume()
500-
{
501-
srs_error_t err = srs_success;
502-
503-
if ((err = trd_->start()) != srs_success) {
504-
return srs_error_wrap(err, "start");
505-
}
506-
507-
return err;
508-
}
509-
510-
srs_error_t SrsThreadPool::cycle()
511-
{
512-
srs_error_t err = srs_success;
513-
514-
while (true) {
515-
int consumed = 0;
516-
517-
// Check error before consume packets.
518-
if ((err = trd_->pull()) != srs_success) {
519-
return srs_error_wrap(err, "pull");
520-
}
521-
if ((err = _srs_async_recv->consume(&consumed)) != srs_success) {
522-
srs_error_reset(err); // Ignore any error.
523-
}
524-
525-
// Check error before consume packets.
526-
if ((err = trd_->pull()) != srs_success) {
527-
return srs_error_wrap(err, "pull");
528-
}
529-
if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) {
530-
srs_error_reset(err); // Ignore any error.
531-
}
532-
533-
if (!consumed) {
534-
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
535-
continue;
536-
}
537-
}
538-
539-
return err;
540-
}
541-
542495
// TODO: FIXME: It should be thread-local or thread-safe.
543496
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
544497

trunk/src/app/srs_app_threads.hpp

+1-9
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class SrsThreadEntry
112112

113113
// Allocate a(or almost) fixed thread poll to execute tasks,
114114
// so that we can take the advantage of multiple CPUs.
115-
class SrsThreadPool : public ISrsCoroutineHandler
115+
class SrsThreadPool
116116
{
117117
private:
118118
SrsThreadEntry* entry_;
@@ -136,9 +136,6 @@ class SrsThreadPool : public ISrsCoroutineHandler
136136
int critical_pulse_;
137137
int dying_threshold_;
138138
int dying_pulse_;
139-
private:
140-
// A coroutine to consume cooked packets.
141-
SrsFastCoroutine* trd_;
142139
public:
143140
SrsThreadPool();
144141
virtual ~SrsThreadPool();
@@ -159,11 +156,6 @@ class SrsThreadPool : public ISrsCoroutineHandler
159156
void stop();
160157
private:
161158
static void* start(void* arg);
162-
public:
163-
// Consume packets. Must call in worker/service thread.
164-
virtual srs_error_t consume();
165-
private:
166-
srs_error_t cycle();
167159
};
168160

169161
// The global thread pool.

0 commit comments

Comments
 (0)