Skip to content

Commit 6718c38

Browse files
committed
Threads-Log: Refine dual queue for log thread.
1. App/User controls the interval to flush coroutine-queue. 2. Use srs_update_system_time to get time for log. 3. Stat the thread sync in us, in SrsThreadPool. 4. Change default interval for thread to 5s.
1 parent 37aee44 commit 6718c38

7 files changed

+73
-32
lines changed

trunk/conf/full.conf

+2-2
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ tcmalloc_release_rate 0.8;
119119
# For thread pool.
120120
threads {
121121
# The thread pool manager cycle interval, in seconds.
122-
# Default: 60
123-
interval 60;
122+
# Default: 5
123+
interval 5;
124124
}
125125

126126
#############################################################################################

trunk/src/app/srs_app_config.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -4107,7 +4107,7 @@ double SrsConfig::tcmalloc_release_rate()
41074107

41084108
srs_utime_t SrsConfig::get_threads_interval()
41094109
{
4110-
static srs_utime_t DEFAULT = 60 * SRS_UTIME_SECONDS;
4110+
static srs_utime_t DEFAULT = 5 * SRS_UTIME_SECONDS;
41114111

41124112
SrsConfDirective* conf = root->get("threads");
41134113
if (!conf) {

trunk/src/app/srs_app_log.cpp

+14-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ SrsFileLog::SrsFileLog()
5454

5555
log_data = new char[LOG_MAX_SIZE];
5656
writer_ = NULL;
57+
58+
last_flush_time_ = srs_get_system_time();
59+
interval_ = 0;
5760
}
5861

5962
SrsFileLog::~SrsFileLog()
@@ -71,6 +74,7 @@ srs_error_t SrsFileLog::initialize()
7174
filename_ = _srs_config->get_log_file();
7275
level = srs_get_log_level(_srs_config->get_log_level());
7376
utc = _srs_config->get_utc_time();
77+
interval_ = _srs_config->srs_log_flush_interval();
7478
}
7579

7680
if (!log_to_file_tank) {
@@ -202,11 +206,12 @@ void SrsFileLog::write_log(char *str_log, int size, int level)
202206
// ensure the tail and EOF of string
203207
// LOG_TAIL_SIZE for the TAIL char.
204208
// 1 for the last char(0).
205-
size = srs_min(LOG_MAX_SIZE - 1 - LOG_TAIL_SIZE, size);
209+
size = srs_min(LOG_MAX_SIZE - 2 - LOG_TAIL_SIZE, size);
206210

207211
// add some to the end of char.
208212
str_log[size++] = LOG_TAIL;
209-
213+
str_log[size] = 0;
214+
210215
// if not to file, to console and return.
211216
if (!log_to_file_tank) {
212217
// if is error msg, then print color msg.
@@ -230,5 +235,12 @@ void SrsFileLog::write_log(char *str_log, int size, int level)
230235
if ((err = writer_->write(str_log, size, NULL)) != srs_success) {
231236
srs_error_reset(err); // Ignore any error for log writing.
232237
}
238+
239+
// Whether flush to thread-queue.
240+
srs_utime_t diff = srs_get_system_time() - last_flush_time_;
241+
if (diff >= interval_) {
242+
last_flush_time_ = srs_get_system_time();
243+
writer_->flush_co_queue();
244+
}
233245
}
234246

trunk/src/app/srs_app_log.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ class SrsFileLog : public ISrsLog, public ISrsReloadHandler
5252
char* log_data;
5353
// Async file writer.
5454
SrsAsyncFileWriter* writer_;
55+
private:
56+
// The interval to flush from coroutine-queue to thread-queue.
57+
srs_utime_t interval_;
58+
// Last flush coroutine-queue time, to calculate the timeout.
59+
srs_utime_t last_flush_time_;
5560
private:
5661
// Defined in SrsLogLevel.
5762
SrsLogLevel level;

trunk/src/app/srs_app_threads.cpp

+44-16
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@
3333

3434
using namespace std;
3535

36+
#include <srs_protocol_kbps.hpp>
37+
38+
SrsPps* _srs_thread_sync_10us = new SrsPps();
39+
SrsPps* _srs_thread_sync_100us = new SrsPps();
40+
SrsPps* _srs_thread_sync_1000us = new SrsPps();
41+
SrsPps* _srs_thread_sync_plus = new SrsPps();
42+
3643
SrsThreadMutex::SrsThreadMutex()
3744
{
3845
// https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html
@@ -171,9 +178,19 @@ srs_error_t SrsThreadPool::run()
171178
while (true) {
172179
sleep(interval_ / SRS_UTIME_SECONDS);
173180

181+
static char buf[128];
174182
string async_logs = _srs_async_log->description();
175-
srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(),
176-
async_logs.c_str());
183+
184+
string sync_desc;
185+
_srs_thread_sync_10us->update(); _srs_thread_sync_100us->update();
186+
_srs_thread_sync_1000us->update(); _srs_thread_sync_plus->update();
187+
if (_srs_thread_sync_10us->r10s() || _srs_thread_sync_100us->r10s() || _srs_thread_sync_1000us->r10s() || _srs_thread_sync_plus->r10s()) {
188+
snprintf(buf, sizeof(buf), ", sync=%d,%d,%d,%d", _srs_thread_sync_10us->r10s(), _srs_thread_sync_100us->r10s(), _srs_thread_sync_1000us->r10s(), _srs_thread_sync_plus->r10s());
189+
sync_desc = buf;
190+
}
191+
192+
srs_trace("Thread: cycle threads=%d%s%s", (int)threads_.size(),
193+
async_logs.c_str(), sync_desc.c_str());
177194
}
178195

179196
return err;
@@ -201,14 +218,12 @@ void* SrsThreadPool::start(void* arg)
201218

202219
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
203220

204-
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p, srs_utime_t interval)
221+
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
205222
{
206223
filename_ = p;
207224
writer_ = new SrsFileWriter();
208225
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
209226
co_queue_ = new SrsCoroutineQueue<SrsSharedPtrMessage>();
210-
interval_ = interval;
211-
last_flush_time_ = srs_get_system_time();
212227
}
213228

