Skip to content

Commit 4918160

Browse files
committed
Threads-Log: Support thread-safe queue SrsThreadQueue.
1. Wrap std::vector to thread-safe queue. 2. Keep API compatible with std::vector. 3. SrsAsyncFileWriter use thread-safe queue instead.
1 parent 3810f5f commit 4918160

File tree

2 files changed

+51
-23
lines changed

2 files changed

+51
-23
lines changed

trunk/src/app/srs_app_threads.cpp

+10-20
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ srs_error_t SrsThreadPool::run()
170170
srs_error_t err = srs_success;
171171

172172
while (true) {
173-
string async_logs = _srs_async_log->desc();
173+
string async_logs = _srs_async_log->description();
174174
srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(),
175175
async_logs.c_str());
176176

@@ -205,21 +205,16 @@ SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
205205
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
206206
{
207207
filename_ = p;
208-
lock_ = new SrsThreadMutex();
209208
writer_ = new SrsFileWriter();
209+
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
210210
}
211211

212212
// TODO: FIXME: Before free the writer, we must remove it from the manager.
213213
SrsAsyncFileWriter::~SrsAsyncFileWriter()
214214
{
215+
// TODO: FIXME: Should we flush dirty logs?
215216
srs_freep(writer_);
216-
srs_freep(lock_);
217-
218-
// TODO: FIXME: Should we flush these logs?
219-
for (int i = 0; i < (int)dirty_.size(); i++) {
220-
SrsSharedPtrMessage* msg = dirty_.at(i);
221-
srs_freep(msg);
222-
}
217+
srs_freep(queue_);
223218
}
224219

225220
srs_error_t SrsAsyncFileWriter::open()
@@ -251,10 +246,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
251246
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
252247
msg->wrap(cp, count);
253248

254-
if (true) {
255-
SrsThreadLocker(lock_);
256-
dirty_.push_back(msg);
257-
}
249+
queue_->push_back(msg);
258250

259251
if (pnwrite) {
260252
*pnwrite = count;
@@ -288,10 +280,7 @@ srs_error_t SrsAsyncFileWriter::flush()
288280
srs_error_t err = srs_success;
289281

290282
vector<SrsSharedPtrMessage*> flying;
291-
if (true) {
292-
SrsThreadLocker(lock_);
293-
dirty_.swap(flying);
294-
}
283+
queue_->swap(flying);
295284

296285
for (int i = 0; i < (int)flying.size(); i++) {
297286
SrsSharedPtrMessage* msg = flying.at(i);
@@ -366,7 +355,7 @@ void SrsAsyncLogManager::reopen()
366355
reopen_ = true;
367356
}
368357

369-
std::string SrsAsyncLogManager::desc()
358+
std::string SrsAsyncLogManager::description()
370359
{
371360
SrsThreadLocker(lock_);
372361

@@ -375,8 +364,9 @@ std::string SrsAsyncLogManager::desc()
375364
for (int i = 0; i < (int)writers_.size(); i++) {
376365
SrsAsyncFileWriter* writer = writers_.at(i);
377366

378-
nn_logs += (int)writer->dirty_.size();
379-
max_logs = srs_max(max_logs, (int)writer->dirty_.size());
367+
int nn = (int)writer->queue_->size();
368+
nn_logs += nn;
369+
max_logs = srs_max(max_logs, nn);
380370
}
381371

382372
static char buf[128];

trunk/src/app/srs_app_threads.hpp

+41-3
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,52 @@ class SrsThreadPool
116116
// The global thread pool.
117117
extern SrsThreadPool* _srs_thread_pool;
118118

119+
// Thread-safe queue.
120+
template<typename T>
121+
class SrsThreadQueue
122+
{
123+
private:
124+
std::vector<T*> dirty_;
125+
SrsThreadMutex* lock_;
126+
public:
127+
// SrsThreadQueue::SrsThreadQueue
128+
SrsThreadQueue() {
129+
lock_ = new SrsThreadMutex();
130+
}
131+
// SrsThreadQueue::~SrsThreadQueue
132+
virtual ~SrsThreadQueue() {
133+
srs_freep(lock_);
134+
for (int i = 0; i < (int)dirty_.size(); i++) {
135+
T* msg = dirty_.at(i);
136+
srs_freep(msg);
137+
}
138+
}
139+
public:
140+
// SrsThreadQueue::push_back
141+
void push_back(T* msg) {
142+
SrsThreadLocker(lock_);
143+
dirty_.push_back(msg);
144+
}
145+
// SrsThreadQueue::swap
146+
void swap(std::vector<T*>& flying) {
147+
SrsThreadLocker(lock_);
148+
dirty_.swap(flying);
149+
}
150+
// SrsThreadQueue::size
151+
size_t size() {
152+
SrsThreadLocker(lock_);
153+
return dirty_.size();
154+
}
155+
};
156+
119157
// Async file writer.
120158
class SrsAsyncFileWriter : public ISrsWriter
121159
{
122160
friend class SrsAsyncLogManager;
123161
private:
124162
std::string filename_;
125163
SrsFileWriter* writer_;
126-
std::vector<SrsSharedPtrMessage*> dirty_;
127-
SrsThreadMutex* lock_;
164+
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
128165
private:
129166
SrsAsyncFileWriter(std::string p);
130167
virtual ~SrsAsyncFileWriter();
@@ -168,7 +205,8 @@ class SrsAsyncLogManager
168205
// Reopen all log files, asynchronously.
169206
virtual void reopen();
170207
public:
171-
std::string desc();
208+
// Get the summary of this manager.
209+
std::string description();
172210
private:
173211
static srs_error_t start(void* arg);
174212
srs_error_t do_start();

0 commit comments

Comments
 (0)