Skip to content

Commit 668c9c1

Browse files
committed
Threads-Log: Run hybrid server in thread.
1. Create thread when execute by thread pool. 2. The primordial thread check all threads status. 3. Have not complete the cleanup and stop.
1 parent 831b77b commit 668c9c1

File tree

5 files changed

+183
-10
lines changed

5 files changed

+183
-10
lines changed

trunk/src/app/srs_app_hybrid.cpp

-5
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,6 @@ srs_error_t SrsHybridServer::initialize()
168168
{
169169
srs_error_t err = srs_success;
170170

171-
// init st
172-
if ((err = srs_st_init()) != srs_success) {
173-
return srs_error_wrap(err, "initialize st failed");
174-
}
175-
176171
// Create global shared timer.
177172
timer_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS);
178173

trunk/src/app/srs_app_threads.cpp

+116-2
Original file line numberDiff line numberDiff line change
@@ -24,35 +24,149 @@
2424
#include <srs_app_threads.hpp>
2525

2626
#include <srs_kernel_error.hpp>
27+
#include <srs_app_config.hpp>
28+
#include <srs_app_log.hpp>
29+
#include <srs_core_autofree.hpp>
30+
31+
#include <unistd.h>
32+
33+
using namespace std;
34+
35+
SrsThreadMutex::SrsThreadMutex()
36+
{
37+
// https://michaelkerrisk.com/linux/man-pages/man3/pthread_mutex_init.3p.html
38+
int r0 = pthread_mutex_init(&lock_, NULL);
39+
srs_assert(!r0);
40+
}
41+
42+
SrsThreadMutex::~SrsThreadMutex()
43+
{
44+
int r0 = pthread_mutex_destroy(&lock_);
45+
srs_assert(!r0);
46+
}
47+
48+
void SrsThreadMutex::lock()
49+
{
50+
// https://man7.org/linux/man-pages/man3/pthread_mutex_lock.3p.html
51+
int r0 = pthread_mutex_lock(&lock_);
52+
srs_assert(!r0);
53+
}
54+
55+
void SrsThreadMutex::unlock()
56+
{
57+
int r0 = pthread_mutex_unlock(&lock_);
58+
srs_assert(!r0);
59+
}
60+
61+
SrsThreadEntry::SrsThreadEntry()
62+
{
63+
pool = NULL;
64+
start = NULL;
65+
arg = NULL;
66+
num = 0;
67+
68+
err = srs_success;
69+
}
2770

2871
SrsThreadPool::SrsThreadPool()
2972
{
73+
entry_ = NULL;
74+
lock_ = new SrsThreadMutex();
3075
}
3176

3277
SrsThreadPool::~SrsThreadPool()
3378
{
79+
srs_freep(lock_);
3480
}
3581

3682
srs_error_t SrsThreadPool::initialize()
3783
{
3884
srs_error_t err = srs_success;
85+
86+
// TODO: FIXME: Should init ST for each thread.
87+
if ((err = srs_st_init()) != srs_success) {
88+
return srs_error_wrap(err, "initialize st failed");
89+
}
90+
91+
// Add primordial thread, current thread itself.
92+
SrsThreadEntry* entry = new SrsThreadEntry();
93+
threads_.push_back(entry);
94+
entry_ = entry;
95+
96+
entry->pool = this;
97+
entry->label = "primordial";
98+
entry->start = NULL;
99+
entry->arg = NULL;
100+
entry->num = 1;
101+
102+
srs_trace("Thread #%d: %s init", entry_->num, entry_->label.c_str());
103+
39104
return err;
40105
}
41106

42-
srs_error_t SrsThreadPool::execute(srs_error_t (*start)(void* arg), void* arg)
107+
srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg), void* arg)
43108
{
44-
srs_error_t err = start(arg);
109+
srs_error_t err = srs_success;
110+
111+
static int num = entry_->num + 1;
112+
113+
SrsThreadEntry* entry = new SrsThreadEntry();
114+
115+
if (true) {
116+
SrsThreadLocker(lock_);
117+
threads_.push_back(entry);
118+
}
119+
120+
entry->pool = this;
121+
entry->label = label;
122+
entry->start = start;
123+
entry->arg = arg;
124+
entry->num = num++;
125+
126+
// https://man7.org/linux/man-pages/man3/pthread_create.3.html
127+
pthread_t trd;
128+
int r0 = pthread_create(&trd, NULL, SrsThreadPool::start, entry);
129+
if (r0 != 0) {
130+
entry->err = srs_error_new(ERROR_THREAD_CREATE, "create thread %s", label.c_str());
131+
return srs_error_copy(entry->err);
132+
}
133+
134+
entry->trd = trd;
135+
45136
return err;
46137
}
47138

