Skip to content
This repository was archived by the owner on Feb 7, 2026. It is now read-only.

Commit 0ffe544

Browse files
authored
feat: add table.createInsertStream for native streaming inserts (#997)
* feat: add createInsertStream draft and tests * refactor to remove InsertQueue * restore default maxDelayMillis value * restore default maxDelayMillis value * moved row encoding to add, updated headers * header * add tests * adding tests * refactor stream and add tests * update header dates * add system tests * remove comment * update pending type * add getOptionsDefaults to rowQueue
1 parent ba86889 commit 0ffe544

9 files changed

Lines changed: 971 additions & 0 deletions

File tree

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"extend": "^3.0.2",
5858
"is": "^3.3.0",
5959
"p-event": "^4.1.0",
60+
"readable-stream": "^3.6.0",
6061
"stream-events": "^1.0.5",
6162
"uuid": "^8.0.0"
6263
},

src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ export {
100100

101101
export {Routine} from './routine';
102102

103+
export {RowBatch} from './rowBatch';
104+
105+
export {InsertRowsStreamResponse, RowQueue} from './rowQueue';
106+
103107
export {
104108
CopyTableMetadata,
105109
CreateCopyJobMetadata,
@@ -110,6 +114,7 @@ export {
110114
InsertRowsCallback,
111115
InsertRowsOptions,
112116
InsertRowsResponse,
117+
InsertStreamOptions,
113118
JobLoadMetadata,
114119
PartialInsertFailure,
115120
PermissionsCallback,

src/rowBatch.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*!
2+
* Copyright 2022 Google LLC. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import {InsertRowsCallback} from './rowQueue';
18+
import {RowBatchOptions, RowMetadata} from './table';
19+
export interface BatchLimits {
20+
maxBytes: number;
21+
maxRows: number;
22+
}
23+
24+
export const BATCH_LIMITS: BatchLimits = {
25+
maxBytes: 9 * 1024 * 1024,
26+
maxRows: 50000,
27+
};
28+
export interface InsertOptions {
29+
maxBytes?: number;
30+
maxRows?: number;
31+
maxMilliseconds?: number;
32+
}
33+
34+
/**
35+
* Call used to help batch rows.
36+
*
37+
* @private
38+
*
39+
* @param {BatchInsertOptions} options The batching options.
40+
*/
41+
export class RowBatch {
42+
batchOptions: RowBatchOptions;
43+
rows: RowMetadata[];
44+
callbacks: InsertRowsCallback[];
45+
created: number;
46+
bytes: number;
47+
constructor(options: RowBatchOptions) {
48+
this.batchOptions = options;
49+
this.rows = [];
50+
this.callbacks = [];
51+
this.created = Date.now();
52+
this.bytes = 0;
53+
}
54+
/**
55+
* Adds a row to the current batch.
56+
*
57+
* @param {object} row The row to insert.
58+
* @param {InsertRowsCallback} callback The callback function.
59+
*/
60+
add(row: RowMetadata, callback?: InsertRowsCallback): void {
61+
this.rows.push(row);
62+
this.callbacks.push(callback!);
63+
this.bytes += Buffer.byteLength(JSON.stringify(row));
64+
}
65+
/**
66+
* Indicates if a given row can fit in the batch.
67+
*
68+
* @param {object} row The row in question.
69+
* @returns {boolean}
70+
*/
71+
canFit(row: RowMetadata): boolean {
72+
const {maxRows, maxBytes} = this.batchOptions;
73+
74+
return (
75+
this.rows.length < maxRows! &&
76+
this.bytes + Buffer.byteLength(JSON.stringify(row)) <= maxBytes
77+
);
78+
}
79+
/**
80+
* Checks to see if this batch is at the maximum allowed payload size.
81+
*
82+
* @returns {boolean}
83+
*/
84+
isAtMax(): boolean {
85+
const {maxRows, maxBytes} = BATCH_LIMITS;
86+
return this.rows.length >= maxRows! || this.bytes >= maxBytes;
87+
}
88+
/**
89+
* Indicates if the batch is at capacity.
90+
*
91+
* @returns {boolean}
92+
*/
93+
isFull(): boolean {
94+
const {maxRows, maxBytes} = this.batchOptions;
95+
return this.rows.length >= maxRows! || this.bytes >= maxBytes;
96+
}
97+
}

src/rowQueue.ts

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*!
2+
* Copyright 2022 Google LLC. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import * as common from '@google-cloud/common';
18+
import * as extend from 'extend';
19+
import * as uuid from 'uuid';
20+
import {RequestCallback, Table, InsertStreamOptions} from '.';
21+
import {GoogleErrorBody} from '@google-cloud/common/build/src/util';
22+
import bigquery from './types';
23+
import {BATCH_LIMITS, RowBatch} from './rowBatch';
24+
import {Stream} from 'stream';
25+
import {RowBatchOptions, InsertRowsOptions, RowMetadata} from './table';
26+
27+
export interface MaxInsertOptions {
28+
maxOutstandingRows: number;
29+
maxOutstandingBytes: number;
30+
maxDelayMillis: number;
31+
}
32+
33+
export const defaultOptions: MaxInsertOptions = {
34+
// The maximum number of rows we'll batch up for insert().
35+
maxOutstandingRows: 300,
36+
37+
// The maximum size of the total batched up rows for insert().
38+
maxOutstandingBytes: 9 * 1024 * 1024,
39+
40+
// The maximum time we'll wait to send batched rows, in milliseconds.
41+
maxDelayMillis: 10000,
42+
};
43+
44+
export type InsertRowsStreamResponse = bigquery.ITableDataInsertAllResponse;
45+
46+
export type InsertRowsCallback = RequestCallback<
47+
bigquery.ITableDataInsertAllResponse | bigquery.ITable
48+
>;
49+
export interface InsertRow {
50+
insertId?: string;
51+
json?: bigquery.IJsonObject;
52+
}
53+
54+
export type TableRow = bigquery.ITableRow;
55+
export interface PartialInsertFailure {
56+
message: string;
57+
reason: string;
58+
row: RowMetadata;
59+
}
60+
61+
/**
62+
* Standard row queue used for inserting rows.
63+
*
64+
*
65+
* @param {Table} table The table.
66+
* @param {Duplex} dup Row stream.
67+
* @param {InsertStreamOptions} options Insert and batch options.
68+
*/
69+
export class RowQueue {
70+
table: Table;
71+
stream: Stream;
72+
insertRowsOptions: InsertRowsOptions = {};
73+
batch: RowBatch;
74+
batchOptions?: RowBatchOptions;
75+
inFlight: boolean;
76+
pending?: ReturnType<typeof setTimeout>;
77+
constructor(table: Table, dup: Stream, options?: InsertStreamOptions) {
78+
this.table = table;
79+
this.stream = dup;
80+
this.inFlight = false;
81+
82+
const opts = typeof options === 'object' ? options : {};
83+
84+
if (opts.insertRowsOptions) {
85+
this.insertRowsOptions = opts.insertRowsOptions;
86+
} else {
87+
this.insertRowsOptions = {};
88+
}
89+
if (opts.batchOptions) {
90+
this.setOptions(opts.batchOptions);
91+
} else {
92+
this.setOptions();
93+
}
94+
95+
this.batch = new RowBatch(this.batchOptions!);
96+
}
97+
98+
/**
99+
* Adds a row to the queue.
100+
*
101+
* @param {RowMetadata} row The row to insert.
102+
* @param {InsertRowsCallback} callback The insert callback.
103+
*/
104+
add(row: RowMetadata, callback: InsertRowsCallback): void {
105+
if (!this.insertRowsOptions.raw) {
106+
row = {
107+
json: Table.encodeValue_(row)!,
108+
};
109+
110+
if (this.insertRowsOptions.createInsertId !== false) {
111+
row.insertId = uuid.v4();
112+
}
113+
}
114+
115+
if (!this.batch.canFit(row)) {
116+
this.insert();
117+
}
118+
this.batch.add(row, callback);
119+
120+
if (this.batch.isFull()) {
121+
this.insert();
122+
} else if (!this.pending) {
123+
const {maxMilliseconds} = this.batchOptions!;
124+
this.pending = setTimeout(() => {
125+
this.insert();
126+
}, maxMilliseconds);
127+
}
128+
}
129+
/**
130+
* Cancels any pending inserts and calls _insert immediately.
131+
*/
132+
insert(callback?: InsertRowsCallback): void {
133+
const {rows, callbacks} = this.batch;
134+
135+
this.batch = new RowBatch(this.batchOptions!);
136+
137+
if (this.pending) {
138+
clearTimeout(this.pending);
139+
delete this.pending;
140+
}
141+
if (rows.length > 0) {
142+
this._insert(rows, callbacks, callback);
143+
}
144+
}
145+
146+
/**
147+
* Accepts a batch of rows and inserts them into table.
148+
*
149+
* @param {object[]} rows The rows to insert.
150+
* @param {InsertCallback[]} callbacks The corresponding callback functions.
151+
* @param {function} [callback] Callback to be fired when insert is done.
152+
*/
153+
_insert(
154+
rows: RowMetadata | RowMetadata[],
155+
callbacks: InsertRowsCallback[],
156+
cb?: InsertRowsCallback
157+
): void {
158+
const json = extend(true, {}, this.insertRowsOptions, {rows});
159+
160+
delete json.createInsertId;
161+
delete json.partialRetries;
162+
delete json.raw;
163+
164+
this.table.request(
165+
{
166+
method: 'POST',
167+
uri: '/insertAll',
168+
json,
169+
},
170+
(err, resp) => {
171+
const partialFailures = (resp.insertErrors || []).map(
172+
(insertError: GoogleErrorBody) => {
173+
return {
174+
errors: insertError.errors!.map(error => {
175+
return {
176+
message: error.message,
177+
reason: error.reason,
178+
};
179+
}),
180+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
181+
row: rows[(insertError as any).index],
182+
};
183+
}
184+
);
185+
186+
if (partialFailures.length > 0) {
187+
err = new common.util.PartialFailureError({
188+
errors: partialFailures,
189+
response: resp,
190+
} as GoogleErrorBody);
191+
192+
callbacks.forEach(callback => callback!(err, resp));
193+
this.stream.emit('error', err);
194+
} else {
195+
callbacks.forEach(callback => callback!(err, resp));
196+
this.stream.emit('response', resp);
197+
cb!(err, resp);
198+
}
199+
cb!(err, resp);
200+
}
201+
);
202+
}
203+
204+
/**
205+
* Sets the batching options.
206+
*
207+
*
208+
* @param {RowBatchOptions} [options] The batching options.
209+
*/
210+
setOptions(options = {} as RowBatchOptions): void {
211+
const defaults = this.getOptionDefaults();
212+
213+
const {maxBytes, maxRows, maxMilliseconds} = extend(
214+
true,
215+
defaults,
216+
options
217+
);
218+
219+
this.batchOptions = {
220+
maxBytes: Math.min(maxBytes, BATCH_LIMITS.maxBytes),
221+
maxRows: Math.min(maxRows!, BATCH_LIMITS.maxRows),
222+
maxMilliseconds: maxMilliseconds,
223+
};
224+
}
225+
226+
getOptionDefaults(): RowBatchOptions {
227+
// Return a unique copy to avoid shenanigans.
228+
const defaults: RowBatchOptions = {
229+
maxBytes: defaultOptions.maxOutstandingBytes,
230+
maxRows: defaultOptions.maxOutstandingRows,
231+
maxMilliseconds: defaultOptions.maxDelayMillis,
232+
};
233+
return defaults;
234+
}
235+
}

0 commit comments

Comments
 (0)