Skip to content
This repository was archived by the owner on Nov 18, 2025. It is now read-only.

Commit 3ac5afb

Browse files
feat: support async iterator for paging method (#708)
* async iteratoe logic & unit test * run all unit tests * add end-to-end test for async iterator * add end-to-end test for page stream * fix lint * system-test * clean up * system-test * fix * fix test * fix unit tests * fix end-to-end test * feedback * clean * change name to asyncIterate * lint * system-test * proper type for iterable * clean up * feedback * test * put common code in a function * lint * clean up * debugging * make it work * feedback * remove extra params * fix * make it work first * resolve request & func * clean up * polish test * clean * add timeout for end-to-end test * expand timeout for testExpand * make test work * expand timeout * fix * test * test will work * feedback * lint * fix: no third party promises anymore Co-authored-by: Alexander Fenster <[email protected]>
1 parent 511fc23 commit 3ac5afb

5 files changed

Lines changed: 216 additions & 12 deletions

File tree

src/paginationCalls/pageDescriptor.ts

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,32 @@ import * as ended from 'is-stream-ended';
1818
import {PassThrough, Transform} from 'stream';
1919

2020
import {APICaller} from '../apiCaller';
21-
import {GaxCall, APICallback} from '../apitypes';
21+
import {
22+
GaxCall,
23+
APICallback,
24+
SimpleCallbackFunction,
25+
RequestType,
26+
} from '../apitypes';
2227
import {Descriptor} from '../descriptor';
2328
import {CallSettings} from '../gax';
2429
import {NormalApiCaller} from '../normalCalls/normalApiCaller';
2530

2631
import {PagedApiCaller} from './pagedApiCaller';
32+
import * as call from '../call';
2733

34+
export interface ResponseType {
35+
[index: string]: string;
36+
}
2837
/**
2938
* A descriptor for methods that support pagination.
3039
*/
3140
export class PageDescriptor implements Descriptor {
41+
resolveParams: Function;
3242
requestPageTokenField: string;
3343
responsePageTokenField: string;
3444
requestPageSizeField?: string;
3545
resourceField: string;
46+
cache: Array<{}>;
3647

3748
constructor(
3849
requestPageTokenField: string,
@@ -42,6 +53,8 @@ export class PageDescriptor implements Descriptor {
4253
this.requestPageTokenField = requestPageTokenField;
4354
this.responsePageTokenField = responsePageTokenField;
4455
this.resourceField = resourceField;
56+
this.resolveParams = () => {};
57+
this.cache = [];
4558
}
4659

4760
/**
@@ -103,6 +116,95 @@ export class PageDescriptor implements Descriptor {
103116
return stream;
104117
}
105118

119+
/**
120+
* Create an async iterable which can be recursively called for data on-demand.
121+
*/
122+
asyncIterate(
123+
apiCall: GaxCall,
124+
request: RequestType,
125+
options: CallSettings
126+
): AsyncIterable<{} | undefined> {
127+
const iterable = this.createIterator(options);
128+
const funcPromise =
129+
typeof apiCall === 'function' ? Promise.resolve(apiCall) : apiCall;
130+
funcPromise
131+
.then((func: GaxCall) => {
132+
this.makeCall(request, func, options);
133+
})
134+
.catch(error => {
135+
throw new Error(error);
136+
});
137+
return iterable;
138+
}
139+
140+
createIterator(options: CallSettings): AsyncIterable<{} | undefined> {
141+
const self = this;
142+
const asyncIterable = {
143+
[Symbol.asyncIterator]() {
144+
const paramPromise: Promise<[
145+
RequestType,
146+
SimpleCallbackFunction
147+
]> = new Promise(resolve => {
148+
self.resolveParams = resolve;
149+
});
150+
let nextPageRequest: RequestType | null = {};
151+
let firstCall = true;
152+
return {
153+
async next() {
154+
const ongoingCall = new call.OngoingCallPromise();
155+
const [request, func] = await paramPromise;
156+
if (self.cache.length > 0) {
157+
return Promise.resolve({done: false, value: self.cache.shift()});
158+
}
159+
if (!firstCall && !nextPageRequest) {
160+
return Promise.resolve({done: true, value: undefined});
161+
}
162+
nextPageRequest = await self.getNextPageRequest(
163+
func,
164+
firstCall ? request : nextPageRequest!,
165+
ongoingCall
166+
);
167+
firstCall = false;
168+
if (self.cache.length === 0) {
169+
nextPageRequest = null;
170+
return Promise.resolve({done: true, value: undefined});
171+
}
172+
return Promise.resolve({done: false, value: self.cache.shift()});
173+
},
174+
};
175+
},
176+
};
177+
return asyncIterable;
178+
}
179+
180+
async getNextPageRequest(
181+
func: SimpleCallbackFunction,
182+
request: RequestType,
183+
ongoingCall: call.OngoingCallPromise
184+
): Promise<RequestType | null> {
185+
ongoingCall.call(func, request);
186+
let nextPageRequest = null;
187+
const [response, nextRequest, rawResponse] = await ongoingCall.promise;
188+
const pageToken = (response as ResponseType)[this.responsePageTokenField];
189+
if (pageToken) {
190+
nextPageRequest = Object.assign({}, request);
191+
nextPageRequest[this.requestPageTokenField] = pageToken;
192+
}
193+
const responses = (response as ResponseType)[this.resourceField];
194+
this.cache.push(...responses);
195+
return nextPageRequest;
196+
}
197+
198+
makeCall(request: RequestType, func: GaxCall, settings: CallSettings) {
199+
if (settings.pageToken) {
200+
request[this.requestPageTokenField] = settings.pageToken;
201+
}
202+
if (settings.pageSize) {
203+
request[this.requestPageSizeField!] = settings.pageSize;
204+
}
205+
this.resolveParams([request, func]);
206+
}
207+
106208
getApiCaller(settings: CallSettings): APICaller {
107209
if (!settings.autoPaginate) {
108210
return new NormalApiCaller();

test/fixtures/google-gax-packaging-test-app/src/index.js

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ async function testShowcase() {
5353
return {
5454
getRequestHeaders: () => {
5555
return {
56-
'Authorization': 'Bearer zzzz'
56+
Authorization: 'Bearer zzzz',
5757
};
58-
}
58+
},
5959
};
60-
}
60+
},
6161
};
6262

6363
const fallbackClientOpts = {
@@ -75,6 +75,7 @@ async function testShowcase() {
7575
await testEcho(grpcClient);
7676
await testExpand(grpcClient);
7777
await testPagedExpand(grpcClient);
78+
await testPagedExpandAsync(grpcClient);
7879
await testCollect(grpcClient);
7980
await testChat(grpcClient);
8081
await testWait(grpcClient);
@@ -85,24 +86,34 @@ async function testShowcase() {
8586

8687
// Fallback clients do not currently support streaming
8788
try {
88-
await testExpand(fallbackClient)
89-
throw new Error("Expand did not throw an error: Streaming calls should fail with fallback clients")
89+
await testExpand(fallbackClient);
90+
throw new Error(
91+
'Expand did not throw an error: Streaming calls should fail with fallback clients'
92+
);
9093
} catch (err) {}
9194
try {
92-
await testCollect(fallbackClient)
93-
throw new Error("Collect did not throw an error: Streaming calls should fail with fallback clients")
95+
await testCollect(fallbackClient);
96+
throw new Error(
97+
'Collect did not throw an error: Streaming calls should fail with fallback clients'
98+
);
9499
} catch (err) {}
95100
try {
96-
await testChat(fallbackClient)
97-
throw new Error("Chat did not throw an error: Streaming calls should fail with fallback clients")
101+
await testChat(fallbackClient);
102+
throw new Error(
103+
'Chat did not throw an error: Streaming calls should fail with fallback clients'
104+
);
98105
} catch (err) {}
99106
}
100107

101108
async function testEcho(client) {
102109
const request = {
103110
content: 'test',
104111
};
112+
const timer = setTimeout(() => {
113+
throw new Error('End-to-end testEcho method fails with timeout');
114+
}, 12000);
105115
const [response] = await client.echo(request);
116+
clearTimeout(timer);
106117
assert.deepStrictEqual(request.content, response.content);
107118
}
108119

@@ -131,11 +142,35 @@ async function testPagedExpand(client) {
131142
content: words.join(' '),
132143
pageSize: 2,
133144
};
145+
const timer = setTimeout(() => {
146+
throw new Error('End-to-end testPagedExpand method fails with timeout');
147+
}, 12000);
134148
const [response] = await client.pagedExpand(request);
149+
clearTimeout(timer);
135150
const result = response.map(r => r.content);
136151
assert.deepStrictEqual(words, result);
137152
}
138153

154+
async function testPagedExpandAsync(client) {
155+
const words = ['nobody', 'ever', 'reads', 'test', 'input'];
156+
const request = {
157+
content: words.join(' '),
158+
pageSize: 2,
159+
};
160+
const response = [];
161+
const iterable = client.pagedExpandAsync(request);
162+
const timer = setTimeout(() => {
163+
throw new Error(
164+
'End-to-end testPagedExpandAsync method fails with timeout'
165+
);
166+
}, 12000);
167+
for await (const resource of iterable) {
168+
response.push(resource.content);
169+
}
170+
clearTimeout(timer);
171+
assert.deepStrictEqual(words, response);
172+
}
173+
139174
async function testCollect(client) {
140175
const words = ['nobody', 'ever', 'reads', 'test', 'input'];
141176
const result = await new Promise((resolve, reject) => {

test/fixtures/google-gax-packaging-test-app/src/v1beta1/echo_client.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,9 @@ class EchoClient {
195195
'wait',
196196
'pagedExpand',
197197
];
198+
this._innerCallPromises = {};
198199
for (const methodName of echoStubMethods) {
199-
const innerCallPromise = echoStub.then(
200+
this._innerCallPromises[methodName] = echoStub.then(
200201
stub => (...args) => {
201202
return stub[methodName].apply(stub, args);
202203
},
@@ -205,7 +206,7 @@ class EchoClient {
205206
}
206207
);
207208
this._innerApiCalls[methodName] = gaxModule.createApiCall(
208-
innerCallPromise,
209+
this._innerCallPromises[methodName],
209210
defaults[methodName],
210211
this._descriptors.page[methodName] ||
211212
this._descriptors.stream[methodName] ||
@@ -582,6 +583,13 @@ class EchoClient {
582583

583584
return this._innerApiCalls.pagedExpand(request, options, callback);
584585
}
586+
587+
pagedExpandAsync(request, options) {
588+
options = options || {};
589+
request = request || {};
590+
const callSettings = new gax.CallSettings(options);
591+
return this._descriptors.page.pagedExpand.asyncIterate(this._innerCallPromises['pagedExpand'], request, callSettings);
592+
}
585593
}
586594

587595
module.exports = EchoClient;

test/fixtures/google-gax-packaging-test-app/test/gapic-v1beta1.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,41 @@ describe('EchoClient', () => {
177177
});
178178
});
179179

180+
it('invokes pagedExpand using async iterator', async () => {
181+
const client = new showcaseModule.v1beta1.EchoClient({
182+
credentials: {client_email: 'bogus', private_key: 'bogus'},
183+
projectId: 'bogus',
184+
});
185+
186+
// Mock request
187+
const request = {};
188+
const expectedResponse = [1, 2, 3, 4, 5, 6, 7, 8, 9];
189+
190+
client._descriptors.page.pagedExpand.asyncIterate = (apiCall, request, options) => {
191+
let count = 0;
192+
const asyncIterable = {
193+
[Symbol.asyncIterator]() {
194+
return {
195+
async next(){
196+
count = count + 1;
197+
if(count === 10) return Promise.resolve({done: true, value: undefined});
198+
return Promise.resolve({done: false, value: count});
199+
}
200+
}
201+
}
202+
}
203+
return asyncIterable;
204+
}
205+
206+
// test paging method by async iterator
207+
const response = [];
208+
const iterable = client.pagedExpandAsync(request);
209+
for await (const resource of iterable){
210+
response.push(resource);
211+
}
212+
assert.deepStrictEqual(response, expectedResponse);
213+
});
214+
180215
it('invokes pagedExpand with error', done => {
181216
const client = new showcaseModule.v1beta1.EchoClient({
182217
credentials: {client_email: 'bogus', private_key: 'bogus'},

test/unit/pagedIteration.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {APICallback, GaxCallPromise} from '../../src/apitypes';
2424

2525
import * as util from './utils';
2626
import {Stream} from 'stream';
27+
import * as gax from '../../src/gax';
2728

2829
describe('paged iteration', () => {
2930
const pageSize = 3;
@@ -194,6 +195,29 @@ describe('paged iteration', () => {
194195
});
195196
});
196197

198+
describe('use async iterator', () => {
199+
const spy = sinon.spy(func);
200+
let apiCall: GaxCallPromise;
201+
beforeEach(() => {
202+
apiCall = util.createApiCall(spy, createOptions);
203+
});
204+
205+
async function iterableChecker(iterable: AsyncIterable<{} | undefined>) {
206+
let counter = 0;
207+
for await (const resource of iterable) {
208+
counter++;
209+
if (counter === 10) break;
210+
}
211+
expect(counter).to.equal(10);
212+
}
213+
it('returns an iterable, count to 10', () => {
214+
const settings = new gax.CallSettings(
215+
(createOptions && createOptions.settings) || {}
216+
);
217+
iterableChecker(descriptor.asyncIterate(apiCall, {}, settings));
218+
});
219+
});
220+
197221
describe('stream conversion', () => {
198222
// tslint:disable-next-line no-any
199223
let spy: any;

0 commit comments

Comments
 (0)