@@ -22,6 +22,8 @@ import java.util.Collections
2222import com .microsoft .azure .cosmosdb .DataType .{Number , String }
2323import com .microsoft .azure .cosmosdb .IndexKind .Range
2424import com .microsoft .azure .cosmosdb .{PartitionKeyDefinition , SqlParameter , SqlParameterCollection , SqlQuerySpec }
25+ import kamon .metric .MeasurementUnit
26+ import org .apache .openwhisk .common .{LogMarkerToken , TransactionId , WhiskInstants }
2527import org .apache .openwhisk .core .database .ActivationHandler .NS_PATH
2628import org .apache .openwhisk .core .database .WhisksHandler .ROOT_NS
2729import org .apache .openwhisk .core .database .cosmosdb .CosmosDBConstants .{alias , computed , deleted }
@@ -34,6 +36,10 @@ import org.apache.openwhisk.core.database.{
3436 WhisksHandler
3537}
3638import org .apache .openwhisk .core .entity .WhiskQueries .TOP
39+ import org .apache .openwhisk .utils .JsHelpers
40+ import spray .json .{JsNumber , JsObject }
41+
42+ import scala .collection .JavaConverters ._
3743
3844private [cosmosdb] trait CosmosDBViewMapper {
3945 protected val NOTHING = " "
@@ -71,6 +77,17 @@ private[cosmosdb] trait CosmosDBViewMapper {
7177
7278 new SqlQuerySpec (query, paramColl)
7379 }
80+
81+ /**
82+ * Records query related stats based on result returned and arguments passed
83+ *
84+ * @return an optional string representation of stats for logging purpose
85+ */
86+ def recordQueryStats (ddoc : String ,
87+ viewName : String ,
88+ descending : Boolean ,
89+ queryParams : SqlParameterCollection ,
90+ result : List [JsObject ]): Option [String ] = None
7491}
7592
7693private [cosmosdb] abstract class SimpleMapper extends CosmosDBViewMapper {
@@ -183,7 +200,8 @@ private[cosmosdb] object WhisksViewMapper extends SimpleMapper {
183200 }
184201
185202}
186- private [cosmosdb] object ActivationViewMapper extends SimpleMapper {
203+ private [cosmosdb] object ActivationViewMapper extends SimpleMapper with WhiskInstants {
204+ import CosmosDBViewMapper ._
187205 private val NS = " namespace"
188206 private val NS_WITH_PATH = s " $computed. $NS_PATH"
189207 private val START = " start"
@@ -233,6 +251,41 @@ private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
233251 case " activations" if ddoc.startsWith(" whisks" ) => s " r. $START"
234252 case _ => throw UnsupportedView (s " $ddoc/ $view" )
235253 }
254+
255+ private val resultDeltaToken = createStatsToken(" activations" , " resultDelta" , " activations" )
256+ private val sinceDeltaToken = createStatsToken(" activations" , " sinceDelta" , " activations" )
257+
258+ override def recordQueryStats (ddoc : String ,
259+ viewName : String ,
260+ descending : Boolean ,
261+ queryParams : SqlParameterCollection ,
262+ result : List [JsObject ]): Option [String ] = {
263+ val stat = if (viewName == " activations" && descending) {
264+ // Collect stats for the delta between
265+ // 1. now and start time of last activation
266+ // 2. now and start time as specific in query for `since` parameter
267+ // These stats would help in determining how much old activations are being queried for list query (used in activation poll)
268+ val uptoOpt = paramValue(queryParams, " upto" , classOf [Number ])
269+ val startOpt = paramValue(queryParams, " start" , classOf [Number ])
270+
271+ // Result json has structure { id: "", "key": [], "value": {activation}}
272+ // So fetch value of start via `value.start` path
273+ val lastOpt = result.lastOption.flatMap(js => JsHelpers .getFieldPath(js, " value" , " start" ))
274+
275+ (uptoOpt, startOpt, lastOpt) match {
276+ // Go for case which does not specify upto as that would be the case with poll based query
277+ case (None , Some (startFromQuery), Some (JsNumber (start))) =>
278+ val now = nowInMillis().toEpochMilli
279+ val resultStartDelta = (now - start.longValue()).max(0 )
280+ val queryStartDelta = (now - startFromQuery.longValue()).max(0 )
281+ resultDeltaToken.histogram.record(resultStartDelta)
282+ sinceDeltaToken.histogram.record(queryStartDelta)
283+ Some (s " resultDelta= $resultStartDelta, sinceDelta= $queryStartDelta" )
284+ case _ => None
285+ }
286+ } else None
287+ stat
288+ }
236289}
237290private [cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
238291 private val UUID = " uuid"
@@ -318,3 +371,18 @@ private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
318371
319372 private def selectClause (count : Boolean ) = if (count) " TOP 1 VALUE COUNT(r)" else " r"
320373}
374+
375+ object CosmosDBViewMapper {
376+
377+ def paramValue [T ](params : SqlParameterCollection , key : String , clazz : Class [T ]): Option [T ] = {
378+ val name = " @" + key
379+ params.iterator().asScala.find(_.getName == name).map(_.getValue(clazz).asInstanceOf [T ])
380+ }
381+
382+ def createStatsToken (viewName : String , statName : String , collName : String ): LogMarkerToken = {
383+ val unit = MeasurementUnit .time.milliseconds
384+ val tags = Map (" view" -> viewName, " collection" -> collName)
385+ if (TransactionId .metricsKamonTags) LogMarkerToken (" cosmosdb" , " query" , statName, tags = tags)(unit)
386+ else LogMarkerToken (" cosmosdb" , " query" , collName, Some (statName))(unit)
387+ }
388+ }
0 commit comments