Skip to content

Commit e15737f

Browse files
committed
Threads: Support circuit-breaker dying threshold
1 parent d6a92cb commit e15737f

9 files changed

+96
-10
lines changed

trunk/conf/full.conf

+7
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@ circuit_breaker {
175175
# @remark 0 to disable the critical water-level.
176176
# Default: 1
177177
critical_pulse 1;
178+
# If dying, also drop packets for players.
179+
# Default: 99
180+
dying_threshold 99;
181+
# If CPU exceed the dying_pulse times, enter dying.
182+
# @remark 0 to disable the dying water-level.
183+
# Default: 5
184+
dying_pulse 5;
178185
}
179186

180187
#############################################################################################

trunk/src/app/srs_app_config.cpp

+34
Original file line numberDiff line numberDiff line change
@@ -4321,6 +4321,40 @@ int SrsConfig::get_critical_pulse()
43214321
return ::atoi(conf->arg0().c_str());
43224322
}
43234323

4324+
int SrsConfig::get_dying_threshold()
4325+
{
4326+
static int DEFAULT = 99;
4327+
4328+
SrsConfDirective* conf = root->get("circuit_breaker");
4329+
if (!conf) {
4330+
return DEFAULT;
4331+
}
4332+
4333+
conf = conf->get("dying_threshold");
4334+
if (!conf) {
4335+
return DEFAULT;
4336+
}
4337+
4338+
return ::atoi(conf->arg0().c_str());
4339+
}
4340+
4341+
int SrsConfig::get_dying_pulse()
4342+
{
4343+
static int DEFAULT = 5;
4344+
4345+
SrsConfDirective* conf = root->get("circuit_breaker");
4346+
if (!conf) {
4347+
return DEFAULT;
4348+
}
4349+
4350+
conf = conf->get("dying_pulse");
4351+
if (!conf) {
4352+
return DEFAULT;
4353+
}
4354+
4355+
return ::atoi(conf->arg0().c_str());
4356+
}
4357+
43244358
vector<SrsConfDirective*> SrsConfig::get_stream_casters()
43254359
{
43264360
srs_assert(root);

trunk/src/app/srs_app_config.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,8 @@ class SrsConfig
488488
virtual int get_high_pulse();
489489
virtual int get_critical_threshold();
490490
virtual int get_critical_pulse();
491+
virtual int get_dying_threshold();
492+
virtual int get_dying_pulse();
491493
// stream_caster section
492494
public:
493495
// Get all stream_caster in config file.

trunk/src/app/srs_app_rtc_conn.cpp

+11-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ srs_error_t SrsPlaintextTransport::on_dtls_alert(std::string type, std::string d
315315

316316
srs_error_t SrsPlaintextTransport::on_dtls_handshake_done()
317317
{
318-
srs_trace("RTC: DTLS handshake done.");
318+
srs_trace("RTC: DTLS plaintext handshake done.");
319319
return session_->on_connection_established();
320320
}
321321

@@ -650,6 +650,9 @@ srs_error_t SrsRtcPlayStream::cycle()
650650
}
651651
}
652652

653+
// How many messages to run a yield.
654+
uint32_t nn_msgs_for_yield = 0;
655+
653656
while (true) {
654657
if ((err = trd_->pull()) != srs_success) {
655658
return srs_error_wrap(err, "rtc sender thread");
@@ -677,6 +680,13 @@ srs_error_t SrsRtcPlayStream::cycle()
677680
// Release the packet to cache.
678681
// @remark Note that the pkt might be set to NULL.
679682
_srs_rtp_cache->recycle(pkt);
683+
684+
// Yield to another coroutines.
685+
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
686+
if (++nn_msgs_for_yield > 10) {
687+
nn_msgs_for_yield = 0;
688+
srs_thread_yield();
689+
}
680690
}
681691
}
682692

trunk/src/app/srs_app_rtc_server.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ extern SrsPps* _srs_pps_rmnack;
8080
extern SrsPps* _srs_pps_rloss;
8181
extern SrsPps* _srs_pps_sloss;
8282
extern SrsPps* _srs_pps_aloss;
83+
extern SrsPps* _srs_pps_aloss2;
8384

