Skip to content

Commit ecbd12d

Browse files
committed
SCBC-490: Operational SDK prevent connection to Analytics 2.0 Cluster
Change-Id: I481a89a016cce792adc2ac323d43dbe74a509daa Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/231783 Tested-by: Build Bot <[email protected]> Reviewed-by: David Nault <[email protected]>
1 parent 4536fff commit ecbd12d

1 file changed

Lines changed: 70 additions & 63 deletions

File tree

scala-client/src/main/scala-2/com/couchbase/client/scala/handlers/AnalyticsHandler.scala

Lines changed: 70 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.couchbase.client.scala.handlers
1818

1919
import com.couchbase.client.core.Core
2020
import com.couchbase.client.core.cnc.TracingIdentifiers
21+
import com.couchbase.client.core.classic.analytics.AnalyticsHelper
2122
import com.couchbase.client.core.deps.io.netty.util.CharsetUtil
2223
import com.couchbase.client.core.error.ErrorCodeAndMessage
2324
import com.couchbase.client.core.msg.analytics.AnalyticsRequest
@@ -114,41 +115,44 @@ private[scala] class AnalyticsHandler(hp: HandlerBasicParams) {
114115
def queryAsync(
115116
request: AnalyticsRequest
116117
)(implicit ec: ExecutionContext): Future[AnalyticsResult] = {
117-
hp.core.send(request)
118-
119118
val ret: Future[AnalyticsResult] = FutureConversions
120-
.javaCFToScalaMono(request, request.response(), propagateCancellation = true)
121-
.flatMap(response =>
119+
.javaMonoToScalaMono(AnalyticsHelper.requireCouchbaseServer(hp.core, request.timeout()))
120+
.`then`({
121+
hp.core.send(request)
122122
FutureConversions
123-
.javaFluxToScalaFlux(response.rows())
124-
.collectSeq()
125-
.flatMap(rows =>
123+
.javaCFToScalaMono(request, request.response(), propagateCancellation = true)
124+
.flatMap(response =>
126125
FutureConversions
127-
.javaMonoToScalaMono(response.trailer())
128-
.map(trailer => {
129-
val warnings: collection.Seq[AnalyticsWarning] = trailer.warnings.asScala
130-
.map(warnings =>
131-
ErrorCodeAndMessage
132-
.fromJsonArray(warnings)
133-
.asScala
134-
.map(codeAndMessage => AnalyticsWarning(codeAndMessage))
135-
)
136-
.getOrElse(Seq.empty)
137-
138-
AnalyticsResult(
139-
rows,
140-
AnalyticsMetaData(
141-
response.header().requestId(),
142-
response.header().clientContextId().orElse(""),
143-
response.header().signature.asScala,
144-
AnalyticsMetrics.fromBytes(trailer.metrics),
145-
warnings,
146-
AnalyticsStatus.from(trailer.status)
147-
)
148-
)
149-
})
126+
.javaFluxToScalaFlux(response.rows())
127+
.collectSeq()
128+
.flatMap(rows =>
129+
FutureConversions
130+
.javaMonoToScalaMono(response.trailer())
131+
.map(trailer => {
132+
val warnings: collection.Seq[AnalyticsWarning] = trailer.warnings.asScala
133+
.map(warnings =>
134+
ErrorCodeAndMessage
135+
.fromJsonArray(warnings)
136+
.asScala
137+
.map(codeAndMessage => AnalyticsWarning(codeAndMessage))
138+
)
139+
.getOrElse(Seq.empty)
140+
141+
AnalyticsResult(
142+
rows,
143+
AnalyticsMetaData(
144+
response.header().requestId(),
145+
response.header().clientContextId().orElse(""),
146+
response.header().signature.asScala,
147+
AnalyticsMetrics.fromBytes(trailer.metrics),
148+
warnings,
149+
AnalyticsStatus.from(trailer.status)
150+
)
151+
)
152+
})
153+
)
150154
)
151-
)
155+
})
152156
.toFuture
153157

154158
ret onComplete {
@@ -160,41 +164,44 @@ private[scala] class AnalyticsHandler(hp: HandlerBasicParams) {
160164

161165
def queryReactive(request: AnalyticsRequest): SMono[ReactiveAnalyticsResult] = {
162166
SMono.defer(() => {
163-
hp.core.send(request)
164-
165167
FutureConversions
166-
.javaCFToScalaMono(request, request.response(), false)
167-
.map(response => {
168-
val meta: SMono[AnalyticsMetaData] = FutureConversions
169-
.javaMonoToScalaMono(response.trailer())
170-
.map(trailer => {
171-
val warnings: collection.Seq[AnalyticsWarning] = trailer.warnings.asScala
172-
.map(warnings =>
173-
ErrorCodeAndMessage
174-
.fromJsonArray(warnings)
175-
.asScala
176-
.map(codeAndMessage => AnalyticsWarning(codeAndMessage))
177-
)
178-
.getOrElse(Seq.empty)
179-
180-
AnalyticsMetaData(
181-
response.header().requestId(),
182-
response.header().clientContextId().orElse(""),
183-
response.header().signature.asScala,
184-
AnalyticsMetrics.fromBytes(trailer.metrics()),
185-
warnings,
186-
AnalyticsStatus.from(trailer.status)
187-
)
188-
})
189-
.doOnNext(_ => request.context.logicallyComplete)
190-
.doOnError(err => request.context().logicallyComplete(err))
168+
.javaMonoToScalaMono(AnalyticsHelper.requireCouchbaseServer(hp.core, request.timeout()))
169+
.`then`({
170+
hp.core.send(request)
171+
FutureConversions
172+
.javaCFToScalaMono(request, request.response(), false)
173+
.map(response => {
174+
val meta: SMono[AnalyticsMetaData] = FutureConversions
175+
.javaMonoToScalaMono(response.trailer())
176+
.map(trailer => {
177+
val warnings: collection.Seq[AnalyticsWarning] = trailer.warnings.asScala
178+
.map(warnings =>
179+
ErrorCodeAndMessage
180+
.fromJsonArray(warnings)
181+
.asScala
182+
.map(codeAndMessage => AnalyticsWarning(codeAndMessage))
183+
)
184+
.getOrElse(Seq.empty)
191185

192-
val rows = FutureConversions.javaFluxToScalaFlux(response.rows())
186+
AnalyticsMetaData(
187+
response.header().requestId(),
188+
response.header().clientContextId().orElse(""),
189+
response.header().signature.asScala,
190+
AnalyticsMetrics.fromBytes(trailer.metrics()),
191+
warnings,
192+
AnalyticsStatus.from(trailer.status)
193+
)
194+
})
195+
.doOnNext(_ => request.context.logicallyComplete)
196+
.doOnError(err => request.context().logicallyComplete(err))
193197

194-
ReactiveAnalyticsResult(
195-
rows,
196-
meta
197-
)
198+
val rows = FutureConversions.javaFluxToScalaFlux(response.rows())
199+
200+
ReactiveAnalyticsResult(
201+
rows,
202+
meta
203+
)
204+
})
198205
})
199206
})
200207
}

0 commit comments

Comments
 (0)