48139
srs_error_t SrsThreadPool::run()
49140
{
50141
srs_error_t err = srs_success;
142+
143+
while (true) {
144+
srs_trace("Thread #%d: %s run, threads=%d", entry_->num, entry_->label.c_str(),
145+
(int)threads_.size());
146+
sleep(60);
147+
}
148+
51149
return err;
52150
}
53151

54152
void SrsThreadPool::stop()
55153
{
154+
// TODO: FIXME: Implements it.
155+
}
156+
157+
void* SrsThreadPool::start(void* arg)
158+
{
159+
srs_error_t err = srs_success;
160+
161+
SrsThreadEntry* entry = (SrsThreadEntry*)arg;
162+
srs_trace("Thread #%d: %s run", entry->num, entry->label.c_str());
163+
164+
if ((err = entry->start(entry->arg)) != srs_success) {
165+
entry->err = err;
166+
}
167+
168+
// We do not use the return value, the err has been set to entry->err.
169+
return NULL;
56170
}
57171

58172
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();

trunk/src/app/srs_app_threads.hpp

+65-2
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,85 @@
2626

2727
#include <srs_core.hpp>
2828

29+
#include <pthread.h>
30+
31+
#include <vector>
32+
#include <string>
33+
34+
class SrsThreadPool;
35+
36+
// The thread mutex wrapper, without error.
37+
class SrsThreadMutex
38+
{
39+
private:
40+
pthread_mutex_t lock_;
41+
public:
42+
SrsThreadMutex();
43+
virtual ~SrsThreadMutex();
44+
public:
45+
void lock();
46+
void unlock();
47+
};
48+
49+
// The thread mutex locker.
50+
#define SrsThreadLocker(instance) \
51+
impl__SrsThreadLocker _SRS_free_##instance(instance)
52+
53+
class impl__SrsThreadLocker
54+
{
55+
private:
56+
SrsThreadMutex* lock;
57+
public:
58+
impl__SrsThreadLocker(SrsThreadMutex* l) {
59+
lock = l;
60+
lock->lock();
61+
}
62+
virtual ~impl__SrsThreadLocker() {
63+
lock->unlock();
64+
}
65+
};
66+
67+
// The information for a thread.
68+
class SrsThreadEntry
69+
{
70+
public:
71+
SrsThreadPool* pool;
72+
std::string label;
73+
srs_error_t (*start)(void* arg);
74+
void* arg;
75+
int num;
76+
public:
77+
// The thread object.
78+
pthread_t trd;
79+
// The exit error of thread.
80+
srs_error_t err;
81+
82+
SrsThreadEntry();
83+
};
84+
2985
// Allocate a(or almost) fixed thread poll to execute tasks,
3086
// so that we can take the advantage of multiple CPUs.
3187
class SrsThreadPool
3288
{
89+
private:
90+
SrsThreadEntry* entry_;
91+
private:
92+
SrsThreadMutex* lock_;
93+
std::vector<SrsThreadEntry*> threads_;
3394
public:
3495
SrsThreadPool();
3596
virtual ~SrsThreadPool();
3697
public:
3798
// Initialize the thread pool.
3899
srs_error_t initialize();
39-
// Execute start function in thread.
40-
srs_error_t execute(srs_error_t (*start)(void* arg), void* arg);
100+
// Execute start function with label in thread.
101+
srs_error_t execute(std::string label, srs_error_t (*start)(void* arg), void* arg);
41102
// Run in the primordial thread, util stop or quit.
42103
srs_error_t run();
43104
// Stop the thread pool and quit the primordial thread.
44105
void stop();
106+
private:
107+
static void* start(void* arg);
45108
};
46109

47110
// The global thread pool.

trunk/src/kernel/srs_kernel_error.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
#define ERROR_SOCKET_SETREUSEADDR 1079
119119
#define ERROR_SOCKET_SETCLOSEEXEC 1080
120120
#define ERROR_SOCKET_ACCEPT 1081
121+
#define ERROR_THREAD_CREATE 1082
121122

122123
///////////////////////////////////////////////////////
123124
// RTMP protocol error.

trunk/src/main/srs_main_server.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ srs_error_t run_in_thread_pool()
469469
return srs_error_wrap(err, "init thread pool");
470470
}
471471

472-
if ((err = _srs_thread_pool->execute(run_hybrid_server, NULL)) != srs_success) {
472+
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
473473
return srs_error_wrap(err, "run hybrid server");
474474
}
475475

0 commit comments

Comments
 (0)