@@ -110,6 +110,7 @@ SrsThreadPool::~SrsThreadPool()
110
110
srs_freep (lock_);
111
111
}
112
112
113
+ // @remark Note that we should never write logs, because log is not ready not.
113
114
srs_error_t SrsThreadPool::initialize ()
114
115
{
115
116
srs_error_t err = srs_success;
@@ -119,12 +120,7 @@ srs_error_t SrsThreadPool::initialize()
119
120
return srs_error_wrap (err, " initialize st failed" );
120
121
}
121
122
122
- if ((err = _srs_async_log->initialize ()) != srs_success) {
123
- return srs_error_wrap (err, " init async log" );
124
- }
125
-
126
123
interval_ = _srs_config->get_threads_interval ();
127
- srs_trace (" Thread #%d(%s): init interval=%dms" , entry_->num , entry_->label .c_str (), srsu2msi (interval_));
128
124
129
125
return err;
130
126
}
@@ -169,12 +165,15 @@ srs_error_t SrsThreadPool::run()
169
165
{
170
166
srs_error_t err = srs_success;
171
167
168
+ // Write the init log here.
169
+ srs_trace (" Thread #%d(%s): init interval=%dms" , entry_->num , entry_->label .c_str (), srsu2msi (interval_));
170
+
172
171
while (true ) {
172
+ sleep (interval_ / SRS_UTIME_SECONDS);
173
+
173
174
string async_logs = _srs_async_log->description ();
174
175
srs_trace (" Thread #%d(%s): cycle threads=%d%s" , entry_->num , entry_->label .c_str (), (int )threads_.size (),
175
176
async_logs.c_str ());
176
-
177
- sleep (interval_ / SRS_UTIME_SECONDS);
178
177
}
179
178
180
179
return err;
@@ -202,11 +201,14 @@ void* SrsThreadPool::start(void* arg)
202
201
203
202
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
204
203
205
- SrsAsyncFileWriter::SrsAsyncFileWriter (std::string p)
204
+ SrsAsyncFileWriter::SrsAsyncFileWriter (std::string p, srs_utime_t interval )
206
205
{
207
206
filename_ = p;
208
207
writer_ = new SrsFileWriter ();
209
208
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
209
+ co_queue_ = new SrsCoroutineQueue<SrsSharedPtrMessage>();
210
+ interval_ = interval;
211
+ last_flush_time_ = srs_get_system_time ();
210
212
}
211
213
212
214
// TODO: FIXME: Before free the writer, we must remove it from the manager.
@@ -215,6 +217,7 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter()
215
217
// TODO: FIXME: Should we flush dirty logs?
216
218
srs_freep (writer_);
217
219
srs_freep (queue_);
220
+ srs_freep (co_queue_);
218
221
}
219
222
220
223
srs_error_t SrsAsyncFileWriter::open ()
@@ -246,7 +249,16 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
246
249
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage ();
247
250
msg->wrap (cp, count);
248
251
249
- queue_->push_back (msg);
252
+ co_queue_->push_back (msg);
253
+
254
+ // Whether flush to thread-queue.
255
+ if (srs_get_system_time () - last_flush_time_ >= interval_) {
256
+ last_flush_time_ = srs_get_system_time ();
257
+
258
+ vector<SrsSharedPtrMessage*> flying;
259
+ co_queue_->swap (flying);
260
+ queue_->push_back (flying);
261
+ }
250
262
251
263
if (pnwrite) {
252
264
*pnwrite = count;
@@ -303,6 +315,7 @@ srs_error_t SrsAsyncFileWriter::flush()
303
315
SrsAsyncLogManager::SrsAsyncLogManager ()
304
316
{
305
317
interval_ = 0 ;
318
+
306
319
reopen_ = false ;
307
320
lock_ = new SrsThreadMutex ();
308
321
}
@@ -318,6 +331,7 @@ SrsAsyncLogManager::~SrsAsyncLogManager()
318
331
}
319
332
}
320
333
334
+ // @remark Note that we should never write logs, because log is not ready not.
321
335
srs_error_t SrsAsyncLogManager::initialize ()
322
336
{
323
337
srs_error_t err = srs_success;
@@ -327,18 +341,28 @@ srs_error_t SrsAsyncLogManager::initialize()
327
341
return srs_error_new (ERROR_SYSTEM_LOGFILE, " invalid interval=%dms" , srsu2msi (interval_));
328
342
}
329
343
344
+ return err;
345
+ }
346
+
347
+ // @remark Now, log is ready, and we can print logs.
348
+ srs_error_t SrsAsyncLogManager::run ()
349
+ {
350
+ srs_error_t err = srs_success;
351
+
330
352
if ((err = _srs_thread_pool->execute (" log" , SrsAsyncLogManager::start, this )) != srs_success) {
331
353
return srs_error_wrap (err, " run async log" );
332
354
}
333
355
356
+ srs_trace (" AsyncLogs: Init flush_interval=%dms" , srsu2msi (interval_));
357
+
334
358
return err;
335
359
}
336
360
337
361
srs_error_t SrsAsyncLogManager::create_writer (std::string filename, SrsAsyncFileWriter** ppwriter)
338
362
{
339
363
srs_error_t err = srs_success;
340
364
341
- SrsAsyncFileWriter* writer = new SrsAsyncFileWriter (filename);
365
+ SrsAsyncFileWriter* writer = new SrsAsyncFileWriter (filename, interval_ );
342
366
writers_.push_back (writer);
343
367
344
368
if ((err = writer->open ()) != srs_success) {
@@ -369,8 +393,20 @@ std::string SrsAsyncLogManager::description()
369
393
max_logs = srs_max (max_logs, nn);
370
394
}
371
395
396
+ int nn_co_logs = 0 ;
397
+ int max_co_logs = 0 ;
398
+ for (int i = 0 ; i < (int )writers_.size (); i++) {
399
+ SrsAsyncFileWriter* writer = writers_.at (i);
400
+
401
+ int nn = (int )writer->co_queue_ ->size ();
402
+ nn_co_logs += nn;
403
+ max_co_logs = srs_max (max_co_logs, nn);
404
+ }
405
+
372
406
static char buf[128 ];
373
- snprintf (buf, sizeof (buf), " , files=%d, queue=%d/%d" , (int )writers_.size (), nn_logs, max_logs);
407
+ snprintf (buf, sizeof (buf), " , files=%d, queue=%d/%d, coq=%d/%d" ,
408
+ (int )writers_.size (), nn_logs, max_logs, nn_co_logs, max_co_logs);
409
+
374
410
return buf;
375
411
}
376
412
0 commit comments