Skip to content

Commit 9a05d24

Browse files
committed
Threads: Support Circuit-Breaker to work in storms.
1. Config the recv queue, drop packet if exceed. 2. Config the high and critical threshold and pulse of water level. 3. If critical water level, disable NACK and TWCC. 4. If high water level, ignore for NACK insert and send. 5. Support read the CPU of thread. 6. Refine SrsPps to support r1s sample.
1 parent 1c90497 commit 9a05d24

14 files changed

+419
-87
lines changed

trunk/conf/full.conf

+24-3
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@ threads {
127127
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
128128
# Default: off
129129
async_recv off;
130-
# If exceed the max size of recv queue, drop the received packet.
131-
# Default: 5000
132-
max_recv_queue 5000;
133130
# CPU set for affinity, for example:
134131
# 0 means CPU0
135132
# 0-3 means CPU0, CPU1, CPU2
@@ -149,6 +146,30 @@ threads {
149146
}
150147
}
151148

149+
# For system circuit breaker.
150+
circuit_breaker {
151+
# If exceed the max size of recv queue, slow the recv to decrease packets.
152+
# Default: 5000
153+
max_recv_queue 5000;
154+
# The CPU percent(0, 100) ever 1s, as system high water-level, which enable the circuit-break
155+
# mechanism, for example, NACK will be disabled if high water-level.
156+
# Default: 90
157+
high_threshold 90;
158+
# Reset the high water-level, if number of pulse under high_threshold.
159+
# @remark 0 to disable the high water-level.
160+
# Default: 2
161+
high_pulse 2;
162+
# The CPU percent(0, 100) ever 1s, as system critical water-level, which enable the circuit-break
163+
# mechanism, for example, TWCC will be disabled if high water-level.
164+
# @note All circuit-break mechanism of high-water-level scope are enabled in critical.
165+
# Default: 95
166+
critical_threshold 95;
167+
# Reset the critical water-level, if number of pulse under critical_threshold.
168+
# @remark 0 to disable the critical water-level.
169+
# Default: 1
170+
critical_pulse 1;
171+
}
172+
152173
#############################################################################################
153174
# heartbeat/stats sections
154175
#############################################################################################

trunk/src/app/srs_app_config.cpp

+70-2
Original file line numberDiff line numberDiff line change
@@ -3393,7 +3393,7 @@ srs_error_t SrsConfig::check_normal_config()
33933393
&& n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit"
33943394
&& n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker"
33953395
&& n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate"
3396-
&& n != "srs_log_flush_interval" && n != "threads") {
3396+
&& n != "srs_log_flush_interval" && n != "threads" && n != "circuit_breaker") {
33973397
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str());
33983398
}
33993399
}
@@ -4206,7 +4206,7 @@ int SrsConfig::get_threads_max_recv_queue()
42064206
{
42074207
static int DEFAULT = 5000;
42084208

4209-
SrsConfDirective* conf = root->get("threads");
4209+
SrsConfDirective* conf = root->get("circuit_breaker");
42104210
if (!conf) {
42114211
return DEFAULT;
42124212
}
@@ -4219,6 +4219,74 @@ int SrsConfig::get_threads_max_recv_queue()
42194219
return ::atoi(conf->arg0().c_str());
42204220
}
42214221

