Skip to content

Commit 6ddbc5f

Browse files
committed
Threads-Log: Refine comments for global variable.
1. Dual queue for async logs, exists risk. 2. Flush the logs is too slow, because it depends on logs and interval.
1 parent 6718c38 commit 6ddbc5f

10 files changed

+70
-30
lines changed

trunk/src/app/srs_app_hybrid.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ srs_error_t SrsHybridServer::run()
207207
}
208208
}
209209

210+
// TODO: FIXME: Should run the signal manager and directly quit.
210211
// Wait for all server to quit.
211212
srs_usleep(SRS_UTIME_NO_TIMEOUT);
212213

trunk/src/app/srs_app_log.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ void SrsFileLog::write_log(char *str_log, int size, int level)
213213
str_log[size] = 0;
214214

215215
// if not to file, to console and return.
216+
// @remark Its value changes, because there is some log before config loaded.
216217
if (!log_to_file_tank) {
217218
// if is error msg, then print color msg.
218219
// \033[31m : red text code in shell

trunk/src/app/srs_app_rtc_server.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -766,5 +766,6 @@ void RtcServerAdapter::stop()
766766
{
767767
}
768768

769+
// TODO: FIXME: It should be thread-local or thread-safe.
769770
SrsResourceManager* _srs_rtc_manager = new SrsResourceManager("RTC", true);
770771

trunk/src/app/srs_app_rtc_source.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ SrsRtcStream* SrsRtcStreamManager::fetch(SrsRequest* r)
315315
return source;
316316
}
317317

318+
// TODO: FIXME: It should be thread-local or thread-safe.
318319
SrsRtcStreamManager* _srs_rtc_sources = new SrsRtcStreamManager();
319320

320321
ISrsRtcPublishStream::ISrsRtcPublishStream()

trunk/src/app/srs_app_server.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1113,11 +1113,12 @@ srs_error_t SrsServer::cycle()
11131113
}
11141114

11151115
srs_trace("srs terminated");
1116-
1116+
11171117
// for valgrind to detect.
11181118
srs_freep(_srs_config);
11191119
srs_freep(_srs_log);
11201120

1121+
// TODO: FIXME: Should return to exit the thread, and quit by thread pool manager.
11211122
exit(0);
11221123

11231124
return err;

trunk/src/app/srs_app_threads.cpp

+37-22
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ SrsThreadEntry::SrsThreadEntry()
9292

9393
SrsThreadEntry::~SrsThreadEntry()
9494
{
95+
// TODO: FIXME: Should dispose err and trd.
9596
}
9697

9798
SrsThreadPool::SrsThreadPool()
@@ -117,7 +118,6 @@ SrsThreadPool::~SrsThreadPool()
117118
srs_freep(lock_);
118119
}
119120

120-
// @remark Note that we should never write logs, because log is not ready not.
121121
srs_error_t SrsThreadPool::initialize()
122122
{
123123
srs_error_t err = srs_success;
@@ -172,12 +172,33 @@ srs_error_t SrsThreadPool::run()
172172
{
173173
srs_error_t err = srs_success;
174174

175-
// Write the init log here.
176-
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
175+
bool print_init_log = true;
177176

178177
while (true) {
179-
sleep(interval_ / SRS_UTIME_SECONDS);
178+
// Check the threads status fastly.
179+
int loops = (int)(interval_ / SRS_UTIME_SECONDS);
180+
for (int i = 0; i < loops; i++) {
181+
if (true) {
182+
SrsThreadLocker(lock_);
183+
for (int i = 0; i < (int)threads_.size(); i++) {
184+
SrsThreadEntry* entry = threads_.at(i);
185+
if (entry->err != srs_success) {
186+
err = srs_error_wrap(entry->err, "thread #%d(%s)", entry->num, entry->label.c_str());
187+
return srs_error_copy(err);
188+
}
189+
}
190+
}
191+
192+
sleep(1);
193+
194+
// Flush the system previous logs by this log.
195+
if (i > 0 && print_init_log) {
196+
print_init_log = false;
197+
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
198+
}
199+
}
180200

201+
// In normal state, gather status and log it.
181202
static char buf[128];
182203
string async_logs = _srs_async_log->description();
183204

@@ -198,7 +219,7 @@ srs_error_t SrsThreadPool::run()
198219

199220
void SrsThreadPool::stop()
200221
{
201-
// TODO: FIXME: Implements it.
222+
// TODO: FIXME: Should notify other threads to do cleanup and quit.
202223
}
203224

204225
void* SrsThreadPool::start(void* arg)
@@ -216,6 +237,7 @@ void* SrsThreadPool::start(void* arg)
216237
return NULL;
217238
}
218239

240+
// TODO: FIXME: It should be thread-local or thread-safe.
219241
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
220242

221243
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
@@ -297,6 +319,7 @@ void SrsAsyncFileWriter::flush_co_queue()
297319
{
298320
srs_utime_t now = srs_update_system_time();
299321

322+
// The thread queue is thread-safe, so we do not need a lock.
300323
if (true) {
301324
vector<SrsSharedPtrMessage*> flying;
302325
co_queue_->swap(flying);
@@ -373,25 +396,22 @@ srs_error_t SrsAsyncLogManager::initialize()
373396
}
374397

375398
// @remark Now, log is ready, and we can print logs.
376-
srs_error_t SrsAsyncLogManager::run()
399+
srs_error_t SrsAsyncLogManager::start(void* arg)
377400
{
378-
srs_error_t err = srs_success;
379-
380-
if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, this)) != srs_success) {
381-
return srs_error_wrap(err, "run async log");
382-
}
383-
384-
srs_trace("AsyncLogs: Init flush_interval=%dms", srsu2msi(interval_));
385-
386-
return err;
401+
SrsAsyncLogManager* log = (SrsAsyncLogManager*)arg;
402+
return log->do_start();
387403
}
388404

