Skip to content
This repository was archived by the owner on Nov 18, 2025. It is now read-only.

Commit 944c06b

Browse files
fix: make async iteration work for gRPC-fallback; refactor the code (#765)
1 parent 72114db commit 944c06b

7 files changed

Lines changed: 53 additions & 90 deletions

File tree

samples/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"license": "Apache-2.0",
44
"author": "Google LLC",
55
"engines": {
6-
"node": ">=8"
6+
"node": ">=10"
77
},
88
"repository": "googleapis/gax-nodejs",
99
"private": true,

samples/pagination.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ async function main() {
6767
// Call it!
6868
const [resources] = await wrappedFunction({request: 'empty'});
6969
console.log(resources);
70+
71+
// Alternatively, call it using async iterators!
72+
const iterable = pageDescriptor.asyncIterate(wrappedFunction, {
73+
request: 'empty',
74+
});
75+
const asyncResources = [];
76+
for await (const resource of iterable) {
77+
asyncResources.push(resource);
78+
}
79+
console.log(asyncResources);
7080
}
7181

7282
main().catch(console.error);

src/apitypes.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export type NextPageRequestType = {
4242
} | null;
4343
export type RawResponseType = Operation | {} | null;
4444
export type ResultTuple = [
45-
ResponseType,
45+
ResponseType | [ResponseType],
4646
NextPageRequestType | undefined,
4747
RawResponseType | undefined
4848
];

src/paginationCalls/pageDescriptor.ts

Lines changed: 22 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,12 @@ import * as ended from 'is-stream-ended';
1818
import {PassThrough, Transform} from 'stream';
1919

2020
import {APICaller} from '../apiCaller';
21-
import {
22-
GaxCall,
23-
APICallback,
24-
SimpleCallbackFunction,
25-
RequestType,
26-
} from '../apitypes';
21+
import {GaxCall, APICallback, RequestType, ResultTuple} from '../apitypes';
2722
import {Descriptor} from '../descriptor';
2823
import {CallSettings} from '../gax';
2924
import {NormalApiCaller} from '../normalCalls/normalApiCaller';
3025

3126
import {PagedApiCaller} from './pagedApiCaller';
32-
import * as call from '../call';
3327

3428
export interface ResponseType {
3529
[index: string]: string;
@@ -38,12 +32,10 @@ export interface ResponseType {
3832
* A descriptor for methods that support pagination.
3933
*/
4034
export class PageDescriptor implements Descriptor {
41-
resolveParams: Function;
4235
requestPageTokenField: string;
4336
responsePageTokenField: string;
4437
requestPageSizeField?: string;
4538
resourceField: string;
46-
cache: Array<{}>;
4739

4840
constructor(
4941
requestPageTokenField: string,
@@ -53,8 +45,6 @@ export class PageDescriptor implements Descriptor {
5345
this.requestPageTokenField = requestPageTokenField;
5446
this.responsePageTokenField = responsePageTokenField;
5547
this.resourceField = resourceField;
56-
this.resolveParams = () => {};
57-
this.cache = [];
5848
}
5949

6050
/**
@@ -122,93 +112,49 @@ export class PageDescriptor implements Descriptor {
122112
asyncIterate(
123113
apiCall: GaxCall,
124114
request: RequestType,
125-
options: CallSettings
115+
options?: CallSettings
126116
): AsyncIterable<{} | undefined> {
127-
const iterable = this.createIterator();
128-
const funcPromise =
129-
typeof apiCall === 'function' ? Promise.resolve(apiCall) : apiCall;
130-
funcPromise
131-
.then((func: GaxCall) => {
132-
this.makeCall(request, func, options);
133-
})
134-
.catch(error => {
135-
throw new Error(error);
136-
});
117+
options = Object.assign({}, options, {autoPaginate: false});
118+
const iterable = this.createIterator(apiCall, request, options);
137119
return iterable;
138120
}
139121

140-
createIterator(): AsyncIterable<{} | undefined> {
141-
// eslint-disable-next-line @typescript-eslint/no-this-alias
142-
const self = this;
122+
createIterator(
123+
apiCall: GaxCall,
124+
request: RequestType,
125+
options: CallSettings
126+
): AsyncIterable<{} | undefined> {
143127
const asyncIterable = {
144128
[Symbol.asyncIterator]() {
145-
const paramPromise: Promise<[
146-
RequestType,
147-
SimpleCallbackFunction
148-
]> = new Promise(resolve => {
149-
self.resolveParams = resolve;
150-
});
151-
let nextPageRequest: RequestType | null = {};
152-
let firstCall = true;
129+
let nextPageRequest: RequestType | null | undefined = request;
130+
const cache: {}[] = [];
153131
return {
154132
async next() {
155-
const ongoingCall = new call.OngoingCallPromise();
156-
const [request, func] = await paramPromise;
157-
if (self.cache.length > 0) {
133+
if (cache.length > 0) {
158134
return Promise.resolve({
159135
done: false,
160-
value: self.cache.shift(),
136+
value: cache.shift(),
161137
});
162138
}
163-
if (!firstCall && !nextPageRequest) {
164-
return Promise.resolve({done: true, value: undefined});
139+
if (nextPageRequest) {
140+
let result: {} | [ResponseType] | null;
141+
[result, nextPageRequest] = (await apiCall(
142+
nextPageRequest!,
143+
options
144+
)) as ResultTuple;
145+
cache.push(...(result as ResponseType[]));
165146
}
166-
nextPageRequest = await self.getNextPageRequest(
167-
func,
168-
firstCall ? request : nextPageRequest!,
169-
ongoingCall
170-
);
171-
firstCall = false;
172-
if (self.cache.length === 0) {
173-
nextPageRequest = null;
147+
if (cache.length === 0) {
174148
return Promise.resolve({done: true, value: undefined});
175149
}
176-
return Promise.resolve({done: false, value: self.cache.shift()});
150+
return Promise.resolve({done: false, value: cache.shift()});
177151
},
178152
};
179153
},
180154
};
181155
return asyncIterable;
182156
}
183157

184-
async getNextPageRequest(
185-
func: SimpleCallbackFunction,
186-
request: RequestType,
187-
ongoingCall: call.OngoingCallPromise
188-
): Promise<RequestType | null> {
189-
ongoingCall.call(func, request);
190-
let nextPageRequest = null;
191-
const [response] = await ongoingCall.promise;
192-
const pageToken = (response as ResponseType)[this.responsePageTokenField];
193-
if (pageToken) {
194-
nextPageRequest = Object.assign({}, request);
195-
nextPageRequest[this.requestPageTokenField] = pageToken;
196-
}
197-
const responses = (response as ResponseType)[this.resourceField];
198-
this.cache.push(...responses);
199-
return nextPageRequest;
200-
}
201-
202-
makeCall(request: RequestType, func: GaxCall, settings: CallSettings) {
203-
if (settings.pageToken) {
204-
request[this.requestPageTokenField] = settings.pageToken;
205-
}
206-
if (settings.pageSize) {
207-
request[this.requestPageSizeField!] = settings.pageSize;
208-
}
209-
this.resolveParams([request, func]);
210-
}
211-
212158
getApiCaller(settings: CallSettings): APICaller {
213159
if (!settings.autoPaginate) {
214160
return new NormalApiCaller();

test/fixtures/google-gax-packaging-test-app/src/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ async function testShowcase() {
8282
await testEcho(fallbackClient);
8383
await testPagedExpand(fallbackClient);
8484
await testWait(fallbackClient);
85+
await testPagedExpandAsync(fallbackClient);
8586
}
8687

8788
async function testEcho(client) {

test/fixtures/google-gax-packaging-test-app/src/v1beta1/echo_client.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,23 +196,23 @@ class EchoClient {
196196
'wait',
197197
'pagedExpand',
198198
];
199-
this._innerCallPromises = {};
200199
for (const methodName of echoStubMethods) {
201-
this._innerCallPromises[methodName] = echoStub.then(
200+
const callPromise = echoStub.then(
202201
stub => (...args) => {
203202
return stub[methodName].apply(stub, args);
204203
},
205204
err => () => {
206205
throw err;
207206
}
208207
);
209-
this._innerApiCalls[methodName] = gaxModule.createApiCall(
210-
this._innerCallPromises[methodName],
208+
const apiCall = gaxModule.createApiCall(
209+
callPromise,
211210
defaults[methodName],
212211
this._descriptors.page[methodName] ||
213212
this._descriptors.stream[methodName] ||
214213
this._descriptors.longrunning[methodName]
215214
);
215+
this._innerApiCalls[methodName] = apiCall;
216216
}
217217
}
218218

@@ -590,7 +590,7 @@ class EchoClient {
590590
request = request || {};
591591
const callSettings = new gax.CallSettings(options);
592592
return this._descriptors.page.pagedExpand.asyncIterate(
593-
this._innerCallPromises['pagedExpand'],
593+
this._innerApiCalls.pagedExpand,
594594
request,
595595
callSettings
596596
);

test/unit/pagedIteration.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import * as sinon from 'sinon';
2222
import * as streamEvents from 'stream-events';
2323
import * as through2 from 'through2';
2424
import {PageDescriptor} from '../../src/paginationCalls/pageDescriptor';
25-
import {APICallback, GaxCallPromise} from '../../src/apitypes';
25+
import {APICallback, GaxCall} from '../../src/apitypes';
2626
import {describe, it, beforeEach} from 'mocha';
2727

2828
import * as util from './utils';
@@ -199,7 +199,7 @@ describe('paged iteration', () => {
199199

200200
describe('use async iterator', () => {
201201
const spy = sinon.spy(func);
202-
let apiCall: GaxCallPromise;
202+
let apiCall: GaxCall;
203203
beforeEach(() => {
204204
apiCall = util.createApiCall(spy, createOptions);
205205
});
@@ -210,22 +210,28 @@ describe('paged iteration', () => {
210210
for await (const resource of iterable) {
211211
counter++;
212212
resources.push(resource);
213-
if (counter === 10) break;
213+
if (counter === 10) {
214+
break;
215+
}
214216
}
215-
expect(counter).to.equal(10);
217+
return resources;
216218
}
217-
it('returns an iterable, count to 10', () => {
219+
220+
it('returns an iterable, count to 10', async () => {
218221
const settings = new gax.CallSettings(
219222
(createOptions && createOptions.settings) || {}
220223
);
221-
iterableChecker(descriptor.asyncIterate(apiCall, {}, settings));
224+
const resources = await iterableChecker(
225+
descriptor.asyncIterate(apiCall, {}, settings)
226+
);
227+
expect(resources.length).to.equal(10);
222228
});
223229
});
224230

225231
describe('stream conversion', () => {
226232
// eslint-disable-next-line @typescript-eslint/no-explicit-any
227233
let spy: any;
228-
let apiCall: GaxCallPromise;
234+
let apiCall: GaxCall;
229235
beforeEach(() => {
230236
spy = sinon.spy(func);
231237
apiCall = util.createApiCall(spy, createOptions);

0 commit comments

Comments
 (0)