Skip to content

Commit 6c83e89

Browse files
committed
Threads: Refine circuit breaker
1 parent 614a781 commit 6c83e89

6 files changed

+173
-116
lines changed

trunk/src/app/srs_app_rtc_conn.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket2*& pkt, SrsBuf
13211321
}
13221322

13231323
// If circuit-breaker is enabled, disable nack.
1324-
if (_srs_thread_pool->hybrid_critical_water_level()) {
1324+
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
13251325
++_srs_pps_snack4->sugar;
13261326
return err;
13271327
}
@@ -1573,7 +1573,7 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim
15731573
++_srs_pps_twcc->sugar;
15741574

15751575
// If circuit-breaker is dropping packet, disable TWCC.
1576-
if (_srs_thread_pool->hybrid_critical_water_level()) {
1576+
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
15771577
++_srs_pps_snack4->sugar;
15781578
return err;
15791579
}
@@ -2432,7 +2432,7 @@ srs_error_t SrsRtcConnection::notify(int type, srs_utime_t interval, srs_utime_t
24322432
// For publisher to send NACK.
24332433
if (type == SRS_TICKID_SEND_NACKS) {
24342434
// If circuit-breaker is enabled, disable nack.
2435-
if (_srs_thread_pool->hybrid_critical_water_level()) {
2435+
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
24362436
++_srs_pps_snack4->sugar;
24372437
return err;
24382438
}

trunk/src/app/srs_app_rtc_queue.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver()
235235
void SrsRtpNackForReceiver::insert(uint16_t first, uint16_t last)
236236
{
237237
// If circuit-breaker is enabled, disable nack.
238-
if (_srs_thread_pool->hybrid_high_water_level()) {
238+
if (_srs_circuit_breaker->hybrid_high_water_level()) {
239239
++_srs_pps_snack4->sugar;
240240
return;
241241
}
@@ -272,7 +272,7 @@ void SrsRtpNackForReceiver::check_queue_size()
272272
void SrsRtpNackForReceiver::get_nack_seqs(SrsRtcpNack& seqs, uint32_t& timeout_nacks)
273273
{
274274
// If circuit-breaker is enabled, disable nack.
275-
if (_srs_thread_pool->hybrid_high_water_level()) {
275+
if (_srs_circuit_breaker->hybrid_high_water_level()) {
276276
queue_.clear();
277277
++_srs_pps_snack4->sugar;
278278
return;

trunk/src/app/srs_app_rtc_source.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt)
571571
srs_error_t err = srs_success;
572572

573573
// If circuit-breaker is dying, drop packet.
574-
if (_srs_thread_pool->hybrid_dying_water_level()) {
574+
if (_srs_circuit_breaker->hybrid_dying_water_level()) {
575575
_srs_pps_aloss2->sugar += (int64_t)consumers.size();
576576
return err;
577577
}

trunk/src/app/srs_app_threads.cpp

+126-93
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,120 @@ uint64_t srs_covert_cpuset(cpu_set_t v)
9393
#endif
9494
}
9595

96+
SrsCircuitBreaker::SrsCircuitBreaker()
97+
{
98+
hybrid_high_water_level_ = 0;
99+
hybrid_critical_water_level_ = 0;
100+
hybrid_dying_water_level_ = 0;
101+
102+
high_threshold_ = 0;
103+
high_pulse_ = 0;
104+
critical_threshold_ = 0;
105+
critical_pulse_ = 0;
106+
dying_threshold_ = 0;
107+
dying_pulse_ = 0;
108+
}
109+
110+
SrsCircuitBreaker::~SrsCircuitBreaker()
111+
{
112+
}
113+
114+
srs_error_t SrsCircuitBreaker::initialize()
115+
{
116+
srs_error_t err = srs_success;
117+
118+
// Start a timer to stat the data for circuit breaker.
119+
_srs_hybrid->timer()->subscribe(1 * SRS_UTIME_SECONDS, this);
120+
121+
high_threshold_ = _srs_config->get_high_threshold();
122+
high_pulse_ = _srs_config->get_high_pulse();
123+
critical_threshold_ = _srs_config->get_critical_threshold();
124+
critical_pulse_ = _srs_config->get_critical_pulse();
125+
dying_threshold_ = _srs_config->get_dying_threshold();
126+
dying_pulse_ = _srs_config->get_dying_pulse();
127+
128+
srs_trace("CircuitBreaker: high=%dx%d, critical=%dx%d, dying=%dx%d",
129+
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, dying_pulse_, dying_threshold_);
130+
131+
return err;
132+
}
133+
134+
bool SrsCircuitBreaker::hybrid_high_water_level()
135+
{
136+
return hybrid_critical_water_level() || hybrid_high_water_level_;
137+
}
138+
139+
bool SrsCircuitBreaker::hybrid_critical_water_level()
140+
{
141+
return hybrid_dying_water_level() || hybrid_critical_water_level_;
142+
}
143+
144+
bool SrsCircuitBreaker::hybrid_dying_water_level()
145+
{
146+
return dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_;
147+
}
148+
149+
srs_error_t SrsCircuitBreaker::on_timer(srs_utime_t interval, srs_utime_t tick)
150+
{
151+
srs_error_t err = srs_success;
152+
153+
SrsThreadEntry* entry = _srs_thread_pool->self();
154+
if (!entry->stat) {
155+
return err;
156+
}
157+
158+
// For Circuit-Breaker to update the SNMP, ASAP.
159+
srs_update_udp_snmp_statistic();
160+
161+
// Update thread CPUs per 1s.
162+
srs_update_thread_proc_stat(entry->stat, entry->tid);
163+
164+
// Update the Circuit-Breaker by water-level.
165+
// Reset the high water-level when CPU is low for N times.
166+
if (entry->stat->percent * 100 > high_threshold_) {
167+
hybrid_high_water_level_ = high_pulse_;
168+
} else if (hybrid_high_water_level_ > 0) {
169+
hybrid_high_water_level_--;
170+
}
171+
172+
// Reset the critical water-level when CPU is low for N times.
173+
if (entry->stat->percent * 100 > critical_threshold_) {
174+
hybrid_critical_water_level_ = critical_pulse_;
175+
} else if (hybrid_critical_water_level_ > 0) {
176+
hybrid_critical_water_level_--;
177+
}
178+
179+
// Reset the dying water-level when CPU is low for N times.
180+
if (entry->stat->percent * 100 > dying_threshold_) {
181+
hybrid_dying_water_level_ = srs_min(dying_pulse_ + 1, hybrid_dying_water_level_ + 1);
182+
} else if (hybrid_dying_water_level_ > 0) {
183+
hybrid_dying_water_level_ = 0;
184+
}
185+
186+
// Show statistics for RTC server.
187+
SrsProcSelfStat* u = srs_get_self_proc_stat();
188+
// Resident Set Size: number of pages the process has in real memory.
189+
int memory = (int)(u->rss * 4 / 1024);
190+
191+
// The hybrid thread cpu and memory.
192+
float thread_percent = entry->stat->percent * 100;
193+
194+
string circuit_breaker;
195+
if (true || hybrid_high_water_level() || hybrid_critical_water_level() || _srs_pps_aloss->r1s() || _srs_pps_rloss->r1s() || _srs_pps_snack2->r10s()) {
196+
srs_trace("CircuitBreaker(%s): cpu=%.2f%%,%dMB, thread=%.2f%%, break=%d,%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d",
197+
entry->label.c_str(), u->percent * 100, memory, thread_percent,
198+
hybrid_high_water_level(), hybrid_critical_water_level(), hybrid_dying_water_level(), // Whether Circuit-Break is enable.
199+
_srs_pps_rloss->r1s(), _srs_pps_aloss->r1s(), thread_percent, // The conditions to enable Circuit-Breaker.
200+
_srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), // NACK packet,seqs sent.
201+
_srs_pps_snack4->r10s() // NACK drop by Circuit-Break.
202+
);
203+
}
204+
205+
return err;
206+
}
207+
208+
__thread SrsCircuitBreaker* _srs_circuit_breaker = NULL;
209+
96210
SrsPipe::SrsPipe()
97211
{
98212
pipes_[0] = pipes_[1] = -1;
@@ -451,16 +565,6 @@ SrsThreadPool::SrsThreadPool()
451565
entry_ = NULL;
452566
lock_ = new SrsThreadMutex();
453567
hybrid_ = NULL;
454-
hybrid_high_water_level_ = 0;
455-
hybrid_critical_water_level_ = 0;
456-
hybrid_dying_water_level_ = 0;
457-
458-
high_threshold_ = 0;
459-
high_pulse_ = 0;
460-
critical_threshold_ = 0;
461-
critical_pulse_ = 0;
462-
dying_threshold_ = 0;
463-
dying_pulse_ = 0;
464568

465569
// Add primordial thread, current thread itself.
466570
SrsThreadEntry* entry = new SrsThreadEntry();
@@ -493,21 +597,6 @@ SrsThreadPool::~SrsThreadPool()
493597
}
494598
}
495599

496-
bool SrsThreadPool::hybrid_high_water_level()
497-
{
498-
return hybrid_critical_water_level() || hybrid_high_water_level_;
499-
}
500-
501-
bool SrsThreadPool::hybrid_critical_water_level()
502-
{
503-
return hybrid_dying_water_level() || hybrid_critical_water_level_;
504-
}
505-
506-
bool SrsThreadPool::hybrid_dying_water_level()
507-
{
508-
return dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_;
509-
}
510-
511600
// Thread local objects.
512601
extern const int LOG_MAX_SIZE;
513602
extern __thread char* _srs_log_data;
@@ -618,6 +707,9 @@ srs_error_t SrsThreadPool::setup()
618707
// Create the hybrid RTMP/HTTP/RTC server.
619708
_srs_hybrid = new SrsHybridServer();
620709

710+
// Create the circuit breaker for each thread.
711+
_srs_circuit_breaker = new SrsCircuitBreaker();
712+
621713
// Create the source manager for server.
622714
_srs_sources = new SrsSourceManager();
623715

@@ -780,12 +872,6 @@ srs_error_t SrsThreadPool::initialize()
780872
#endif
781873

782874
interval_ = _srs_config->get_threads_interval();
783-
high_threshold_ = _srs_config->get_high_threshold();
784-
high_pulse_ = _srs_config->get_high_pulse();
785-
critical_threshold_ = _srs_config->get_critical_threshold();
786-
critical_pulse_ = _srs_config->get_critical_pulse();
787-
dying_threshold_ = _srs_config->get_dying_threshold();
788-
dying_pulse_ = _srs_config->get_dying_pulse();
789875
int async_srtp = _srs_config->get_threads_async_srtp();
790876

791877
int recv_queue = _srs_config->get_threads_max_recv_queue();
@@ -798,10 +884,9 @@ srs_error_t SrsThreadPool::initialize()
798884
_srs_async_recv->set_tunnel_enabled(async_tunnel);
799885
_srs_async_srtp->set_tunnel_enabled(async_tunnel);
800886

801-
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d,%dx%d recvQ=%d, aSend=%d, tunnel=%d",
887+
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", recvQ=%d, aSend=%d, tunnel=%d",
802888
entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp,
803889
entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2),
804-
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, dying_pulse_, dying_threshold_,
805890
recv_queue, async_send, async_tunnel);
806891

807892
return err;
@@ -943,44 +1028,6 @@ srs_error_t SrsThreadPool::run()
9431028
}
9441029
}
9451030