389405
srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFileWriter** ppwriter)
390406
{
391407
srs_error_t err = srs_success;
392408

393409
SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename);
394-
writers_.push_back(writer);
410+
411+
if (true) {
412+
SrsThreadLocker(lock_);
413+
writers_.push_back(writer);
414+
}
395415

396416
if ((err = writer->open()) != srs_success) {
397417
return srs_error_wrap(err, "open file %s fail", filename.c_str());
@@ -438,12 +458,6 @@ std::string SrsAsyncLogManager::description()
438458
return buf;
439459
}
440460

441-
srs_error_t SrsAsyncLogManager::start(void* arg)
442-
{
443-
SrsAsyncLogManager* log = (SrsAsyncLogManager*)arg;
444-
return log->do_start();
445-
}
446-
447461
srs_error_t SrsAsyncLogManager::do_start()
448462
{
449463
srs_error_t err = srs_success;
@@ -489,4 +503,5 @@ srs_error_t SrsAsyncLogManager::do_start()
489503
return err;
490504
}
491505

506+
// TODO: FIXME: It should be thread-local or thread-safe.
492507
SrsAsyncLogManager* _srs_async_log = new SrsAsyncLogManager();

trunk/src/app/srs_app_threads.hpp

+11-3
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ class SrsThreadQueue
190190
}
191191
};
192192

193-
// Async file writer.
193+
// Async file writer, it's not thread safe. It assume that each thread has its dedicate object,
194+
// for example, the log object should be thread_local.
194195
class SrsAsyncFileWriter : public ISrsWriter
195196
{
196197
friend class SrsAsyncLogManager;
@@ -202,6 +203,14 @@ class SrsAsyncFileWriter : public ISrsWriter
202203
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
203204
private:
204205
// The coroutine-queue, to avoid requires lock for each log.
206+
// @remark Note that if multiple thread write to the same log file, the log is nor ordered
207+
// by time, because each thread has this coroutine-queue and flush as a batch of logs to
208+
// thread-queue:
209+
// thread #1, flush 10 logs to thread-queue, [10:10:00 ~ 10:11:00]
210+
// thread #2, flush 100 logs to thread-queue, [10:09:00 ~ 10:12:00]
211+
// Finally, the logs flush to disk as:
212+
// [10:10:00 ~ 10:11:00], 10 logs.
213+
// [10:09:00 ~ 10:12:00], 100 logs.
205214
SrsCoroutineQueue<SrsSharedPtrMessage>* co_queue_;
206215
private:
207216
SrsAsyncFileWriter(std::string p);
@@ -244,7 +253,7 @@ class SrsAsyncLogManager
244253
// Initialize the async log manager.
245254
srs_error_t initialize();
246255
// Run the async log manager thread.
247-
srs_error_t run();
256+
static srs_error_t start(void* arg);
248257
// Create a managed writer, user should never free it.
249258
srs_error_t create_writer(std::string filename, SrsAsyncFileWriter** ppwriter);
250259
// Reopen all log files, asynchronously.
@@ -253,7 +262,6 @@ class SrsAsyncLogManager
253262
// Get the summary of this manager.
254263
std::string description();
255264
private:
256-
static srs_error_t start(void* arg);
257265
srs_error_t do_start();
258266
};
259267