4222+
int SrsConfig::get_high_threshold()
4223+
{
4224+
static int DEFAULT = 90;
4225+
4226+
SrsConfDirective* conf = root->get("circuit_breaker");
4227+
if (!conf) {
4228+
return DEFAULT;
4229+
}
4230+
4231+
conf = conf->get("high_threshold");
4232+
if (!conf) {
4233+
return DEFAULT;
4234+
}
4235+
4236+
return ::atoi(conf->arg0().c_str());
4237+
}
4238+
4239+
int SrsConfig::get_high_pulse()
4240+
{
4241+
static int DEFAULT = 2;
4242+
4243+
SrsConfDirective* conf = root->get("circuit_breaker");
4244+
if (!conf) {
4245+
return DEFAULT;
4246+
}
4247+
4248+
conf = conf->get("high_pulse");
4249+
if (!conf) {
4250+
return DEFAULT;
4251+
}
4252+
4253+
return ::atoi(conf->arg0().c_str());
4254+
}
4255+
4256+
int SrsConfig::get_critical_threshold()
4257+
{
4258+
static int DEFAULT = 95;
4259+
4260+
SrsConfDirective* conf = root->get("circuit_breaker");
4261+
if (!conf) {
4262+
return DEFAULT;
4263+
}
4264+
4265+
conf = conf->get("critical_threshold");
4266+
if (!conf) {
4267+
return DEFAULT;
4268+
}
4269+
4270+
return ::atoi(conf->arg0().c_str());
4271+
}
4272+
4273+
int SrsConfig::get_critical_pulse()
4274+
{
4275+
static int DEFAULT = 1;
4276+
4277+
SrsConfDirective* conf = root->get("circuit_breaker");
4278+
if (!conf) {
4279+
return DEFAULT;
4280+
}
4281+
4282+
conf = conf->get("critical_pulse");
4283+
if (!conf) {
4284+
return DEFAULT;
4285+
}
4286+
4287+
return ::atoi(conf->arg0().c_str());
4288+
}
4289+
42224290
vector<SrsConfDirective*> SrsConfig::get_stream_casters()
42234291
{
42244292
srs_assert(root);

trunk/src/app/srs_app_config.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,10 @@ class SrsConfig
482482
virtual bool get_threads_async_recv();
483483
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
484484
virtual int get_threads_max_recv_queue();
485+
virtual int get_high_threshold();
486+
virtual int get_high_pulse();
487+
virtual int get_critical_threshold();
488+
virtual int get_critical_pulse();
485489
// stream_caster section
486490
public:
487491
// Get all stream_caster in config file.

trunk/src/app/srs_app_rtc_conn.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ extern SrsPps* _srs_pps_snack2;
7676

7777
extern SrsPps* _srs_pps_rnack;
7878
extern SrsPps* _srs_pps_rnack2;
79+
extern SrsPps* _srs_pps_snack4;
7980

8081
#define SRS_TICKID_RTCP 0
8182
#define SRS_TICKID_TWCC 1
@@ -1283,6 +1284,12 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket2*& pkt, SrsBuf
12831284
}
12841285
}
12851286

