Skip to content

Commit 37aee44

Browse files
committed
Threads-Log: Support dual queue cache for async logs.
1. Create dual queue, the coroutine queue and thread queue. 2. The coroutine queue cache logs does not require lock. 3. When need to flush, flush the logs from coroutine-queue to thread-queue. 4. Finally, flush thread-queue to disk.
1 parent 4918160 commit 37aee44

6 files changed

+115
-15
lines changed

trunk/conf/full.conf

+4-2
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ srs_log_level trace;
4141
# when srs_log_tank is file, specifies the log file.
4242
# default: ./objs/srs.log
4343
srs_log_file ./objs/srs.log;
44-
# The interval in ms, to flush async log.
44+
# The interval in ms, to flush async log. Generally, we flush from
45+
# coroutine-queue to thread-queue, then from thread-queue to disk.
46+
# So the delay of logs might be 2*srs_log_flush_interval.
4547
# Default: 1300
46-
srs_log_async_interval 1300;
48+
srs_log_flush_interval 1300;
4749
# the max connections.
4850
# if exceed the max connections, server will drop the new connection.
4951
# default: 1000

trunk/src/app/srs_app_config.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ class SrsConfig
892892
virtual std::string get_log_level();
893893
// Get the log file path.
894894
virtual std::string get_log_file();
895-
// Get the interval in ms to flush asyn log.
895+
// Get the interval in ms to flush async log.
896896
virtual srs_utime_t srs_log_flush_interval();
897897
// Whether ffmpeg log enabled
898898
virtual bool get_ff_log_enabled();

trunk/src/app/srs_app_log.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ SrsFileLog::~SrsFileLog()
6161
srs_freepa(log_data);
6262
}
6363

64+
// @remark Note that we should never write logs, because log is not ready not.
6465
srs_error_t SrsFileLog::initialize()
6566
{
6667
srs_error_t err = srs_success;

trunk/src/app/srs_app_threads.cpp

+47-11
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ SrsThreadPool::~SrsThreadPool()
110110
srs_freep(lock_);
111111
}
112112

113+
// @remark Note that we should never write logs, because log is not ready not.
113114
srs_error_t SrsThreadPool::initialize()
114115
{
115116
srs_error_t err = srs_success;
@@ -119,12 +120,7 @@ srs_error_t SrsThreadPool::initialize()
119120
return srs_error_wrap(err, "initialize st failed");
120121
}
121122

122-
if ((err = _srs_async_log->initialize()) != srs_success) {
123-
return srs_error_wrap(err, "init async log");
124-
}
125-
126123
interval_ = _srs_config->get_threads_interval();
127-
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
128124

129125
return err;
130126
}
@@ -169,12 +165,15 @@ srs_error_t SrsThreadPool::run()
169165
{
170166
srs_error_t err = srs_success;
171167

168+
// Write the init log here.
169+
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
170+
172171
while (true) {
172+
sleep(interval_ / SRS_UTIME_SECONDS);
173+
173174
string async_logs = _srs_async_log->description();
174175
srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(),
175176
async_logs.c_str());
176-
177-
sleep(interval_ / SRS_UTIME_SECONDS);
178177
}
179178

180179
return err;
@@ -202,11 +201,14 @@ void* SrsThreadPool::start(void* arg)
202201

203202
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
204203

205-
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
204+
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p, srs_utime_t interval)
206205
{
207206
filename_ = p;
208207
writer_ = new SrsFileWriter();
209208
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
209+
co_queue_ = new SrsCoroutineQueue<SrsSharedPtrMessage>();
210+
interval_ = interval;
211+
last_flush_time_ = srs_get_system_time();
210212
}
211213

212214
// TODO: FIXME: Before free the writer, we must remove it from the manager.
@@ -215,6 +217,7 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter()
215217
// TODO: FIXME: Should we flush dirty logs?
216218
srs_freep(writer_);
217219
srs_freep(queue_);
220+
srs_freep(co_queue_);
218221
}
219222

