Skip to content

Commit e56df68

Browse files
committed
Collect query stats for activation poll query
1 parent 54b32ef commit e56df68

File tree

2 files changed

+77
-3
lines changed

2 files changed

+77
-3
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,14 +363,20 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
363363
.map(l => if (limit > 0) l.take(limit) else l)
364364

365365
val g = f.andThen {
366-
case Success(out) =>
366+
case Success(queryResult) =>
367367
if (queryMetrics.nonEmpty) {
368368
val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics: _*)
369369
logging.debug(
370370
this,
371371
s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
372372
}
373-
transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel)
373+
val stats = viewMapper.recordQueryStats(ddoc, viewName, descending, querySpec.getParameters, queryResult)
374+
val statsToLog = stats.map(s => " " + s).getOrElse("")
375+
transid.finished(
376+
this,
377+
start,
378+
s"[QUERY] '$collName' completed: matched ${queryResult.size}$statsToLog",
379+
InfoLevel)
374380
}
375381
reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
376382
}

common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.util.Collections
2222
import com.microsoft.azure.cosmosdb.DataType.{Number, String}
2323
import com.microsoft.azure.cosmosdb.IndexKind.Range
2424
import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, SqlParameterCollection, SqlQuerySpec}
25+
import kamon.metric.MeasurementUnit
26+
import org.apache.openwhisk.common.{LogMarkerToken, TransactionId, WhiskInstants}
2527
import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH
2628
import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS
2729
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias, computed, deleted}
@@ -34,6 +36,10 @@ import org.apache.openwhisk.core.database.{
3436
WhisksHandler
3537
}
3638
import org.apache.openwhisk.core.entity.WhiskQueries.TOP
39+
import org.apache.openwhisk.utils.JsHelpers
40+
import spray.json.{JsNumber, JsObject}
41+
42+
import scala.collection.JavaConverters._
3743

3844
private[cosmosdb] trait CosmosDBViewMapper {
3945
protected val NOTHING = ""
@@ -71,6 +77,17 @@ private[cosmosdb] trait CosmosDBViewMapper {
7177

7278
new SqlQuerySpec(query, paramColl)
7379
}
80+
81+
/**
82+
* Records query related stats based on result returned and arguments passed
83+
*
84+
* @return an optional string representation of stats for logging purpose
85+
*/
86+
def recordQueryStats(ddoc: String,
87+
viewName: String,
88+
descending: Boolean,
89+
queryParams: SqlParameterCollection,
90+
result: List[JsObject]): Option[String] = None
7491
}
7592

7693
private[cosmosdb] abstract class SimpleMapper extends CosmosDBViewMapper {
@@ -183,7 +200,8 @@ private[cosmosdb] object WhisksViewMapper extends SimpleMapper {
183200
}
184201

185202
}
186-
private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
203+
private[cosmosdb] object ActivationViewMapper extends SimpleMapper with WhiskInstants {
204+
import CosmosDBViewMapper._
187205
private val NS = "namespace"
188206
private val NS_WITH_PATH = s"$computed.$NS_PATH"
189207
private val START = "start"
@@ -233,6 +251,41 @@ private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
233251
case "activations" if ddoc.startsWith("whisks") => s"r.$START"
234252
case _ => throw UnsupportedView(s"$ddoc/$view")
235253
}
254+
255+
private val resultDeltaToken = createStatsToken("activations", "resultDelta", "activations")
256+
private val sinceDeltaToken = createStatsToken("activations", "sinceDelta", "activations")
257+
258+
override def recordQueryStats(ddoc: String,
259+
viewName: String,
260+
descending: Boolean,
261+
queryParams: SqlParameterCollection,
262+
result: List[JsObject]): Option[String] = {
263+
val stat = if (viewName == "activations" && descending) {
264+
// Collect stats for the delta between
265+
// 1. now and start time of last activation
266+
// 2. now and start time as specific in query for `since` parameter
267+
// These stats would help in determining how much old activations are being queried for list query (used in activation poll)
268+
val uptoOpt = paramValue(queryParams, "upto", classOf[Number])
269+
val startOpt = paramValue(queryParams, "start", classOf[Number])
270+
271+
// Result json has structure { id: "", "key": [], "value": {activation}}
272+
// So fetch value of start via `value.start` path
273+
val lastOpt = result.lastOption.flatMap(js => JsHelpers.getFieldPath(js, "value", "start"))
274+
275+
(uptoOpt, startOpt, lastOpt) match {
276+
//Go for case which does not specify upto as that would be the case with poll based query
277+
case (None, Some(startFromQuery), Some(JsNumber(start))) =>
278+
val now = nowInMillis().toEpochMilli
279+
val resultStartDelta = (now - start.longValue()).max(0)
280+
val queryStartDelta = (now - startFromQuery.longValue()).max(0)
281+
resultDeltaToken.histogram.record(resultStartDelta)
282+
sinceDeltaToken.histogram.record(queryStartDelta)
283+
Some(s"resultDelta=$resultStartDelta, sinceDelta=$queryStartDelta")
284+
case _ => None
285+
}
286+
} else None
287+
stat
288+
}
236289
}
237290
private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
238291
private val UUID = "uuid"
@@ -318,3 +371,18 @@ private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
318371

319372
private def selectClause(count: Boolean) = if (count) "TOP 1 VALUE COUNT(r)" else "r"
320373
}
374+
375+
object CosmosDBViewMapper {
376+
377+
def paramValue[T](params: SqlParameterCollection, key: String, clazz: Class[T]): Option[T] = {
378+
val name = "@" + key
379+
params.iterator().asScala.find(_.getName == name).map(_.getValue(clazz).asInstanceOf[T])
380+
}
381+
382+
def createStatsToken(viewName: String, statName: String, collName: String): LogMarkerToken = {
383+
val unit = MeasurementUnit.time.milliseconds
384+
val tags = Map("view" -> viewName, "collection" -> collName)
385+
if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "query", statName, tags = tags)(unit)
386+
else LogMarkerToken("cosmosdb", "query", collName, Some(statName))(unit)
387+
}
388+
}

0 commit comments

Comments
 (0)