@@ -18,6 +18,7 @@ package com.couchbase.client.scala.handlers
1818
1919import com .couchbase .client .core .Core
2020import com .couchbase .client .core .cnc .TracingIdentifiers
21+ import com .couchbase .client .core .classic .analytics .AnalyticsHelper
2122import com .couchbase .client .core .deps .io .netty .util .CharsetUtil
2223import com .couchbase .client .core .error .ErrorCodeAndMessage
2324import com .couchbase .client .core .msg .analytics .AnalyticsRequest
@@ -114,41 +115,44 @@ private[scala] class AnalyticsHandler(hp: HandlerBasicParams) {
114115 def queryAsync (
115116 request : AnalyticsRequest
116117 )(implicit ec : ExecutionContext ): Future [AnalyticsResult ] = {
117- hp.core.send(request)
118-
119118 val ret : Future [AnalyticsResult ] = FutureConversions
120- .javaCFToScalaMono(request, request.response(), propagateCancellation = true )
121- .flatMap(response =>
119+ .javaMonoToScalaMono(AnalyticsHelper .requireCouchbaseServer(hp.core, request.timeout()))
120+ .`then`({
121+ hp.core.send(request)
122122 FutureConversions
123- .javaFluxToScalaFlux(response.rows())
124- .collectSeq()
125- .flatMap(rows =>
123+ .javaCFToScalaMono(request, request.response(), propagateCancellation = true )
124+ .flatMap(response =>
126125 FutureConversions
127- .javaMonoToScalaMono(response.trailer())
128- .map(trailer => {
129- val warnings : collection.Seq [AnalyticsWarning ] = trailer.warnings.asScala
130- .map(warnings =>
131- ErrorCodeAndMessage
132- .fromJsonArray(warnings)
133- .asScala
134- .map(codeAndMessage => AnalyticsWarning (codeAndMessage))
135- )
136- .getOrElse(Seq .empty)
137-
138- AnalyticsResult (
139- rows,
140- AnalyticsMetaData (
141- response.header().requestId(),
142- response.header().clientContextId().orElse(" " ),
143- response.header().signature.asScala,
144- AnalyticsMetrics .fromBytes(trailer.metrics),
145- warnings,
146- AnalyticsStatus .from(trailer.status)
147- )
148- )
149- })
126+ .javaFluxToScalaFlux(response.rows())
127+ .collectSeq()
128+ .flatMap(rows =>
129+ FutureConversions
130+ .javaMonoToScalaMono(response.trailer())
131+ .map(trailer => {
132+ val warnings : collection.Seq [AnalyticsWarning ] = trailer.warnings.asScala
133+ .map(warnings =>
134+ ErrorCodeAndMessage
135+ .fromJsonArray(warnings)
136+ .asScala
137+ .map(codeAndMessage => AnalyticsWarning (codeAndMessage))
138+ )
139+ .getOrElse(Seq .empty)
140+
141+ AnalyticsResult (
142+ rows,
143+ AnalyticsMetaData (
144+ response.header().requestId(),
145+ response.header().clientContextId().orElse(" " ),
146+ response.header().signature.asScala,
147+ AnalyticsMetrics .fromBytes(trailer.metrics),
148+ warnings,
149+ AnalyticsStatus .from(trailer.status)
150+ )
151+ )
152+ })
153+ )
150154 )
151- )
155+ } )
152156 .toFuture
153157
154158 ret onComplete {
@@ -160,41 +164,44 @@ private[scala] class AnalyticsHandler(hp: HandlerBasicParams) {
160164
161165 def queryReactive (request : AnalyticsRequest ): SMono [ReactiveAnalyticsResult ] = {
162166 SMono .defer(() => {
163- hp.core.send(request)
164-
165167 FutureConversions
166- .javaCFToScalaMono(request, request.response(), false )
167- .map(response => {
168- val meta : SMono [AnalyticsMetaData ] = FutureConversions
169- .javaMonoToScalaMono(response.trailer())
170- .map(trailer => {
171- val warnings : collection.Seq [AnalyticsWarning ] = trailer.warnings.asScala
172- .map(warnings =>
173- ErrorCodeAndMessage
174- .fromJsonArray(warnings)
175- .asScala
176- .map(codeAndMessage => AnalyticsWarning (codeAndMessage))
177- )
178- .getOrElse(Seq .empty)
179-
180- AnalyticsMetaData (
181- response.header().requestId(),
182- response.header().clientContextId().orElse(" " ),
183- response.header().signature.asScala,
184- AnalyticsMetrics .fromBytes(trailer.metrics()),
185- warnings,
186- AnalyticsStatus .from(trailer.status)
187- )
188- })
189- .doOnNext(_ => request.context.logicallyComplete)
190- .doOnError(err => request.context().logicallyComplete(err))
168+ .javaMonoToScalaMono(AnalyticsHelper .requireCouchbaseServer(hp.core, request.timeout()))
169+ .`then`({
170+ hp.core.send(request)
171+ FutureConversions
172+ .javaCFToScalaMono(request, request.response(), false )
173+ .map(response => {
174+ val meta : SMono [AnalyticsMetaData ] = FutureConversions
175+ .javaMonoToScalaMono(response.trailer())
176+ .map(trailer => {
177+ val warnings : collection.Seq [AnalyticsWarning ] = trailer.warnings.asScala
178+ .map(warnings =>
179+ ErrorCodeAndMessage
180+ .fromJsonArray(warnings)
181+ .asScala
182+ .map(codeAndMessage => AnalyticsWarning (codeAndMessage))
183+ )
184+ .getOrElse(Seq .empty)
191185
192- val rows = FutureConversions .javaFluxToScalaFlux(response.rows())
186+ AnalyticsMetaData (
187+ response.header().requestId(),
188+ response.header().clientContextId().orElse(" " ),
189+ response.header().signature.asScala,
190+ AnalyticsMetrics .fromBytes(trailer.metrics()),
191+ warnings,
192+ AnalyticsStatus .from(trailer.status)
193+ )
194+ })
195+ .doOnNext(_ => request.context.logicallyComplete)
196+ .doOnError(err => request.context().logicallyComplete(err))
193197
194- ReactiveAnalyticsResult (
195- rows,
196- meta
197- )
198+ val rows = FutureConversions .javaFluxToScalaFlux(response.rows())
199+
200+ ReactiveAnalyticsResult (
201+ rows,
202+ meta
203+ )
204+ })
198205 })
199206 })
200207 }
0 commit comments