Skip to content

Commit 83de2af

Browse files
lukasolsonlizozom
andauthored
[Search] Add cancel function to pollSearch (#85787) (#88414)
* Add cancel functionality to pollSearch Makes sure that DELETE requests are properly sent even if consumer unsubscribes. * Update poll_search.test.ts * cancel on abort * fix jest * ADded jest test * Cancel to be called once * Only cancel internally on abort * ts + addd defer * ts * make cancel code prettier * Cancel even after unsubscribe * Throw abort error if already aborted * Only delete once Co-authored-by: Lukas Olson <[email protected]> Co-authored-by: Liza Katz <[email protected]>
1 parent eba5539 commit 83de2af

File tree

6 files changed

+187
-32
lines changed

6 files changed

+187
-32
lines changed
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import { pollSearch } from './poll_search';
8+
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';
9+
10+
describe('pollSearch', () => {
11+
function getMockedSearch$(resolveOnI = 1, finishWithError = false) {
12+
let counter = 0;
13+
return jest.fn().mockImplementation(() => {
14+
counter++;
15+
const lastCall = counter === resolveOnI;
16+
return new Promise((resolve) => {
17+
if (lastCall) {
18+
resolve({
19+
isRunning: false,
20+
isPartial: finishWithError,
21+
});
22+
} else {
23+
resolve({
24+
isRunning: true,
25+
isPartial: true,
26+
});
27+
}
28+
});
29+
});
30+
}
31+
32+
test('Defers execution', async () => {
33+
const searchFn = getMockedSearch$(1);
34+
const cancelFn = jest.fn();
35+
pollSearch(searchFn, cancelFn);
36+
expect(searchFn).toBeCalledTimes(0);
37+
expect(cancelFn).toBeCalledTimes(0);
38+
});
39+
40+
test('Resolves immediatelly', async () => {
41+
const searchFn = getMockedSearch$(1);
42+
const cancelFn = jest.fn();
43+
await pollSearch(searchFn, cancelFn).toPromise();
44+
expect(searchFn).toBeCalledTimes(1);
45+
expect(cancelFn).toBeCalledTimes(0);
46+
});
47+
48+
test('Resolves when complete', async () => {
49+
const searchFn = getMockedSearch$(3);
50+
const cancelFn = jest.fn();
51+
await pollSearch(searchFn, cancelFn).toPromise();
52+
expect(searchFn).toBeCalledTimes(3);
53+
expect(cancelFn).toBeCalledTimes(0);
54+
});
55+
56+
test('Throws Error on ES error response', async () => {
57+
const searchFn = getMockedSearch$(2, true);
58+
const cancelFn = jest.fn();
59+
const poll = pollSearch(searchFn, cancelFn).toPromise();
60+
await expect(poll).rejects.toThrow(Error);
61+
expect(searchFn).toBeCalledTimes(2);
62+
expect(cancelFn).toBeCalledTimes(0);
63+
});
64+
65+
test('Throws AbortError on empty response', async () => {
66+
const searchFn = jest.fn().mockResolvedValue(undefined);
67+
const cancelFn = jest.fn();
68+
const poll = pollSearch(searchFn, cancelFn).toPromise();
69+
await expect(poll).rejects.toThrow(AbortError);
70+
expect(searchFn).toBeCalledTimes(1);
71+
expect(cancelFn).toBeCalledTimes(0);
72+
});
73+
74+
test('Throws AbortError and cancels on abort', async () => {
75+
const searchFn = getMockedSearch$(20);
76+
const cancelFn = jest.fn();
77+
const abortController = new AbortController();
78+
const poll = pollSearch(searchFn, cancelFn, {
79+
abortSignal: abortController.signal,
80+
}).toPromise();
81+
82+
await new Promise((resolve) => setTimeout(resolve, 500));
83+
abortController.abort();
84+
85+
await expect(poll).rejects.toThrow(AbortError);
86+
87+
await new Promise((resolve) => setTimeout(resolve, 1000));
88+
89+
expect(searchFn).toBeCalledTimes(1);
90+
expect(cancelFn).toBeCalledTimes(1);
91+
});
92+
93+
test("Stops, but doesn't cancel on unsubscribe", async () => {
94+
const searchFn = getMockedSearch$(20);
95+
const cancelFn = jest.fn();
96+
const subscription = pollSearch(searchFn, cancelFn).subscribe(() => {});
97+
98+
await new Promise((resolve) => setTimeout(resolve, 500));
99+
subscription.unsubscribe();
100+
await new Promise((resolve) => setTimeout(resolve, 1000));
101+
102+
expect(searchFn).toBeCalledTimes(1);
103+
expect(cancelFn).toBeCalledTimes(0);
104+
});
105+
106+
test('Calls cancel even when consumer unsubscribes', async () => {
107+
const searchFn = getMockedSearch$(20);
108+
const cancelFn = jest.fn();
109+
const abortController = new AbortController();
110+
const subscription = pollSearch(searchFn, cancelFn, {
111+
abortSignal: abortController.signal,
112+
}).subscribe(() => {});
113+
subscription.unsubscribe();
114+
abortController.abort();
115+
116+
expect(searchFn).toBeCalledTimes(1);
117+
expect(cancelFn).toBeCalledTimes(1);
118+
});
119+
});

x-pack/plugins/data_enhanced/common/search/poll_search.ts

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,42 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
import { from, NEVER, Observable, timer } from 'rxjs';
8-
import { expand, finalize, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
7+
import { from, Observable, timer, defer, fromEvent, EMPTY } from 'rxjs';
8+
import { expand, map, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
99
import type { IKibanaSearchResponse } from '../../../../../src/plugins/data/common';
1010
import { isErrorResponse, isPartialResponse } from '../../../../../src/plugins/data/common';
11-
import { AbortError, abortSignalToPromise } from '../../../../../src/plugins/kibana_utils/common';
11+
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';
1212
import type { IAsyncSearchOptions } from './types';
1313

1414
export const pollSearch = <Response extends IKibanaSearchResponse>(
1515
search: () => Promise<Response>,
16-
{ pollInterval = 1000, ...options }: IAsyncSearchOptions = {}
16+
cancel?: () => void,
17+
{ pollInterval = 1000, abortSignal }: IAsyncSearchOptions = {}
1718
): Observable<Response> => {
18-
const aborted = options?.abortSignal
19-
? abortSignalToPromise(options?.abortSignal)
20-
: { promise: NEVER, cleanup: () => {} };
19+
return defer(() => {
20+
if (abortSignal?.aborted) {
21+
throw new AbortError();
22+
}
2123

22-
return from(search()).pipe(
23-
expand(() => timer(pollInterval).pipe(switchMap(search))),
24-
tap((response) => {
25-
if (isErrorResponse(response)) throw new AbortError();
26-
}),
27-
takeWhile<Response>(isPartialResponse, true),
28-
takeUntil<Response>(from(aborted.promise)),
29-
finalize(aborted.cleanup)
30-
);
24+
if (cancel) {
25+
abortSignal?.addEventListener('abort', cancel, { once: true });
26+
}
27+
28+
const aborted$ = (abortSignal ? fromEvent(abortSignal, 'abort') : EMPTY).pipe(
29+
map(() => {
30+
throw new AbortError();
31+
})
32+
);
33+
34+
return from(search()).pipe(
35+
expand(() => timer(pollInterval).pipe(switchMap(search))),
36+
tap((response) => {
37+
if (isErrorResponse(response)) {
38+
throw response ? new Error('Received partial response') : new AbortError();
39+
}
40+
}),
41+
takeWhile<Response>(isPartialResponse, true),
42+
takeUntil<Response>(aborted$)
43+
);
44+
});
3145
};

x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ describe('EnhancedSearchInterceptor', () => {
192192
await timeTravel(10);
193193

194194
expect(error).toHaveBeenCalled();
195-
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);
195+
expect(error.mock.calls[0][0]).toBeInstanceOf(Error);
196196
});
197197

198198
test('should abort on user abort', async () => {
@@ -262,7 +262,7 @@ describe('EnhancedSearchInterceptor', () => {
262262
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);
263263

264264
expect(fetchMock).toHaveBeenCalledTimes(2);
265-
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
265+
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
266266
});
267267

268268
test('should not DELETE a running async search on async timeout prior to first response', async () => {
@@ -326,7 +326,7 @@ describe('EnhancedSearchInterceptor', () => {
326326
expect(error).toHaveBeenCalled();
327327
expect(error.mock.calls[0][0]).toBeInstanceOf(SearchTimeoutError);
328328
expect(fetchMock).toHaveBeenCalledTimes(2);
329-
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
329+
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
330330
});
331331

332332
test('should DELETE a running async search on async timeout on error from fetch', async () => {
@@ -343,8 +343,6 @@ describe('EnhancedSearchInterceptor', () => {
343343
time: 10,
344344
value: {
345345
error: 'oh no',
346-
isPartial: false,
347-
isRunning: false,
348346
id: 1,
349347
},
350348
isError: true,
@@ -368,7 +366,7 @@ describe('EnhancedSearchInterceptor', () => {
368366
expect(error).toHaveBeenCalled();
369367
expect(error.mock.calls[0][0]).toBe(responses[1].value);
370368
expect(fetchMock).toHaveBeenCalledTimes(2);
371-
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
369+
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
372370
});
373371

374372
test('should NOT DELETE a running SAVED async search on abort', async () => {

x-pack/plugins/data_enhanced/public/search/search_interceptor.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7+
import { once } from 'lodash';
78
import { throwError, Subscription } from 'rxjs';
89
import { tap, finalize, catchError, filter, take, skip } from 'rxjs/operators';
910
import {
@@ -14,7 +15,6 @@ import {
1415
IKibanaSearchRequest,
1516
SearchSessionState,
1617
} from '../../../../../src/plugins/data/public';
17-
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';
1818
import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common';
1919

2020
export class EnhancedSearchInterceptor extends SearchInterceptor {
@@ -88,10 +88,14 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
8888
isSavedToBackground = true;
8989
});
9090

91-
return pollSearch(search, { ...options, abortSignal: combinedSignal }).pipe(
91+
const cancel = once(() => {
92+
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
93+
});
94+
95+
return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe(
9296
tap((response) => (id = response.id)),
93-
catchError((e: AbortError) => {
94-
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
97+
catchError((e: Error) => {
98+
cancel();
9599
return throwError(this.handleSearchError(e, timeoutSignal, options));
96100
}),
97101
finalize(() => {

x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*/
66

77
import { tap } from 'rxjs/operators';
8-
import type { Logger } from 'kibana/server';
8+
import type { IScopedClusterClient, Logger } from 'kibana/server';
99
import type { ISearchStrategy } from '../../../../../src/plugins/data/server';
1010
import type {
1111
EqlSearchStrategyRequest,
@@ -21,10 +21,14 @@ import { EqlSearchResponse } from './types';
2121
export const eqlSearchStrategyProvider = (
2222
logger: Logger
2323
): ISearchStrategy<EqlSearchStrategyRequest, EqlSearchStrategyResponse> => {
24+
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
25+
await esClient.asCurrentUser.asyncSearch.delete({ id });
26+
}
27+
2428
return {
2529
cancel: async (id, options, { esClient }) => {
2630
logger.debug(`_eql/delete ${id}`);
27-
await esClient.asCurrentUser.eql.delete({ id });
31+
await cancelAsyncSearch(id, esClient);
2832
},
2933

3034
search: ({ id, ...request }, options: IAsyncSearchOptions, { esClient, uiSettingsClient }) => {
@@ -54,7 +58,13 @@ export const eqlSearchStrategyProvider = (
5458
return toEqlKibanaSearchResponse(response);
5559
};
5660

57-
return pollSearch(search, options).pipe(tap((response) => (id = response.id)));
61+
const cancel = async () => {
62+
if (id) {
63+
await cancelAsyncSearch(id, esClient);
64+
}
65+
};
66+
67+
return pollSearch(search, cancel, options).pipe(tap((response) => (id = response.id)));
5868
},
5969

6070
extend: async (id, keepAlive, options, { esClient }) => {

x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*/
66

77
import type { Observable } from 'rxjs';
8-
import type { Logger, SharedGlobalConfig } from 'kibana/server';
8+
import type { IScopedClusterClient, Logger, SharedGlobalConfig } from 'kibana/server';
99
import { first, tap } from 'rxjs/operators';
1010
import { SearchResponse } from 'elasticsearch';
1111
import { from } from 'rxjs';
@@ -40,6 +40,10 @@ export const enhancedEsSearchStrategyProvider = (
4040
logger: Logger,
4141
usage?: SearchUsage
4242
): ISearchStrategy<IEsSearchRequest> => {
43+
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
44+
await esClient.asCurrentUser.asyncSearch.delete({ id });
45+
}
46+
4347
function asyncSearch(
4448
{ id, ...request }: IEsSearchRequest,
4549
options: IAsyncSearchOptions,
@@ -58,7 +62,13 @@ export const enhancedEsSearchStrategyProvider = (
5862
return toAsyncKibanaSearchResponse(body);
5963
};
6064

61-
return pollSearch(search, options).pipe(
65+
const cancel = async () => {
66+
if (id) {
67+
await cancelAsyncSearch(id, esClient);
68+
}
69+
};
70+
71+
return pollSearch(search, cancel, options).pipe(
6272
tap((response) => (id = response.id)),
6373
tap(searchUsageObserver(logger, usage))
6474
);
@@ -109,7 +119,7 @@ export const enhancedEsSearchStrategyProvider = (
109119
},
110120
cancel: async (id, options, { esClient }) => {
111121
logger.debug(`cancel ${id}`);
112-
await esClient.asCurrentUser.asyncSearch.delete({ id });
122+
await cancelAsyncSearch(id, esClient);
113123
},
114124
extend: async (id, keepAlive, options, { esClient }) => {
115125
logger.debug(`extend ${id} by ${keepAlive}`);

0 commit comments

Comments
 (0)