99
1010#include " common/buffer/buffer_impl.h"
1111#include " common/common/logger.h"
12+ #include " common/config/well_known_names.h"
1213#include " common/http/headers.h"
1314
1415#include " absl/strings/str_cat.h"
16+ #include " absl/strings/str_split.h"
17+ #include " fmt/printf.h"
1518
1619namespace Envoy {
1720namespace Extensions {
1821namespace StatSinks {
1922namespace Hystrix {
2023
2124const uint64_t HystrixSink::DEFAULT_NUM_BUCKETS;
22-
2325ClusterStatsCache::ClusterStatsCache (const std::string& cluster_name)
2426 : cluster_name_(cluster_name) {}
2527
@@ -43,6 +45,19 @@ void ClusterStatsCache::printRollingWindow(absl::string_view name, RollingWindow
4345 out_str << std::endl;
4446}
4547
48+ void HystrixSink::addHistogramToStream (const QuantileLatencyMap& latency_map, absl::string_view key,
49+ std::stringstream& ss) {
50+ // TODO: Consider if we better use join here
51+ ss << " , \" " << key << " \" : {" ;
52+ bool is_first = true ;
53+ for (const std::pair<double , double >& element : latency_map) {
54+ const std::string quantile = fmt::sprintf (" %g" , element.first * 100 );
55+ HystrixSink::addDoubleToStream (quantile, element.second , ss, is_first);
56+ is_first = false ;
57+ }
58+ ss << " }" ;
59+ }
60+
4661// Add new value to rolling window, in place of oldest one.
4762void HystrixSink::pushNewValue (RollingWindow& rolling_window, uint64_t value) {
4863 if (rolling_window.empty ()) {
@@ -118,6 +133,11 @@ void HystrixSink::addIntToStream(absl::string_view key, uint64_t value, std::str
118133 addInfoToStream (key, std::to_string (value), info, is_first);
119134}
120135
136+ void HystrixSink::addDoubleToStream (absl::string_view key, double value, std::stringstream& info,
137+ bool is_first) {
138+ addInfoToStream (key, std::to_string (value), info, is_first);
139+ }
140+
121141void HystrixSink::addInfoToStream (absl::string_view key, absl::string_view value,
122142 std::stringstream& info, bool is_first) {
123143 if (!is_first) {
@@ -131,7 +151,7 @@ void HystrixSink::addHystrixCommand(ClusterStatsCache& cluster_stats_cache,
131151 absl::string_view cluster_name,
132152 uint64_t max_concurrent_requests, uint64_t reporting_hosts,
133153 std::chrono::milliseconds rolling_window_ms,
134- std::stringstream& ss) {
154+ const QuantileLatencyMap& histogram, std::stringstream& ss) {
135155
136156 std::time_t currentTime = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now ());
137157
@@ -161,7 +181,7 @@ void HystrixSink::addHystrixCommand(ClusterStatsCache& cluster_stats_cache,
161181 addIntToStream (" rollingCountResponsesFromCache" , 0 , ss);
162182
163183 // Envoy's "circuit breaker" has similar meaning to hystrix's isolation
164- // so we count upstream_rq_pending_overflow and present it as ss
184+ // so we count upstream_rq_pending_overflow and present it as rollingCountSemaphoreRejected
165185 addIntToStream (" rollingCountSemaphoreRejected" , rejected, ss);
166186
167187 // Hystrix's short circuit is not similar to Envoy's since it is triggered by 503 responses
@@ -173,12 +193,8 @@ void HystrixSink::addHystrixCommand(ClusterStatsCache& cluster_stats_cache,
173193 addIntToStream (" rollingCountTimeout" , timeouts, ss);
174194 addIntToStream (" rollingCountBadRequests" , 0 , ss);
175195 addIntToStream (" currentConcurrentExecutionCount" , 0 , ss);
176- addIntToStream (" latencyExecute_mean" , 0 , ss);
177-
178- // TODO trabetti : add histogram information once available by PR #2932
179- addInfoToStream (
180- " latencyExecute" ,
181- " {\" 0\" :0,\" 25\" :0,\" 50\" :0,\" 75\" :0,\" 90\" :0,\" 95\" :0,\" 99\" :0,\" 99.5\" :0,\" 100\" :0}" , ss);
196+ addStringToStream (" latencyExecute_mean" , " null" , ss);
197+ addHistogramToStream (histogram, " latencyExecute" , ss);
182198 addIntToStream (" propertyValue_circuitBreakerRequestVolumeThreshold" , 0 , ss);
183199 addIntToStream (" propertyValue_circuitBreakerSleepWindowInMilliseconds" , 0 , ss);
184200 addIntToStream (" propertyValue_circuitBreakerErrorThresholdPercentage" , 0 , ss);
@@ -230,10 +246,11 @@ void HystrixSink::addClusterStatsToStream(ClusterStatsCache& cluster_stats_cache
230246 uint64_t max_concurrent_requests,
231247 uint64_t reporting_hosts,
232248 std::chrono::milliseconds rolling_window_ms,
249+ const QuantileLatencyMap& histogram,
233250 std::stringstream& ss) {
234251
235252 addHystrixCommand (cluster_stats_cache, cluster_name, max_concurrent_requests, reporting_hosts,
236- rolling_window_ms, ss);
253+ rolling_window_ms, histogram, ss);
237254 addHystrixThreadPool (cluster_name, max_concurrent_requests, reporting_hosts, rolling_window_ms,
238255 ss);
239256}
@@ -299,13 +316,46 @@ Http::Code HystrixSink::handlerHystrixEventStream(absl::string_view,
299316 return Http::Code::OK;
300317}
301318
302- void HystrixSink::flush (Stats::Source&) {
319+ void HystrixSink::flush (Stats::Source& source ) {
303320 if (callbacks_list_.empty ()) {
304321 return ;
305322 }
306323 incCounter ();
307324 std::stringstream ss;
308325 Upstream::ClusterManager::ClusterInfoMap clusters = server_.clusterManager ().clusters ();
326+
327+ // Save a map of the relevant histograms per cluster in a convenient format.
328+ std::unordered_map<std::string, QuantileLatencyMap> time_histograms;
329+ for (const Stats::ParentHistogramSharedPtr& histogram : source.cachedHistograms ()) {
330+ if (histogram->tagExtractedName () == " cluster.upstream_rq_time" ) {
331+ // TODO(mrice32): add an Envoy utility function to look up and return a tag for a metric.
332+ auto it = std::find_if (histogram->tags ().begin (), histogram->tags ().end (),
333+ [](const Stats::Tag& tag) {
334+ return (tag.name_ == Config::TagNames::get ().CLUSTER_NAME );
335+ });
336+
337+ // Make sure we found the cluster name tag
338+ ASSERT (it != histogram->tags ().end ());
339+ auto it_bool_pair = time_histograms.emplace (std::make_pair (it->value_ , QuantileLatencyMap ()));
340+ // Make sure histogram with this name was not already added
341+ ASSERT (it_bool_pair.second );
342+ QuantileLatencyMap& hist_map = it_bool_pair.first ->second ;
343+
344+ const std::vector<double >& supported_quantiles =
345+ histogram->intervalStatistics ().supportedQuantiles ();
346+ for (size_t i = 0 ; i < supported_quantiles.size (); ++i) {
347+ // binary-search here is likely not worth it, as hystrix_quantiles has <10 elements.
348+ if (std::find (hystrix_quantiles.begin (), hystrix_quantiles.end (), supported_quantiles[i]) !=
349+ hystrix_quantiles.end ()) {
350+ const double value = histogram->intervalStatistics ().computedQuantiles ()[i];
351+ if (!std::isnan (value)) {
352+ hist_map[supported_quantiles[i]] = value;
353+ }
354+ }
355+ }
356+ }
357+ }
358+
309359 for (auto & cluster : clusters) {
310360 Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.second .get ().info ();
311361
@@ -323,7 +373,7 @@ void HystrixSink::flush(Stats::Source&) {
323373 *cluster_stats_cache_ptr, cluster_info->name (),
324374 cluster_info->resourceManager (Upstream::ResourcePriority::Default).pendingRequests ().max (),
325375 cluster_info->statsScope ().gauge (" membership_total" ).value (), server_.statsFlushInterval (),
326- ss);
376+ time_histograms[cluster_info-> name ()], ss);
327377 }
328378
329379 Buffer::OwnedImpl data;
0 commit comments