1287+
// If circuit-breaker is enabled, disable nack.
1288+
if (_srs_thread_pool->hybrid_critical_water_level()) {
1289+
++_srs_pps_snack4->sugar;
1290+
return err;
1291+
}
1292+
12861293
// For NACK to handle packet.
12871294
// @remark Note that the pkt might be set to NULL.
12881295
if (nack_enabled_) {
@@ -1529,6 +1536,12 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim
15291536
if (twcc_enabled_ && type == SRS_TICKID_TWCC) {
15301537
++_srs_pps_twcc->sugar;
15311538

1539+
// If circuit-breaker is dropping packet, disable TWCC.
1540+
if (_srs_thread_pool->hybrid_critical_water_level()) {
1541+
++_srs_pps_snack4->sugar;
1542+
return err;
1543+
}
1544+
15321545
// We should not depends on the received packet,
15331546
// instead we should send feedback every Nms.
15341547
if ((err = send_periodic_twcc()) != srs_success) {
@@ -2333,6 +2346,12 @@ srs_error_t SrsRtcConnection::notify(int type, srs_utime_t interval, srs_utime_t
23332346

23342347
// For publisher to send NACK.
23352348
if (type == SRS_TICKID_SEND_NACKS) {
2349+
// If circuit-breaker is enabled, disable nack.
2350+
if (_srs_thread_pool->hybrid_critical_water_level()) {
2351+
++_srs_pps_snack4->sugar;
2352+
return err;
2353+
}
2354+
23362355
// TODO: FIXME: Merge with hybrid system clock.
23372356
srs_update_system_time();
23382357

trunk/src/app/srs_app_rtc_queue.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ using namespace std;
3333
#include <srs_kernel_rtc_rtp.hpp>
3434
#include <srs_kernel_utility.hpp>
3535
#include <srs_app_utility.hpp>
36+
#include <srs_app_threads.hpp>
37+
38+
#include <srs_protocol_kbps.hpp>
39+
40+
extern SrsPps* _srs_pps_snack3;
41+
extern SrsPps* _srs_pps_snack4;
3642

3743
SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity)
3844
{
@@ -228,6 +234,12 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver()
228234

229235
void SrsRtpNackForReceiver::insert(uint16_t first, uint16_t last)
230236
{
237+
// If circuit-breaker is enabled, disable nack.
238+
if (_srs_thread_pool->hybrid_high_water_level()) {
239+
++_srs_pps_snack4->sugar;
240+
return;
241+
}
242+
231243
for (uint16_t s = first; s != last; ++s) {
232244
queue_[s] = SrsRtpNackInfo();
233245
}
@@ -259,6 +271,13 @@ void SrsRtpNackForReceiver::check_queue_size()
259271

260272
void SrsRtpNackForReceiver::get_nack_seqs(SrsRtcpNack& seqs, uint32_t& timeout_nacks)
261273
{
274+
// If circuit-breaker is enabled, disable nack.
275+
if (_srs_thread_pool->hybrid_high_water_level()) {
276+
queue_.clear();
277+
++_srs_pps_snack4->sugar;
278+
return;
279+
}
280+
262281
srs_utime_t now = srs_get_system_time();
263282

264283
srs_utime_t interval = now - pre_check_time_;
@@ -294,6 +313,8 @@ void SrsRtpNackForReceiver::get_nack_seqs(SrsRtcpNack& seqs, uint32_t& timeout_n
294313
++nack_info.req_nack_count_;
295314
nack_info.pre_req_nack_time_ = now;
296315
seqs.add_lost_sn(seq);
316+
317+
++_srs_pps_snack3->sugar;
297318
}
298319

299320
++iter;

trunk/src/app/srs_app_rtc_server.cpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ extern SrsPps* _srs_pps_rr;
6767

6868
extern SrsPps* _srs_pps_snack;
6969
extern SrsPps* _srs_pps_snack2;
70+
extern SrsPps* _srs_pps_snack3;
71+
extern SrsPps* _srs_pps_snack4;
7072
extern SrsPps* _srs_pps_sanack;
7173
extern SrsPps* _srs_pps_svnack;
7274

@@ -687,9 +689,9 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)
687689
}
688690

689691
string snk_desc;
690-
_srs_pps_snack->update(); _srs_pps_snack2->update(); _srs_pps_sanack->update(); _srs_pps_svnack->update();
691-
if (_srs_pps_snack->r10s() || _srs_pps_sanack->r10s() || _srs_pps_svnack->r10s() || _srs_pps_snack2->r10s()) {
692-
snprintf(buf, sizeof(buf), ", snk=(%d,a:%d,v:%d,h:%d)", _srs_pps_snack->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), _srs_pps_snack2->r10s());
692+
_srs_pps_snack->update(); _srs_pps_snack2->update(); _srs_pps_snack3->update(); _srs_pps_snack4->update(); _srs_pps_sanack->update(); _srs_pps_svnack->update();
693+
if (_srs_pps_snack->r10s() || _srs_pps_sanack->r10s() || _srs_pps_svnack->r10s() || _srs_pps_snack2->r10s() || _srs_pps_snack3->r10s() || _srs_pps_snack4->r10s()) {
694+
snprintf(buf, sizeof(buf), ", snk=(%d,a:%d,v:%d,h:%d,h3:%d,h4:%d)", _srs_pps_snack->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), _srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), _srs_pps_snack4->r10s());
693695
snk_desc = buf;
694696
}
695697

@@ -702,9 +704,8 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)
702704

703705
// TODO: FIXME: Should move to Hybrid server stat.
704706
string loss_desc;
705-
_srs_pps_aloss->update();
706-
if (_srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
707-
snprintf(buf, sizeof(buf), ", loss=(r:%d,s:%d,a:%d)", _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s());
707+
if (!snk_desc.empty() || _srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
708+
snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s());
708709
loss_desc = buf;
709710
}
710711

trunk/src/app/srs_app_rtc_source.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
// The NACK sent by us(SFU).
5454
SrsPps* _srs_pps_snack = new SrsPps();
5555
SrsPps* _srs_pps_snack2 = new SrsPps();
56+
SrsPps* _srs_pps_snack3 = new SrsPps();
57+
SrsPps* _srs_pps_snack4 = new SrsPps();
5658
SrsPps* _srs_pps_sanack = new SrsPps();
5759
SrsPps* _srs_pps_svnack = new SrsPps();
5860

trunk/src/app/srs_app_server.cpp

+4-6
Original file line numberDiff line numberDiff line change
@@ -1282,10 +1282,6 @@ srs_error_t SrsServer::setup_ticks()
12821282
if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) {
12831283
return srs_error_wrap(err, "tick");
12841284
}
1285-
1286-
if ((err = timer_->tick(10, 5 * SRS_UTIME_SECONDS)) != srs_success) {
1287-
return srs_error_wrap(err, "tick");
1288-
}
12891285
}
12901286

12911287
if (_srs_config->get_heartbeat_enabled()) {
@@ -1307,14 +1303,16 @@ srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
13071303

13081304
switch (event) {
13091305
case 2: srs_update_system_rusage(); break;
1310-
case 3: srs_update_proc_stat(); break;
1306+
case 3:
1307+
srs_update_system_proc_stat();
1308+
srs_update_self_proc_stat();
1309+
break;
13111310
case 4: srs_update_disk_stat(); break;
13121311
case 5: srs_update_meminfo(); break;
13131312
case 6: srs_update_platform_info(); break;
13141313
case 7: srs_update_network_devices(); break;
13151314
case 8: resample_kbps(); break;
13161315
case 9: http_heartbeat->heartbeat(); break;
1317-
case 10: srs_update_udp_snmp_statistic(); break;
13181316
}
13191317

13201318
return err;

0 commit comments

Comments
 (0)