Skip to content

Commit 6a1d6a0

Browse files
committed
Threads-Log: Remove dual queue for sys logs.
1. It exists delay for multiple threads. 2. There is overlay for cache of coroutine queues. 3. Risk when other threads write logs.
1 parent 6ddbc5f commit 6a1d6a0

File tree

4 files changed

+29
-80
lines changed

4 files changed

+29
-80
lines changed

trunk/src/app/srs_app_log.cpp

+21-10
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@
3838
#include <srs_kernel_utility.hpp>
3939
#include <srs_app_threads.hpp>
4040

41+
#include <srs_protocol_kbps.hpp>
42+
43+
SrsPps* _srs_thread_sync_10us = new SrsPps();
44+
SrsPps* _srs_thread_sync_100us = new SrsPps();
45+
SrsPps* _srs_thread_sync_1000us = new SrsPps();
46+
SrsPps* _srs_thread_sync_plus = new SrsPps();
47+
4148
// the max size of a line of log.
4249
#define LOG_MAX_SIZE 8192
4350

@@ -54,9 +61,6 @@ SrsFileLog::SrsFileLog()
5461

5562
log_data = new char[LOG_MAX_SIZE];
5663
writer_ = NULL;
57-
58-
last_flush_time_ = srs_get_system_time();
59-
interval_ = 0;
6064
}
6165

6266
SrsFileLog::~SrsFileLog()
@@ -74,7 +78,6 @@ srs_error_t SrsFileLog::initialize()
7478
filename_ = _srs_config->get_log_file();
7579
level = srs_get_log_level(_srs_config->get_log_level());
7680
utc = _srs_config->get_utc_time();
77-
interval_ = _srs_config->srs_log_flush_interval();
7881
}
7982

8083
if (!log_to_file_tank) {
@@ -231,17 +234,25 @@ void SrsFileLog::write_log(char *str_log, int size, int level)
231234

232235
return;
233236
}
234-
237+
238+
// It's ok to use cache, because it has been updated in generating log header.
239+
srs_utime_t now = srs_get_system_time();
240+
235241
// write log to file.
236242
if ((err = writer_->write(str_log, size, NULL)) != srs_success) {
237243
srs_error_reset(err); // Ignore any error for log writing.
238244
}
239245

240-
// Whether flush to thread-queue.
241-
srs_utime_t diff = srs_get_system_time() - last_flush_time_;
242-
if (diff >= interval_) {
243-
last_flush_time_ = srs_get_system_time();
244-
writer_->flush_co_queue();
246+
// Stat the sync wait of locks.
247+
srs_utime_t elapsed = srs_update_system_time() - now;
248+
if (elapsed <= 10) {
249+
++_srs_thread_sync_10us->sugar;
250+
} else if (elapsed <= 100) {
251+
++_srs_thread_sync_100us->sugar;
252+
} else if (elapsed <= 1000) {
253+
++_srs_thread_sync_1000us->sugar;
254+
} else {
255+
++_srs_thread_sync_plus->sugar;
245256
}
246257
}
247258

trunk/src/app/srs_app_log.hpp

-5
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,6 @@ class SrsFileLog : public ISrsLog, public ISrsReloadHandler
5252
char* log_data;
5353
// Async file writer.
5454
SrsAsyncFileWriter* writer_;
55-
private:
56-
// The interval to flush from coroutine-queue to thread-queue.
57-
srs_utime_t interval_;
58-
// Last flush coroutine-queue time, to calculate the timeout.
59-
srs_utime_t last_flush_time_;
6055
private:
6156
// Defined in SrsLogLevel.
6257
SrsLogLevel level;

trunk/src/app/srs_app_threads.cpp

+7-50
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ using namespace std;
3535

3636
#include <srs_protocol_kbps.hpp>
3737

38-
SrsPps* _srs_thread_sync_10us = new SrsPps();
39-
SrsPps* _srs_thread_sync_100us = new SrsPps();
40-
SrsPps* _srs_thread_sync_1000us = new SrsPps();
41-
SrsPps* _srs_thread_sync_plus = new SrsPps();
38+
extern SrsPps* _srs_thread_sync_10us;
39+
extern SrsPps* _srs_thread_sync_100us;
40+
extern SrsPps* _srs_thread_sync_1000us;
41+
extern SrsPps* _srs_thread_sync_plus;
4242

4343
SrsThreadMutex::SrsThreadMutex()
4444
{
@@ -128,6 +128,7 @@ srs_error_t SrsThreadPool::initialize()
128128
}
129129

130130
interval_ = _srs_config->get_threads_interval();
131+
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
131132

132133
return err;
133134
}
@@ -172,8 +173,6 @@ srs_error_t SrsThreadPool::run()
172173
{
173174
srs_error_t err = srs_success;
174175

175-
bool print_init_log = true;
176-
177176
while (true) {
178177
// Check the threads status fastly.
179178
int loops = (int)(interval_ / SRS_UTIME_SECONDS);
@@ -190,12 +189,6 @@ srs_error_t SrsThreadPool::run()
190189
}
191190

192191
sleep(1);
193-
194-
// Flush the system previous logs by this log.
195-
if (i > 0 && print_init_log) {
196-
print_init_log = false;
197-
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
198-
}
199192
}
200193