220223
srs_error_t SrsAsyncFileWriter::open()
@@ -246,7 +249,16 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
246249
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
247250
msg->wrap(cp, count);
248251

249-
queue_->push_back(msg);
252+
co_queue_->push_back(msg);
253+
254+
// Whether flush to thread-queue.
255+
if (srs_get_system_time() - last_flush_time_ >= interval_) {
256+
last_flush_time_ = srs_get_system_time();
257+
258+
vector<SrsSharedPtrMessage*> flying;
259+
co_queue_->swap(flying);
260+
queue_->push_back(flying);
261+
}
250262

251263
if (pnwrite) {
252264
*pnwrite = count;
@@ -303,6 +315,7 @@ srs_error_t SrsAsyncFileWriter::flush()
303315
SrsAsyncLogManager::SrsAsyncLogManager()
304316
{
305317
interval_ = 0;
318+
306319
reopen_ = false;
307320
lock_ = new SrsThreadMutex();
308321
}
@@ -318,6 +331,7 @@ SrsAsyncLogManager::~SrsAsyncLogManager()
318331
}
319332
}
320333

334+
// @remark Note that we should never write logs, because log is not ready not.
321335
srs_error_t SrsAsyncLogManager::initialize()
322336
{
323337
srs_error_t err = srs_success;
@@ -327,18 +341,28 @@ srs_error_t SrsAsyncLogManager::initialize()
327341
return srs_error_new(ERROR_SYSTEM_LOGFILE, "invalid interval=%dms", srsu2msi(interval_));
328342
}
329343

344+
return err;
345+
}
346+
347+
// @remark Now, log is ready, and we can print logs.
348+
srs_error_t SrsAsyncLogManager::run()
349+
{
350+
srs_error_t err = srs_success;
351+
330352
if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, this)) != srs_success) {
331353
return srs_error_wrap(err, "run async log");
332354
}
333355

356+
srs_trace("AsyncLogs: Init flush_interval=%dms", srsu2msi(interval_));
357+
334358
return err;
335359
}
336360

337361
srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFileWriter** ppwriter)
338362
{
339363
srs_error_t err = srs_success;
340364

341-
SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename);
365+
SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename, interval_);
342366
writers_.push_back(writer);
343367

344368
if ((err = writer->open()) != srs_success) {
@@ -369,8 +393,20 @@ std::string SrsAsyncLogManager::description()
369393
max_logs = srs_max(max_logs, nn);
370394
}
371395

396+
int nn_co_logs = 0;
397+
int max_co_logs = 0;
398+
for (int i = 0; i < (int)writers_.size(); i++) {
399+
SrsAsyncFileWriter* writer = writers_.at(i);
400+
401+
int nn = (int)writer->co_queue_->size();
402+
nn_co_logs += nn;
403+
max_co_logs = srs_max(max_co_logs, nn);
404+
}
405+
372406
static char buf[128];
373-
snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d", (int)writers_.size(), nn_logs, max_logs);
407+
snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d, coq=%d/%d",
408+
(int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs);
409+
374410
return buf;
375411
}
376412

trunk/src/app/srs_app_threads.hpp

+50-1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,37 @@ class SrsThreadPool
116116
// The global thread pool.
117117
extern SrsThreadPool* _srs_thread_pool;
118118