trunk/src/kernel/srs_kernel_kbps.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -129,5 +129,6 @@ srs_utime_t SrsWallClock::now()
129129
return srs_get_system_time();
130130
}
131131

132+
// TODO: FIXME: It should be thread-local or thread-safe.
132133
SrsWallClock* _srs_clock = new SrsWallClock();
133134

trunk/src/kernel/srs_kernel_rtc_rtp.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -1054,10 +1054,12 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf)
10541054
return err;
10551055
}
10561056

1057+
// TODO: FIXME: It should be thread-local or thread-safe.
10571058
SrsRtpObjectCacheManager<SrsRtpPacket2>* _srs_rtp_cache = new SrsRtpObjectCacheManager<SrsRtpPacket2>(sizeof(SrsRtpPacket2));
10581059
SrsRtpObjectCacheManager<SrsRtpRawPayload>* _srs_rtp_raw_cache = new SrsRtpObjectCacheManager<SrsRtpRawPayload>(sizeof(SrsRtpRawPayload));
10591060
SrsRtpObjectCacheManager<SrsRtpFUAPayload2>* _srs_rtp_fua_cache = new SrsRtpObjectCacheManager<SrsRtpFUAPayload2>(sizeof(SrsRtpFUAPayload2));
10601061

1062+
// TODO: FIXME: It should be thread-local or thread-safe.
10611063
SrsRtpObjectCacheManager<SrsSharedPtrMessage>* _srs_rtp_msg_cache_buffers = new SrsRtpObjectCacheManager<SrsSharedPtrMessage>(sizeof(SrsSharedPtrMessage) + kRtpPacketSize);
10621064
SrsRtpObjectCacheManager<SrsSharedPtrMessage>* _srs_rtp_msg_cache_objs = new SrsRtpObjectCacheManager<SrsSharedPtrMessage>(sizeof(SrsSharedPtrMessage));
10631065

trunk/src/main/srs_main_server.cpp

+13-4
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,12 @@ srs_error_t run_in_thread_pool();
7070
void show_macro_features();
7171

7272
// @global log and context.
73+
// TODO: FIXME: It should be thread-local or thread-safe.
7374
ISrsLog* _srs_log = new SrsFileLog();
75+
// TODO: FIXME: It should be thread-local or thread-safe.
7476
ISrsContext* _srs_context = new SrsThreadContext();
7577
// @global config object for app module.
78+
// TODO: FIXME: It should be thread-local or thread-safe.
7679
SrsConfig* _srs_config = new SrsConfig();
7780

7881
// @global version of srs, which can grep keyword "XCORE"
@@ -226,8 +229,12 @@ int main(int argc, char** argv) {
226229

227230
srs_error_t err = do_main(argc, argv);
228231

232+
// Because we are exiting, and it's impossible to notify the async log thread
233+
// to write the error log, so we print to stderr instead.
234+
// TODO: FIXME: Should we flush the async log cache?
229235
if (err != srs_success) {
230-
srs_error("Failed, %s", srs_error_desc(err).c_str());
236+
fprintf(stderr, "Failed, ts=%" PRId64 ", err is %s\n", srs_update_system_time(),
237+
srs_error_desc(err).c_str());
231238
}
232239

233240
int ret = srs_error_code(err);
@@ -476,18 +483,19 @@ srs_error_t run_in_thread_pool()
476483

477484
// After all init(log, async log manager, thread pool), now we can start to
478485
// run the log manager thread.
479-
if ((err = _srs_async_log->run()) != srs_success) {
480-
return srs_error_wrap(err, "run async log");
486+
if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, _srs_async_log)) != srs_success) {
487+
return srs_error_wrap(err, "start async log thread");
481488
}
482489

483490
// Start the service worker thread, for RTMP and RTC server, etc.
484491
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
485-
return srs_error_wrap(err, "run hybrid server");
492+
return srs_error_wrap(err, "start hybrid server thread");
486493
}
487494

488495
return _srs_thread_pool->run();
489496
}
490497

498+
// TODO: FIXME: Extract to hybrid server.
491499
srs_error_t run_hybrid_server(void* arg)
492500
{
493501
srs_error_t err = srs_success;
@@ -504,6 +512,7 @@ srs_error_t run_hybrid_server(void* arg)
504512
#endif
505513

506514
// Do some system initialize.
515+
// TODO: FIXME: If fail, for example, acquire pid fail, should exit.
507516
if ((err = _srs_hybrid->initialize()) != srs_success) {
508517
return srs_error_wrap(err, "hybrid initialize");
509518
}

0 commit comments

Comments
 (0)