946-
// For Circuit-Breaker to update the SNMP, ASAP.
947-
srs_update_udp_snmp_statistic();
948-
949-
// Update thread CPUs per 1s.
950-
for (int i = 0; i < (int)threads.size(); i++) {
951-
SrsThreadEntry* entry = threads.at(i);
952-
if (!entry->tid) {
953-
continue;
954-
}
955-
956-
srs_update_thread_proc_stat(entry->stat, entry->tid);
957-
}
958-
959-
// Update the Circuit-Breaker by water-level.
960-
// TODO: FIXME: Should stat all hybrid servers.
961-
if (hybrid_ && hybrid_->stat) {
962-
// Reset the high water-level when CPU is low for N times.
963-
if (hybrid_->stat->percent * 100 > high_threshold_) {
964-
hybrid_high_water_level_ = high_pulse_;
965-
} else if (hybrid_high_water_level_ > 0) {
966-
hybrid_high_water_level_--;
967-
}
968-
969-
// Reset the critical water-level when CPU is low for N times.
970-
if (hybrid_->stat->percent * 100 > critical_threshold_) {
971-
hybrid_critical_water_level_ = critical_pulse_;
972-
} else if (hybrid_critical_water_level_ > 0) {
973-
hybrid_critical_water_level_--;
974-
}
975-
976-
// Reset the dying water-level when CPU is low for N times.
977-
if (hybrid_->stat->percent * 100 > dying_threshold_) {
978-
hybrid_dying_water_level_ = srs_min(dying_pulse_ + 1, hybrid_dying_water_level_ + 1);
979-
} else if (hybrid_dying_water_level_ > 0) {
980-
hybrid_dying_water_level_ = 0;
981-
}
982-
}
983-
9841031
srs_usleep(1 * SRS_UTIME_SECONDS);
9851032
}
9861033