201194
// In normal state, gather status and log it.
@@ -245,7 +238,6 @@ SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
245238
filename_ = p;
246239
writer_ = new SrsFileWriter();
247240
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
248-
co_queue_ = new SrsCoroutineQueue<SrsSharedPtrMessage>();
249241
}
250242

251243
// TODO: FIXME: Before free the writer, we must remove it from the manager.
@@ -254,7 +246,6 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter()
254246
// TODO: FIXME: Should we flush dirty logs?
255247
srs_freep(writer_);
256248
srs_freep(queue_);
257-
srs_freep(co_queue_);
258249
}
259250

260251
srs_error_t SrsAsyncFileWriter::open()
@@ -286,7 +277,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
286277
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
287278
msg->wrap(cp, count);
288279

289-
co_queue_->push_back(msg);
280+
queue_->push_back(msg);
290281

291282
if (pnwrite) {
292283
*pnwrite = count;
@@ -315,29 +306,6 @@ srs_error_t SrsAsyncFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pn
315306
return err;
316307
}
317308

318-
void SrsAsyncFileWriter::flush_co_queue()
319-
{
320-
srs_utime_t now = srs_update_system_time();
321-
322-
// The thread queue is thread-safe, so we do not need a lock.
323-
if (true) {
324-
vector<SrsSharedPtrMessage*> flying;
325-
co_queue_->swap(flying);
326-
queue_->push_back(flying);
327-
}
328-
329-
srs_utime_t elapsed = srs_update_system_time() - now;
330-
if (elapsed <= 10) {
331-
++_srs_thread_sync_10us->sugar;
332-
} else if (elapsed <= 100) {
333-
++_srs_thread_sync_100us->sugar;
334-
} else if (elapsed <= 1000) {
335-
++_srs_thread_sync_1000us->sugar;
336-
} else {
337-
++_srs_thread_sync_plus->sugar;
338-
}
339-
}
340-
341309
srs_error_t SrsAsyncFileWriter::flush()
342310
{
343311
srs_error_t err = srs_success;
@@ -441,19 +409,8 @@ std::string SrsAsyncLogManager::description()
441409
max_logs = srs_max(max_logs, nn);
442410
}
443411

444-
int nn_co_logs = 0;
445-
int max_co_logs = 0;
446-
for (int i = 0; i < (int)writers_.size(); i++) {
447-
SrsAsyncFileWriter* writer = writers_.at(i);
448-
449-
int nn = (int)writer->co_queue_->size();
450-
nn_co_logs += nn;
451-
max_co_logs = srs_max(max_co_logs, nn);
452-
}
453-
454412
static char buf[128];
455-
snprintf(buf, sizeof(buf), ", logs=%d/%d/%d/%d/%d",
456-
(int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs);
413+
snprintf(buf, sizeof(buf), ", logs=%d/%d/%d", (int)writers_.size(), nn_logs, max_logs);
457414

458415
return buf;
459416
}

trunk/src/app/srs_app_threads.hpp

+1-15
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ class SrsThreadQueue
190190
}
191191
};
192192

193-
// Async file writer, it's not thread safe. It assume that each thread has its dedicate object,
194-
// for example, the log object should be thread_local.
193+
// Async file writer, it's thread safe.
195194
class SrsAsyncFileWriter : public ISrsWriter
196195
{
197196
friend class SrsAsyncLogManager;
@@ -201,17 +200,6 @@ class SrsAsyncFileWriter : public ISrsWriter
201200
private:
202201
// The thread-queue, to flush to disk by dedicated thread.
203202
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
204-
private:
205-
// The coroutine-queue, to avoid requires lock for each log.
206-
// @remark Note that if multiple thread write to the same log file, the log is nor ordered
207-
// by time, because each thread has this coroutine-queue and flush as a batch of logs to
208-
// thread-queue:
209-
// thread #1, flush 10 logs to thread-queue, [10:10:00 ~ 10:11:00]
210-
// thread #2, flush 100 logs to thread-queue, [10:09:00 ~ 10:12:00]
211-
// Finally, the logs flush to disk as:
212-
// [10:10:00 ~ 10:11:00], 10 logs.
213-
// [10:09:00 ~ 10:12:00], 100 logs.
214-
SrsCoroutineQueue<SrsSharedPtrMessage>* co_queue_;
215203
private:
216204
SrsAsyncFileWriter(std::string p);
217205
virtual ~SrsAsyncFileWriter();
@@ -227,8 +215,6 @@ class SrsAsyncFileWriter : public ISrsWriter
227215
virtual srs_error_t write(void* buf, size_t count, ssize_t* pnwrite);
228216
virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
229217
public:
230-
// Flush coroutine-queue to thread-queue, avoid requiring lock for each message.
231-
void flush_co_queue();
232218
// Flush thread-queue to disk, generally by dedicated thread.
233219
srs_error_t flush();
234220
};

0 commit comments

Comments
 (0)