8485
SrsRtcBlackhole::SrsRtcBlackhole()
8586
{
@@ -721,9 +722,9 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)
721722

722723
// TODO: FIXME: Should move to Hybrid server stat.
723724
string loss_desc;
724-
_srs_pps_aloss->update();
725-
if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
726-
snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s());
725+
_srs_pps_aloss->update(); _srs_pps_aloss2->update();
726+
if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s() || _srs_pps_aloss2->r10s()) {
727+
snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d/%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s(), _srs_pps_aloss2->r10s());
727728
loss_desc = buf;
728729
}
729730

trunk/src/app/srs_app_rtc_source.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include <srs_protocol_json.hpp>
4444
#include <srs_app_pithy_print.hpp>
4545
#include <srs_app_log.hpp>
46+
#include <srs_app_threads.hpp>
4647

4748
#ifdef SRS_FFMPEG_FIT
4849
#include <srs_app_rtc_codec.hpp>
@@ -63,6 +64,8 @@ SrsPps* _srs_pps_rnack2 = new SrsPps();
6364
SrsPps* _srs_pps_rhnack = new SrsPps();
6465
SrsPps* _srs_pps_rmnack = new SrsPps();
6566

67+
extern SrsPps* _srs_pps_aloss2;
68+
6669
// Firefox defaults as 109, Chrome is 111.
6770
const int kAudioPayloadType = 111;
6871
const int kAudioChannel = 2;
@@ -568,6 +571,12 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt)
568571
{
569572
srs_error_t err = srs_success;
570573

574+
// If circuit-breaker is dying, drop packet.
575+
if (_srs_thread_pool->hybrid_dying_water_level()) {
576+
_srs_pps_aloss2->sugar += (int64_t)consumers.size();
577+
return err;
578+
}
579+
571580
for (int i = 0; i < (int)consumers.size(); i++) {
572581
SrsRtcConsumer* consumer = consumers.at(i);
573582
if ((err = consumer->enqueue(pkt->copy())) != srs_success) {

trunk/src/app/srs_app_threads.cpp

+24-6
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ using namespace std;
4949

5050
extern SrsPps* _srs_pps_rloss;
5151
extern SrsPps* _srs_pps_aloss;
52+
extern SrsPps* _srs_pps_aloss2;
5253

5354
extern SrsPps* _srs_pps_snack2;
5455
extern SrsPps* _srs_pps_snack3;
@@ -155,13 +156,16 @@ SrsThreadPool::SrsThreadPool()
155156
hybrid_ = NULL;
156157
hybrid_high_water_level_ = 0;
157158
hybrid_critical_water_level_ = 0;
159+
hybrid_dying_water_level_ = 0;
158160

159161
trd_ = new SrsFastCoroutine("pool", this);
160162

161163
high_threshold_ = 0;
162164
high_pulse_ = 0;
163165
critical_threshold_ = 0;
164166
critical_pulse_ = 0;
167+
dying_threshold_ = 0;
168+
dying_pulse_ = 0;
165169

166170
// Add primordial thread, current thread itself.
167171
SrsThreadEntry* entry = new SrsThreadEntry();
@@ -191,12 +195,17 @@ SrsThreadPool::~SrsThreadPool()
191195

192196
bool SrsThreadPool::hybrid_high_water_level()
193197
{
194-
return hybrid_critical_water_level_ || hybrid_high_water_level_;
198+
return hybrid_critical_water_level() || hybrid_high_water_level_;
195199
}
196200

197201
bool SrsThreadPool::hybrid_critical_water_level()
198202
{
199-
return hybrid_critical_water_level_;
203+
return hybrid_dying_water_level() || hybrid_critical_water_level_;
204+
}
205+
206+
bool SrsThreadPool::hybrid_dying_water_level()
207+
{
208+
return dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_;
200209
}
201210

202211
// Thread local objects.
@@ -243,6 +252,8 @@ srs_error_t SrsThreadPool::initialize()
243252
high_pulse_ = _srs_config->get_high_pulse();
244253
critical_threshold_ = _srs_config->get_critical_threshold();
245254
critical_pulse_ = _srs_config->get_critical_pulse();
255+
dying_threshold_ = _srs_config->get_dying_threshold();
256+
dying_pulse_ = _srs_config->get_dying_pulse();
246257
bool async_srtp = _srs_config->get_threads_async_srtp();
247258

248259
int recv_queue = _srs_config->get_threads_max_recv_queue();
@@ -255,10 +266,10 @@ srs_error_t SrsThreadPool::initialize()
255266
_srs_async_recv->set_tunnel_enabled(async_tunnel);
256267
_srs_async_srtp->set_tunnel_enabled(async_tunnel);
257268

258-
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d, tunnel=%d",
269+
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d,%dx%d recvQ=%d, aSend=%d, tunnel=%d",
259270
entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp,
260271
entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2),
261-
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_,
272+
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, dying_pulse_, dying_threshold_,
262273
recv_queue, async_send, async_tunnel);
263274

264275
return err;
@@ -368,6 +379,13 @@ srs_error_t SrsThreadPool::run()
368379
} else if (hybrid_critical_water_level_ > 0) {
369380
hybrid_critical_water_level_--;
370381
}
382+
383+
// Reset the dying water-level when CPU is low for N times.
384+
if (hybrid_->stat->percent * 100 > dying_threshold_) {
385+
hybrid_dying_water_level_ = srs_min(dying_pulse_ + 1, hybrid_dying_water_level_ + 1);
386+
} else if (hybrid_dying_water_level_ > 0) {
387+
hybrid_dying_water_level_ = 0;
388+
}
371389
}
372390