119+
// We use coroutine queue to collect messages from different coroutines,
120+
// then swap to the SrsThreadQueue and process by another thread.
121+
template<typename T>
122+
class SrsCoroutineQueue
123+
{
124+
private:
125+
std::vector<T*> dirty_;
126+
public:
127+
SrsCoroutineQueue() {
128+
}
129+
virtual ~SrsCoroutineQueue() {
130+
for (int i = 0; i < (int)dirty_.size(); i++) {
131+
T* msg = dirty_.at(i);
132+
srs_freep(msg);
133+
}
134+
}
135+
public:
136+
// SrsCoroutineQueue::push_back
137+
void push_back(T* msg) {
138+
dirty_.push_back(msg);
139+
}
140+
// SrsCoroutineQueue::swap
141+
void swap(std::vector<T*>& flying) {
142+
dirty_.swap(flying);
143+
}
144+
// SrsCoroutineQueue::size
145+
size_t size() {
146+
return dirty_.size();
147+
}
148+
};
149+
119150
// Thread-safe queue.
120151
template<typename T>
121152
class SrsThreadQueue
@@ -142,6 +173,11 @@ class SrsThreadQueue
142173
SrsThreadLocker(lock_);
143174
dirty_.push_back(msg);
144175
}
176+
// SrsThreadQueue::push_back
177+
void push_back(std::vector<T*>& flying) {
178+
SrsThreadLocker(lock_);
179+
dirty_.insert(dirty_.end(), flying.begin(), flying.end());
180+
}
145181
// SrsThreadQueue::swap
146182
void swap(std::vector<T*>& flying) {
147183
SrsThreadLocker(lock_);
@@ -161,9 +197,18 @@ class SrsAsyncFileWriter : public ISrsWriter
161197
private:
162198
std::string filename_;
163199
SrsFileWriter* writer_;
200+
private:
201+
// The thread-queue, to flush to disk by dedicated thread.
164202
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
165203
private:
166-
SrsAsyncFileWriter(std::string p);
204+
// The interval to flush from coroutine-queue to thread-queue.
205+
srs_utime_t interval_;
206+
// Last flush coroutine-queue time, to calculate the timeout.
207+
srs_utime_t last_flush_time_;
208+
// The coroutine-queue, to avoid requires lock for each log.
209+
SrsCoroutineQueue<SrsSharedPtrMessage>* co_queue_;
210+
private:
211+
SrsAsyncFileWriter(std::string p, srs_utime_t interval);
167212
virtual ~SrsAsyncFileWriter();
168213
public:
169214
// Open file writer, in truncate mode.
@@ -188,6 +233,8 @@ class SrsAsyncLogManager
188233
private:
189234
// The async flush interval.
190235
srs_utime_t interval_;
236+
// The number of logs to flush from coroutine-queue to thread-queue.
237+
int flush_co_queue_;
191238
private:
192239
// The async reopen event.
193240
bool reopen_;
@@ -200,6 +247,8 @@ class SrsAsyncLogManager
200247
public:
201248
// Initialize the async log manager.
202249
srs_error_t initialize();
250+
// Run the async log manager thread.
251+
srs_error_t run();
203252
// Create a managed writer, user should never free it.
204253
srs_error_t create_writer(std::string filename, SrsAsyncFileWriter** ppwriter);
205254
// Reopen all log files, asynchronously.

trunk/src/main/srs_main_server.cpp

+12
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ srs_error_t do_main(int argc, char** argv)
131131
if ((err = _srs_config->initialize_cwd()) != srs_success) {
132132
return srs_error_wrap(err, "config cwd");
133133
}
134+
135+
// We must initialize the async log manager before log init.
136+
if ((err = _srs_async_log->initialize()) != srs_success) {
137+
return srs_error_wrap(err, "init async log");
138+
}
134139

135140
// config parsed, initialize log.
136141
if ((err = _srs_log->initialize()) != srs_success) {
@@ -469,6 +474,13 @@ srs_error_t run_in_thread_pool()
469474
return srs_error_wrap(err, "init thread pool");
470475
}
471476

477+
// After all init(log, async log manager, thread pool), now we can start to
478+
// run the log manager thread.
479+
if ((err = _srs_async_log->run()) != srs_success) {
480+
return srs_error_wrap(err, "run async log");
481+
}
482+
483+
// Start the service worker thread, for RTMP and RTC server, etc.
472484
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
473485
return srs_error_wrap(err, "run hybrid server");
474486
}

0 commit comments

Comments
 (0)