This repository was archived by the owner on Jan 5, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 293
Expand file tree
/
Copy pathcloudAdapter.ts
More file actions
429 lines (369 loc) · 15.7 KB
/
cloudAdapter.ts
File metadata and controls
429 lines (369 loc) · 15.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import * as z from 'zod';
import type { BotFrameworkHttpAdapter } from './botFrameworkHttpAdapter';
import { Activity, CloudAdapterBase, InvokeResponse, StatusCodes, TurnContext } from 'botbuilder-core';
import { GET, POST, VERSION_PATH } from './streaming';
import {
HttpClient,
HttpHeaders,
HttpOperationResponse,
WebResourceLike as WebResource,
} from 'botbuilder-stdlib/lib/azureCoreHttpCompat';
import { INodeBufferT, INodeSocketT, LogicT } from './zod';
import { Request, Response, ResponseT } from './interfaces';
import { USER_AGENT } from './botFrameworkAdapter';
import { retry } from 'botbuilder-stdlib';
import { validateAndFixActivity } from './activityValidator';
import {
AuthenticateRequestResult,
AuthenticationError,
BotFrameworkAuthentication,
BotFrameworkAuthenticationFactory,
ClaimsIdentity,
ConnectorClient,
ConnectorFactory,
MicrosoftAppCredentials,
} from 'botframework-connector';
import {
INodeBuffer,
INodeSocket,
INodeDuplex,
IReceiveRequest,
IReceiveResponse,
IStreamingTransportServer,
NamedPipeServer,
NodeWebSocketFactory,
RequestHandler,
StreamingRequest,
StreamingResponse,
WebSocketServer,
} from 'botframework-streaming';
// Note: this is _okay_ because we pass the result through `validateAndFixActivity`. Should not be used otherwise.
const ActivityT = z.custom<Activity>((val) => z.record(z.unknown()).safeParse(val).success, { message: 'Activity' });
/**
* An adapter that implements the Bot Framework Protocol and can be hosted in different cloud environmens both public and private.
*/
export class CloudAdapter extends CloudAdapterBase implements BotFrameworkHttpAdapter {
/**
* Initializes a new instance of the [CloudAdapter](xref:botbuilder:CloudAdapter) class.
*
* @param botFrameworkAuthentication Optional [BotFrameworkAuthentication](xref:botframework-connector.BotFrameworkAuthentication) instance
*/
constructor(botFrameworkAuthentication: BotFrameworkAuthentication = BotFrameworkAuthenticationFactory.create()) {
super(botFrameworkAuthentication);
}
/**
* Process a web request by applying a logic function.
*
* @param req An incoming HTTP [Request](xref:botbuilder.Request)
* @param req The corresponding HTTP [Response](xref:botbuilder.Response)
* @param logic The logic function to apply
* @returns a promise representing the asynchronous operation.
*/
async process(req: Request, res: Response, logic: (context: TurnContext) => Promise<void>): Promise<void>;
/**
* Handle a web socket connection by applying a logic function to
* each streaming request.
*
* @param req An incoming HTTP [Request](xref:botbuilder.Request)
* @param socket The corresponding [INodeSocket](xref:botframework-streaming.INodeSocket)
* @param head The corresponding [INodeBuffer](xref:botframework-streaming.INodeBuffer)
* @param logic The logic function to apply
* @returns a promise representing the asynchronous operation.
*/
async process(
req: Request,
socket: INodeSocket,
head: INodeBuffer,
logic: (context: TurnContext) => Promise<void>,
): Promise<void>;
/**
* Handle a web socket connection by applying a logic function to
* each streaming request.
*
* @param req An incoming HTTP [Request](xref:botbuilder.Request)
* @param socket The corresponding [INodeDuplex](xref:botframework-streaming.INodeDuplex)
* @param head The corresponding [INodeBuffer](xref:botframework-streaming.INodeBuffer)
* @param logic The logic function to apply
* @returns a promise representing the asynchronous operation.
*/
async process(
req: Request,
socket: INodeDuplex,
head: INodeBuffer,
logic: (context: TurnContext) => Promise<void>,
): Promise<void>;
/**
* @internal
*/
async process(
req: Request,
resOrSocket: Response | INodeSocket | INodeDuplex,
logicOrHead: ((context: TurnContext) => Promise<void>) | INodeBuffer,
maybeLogic?: (context: TurnContext) => Promise<void>,
): Promise<void> {
// Early return with web socket handler if function invocation matches that signature
if (maybeLogic) {
const socket = INodeSocketT.parse(resOrSocket);
const head = INodeBufferT.parse(logicOrHead);
const logic = LogicT.parse(maybeLogic);
return this.connect(req, socket, head, logic);
}
const res = ResponseT.parse(resOrSocket);
const logic = LogicT.parse(logicOrHead);
const end = (status: StatusCodes, body?: unknown) => {
res.status(status);
if (body) {
res.send(body);
}
res.end();
};
// Only POST requests from here on out
if (req.method !== 'POST') {
return end(StatusCodes.METHOD_NOT_ALLOWED);
}
// Ensure we have a parsed request body already. We rely on express/restify middleware to parse
// request body and azure functions, which does it for us before invoking our code. Warn the user
// to update their code and return an error.
if (!z.record(z.unknown()).safeParse(req.body).success) {
return end(
StatusCodes.BAD_REQUEST,
'`req.body` not an object, make sure you are using middleware to parse incoming requests.',
);
}
const activity = validateAndFixActivity(ActivityT.parse(req.body));
if (!activity.type) {
console.warn('BadRequest: Missing activity or activity type.');
return end(StatusCodes.BAD_REQUEST);
}
const authHeader = z.string().parse(req.headers.Authorization ?? req.headers.authorization ?? '');
try {
const invokeResponse = await this.processActivity(authHeader, activity, logic);
return end(invokeResponse?.status ?? StatusCodes.OK, invokeResponse?.body);
} catch (err) {
console.error(err);
return end(
err instanceof AuthenticationError ? StatusCodes.UNAUTHORIZED : StatusCodes.INTERNAL_SERVER_ERROR,
err.message ?? err,
);
}
}
/**
* Asynchronously process an activity running the provided logic function.
*
* @param authorization The authorization header in the format: "Bearer [longString]" or the AuthenticateRequestResult for this turn.
* @param activity The activity to process.
* @param logic The logic function to apply.
* @returns a promise representing the asynchronous operation.
*/
async processActivityDirect(
authorization: string | AuthenticateRequestResult,
activity: Activity,
logic: (context: TurnContext) => Promise<void>,
): Promise<void> {
try {
await this.processActivity(authorization as any, activity, logic);
} catch (err) {
throw new Error(`CloudAdapter.processActivityDirect(): ERROR\n ${err.stack}`);
}
}
/**
* Used to connect the adapter to a named pipe.
*
* @param pipeName Pipe name to connect to (note: yields two named pipe servers by appending ".incoming" and ".outgoing" to this name)
* @param logic The logic function to call for resulting bot turns.
* @param appId The Bot application ID
* @param audience The audience to use for outbound communication. The will vary by cloud environment.
* @param callerId Optional, the caller ID
* @param retryCount Optional, the number of times to retry a failed connection (defaults to 7)
*/
async connectNamedPipe(
pipeName: string,
logic: (context: TurnContext) => Promise<void>,
appId: string,
audience: string,
callerId?: string,
retryCount = 7,
): Promise<void> {
z.object({
pipeName: z.string(),
logic: LogicT,
appId: z.string(),
audience: z.string(),
callerId: z.string().optional(),
}).parse({ pipeName, logic, appId, audience, callerId });
// The named pipe is local and so there is no network authentication to perform: so we can create the result here.
const authenticateRequestResult: AuthenticateRequestResult = {
audience,
callerId,
claimsIdentity: appId ? this.createClaimsIdentity(appId) : new ClaimsIdentity([]),
};
// Creat request handler
const requestHandler = new StreamingRequestHandler(
authenticateRequestResult,
(authenticateRequestResult, activity) => this.processActivity(authenticateRequestResult, activity, logic),
);
// Create server
const server = new NamedPipeServer(pipeName, requestHandler);
// Attach server to request handler for outbound requests
requestHandler.server = server;
// Spin it up
await retry(() => server.start(), retryCount);
}
private async connect(
req: Request,
socket: INodeSocket,
head: INodeBuffer,
logic: (context: TurnContext) => Promise<void>,
): Promise<void> {
// Grab the auth header from the inbound http request
const authHeader = z.string().parse(req.headers.Authorization ?? req.headers.authorization ?? '');
// Grab the channelId which should be in the http headers
const channelIdHeader = z.string().optional().parse(req.headers.channelid);
// Authenticate inbound request
const authenticateRequestResult = await this.botFrameworkAuthentication.authenticateStreamingRequest(
authHeader,
channelIdHeader,
);
// Creat request handler
const requestHandler = new StreamingRequestHandler(
authenticateRequestResult,
(authenticateRequestResult, activity) => this.processActivity(authenticateRequestResult, activity, logic),
);
// Create server
const server = new WebSocketServer(
await new NodeWebSocketFactory().createWebSocket(req, socket, head),
requestHandler,
);
// Attach server to request handler
requestHandler.server = server;
// Spin it up
await server.start();
}
}
/**
* @internal
*/
class StreamingRequestHandler extends RequestHandler {
server?: IStreamingTransportServer;
// Note: `processActivity` lambda is to work around the fact that CloudAdapterBase#processActivity
// is protected, and we can't get around that by defining classes inside of other classes
constructor(
private readonly authenticateRequestResult: AuthenticateRequestResult,
private readonly processActivity: (
authenticateRequestResult: AuthenticateRequestResult,
activity: Activity,
) => Promise<InvokeResponse | undefined>,
) {
super();
// Attach streaming connector factory to authenticateRequestResult so it's used for outbound calls
this.authenticateRequestResult.connectorFactory = new StreamingConnectorFactory(this);
}
async processRequest(request: IReceiveRequest): Promise<StreamingResponse> {
const response = new StreamingResponse();
const end = (statusCode: StatusCodes, body?: unknown): StreamingResponse => {
response.statusCode = statusCode;
if (body) {
response.setBody(body);
}
return response;
};
if (!request) {
return end(StatusCodes.BAD_REQUEST, 'No request provided.');
}
if (!request.verb || !request.path) {
return end(
StatusCodes.BAD_REQUEST,
`Request missing verb and/or path. Verb: ${request.verb}, Path: ${request.path}`,
);
}
if (request.verb.toUpperCase() !== POST && request.verb.toUpperCase() !== GET) {
return end(
StatusCodes.METHOD_NOT_ALLOWED,
`Invalid verb received. Only GET and POST are accepted. Verb: ${request.verb}`,
);
}
if (request.path.toLowerCase() === VERSION_PATH) {
if (request.verb.toUpperCase() === GET) {
return end(StatusCodes.OK, { UserAgent: USER_AGENT });
} else {
return end(
StatusCodes.METHOD_NOT_ALLOWED,
`Invalid verb received for path: ${request.path}. Only GET is accepted. Verb: ${request.verb}`,
);
}
}
const [activityStream, ...attachmentStreams] = request.streams;
let activity: Activity;
try {
activity = validateAndFixActivity(ActivityT.parse(await activityStream.readAsJson()));
activity.attachments = await Promise.all(
attachmentStreams.map(async (attachmentStream) => {
const contentType = attachmentStream.contentType;
const content =
contentType === 'application/json'
? await attachmentStream.readAsJson()
: await attachmentStream.readAsString();
return { contentType, content };
}),
);
} catch (err) {
return end(StatusCodes.BAD_REQUEST, `Request body missing or malformed: ${err}`);
}
try {
const invokeResponse = await this.processActivity(this.authenticateRequestResult, activity);
return end(invokeResponse?.status ?? StatusCodes.OK, invokeResponse?.body);
} catch (err) {
return end(StatusCodes.INTERNAL_SERVER_ERROR, err.message ?? err);
}
}
}
/**
* @internal
*/
class StreamingConnectorFactory implements ConnectorFactory {
private serviceUrl?: string;
constructor(private readonly requestHandler: StreamingRequestHandler) {}
async create(serviceUrl: string, _audience: string): Promise<ConnectorClient> {
this.serviceUrl ??= serviceUrl;
if (serviceUrl !== this.serviceUrl) {
throw new Error(
'This is a streaming scenario, all connectors from this factory must all be for the same url.',
);
}
const httpClient = new StreamingHttpClient(this.requestHandler);
return new ConnectorClient(MicrosoftAppCredentials.Empty, { httpClient });
}
}
/**
* @internal
*/
class StreamingHttpClient implements HttpClient {
constructor(private readonly requestHandler: StreamingRequestHandler) {}
async sendRequest(httpRequest: WebResource): Promise<HttpOperationResponse> {
const streamingRequest = this.createStreamingRequest(httpRequest);
const receiveResponse = await this.requestHandler.server?.send(streamingRequest);
return this.createHttpResponse(receiveResponse, httpRequest);
}
private createStreamingRequest(httpRequest: WebResource): StreamingRequest {
const verb = httpRequest.method.toString();
const path = httpRequest.url.slice(httpRequest.url.indexOf('/v3'));
const request = StreamingRequest.create(verb, path);
request.setBody(httpRequest.body);
return request;
}
private async createHttpResponse(
receiveResponse: IReceiveResponse,
httpRequest: WebResource,
): Promise<HttpOperationResponse> {
const [bodyAsText] =
(await Promise.all(receiveResponse.streams?.map((stream) => stream.readAsString()) ?? [])) ?? [];
return {
bodyAsText,
headers: new HttpHeaders(),
request: httpRequest,
status: receiveResponse.statusCode,
};
}
}