373391
sleep(1);
@@ -419,8 +437,8 @@ srs_error_t SrsThreadPool::run()
419437

420438
string circuit_breaker;
421439
if (hybrid_high_water_level() || hybrid_critical_water_level() || _srs_pps_aloss->r1s() || _srs_pps_rloss->r1s() || _srs_pps_snack2->r10s()) {
422-
snprintf(buf, sizeof(buf), ", break=%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d",
423-
hybrid_high_water_level(), hybrid_critical_water_level(), // Whether Circuit-Break is enable.
440+
snprintf(buf, sizeof(buf), ", break=%d,%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d",
441+
hybrid_high_water_level(), hybrid_critical_water_level(), hybrid_dying_water_level(), // Whether Circuit-Break is enable.
424442
_srs_pps_rloss->r1s(), _srs_pps_aloss->r1s(), thread_percent, // The conditions to enable Circuit-Breaker.
425443
_srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), // NACK packet,seqs sent.
426444
_srs_pps_snack4->r10s() // NACK drop by Circuit-Break.

trunk/src/app/srs_app_threads.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,15 @@ class SrsThreadPool : public ISrsCoroutineHandler
127127
// @note To avoid the CPU change rapidly.
128128
int hybrid_high_water_level_;
129129
int hybrid_critical_water_level_;
130+
int hybrid_dying_water_level_;
130131
private:
131132
// The config for high/critical water level.
132133
int high_threshold_;
133134
int high_pulse_;
134135
int critical_threshold_;
135136
int critical_pulse_;
137+
int dying_threshold_;
138+
int dying_pulse_;
136139
private:
137140
// A coroutine to consume cooked packets.
138141
SrsFastCoroutine* trd_;
@@ -143,6 +146,7 @@ class SrsThreadPool : public ISrsCoroutineHandler
143146
// Whether hybrid server water-level is high.
144147
bool hybrid_high_water_level();
145148
bool hybrid_critical_water_level();
149+
bool hybrid_dying_water_level();
146150
// Setup the thread-local variables.
147151
static void setup();
148152
// Initialize the thread pool.

trunk/src/app/srs_app_utility.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ using namespace std;
5555
SrsPps* _srs_pps_rloss = new SrsPps();
5656
SrsPps* _srs_pps_sloss = new SrsPps();
5757
SrsPps* _srs_pps_aloss = new SrsPps();
58+
SrsPps* _srs_pps_aloss2 = new SrsPps();
5859

5960
// the longest time to wait for a process to quit.
6061
#define SRS_PROCESS_QUIT_TIMEOUT_MS 1000

0 commit comments

Comments
 (0)