@@ -1011,16 +1058,8 @@ srs_error_t SrsThreadPool::run()
10111058
tunnel_desc = buf;
10121059
}
10131060

1014-
// Show statistics for RTC server.
1015-
SrsProcSelfStat* u = srs_get_self_proc_stat();
1016-
// Resident Set Size: number of pages the process has in real memory.
1017-
int memory = (int)(u->rss * 4 / 1024);
1018-
10191061
// The hybrid thread cpu and memory.
1020-
float thread_percent = 0.0f, top_percent = 0.0f;
1021-
if (hybrid_ && hybrid_->stat) {
1022-
thread_percent = hybrid_->stat->percent * 100;
1023-
}
1062+
float top_percent = 0.0f;
10241063
for (int i = 0; i < (int)threads.size(); i++) {
10251064
SrsThreadEntry* entry = threads.at(i);
10261065
if (!entry->stat || entry->stat->percent <= 0) {
@@ -1029,20 +1068,14 @@ srs_error_t SrsThreadPool::run()
10291068
top_percent = srs_max(top_percent, entry->stat->percent * 100);
10301069
}
10311070

1032-
string circuit_breaker;
1033-
if (hybrid_high_water_level() || hybrid_critical_water_level() || _srs_pps_aloss->r1s() || _srs_pps_rloss->r1s() || _srs_pps_snack2->r10s()) {
1034-
snprintf(buf, sizeof(buf), ", break=%d,%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d",
1035-
hybrid_high_water_level(), hybrid_critical_water_level(), hybrid_dying_water_level(), // Whether Circuit-Break is enable.
1036-
_srs_pps_rloss->r1s(), _srs_pps_aloss->r1s(), thread_percent, // The conditions to enable Circuit-Breaker.
1037-
_srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), // NACK packet,seqs sent.
1038-
_srs_pps_snack4->r10s() // NACK drop by Circuit-Break.
1039-
);
1040-
circuit_breaker = buf;
1041-
}
1071+
// Show statistics for RTC server.
1072+
SrsProcSelfStat* u = srs_get_self_proc_stat();
1073+
// Resident Set Size: number of pages the process has in real memory.
1074+
int memory = (int)(u->rss * 4 / 1024);
10421075

1043-
srs_trace("Process: cpu=%.2f%%,%dMB, threads=%d,%.2f%%,%.2f%%%s%s%s%s%s",
1044-
u->percent * 100, memory, (int)threads_.size(), top_percent, thread_percent,
1045-
async_logs.c_str(), sync_desc.c_str(), queue_desc.c_str(), circuit_breaker.c_str(),
1076+
srs_trace("Process: cpu=%.2f%%,%dMB, threads=%d,%.2f%%%%s%s%s%s",
1077+
u->percent * 100, memory, (int)threads_.size(), top_percent,
1078+
async_logs.c_str(), sync_desc.c_str(), queue_desc.c_str(),
10461079
tunnel_desc.c_str());
10471080
}
10481081

0 commit comments

Comments
 (0)