|
33 | 33 |
|
34 | 34 | using namespace std;
|
35 | 35 |
|
| 36 | +#include <srs_protocol_kbps.hpp> |
| 37 | + |
| 38 | +SrsPps* _srs_thread_sync_10us = new SrsPps(); |
| 39 | +SrsPps* _srs_thread_sync_100us = new SrsPps(); |
| 40 | +SrsPps* _srs_thread_sync_1000us = new SrsPps(); |
| 41 | +SrsPps* _srs_thread_sync_plus = new SrsPps(); |
| 42 | + |
36 | 43 | SrsThreadMutex::SrsThreadMutex()
|
37 | 44 | {
|
38 | 45 | // https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html
|
@@ -171,9 +178,19 @@ srs_error_t SrsThreadPool::run()
|
171 | 178 | while (true) {
|
172 | 179 | sleep(interval_ / SRS_UTIME_SECONDS);
|
173 | 180 |
|
| 181 | + static char buf[128]; |
174 | 182 | string async_logs = _srs_async_log->description();
|
175 |
| - srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(), |
176 |
| - async_logs.c_str()); |
| 183 | + |
| 184 | + string sync_desc; |
| 185 | + _srs_thread_sync_10us->update(); _srs_thread_sync_100us->update(); |
| 186 | + _srs_thread_sync_1000us->update(); _srs_thread_sync_plus->update(); |
| 187 | + if (_srs_thread_sync_10us->r10s() || _srs_thread_sync_100us->r10s() || _srs_thread_sync_1000us->r10s() || _srs_thread_sync_plus->r10s()) { |
| 188 | + snprintf(buf, sizeof(buf), ", sync=%d,%d,%d,%d", _srs_thread_sync_10us->r10s(), _srs_thread_sync_100us->r10s(), _srs_thread_sync_1000us->r10s(), _srs_thread_sync_plus->r10s()); |
| 189 | + sync_desc = buf; |
| 190 | + } |
| 191 | + |
| 192 | + srs_trace("Thread: cycle threads=%d%s%s", (int)threads_.size(), |
| 193 | + async_logs.c_str(), sync_desc.c_str()); |
177 | 194 | }
|
178 | 195 |
|
179 | 196 | return err;
|
@@ -201,14 +218,12 @@ void* SrsThreadPool::start(void* arg)
|
201 | 218 |
|
202 | 219 | SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
|
203 | 220 |
|
204 |
| -SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p, srs_utime_t interval) |
| 221 | +SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p) |
205 | 222 | {
|
206 | 223 | filename_ = p;
|
207 | 224 | writer_ = new SrsFileWriter();
|
208 | 225 | queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
|
209 | 226 | co_queue_ = new SrsCoroutineQueue<SrsSharedPtrMessage>();
|
210 |
| - interval_ = interval; |
211 |
| - last_flush_time_ = srs_get_system_time(); |
212 | 227 | }
|
213 | 228 |
|
214 | 229 | // TODO: FIXME: Before free the writer, we must remove it from the manager.
|
@@ -251,15 +266,6 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
|
251 | 266 |
|
252 | 267 | co_queue_->push_back(msg);
|
253 | 268 |
|
254 |
| - // Whether flush to thread-queue. |
255 |
| - if (srs_get_system_time() - last_flush_time_ >= interval_) { |
256 |
| - last_flush_time_ = srs_get_system_time(); |
257 |
| - |
258 |
| - vector<SrsSharedPtrMessage*> flying; |
259 |
| - co_queue_->swap(flying); |
260 |
| - queue_->push_back(flying); |
261 |
| - } |
262 |
| - |
263 | 269 | if (pnwrite) {
|
264 | 270 | *pnwrite = count;
|
265 | 271 | }
|
@@ -287,6 +293,28 @@ srs_error_t SrsAsyncFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pn
|
287 | 293 | return err;
|
288 | 294 | }
|
289 | 295 |
|
| 296 | +void SrsAsyncFileWriter::flush_co_queue() |
| 297 | +{ |
| 298 | + srs_utime_t now = srs_update_system_time(); |
| 299 | + |
| 300 | + if (true) { |
| 301 | + vector<SrsSharedPtrMessage*> flying; |
| 302 | + co_queue_->swap(flying); |
| 303 | + queue_->push_back(flying); |
| 304 | + } |
| 305 | + |
| 306 | + srs_utime_t elapsed = srs_update_system_time() - now; |
| 307 | + if (elapsed <= 10) { |
| 308 | + ++_srs_thread_sync_10us->sugar; |
| 309 | + } else if (elapsed <= 100) { |
| 310 | + ++_srs_thread_sync_100us->sugar; |
| 311 | + } else if (elapsed <= 1000) { |
| 312 | + ++_srs_thread_sync_1000us->sugar; |
| 313 | + } else { |
| 314 | + ++_srs_thread_sync_plus->sugar; |
| 315 | + } |
| 316 | +} |
| 317 | + |
290 | 318 | srs_error_t SrsAsyncFileWriter::flush()
|
291 | 319 | {
|
292 | 320 | srs_error_t err = srs_success;
|
@@ -362,7 +390,7 @@ srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFile
|
362 | 390 | {
|
363 | 391 | srs_error_t err = srs_success;
|
364 | 392 |
|
365 |
| - SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename, interval_); |
| 393 | + SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename); |
366 | 394 | writers_.push_back(writer);
|
367 | 395 |
|
368 | 396 | if ((err = writer->open()) != srs_success) {
|
@@ -404,7 +432,7 @@ std::string SrsAsyncLogManager::description()
|
404 | 432 | }
|
405 | 433 |
|
406 | 434 | static char buf[128];
|
407 |
| - snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d, coq=%d/%d", |
| 435 | + snprintf(buf, sizeof(buf), ", logs=%d/%d/%d/%d/%d", |
408 | 436 | (int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs);
|
409 | 437 |
|
410 | 438 | return buf;
|
|
0 commit comments