@@ -93,6 +93,120 @@ uint64_t srs_covert_cpuset(cpu_set_t v)
93
93
#endif
94
94
}
95
95
96
+ SrsCircuitBreaker::SrsCircuitBreaker ()
97
+ {
98
+ hybrid_high_water_level_ = 0 ;
99
+ hybrid_critical_water_level_ = 0 ;
100
+ hybrid_dying_water_level_ = 0 ;
101
+
102
+ high_threshold_ = 0 ;
103
+ high_pulse_ = 0 ;
104
+ critical_threshold_ = 0 ;
105
+ critical_pulse_ = 0 ;
106
+ dying_threshold_ = 0 ;
107
+ dying_pulse_ = 0 ;
108
+ }
109
+
110
+ SrsCircuitBreaker::~SrsCircuitBreaker ()
111
+ {
112
+ }
113
+
114
+ srs_error_t SrsCircuitBreaker::initialize ()
115
+ {
116
+ srs_error_t err = srs_success;
117
+
118
+ // Start a timer to stat the data for circuit breaker.
119
+ _srs_hybrid->timer ()->subscribe (1 * SRS_UTIME_SECONDS, this );
120
+
121
+ high_threshold_ = _srs_config->get_high_threshold ();
122
+ high_pulse_ = _srs_config->get_high_pulse ();
123
+ critical_threshold_ = _srs_config->get_critical_threshold ();
124
+ critical_pulse_ = _srs_config->get_critical_pulse ();
125
+ dying_threshold_ = _srs_config->get_dying_threshold ();
126
+ dying_pulse_ = _srs_config->get_dying_pulse ();
127
+
128
+ srs_trace (" CircuitBreaker: high=%dx%d, critical=%dx%d, dying=%dx%d" ,
129
+ high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, dying_pulse_, dying_threshold_);
130
+
131
+ return err;
132
+ }
133
+
134
+ bool SrsCircuitBreaker::hybrid_high_water_level ()
135
+ {
136
+ return hybrid_critical_water_level () || hybrid_high_water_level_;
137
+ }
138
+
139
+ bool SrsCircuitBreaker::hybrid_critical_water_level ()
140
+ {
141
+ return hybrid_dying_water_level () || hybrid_critical_water_level_;
142
+ }
143
+
144
+ bool SrsCircuitBreaker::hybrid_dying_water_level ()
145
+ {
146
+ return dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_;
147
+ }
148
+
149
+ srs_error_t SrsCircuitBreaker::on_timer (srs_utime_t interval, srs_utime_t tick)
150
+ {
151
+ srs_error_t err = srs_success;
152
+
153
+ SrsThreadEntry* entry = _srs_thread_pool->self ();
154
+ if (!entry->stat ) {
155
+ return err;
156
+ }
157
+
158
+ // For Circuit-Breaker to update the SNMP, ASAP.
159
+ srs_update_udp_snmp_statistic ();
160
+
161
+ // Update thread CPUs per 1s.
162
+ srs_update_thread_proc_stat (entry->stat , entry->tid );
163
+
164
+ // Update the Circuit-Breaker by water-level.
165
+ // Reset the high water-level when CPU is low for N times.
166
+ if (entry->stat ->percent * 100 > high_threshold_) {
167
+ hybrid_high_water_level_ = high_pulse_;
168
+ } else if (hybrid_high_water_level_ > 0 ) {
169
+ hybrid_high_water_level_--;
170
+ }
171
+
172
+ // Reset the critical water-level when CPU is low for N times.
173
+ if (entry->stat ->percent * 100 > critical_threshold_) {
174
+ hybrid_critical_water_level_ = critical_pulse_;
175
+ } else if (hybrid_critical_water_level_ > 0 ) {
176
+ hybrid_critical_water_level_--;
177
+ }
178
+
179
+ // Reset the dying water-level when CPU is low for N times.
180
+ if (entry->stat ->percent * 100 > dying_threshold_) {
181
+ hybrid_dying_water_level_ = srs_min (dying_pulse_ + 1 , hybrid_dying_water_level_ + 1 );
182
+ } else if (hybrid_dying_water_level_ > 0 ) {
183
+ hybrid_dying_water_level_ = 0 ;
184
+ }
185
+
186
+ // Show statistics for RTC server.
187
+ SrsProcSelfStat* u = srs_get_self_proc_stat ();
188
+ // Resident Set Size: number of pages the process has in real memory.
189
+ int memory = (int )(u->rss * 4 / 1024 );
190
+
191
+ // The hybrid thread cpu and memory.
192
+ float thread_percent = entry->stat ->percent * 100 ;
193
+
194
+ string circuit_breaker;
195
+ if (true || hybrid_high_water_level () || hybrid_critical_water_level () || _srs_pps_aloss->r1s () || _srs_pps_rloss->r1s () || _srs_pps_snack2->r10s ()) {
196
+ srs_trace (" CircuitBreaker(%s): cpu=%.2f%%,%dMB, thread=%.2f%%, break=%d,%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d" ,
197
+ entry->label .c_str (), u->percent * 100 , memory, thread_percent,
198
+ hybrid_high_water_level (), hybrid_critical_water_level (), hybrid_dying_water_level (), // Whether Circuit-Break is enable.
199
+ _srs_pps_rloss->r1s (), _srs_pps_aloss->r1s (), thread_percent, // The conditions to enable Circuit-Breaker.
200
+ _srs_pps_snack2->r10s (), _srs_pps_snack3->r10s (), // NACK packet,seqs sent.
201
+ _srs_pps_snack4->r10s () // NACK drop by Circuit-Break.
202
+ );
203
+ }
204
+
205
+ return err;
206
+ }
207
+
208
+ __thread SrsCircuitBreaker* _srs_circuit_breaker = NULL ;
209
+
96
210
SrsPipe::SrsPipe ()
97
211
{
98
212
pipes_[0 ] = pipes_[1 ] = -1 ;
@@ -451,16 +565,6 @@ SrsThreadPool::SrsThreadPool()
451
565
entry_ = NULL ;
452
566
lock_ = new SrsThreadMutex ();
453
567
hybrid_ = NULL ;
454
- hybrid_high_water_level_ = 0 ;
455
- hybrid_critical_water_level_ = 0 ;
456
- hybrid_dying_water_level_ = 0 ;
457
-
458
- high_threshold_ = 0 ;
459
- high_pulse_ = 0 ;
460
- critical_threshold_ = 0 ;
461
- critical_pulse_ = 0 ;
462
- dying_threshold_ = 0 ;
463
- dying_pulse_ = 0 ;
464
568
465
569
// Add primordial thread, current thread itself.
466
570
SrsThreadEntry* entry = new SrsThreadEntry ();
@@ -493,21 +597,6 @@ SrsThreadPool::~SrsThreadPool()
493
597
}
494
598
}
495
599
496
- bool SrsThreadPool::hybrid_high_water_level ()
497
- {
498
- return hybrid_critical_water_level () || hybrid_high_water_level_;
499
- }
500
-
501
- bool SrsThreadPool::hybrid_critical_water_level ()
502
- {
503
- return hybrid_dying_water_level () || hybrid_critical_water_level_;
504
- }
505
-
506
- bool SrsThreadPool::hybrid_dying_water_level ()
507
- {
508
- return dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_;
509
- }
510
-
511
600
// Thread local objects.
512
601
extern const int LOG_MAX_SIZE;
513
602
extern __thread char * _srs_log_data;
@@ -618,6 +707,9 @@ srs_error_t SrsThreadPool::setup()
618
707
// Create the hybrid RTMP/HTTP/RTC server.
619
708
_srs_hybrid = new SrsHybridServer ();
620
709
710
+ // Create the circuit breaker for each thread.
711
+ _srs_circuit_breaker = new SrsCircuitBreaker ();
712
+
621
713
// Create the source manager for server.
622
714
_srs_sources = new SrsSourceManager ();
623
715
@@ -780,12 +872,6 @@ srs_error_t SrsThreadPool::initialize()
780
872
#endif
781
873
782
874
interval_ = _srs_config->get_threads_interval ();
783
- high_threshold_ = _srs_config->get_high_threshold ();
784
- high_pulse_ = _srs_config->get_high_pulse ();
785
- critical_threshold_ = _srs_config->get_critical_threshold ();
786
- critical_pulse_ = _srs_config->get_critical_pulse ();
787
- dying_threshold_ = _srs_config->get_dying_threshold ();
788
- dying_pulse_ = _srs_config->get_dying_pulse ();
789
875
int async_srtp = _srs_config->get_threads_async_srtp ();
790
876
791
877
int recv_queue = _srs_config->get_threads_max_recv_queue ();
@@ -798,10 +884,9 @@ srs_error_t SrsThreadPool::initialize()
798
884
_srs_async_recv->set_tunnel_enabled (async_tunnel);
799
885
_srs_async_srtp->set_tunnel_enabled (async_tunnel);
800
886
801
- srs_trace (" Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 " /%d-0x%" PRIx64 " , water_level=%dx%d,%dx%d,%dx%d recvQ=%d, aSend=%d, tunnel=%d" ,
887
+ srs_trace (" Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 " /%d-0x%" PRIx64 " , recvQ=%d, aSend=%d, tunnel=%d" ,
802
888
entry->num , entry->label .c_str (), entry->name .c_str (), srsu2msi (interval_), async_srtp,
803
889
entry->cpuset_ok , r0, srs_covert_cpuset (entry->cpuset ), r1, srs_covert_cpuset (entry->cpuset2 ),
804
- high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, dying_pulse_, dying_threshold_,
805
890
recv_queue, async_send, async_tunnel);
806
891
807
892
return err;
@@ -943,44 +1028,6 @@ srs_error_t SrsThreadPool::run()
943
1028
}
944
1029
}
945
1030
946
- // For Circuit-Breaker to update the SNMP, ASAP.
947
- srs_update_udp_snmp_statistic ();
948
-
949
- // Update thread CPUs per 1s.
950
- for (int i = 0 ; i < (int )threads.size (); i++) {
951
- SrsThreadEntry* entry = threads.at (i);
952
- if (!entry->tid ) {
953
- continue ;
954
- }
955
-
956
- srs_update_thread_proc_stat (entry->stat , entry->tid );
957
- }
958
-
959
- // Update the Circuit-Breaker by water-level.
960
- // TODO: FIXME: Should stat all hybrid servers.
961
- if (hybrid_ && hybrid_->stat ) {
962
- // Reset the high water-level when CPU is low for N times.
963
- if (hybrid_->stat ->percent * 100 > high_threshold_) {
964
- hybrid_high_water_level_ = high_pulse_;
965
- } else if (hybrid_high_water_level_ > 0 ) {
966
- hybrid_high_water_level_--;
967
- }
968
-
969
- // Reset the critical water-level when CPU is low for N times.
970
- if (hybrid_->stat ->percent * 100 > critical_threshold_) {
971
- hybrid_critical_water_level_ = critical_pulse_;
972
- } else if (hybrid_critical_water_level_ > 0 ) {
973
- hybrid_critical_water_level_--;
974
- }
975
-
976
- // Reset the dying water-level when CPU is low for N times.
977
- if (hybrid_->stat ->percent * 100 > dying_threshold_) {
978
- hybrid_dying_water_level_ = srs_min (dying_pulse_ + 1 , hybrid_dying_water_level_ + 1 );
979
- } else if (hybrid_dying_water_level_ > 0 ) {
980
- hybrid_dying_water_level_ = 0 ;
981
- }
982
- }
983
-
984
1031
srs_usleep (1 * SRS_UTIME_SECONDS);
985
1032
}
986
1033
@@ -1011,16 +1058,8 @@ srs_error_t SrsThreadPool::run()
1011
1058
tunnel_desc = buf;
1012
1059
}
1013
1060
1014
- // Show statistics for RTC server.
1015
- SrsProcSelfStat* u = srs_get_self_proc_stat ();
1016
- // Resident Set Size: number of pages the process has in real memory.
1017
- int memory = (int )(u->rss * 4 / 1024 );
1018
-
1019
1061
// The hybrid thread cpu and memory.
1020
- float thread_percent = 0 .0f , top_percent = 0 .0f ;
1021
- if (hybrid_ && hybrid_->stat ) {
1022
- thread_percent = hybrid_->stat ->percent * 100 ;
1023
- }
1062
+ float top_percent = 0 .0f ;
1024
1063
for (int i = 0 ; i < (int )threads.size (); i++) {
1025
1064
SrsThreadEntry* entry = threads.at (i);
1026
1065
if (!entry->stat || entry->stat ->percent <= 0 ) {
@@ -1029,20 +1068,14 @@ srs_error_t SrsThreadPool::run()
1029
1068
top_percent = srs_max (top_percent, entry->stat ->percent * 100 );
1030
1069
}
1031
1070
1032
- string circuit_breaker;
1033
- if (hybrid_high_water_level () || hybrid_critical_water_level () || _srs_pps_aloss->r1s () || _srs_pps_rloss->r1s () || _srs_pps_snack2->r10s ()) {
1034
- snprintf (buf, sizeof (buf), " , break=%d,%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d" ,
1035
- hybrid_high_water_level (), hybrid_critical_water_level (), hybrid_dying_water_level (), // Whether Circuit-Break is enable.
1036
- _srs_pps_rloss->r1s (), _srs_pps_aloss->r1s (), thread_percent, // The conditions to enable Circuit-Breaker.
1037
- _srs_pps_snack2->r10s (), _srs_pps_snack3->r10s (), // NACK packet,seqs sent.
1038
- _srs_pps_snack4->r10s () // NACK drop by Circuit-Break.
1039
- );
1040
- circuit_breaker = buf;
1041
- }
1071
+ // Show statistics for RTC server.
1072
+ SrsProcSelfStat* u = srs_get_self_proc_stat ();
1073
+ // Resident Set Size: number of pages the process has in real memory.
1074
+ int memory = (int )(u->rss * 4 / 1024 );
1042
1075
1043
- srs_trace (" Process: cpu=%.2f%%,%dMB, threads=%d,%.2f%%,%.2f%%%s %s%s%s%s" ,
1044
- u->percent * 100 , memory, (int )threads_.size (), top_percent, thread_percent,
1045
- async_logs.c_str (), sync_desc.c_str (), queue_desc.c_str (), circuit_breaker. c_str (),
1076
+ srs_trace (" Process: cpu=%.2f%%,%dMB, threads=%d,%.2f%%% %s%s%s%s" ,
1077
+ u->percent * 100 , memory, (int )threads_.size (), top_percent,
1078
+ async_logs.c_str (), sync_desc.c_str (), queue_desc.c_str (),
1046
1079
tunnel_desc.c_str ());
1047
1080
}
1048
1081
0 commit comments