214229
// TODO: FIXME: Before free the writer, we must remove it from the manager.
@@ -251,15 +266,6 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
251266

252267
co_queue_->push_back(msg);
253268

254-
// Whether flush to thread-queue.
255-
if (srs_get_system_time() - last_flush_time_ >= interval_) {
256-
last_flush_time_ = srs_get_system_time();
257-
258-
vector<SrsSharedPtrMessage*> flying;
259-
co_queue_->swap(flying);
260-
queue_->push_back(flying);
261-
}
262-
263269
if (pnwrite) {
264270
*pnwrite = count;
265271
}
@@ -287,6 +293,28 @@ srs_error_t SrsAsyncFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pn
287293
return err;
288294
}
289295

296+
void SrsAsyncFileWriter::flush_co_queue()
297+
{
298+
srs_utime_t now = srs_update_system_time();
299+
300+
if (true) {
301+
vector<SrsSharedPtrMessage*> flying;
302+
co_queue_->swap(flying);
303+
queue_->push_back(flying);
304+
}
305+
306+
srs_utime_t elapsed = srs_update_system_time() - now;
307+
if (elapsed <= 10) {
308+
++_srs_thread_sync_10us->sugar;
309+
} else if (elapsed <= 100) {
310+
++_srs_thread_sync_100us->sugar;
311+
} else if (elapsed <= 1000) {
312+
++_srs_thread_sync_1000us->sugar;
313+
} else {
314+
++_srs_thread_sync_plus->sugar;
315+
}
316+
}
317+
290318
srs_error_t SrsAsyncFileWriter::flush()
291319
{
292320
srs_error_t err = srs_success;
@@ -362,7 +390,7 @@ srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFile
362390
{
363391
srs_error_t err = srs_success;
364392

365-
SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename, interval_);
393+
SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename);
366394
writers_.push_back(writer);
367395

368396
if ((err = writer->open()) != srs_success) {
@@ -404,7 +432,7 @@ std::string SrsAsyncLogManager::description()
404432
}
405433

406434
static char buf[128];
407-
snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d, coq=%d/%d",
435+
snprintf(buf, sizeof(buf), ", logs=%d/%d/%d/%d/%d",
408436
(int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs);
409437

410438
return buf;

trunk/src/app/srs_app_threads.hpp

+4-8
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,10 @@ class SrsAsyncFileWriter : public ISrsWriter
201201
// The thread-queue, to flush to disk by dedicated thread.
202202
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
203203
private:
204-
// The interval to flush from coroutine-queue to thread-queue.
205-
srs_utime_t interval_;
206-
// Last flush coroutine-queue time, to calculate the timeout.
207-
srs_utime_t last_flush_time_;
208204
// The coroutine-queue, to avoid requires lock for each log.
209205
SrsCoroutineQueue<SrsSharedPtrMessage>* co_queue_;
210206
private:
211-
SrsAsyncFileWriter(std::string p, srs_utime_t interval);
207+
SrsAsyncFileWriter(std::string p);
212208
virtual ~SrsAsyncFileWriter();
213209
public:
214210
// Open file writer, in truncate mode.
@@ -222,7 +218,9 @@ class SrsAsyncFileWriter : public ISrsWriter
222218
virtual srs_error_t write(void* buf, size_t count, ssize_t* pnwrite);
223219
virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
224220
public:
225-
// Flush by other thread.
221+
// Flush coroutine-queue to thread-queue, avoid requiring lock for each message.
222+
void flush_co_queue();
223+
// Flush thread-queue to disk, generally by dedicated thread.
226224
srs_error_t flush();
227225
};
228226

@@ -233,8 +231,6 @@ class SrsAsyncLogManager
233231
private:
234232
// The async flush interval.
235233
srs_utime_t interval_;
236-
// The number of logs to flush from coroutine-queue to thread-queue.
237-
int flush_co_queue_;
238234
private:
239235
// The async reopen event.
240236
bool reopen_;

trunk/src/protocol/srs_service_log.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,9 @@ bool srs_log_header(char* buffer, int size, bool utc, bool dangerous, const char
244244
{
245245
// clock time
246246
timeval tv;
247-
if (gettimeofday(&tv, NULL) == -1) {
248-
return false;
249-
}
247+
srs_utime_t now = srs_update_system_time();
248+
tv.tv_sec = now / SRS_UTIME_SECONDS;
249+
tv.tv_usec = now % SRS_UTIME_SECONDS;
250250

251251
// to calendar time
252252
struct tm* tm;

0 commit comments

Comments
 (0)