Skip to content

Commit 79bfd46

Browse files
committed
fix node and dom builder streams
1 parent 2b7a63c commit 79bfd46

File tree

8 files changed

+148
-98
lines changed

8 files changed

+148
-98
lines changed

js/.vscode/launch.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
},
3939
"args": [
4040
// "-i",
41-
// "test/unit/",
42-
"test/unit/builders/",
41+
"test/unit/",
42+
// "test/unit/builders/",
4343

4444
// "test/unit/builders/date-tests.ts",
4545
// "test/unit/builders/primitive-tests.ts",

js/src/io/node/builder.ts

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,31 +37,34 @@ class BuilderDuplex<T extends DataType = any, TNull = any> extends Duplex {
3737
}
3838
_final(cb?: CB) {
3939
const builder = this._builder;
40-
if (builder) { builder.finish(); }
40+
if (builder) { flush(builder.finish(), this); }
4141
cb && cb();
4242
}
4343
_write(x: any, _: string, cb: CB) {
4444
const builder = this._builder;
45-
if (builder) { builder.write(x); }
45+
if (builder) { flush(builder.write(x), this); }
4646
cb && cb();
4747
return true;
4848
}
4949
_read(size: number) {
5050
const builder = this._builder;
51-
if (!builder) { return; }
52-
if (size === null || builder.length >= size) {
53-
this.push(builder.flush());
54-
}
55-
if (builder.finished) {
56-
if (builder.length > 0) {
57-
this.push(builder.flush());
58-
}
59-
this.push(null);
60-
}
51+
if (builder) { flush(builder, this, size); }
6152
}
6253
_destroy(_err: Error | null, cb: (error: Error | null) => void) {
6354
const builder = this._builder;
6455
if (builder) { builder.reset(); }
6556
cb(this._builder = null);
6657
}
6758
}
59+
60+
function flush<T extends DataType = any, TNull = any>(builder: Builder<T, TNull>, sink: BuilderDuplex<T, TNull>, size = sink.readableHighWaterMark) {
61+
if (size === null || builder.length >= size) {
62+
sink.push(builder.flush());
63+
}
64+
if (builder.finished) {
65+
if (builder.length > 0) {
66+
sink.push(builder.flush());
67+
}
68+
sink.push(null);
69+
}
70+
}

js/src/io/whatwg/builder.ts

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,31 +26,40 @@ export function builderThroughDOMStream<T extends DataType = any, TNull = any>(
2626
) {
2727

2828
const builder = Builder.new<T, TNull>(writableStrategy);
29+
let controller_: ReadableStreamDefaultController<Data<T>> | null = null;
2930

30-
const readable = new ReadableStream<Data<T>>({
31-
cancel() { builder.reset(); },
32-
pull(controller) {
33-
const size = controller.desiredSize;
34-
if (size === null || builder.length >= size) {
35-
controller.enqueue(builder.flush());
36-
}
37-
if (builder.finished) {
38-
if (builder.length > 0) {
39-
controller.enqueue(builder.flush());
40-
}
41-
controller.close();
42-
}
43-
},
44-
}, readableStrategy);
31+
// Access these properties by string indexers to defeat closure compiler
32+
const { ['highWaterMark']: highWaterMark, ['size']: size } = writableStrategy;
4533

4634
return {
47-
readable,
35+
readable: new ReadableStream<Data<T>>({
36+
['cancel']() { builder.reset(); },
37+
['start'](c) { flush(builder, controller_ = c); },
38+
['pull'](c) { flush(builder, controller_ = c); },
39+
}, readableStrategy),
4840
writable: new WritableStream({
49-
abort() { builder.reset(); },
50-
close() { builder.finish(); },
51-
write(value: T['TValue'] | TNull) {
52-
builder.write(value);
41+
['abort']() { builder.reset(); },
42+
['close']() { flush(builder.finish(), controller_); },
43+
['write'](value: T['TValue'] | TNull) {
44+
flush(builder.write(value), controller_);
5345
},
54-
}, writableStrategy)
46+
}, { 'highWaterMark': highWaterMark, 'size': size })
5547
};
48+
49+
function flush(builder: Builder<T, TNull>, controller: ReadableStreamDefaultController<Data<T>> | null) {
50+
if (controller === null) { return; }
51+
const size = controller.desiredSize;
52+
if (size === null || builder.length >= size) {
53+
controller_ = null;
54+
controller.enqueue(builder.flush());
55+
}
56+
if (builder.finished) {
57+
controller_ = null;
58+
if (builder.length > 0) {
59+
controller.enqueue(builder.flush());
60+
} else {
61+
controller.close();
62+
}
63+
}
64+
}
5665
}

js/test/unit/builders/date-tests.ts

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,35 @@ import { Vector, DateDay, DateMillisecond } from '../../Arrow';
2020
import {
2121
encodeAll,
2222
encodeEach,
23+
encodeEachDOM,
24+
encodeEachNode,
2325
date32sNoNulls,
2426
date64sNoNulls,
2527
date32sWithNulls,
2628
date64sWithNulls
2729
} from './utils';
2830

31+
const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true';
32+
const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true';
33+
2934
describe('DateDayBuilder', () => {
3035

3136
runTestsWithEncoder('encodeAll', encodeAll(() => new DateDay()));
32-
runTestsWithEncoder('encodeEach chunkLength: 5', encodeEach(() => new DateDay(), 5));
33-
runTestsWithEncoder('encodeEach chunkLength: 25', encodeEach(() => new DateDay(), 25));
34-
runTestsWithEncoder('encodeEach chunkLength: undefined', encodeEach(() => new DateDay()));
35-
36-
function runTestsWithEncoder(name: string, encode: (vals: (Date | null)[], nullVals?: any[]) => Vector<DateDay>) {
37+
runTestsWithEncoder('encodeEach: 5', encodeEach(() => new DateDay(), 5));
38+
runTestsWithEncoder('encodeEach: 25', encodeEach(() => new DateDay(), 25));
39+
runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new DateDay()));
40+
testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new DateDay(), 25));
41+
testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new DateDay(), 25));
42+
43+
function runTestsWithEncoder(name: string, encode: (vals: (Date | null)[], nullVals?: any[]) => Promise<Vector<DateDay>>) {
3744
describe(`${encode.name} ${name}`, () => {
38-
it(`encodes dates no nulls`, () => {
45+
it(`encodes dates no nulls`, async () => {
3946
const vals = date32sNoNulls(20);
40-
validateVector(vals, encode(vals, []), []);
47+
validateVector(vals, await encode(vals, []), []);
4148
});
42-
it(`encodes dates with nulls`, () => {
49+
it(`encodes dates with nulls`, async () => {
4350
const vals = date32sWithNulls(20);
44-
validateVector(vals, encode(vals, [null]), [null]);
51+
validateVector(vals, await encode(vals, [null]), [null]);
4552
});
4653
});
4754
}
@@ -53,16 +60,18 @@ describe('DateMillisecondBuilder', () => {
5360
runTestsWithEncoder('encodeEach: 5', encodeEach(() => new DateMillisecond(), 5));
5461
runTestsWithEncoder('encodeEach: 25', encodeEach(() => new DateMillisecond(), 25));
5562
runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new DateMillisecond()));
63+
testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new DateMillisecond(), 25));
64+
testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new DateMillisecond(), 25));
5665

57-
function runTestsWithEncoder(name: string, encode: (vals: (Date | null)[], nullVals?: any[]) => Vector<DateMillisecond>) {
66+
function runTestsWithEncoder(name: string, encode: (vals: (Date | null)[], nullVals?: any[]) => Promise<Vector<DateMillisecond>>) {
5867
describe(`${encode.name} ${name}`, () => {
59-
it(`encodes dates no nulls`, () => {
68+
it(`encodes dates no nulls`, async () => {
6069
const vals = date64sNoNulls(20);
61-
validateVector(vals, encode(vals, []), []);
70+
validateVector(vals, await encode(vals, []), []);
6271
});
63-
it(`encodes dates with nulls`, () => {
72+
it(`encodes dates with nulls`, async () => {
6473
const vals = date64sWithNulls(20);
65-
validateVector(vals, encode(vals, [null]), [null]);
74+
validateVector(vals, await encode(vals, [null]), [null]);
6675
});
6776
});
6877
}
@@ -92,8 +101,8 @@ describe('DateMillisecondBuilder', () => {
92101
"2019-03-21T07:25:34.864Z",
93102
null
94103
].map((x) => x === null ? x : new Date(x));
95-
it(`encodes dates with nulls`, () => {
104+
it(`encodes dates with nulls`, async () => {
96105
const vals = dates.slice();
97-
validateVector(vals, encode(vals, [null]), [null]);
106+
validateVector(vals, await encode(vals, [null]), [null]);
98107
});
99108
});

js/test/unit/builders/dictionary-tests.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,46 @@ import { Dictionary, Utf8, Int32, Vector } from '../../Arrow';
2020
import {
2121
encodeAll,
2222
encodeEach,
23+
encodeEachDOM,
24+
encodeEachNode,
2325
duplicateItems,
2426
stringsNoNulls,
2527
stringsWithNAs,
2628
stringsWithNulls,
2729
stringsWithEmpties
2830
} from './utils';
2931

32+
const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true';
33+
const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true';
34+
3035
describe('DictionaryBuilder', () => {
3136
describe('<Utf8, Int32>', () => {
3237
runTestsWithEncoder('encodeAll', encodeAll(() => new Dictionary(new Utf8(), new Int32())));
3338
runTestsWithEncoder('encodeEach: 5', encodeEach(() => new Dictionary(new Utf8(), new Int32()), 5));
3439
runTestsWithEncoder('encodeEach: 25', encodeEach(() => new Dictionary(new Utf8(), new Int32()), 25));
3540
runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new Dictionary(new Utf8(), new Int32()), void 0));
41+
testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new Dictionary(new Utf8(), new Int32()), 25));
42+
testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new Dictionary(new Utf8(), new Int32()), 25));
3643
});
3744
});
3845

39-
function runTestsWithEncoder(name: string, encode: (vals: (string | null)[], nullVals?: any[]) => Vector<Dictionary<Utf8, Int32>>) {
46+
function runTestsWithEncoder(name: string, encode: (vals: (string | null)[], nullVals?: any[]) => Promise<Vector<Dictionary<Utf8, Int32>>>) {
4047
describe(`${encode.name} ${name}`, () => {
41-
it(`dictionary-encodes strings no nulls`, () => {
48+
it(`dictionary-encodes strings no nulls`, async () => {
4249
const vals = duplicateItems(20, stringsNoNulls(10));
43-
validateVector(vals, encode(vals, []), []);
50+
validateVector(vals, await encode(vals, []), []);
4451
});
45-
it(`dictionary-encodes strings with nulls`, () => {
52+
it(`dictionary-encodes strings with nulls`, async () => {
4653
const vals = duplicateItems(20, stringsWithNulls(10));
47-
validateVector(vals, encode(vals, [null]), [null]);
54+
validateVector(vals, await encode(vals, [null]), [null]);
4855
});
49-
it(`dictionary-encodes strings using n/a as the null value rep`, () => {
56+
it(`dictionary-encodes strings using n/a as the null value rep`, async () => {
5057
const vals = duplicateItems(20, stringsWithNAs(10));
51-
validateVector(vals, encode(vals, ['n/a']), ['n/a']);
58+
validateVector(vals, await encode(vals, ['n/a']), ['n/a']);
5259
});
53-
it(`dictionary-encodes strings using \\0 as the null value rep`, () => {
60+
it(`dictionary-encodes strings using \\0 as the null value rep`, async () => {
5461
const vals = duplicateItems(20, stringsWithEmpties(10));
55-
validateVector(vals, encode(vals, ['\0']), ['\0']);
62+
validateVector(vals, await encode(vals, ['\0']), ['\0']);
5663
});
5764
});
5865
}

js/test/unit/builders/primitive-tests.ts

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import {
2222
import { util } from '../../Arrow';
2323

2424
import {
25-
validateVector, encodeAll, encodeEach,
25+
validateVector,
26+
encodeAll, encodeEach, encodeEachDOM, encodeEachNode,
2627
boolsNoNulls, boolsWithNulls,
2728
int8sNoNulls, int8sWithNulls, int8sWithMaxInts,
2829
int16sNoNulls, int16sWithNulls, int16sWithMaxInts,
@@ -37,22 +38,27 @@ import {
3738
float64sNoNulls, float64sWithNulls, float32sWithNaNs,
3839
} from './utils';
3940

41+
const testDOMStreams = process.env.TEST_DOM_STREAMS === 'true';
42+
const testNodeStreams = process.env.TEST_NODE_STREAMS === 'true';
43+
4044
describe('BoolBuilder', () => {
4145

4246
runTestsWithEncoder('encodeAll: 5', encodeAll(() => new Bool()));
4347
runTestsWithEncoder('encodeEach: 5', encodeEach(() => new Bool(), 5));
4448
runTestsWithEncoder('encodeEach: 25', encodeEach(() => new Bool(), 25));
4549
runTestsWithEncoder('encodeEach: undefined', encodeEach(() => new Bool()));
50+
testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(() => new Bool(), 25));
51+
testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(() => new Bool(), 25));
4652

47-
function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Vector<T>) {
53+
function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Promise<Vector<T>>) {
4854
describe(`${encode.name} ${name}`, () => {
49-
it(`encodes bools no nulls`, () => {
55+
it(`encodes bools no nulls`, async () => {
5056
const vals = boolsNoNulls(20);
51-
validateVector(vals, encode(vals, []), []);
57+
validateVector(vals, await encode(vals, []), []);
5258
});
53-
it(`encodes bools with nulls`, () => {
59+
it(`encodes bools with nulls`, async () => {
5460
const vals = boolsWithNulls(20);
55-
validateVector(vals, encode(vals, [null]), [null]);
61+
validateVector(vals, await encode(vals, [null]), [null]);
5662
});
5763
});
5864
}
@@ -85,18 +91,20 @@ type PrimitiveTypeOpts<T extends DataType> = [
8591
runTestsWithEncoder('encodeEach: 5', encodeEach(typeFactory, 5));
8692
runTestsWithEncoder('encodeEach: 25', encodeEach(typeFactory, 25));
8793
runTestsWithEncoder('encodeEach: undefined', encodeEach(typeFactory));
88-
89-
function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Vector<T>) {
94+
testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(typeFactory, 25));
95+
testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(typeFactory, 25));
96+
97+
function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Promise<Vector<T>>) {
9098
describe(`${encode.name} ${name}`, () => {
91-
it(`encodes ${valueName} no nulls`, () => {
99+
it(`encodes ${valueName} no nulls`, async () => {
92100
const vals = noNulls(20);
93-
validateVector(vals, encode(vals, []), []);
101+
validateVector(vals, await encode(vals, []), []);
94102
});
95-
it(`encodes ${valueName} with nulls`, () => {
103+
it(`encodes ${valueName} with nulls`, async () => {
96104
const vals = withNulls(20);
97-
validateVector(vals, encode(vals, [null]), [null]);
105+
validateVector(vals, await encode(vals, [null]), [null]);
98106
});
99-
it(`encodes ${valueName} with MAX_INT`, () => {
107+
it(`encodes ${valueName} with MAX_INT`, async () => {
100108
const vals = withNaNs(20);
101109
const nullVals0: any[] = [0x7fffffff];
102110
const nullVals1: any[] = [0x7fffffff];
@@ -106,7 +114,7 @@ type PrimitiveTypeOpts<T extends DataType> = [
106114
nullVals0[0] = new Uint32Array([0x7fffffff, 0x7fffffff]);
107115
nullVals1[0] = (util.BN.new(nullVals0[0]) as any)[Symbol.toPrimitive]('default');
108116
}
109-
validateVector(vals, encode(vals, nullVals0), nullVals1);
117+
validateVector(vals, await encode(vals, nullVals0), nullVals1);
110118
});
111119
});
112120
}
@@ -128,20 +136,22 @@ type PrimitiveTypeOpts<T extends DataType> = [
128136
runTestsWithEncoder('encodeEach: 5', encodeEach(typeFactory, 5));
129137
runTestsWithEncoder('encodeEach: 25', encodeEach(typeFactory, 25));
130138
runTestsWithEncoder('encodeEach: undefined', encodeEach(typeFactory));
139+
testDOMStreams && runTestsWithEncoder('encodeEachDOM: 25', encodeEachDOM(typeFactory, 25));
140+
testNodeStreams && runTestsWithEncoder('encodeEachNode: 25', encodeEachNode(typeFactory, 25));
131141

132-
function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Vector<T>) {
142+
function runTestsWithEncoder<T extends DataType>(name: string, encode: (vals: (T['TValue'] | null)[], nullVals?: any[]) => Promise<Vector<T>>) {
133143
describe(`${encode.name} ${name}`, () => {
134-
it(`encodes ${valueName} no nulls`, () => {
144+
it(`encodes ${valueName} no nulls`, async () => {
135145
const vals = noNulls(20);
136-
validateVector(vals, encode(vals, []), []);
146+
validateVector(vals, await encode(vals, []), []);
137147
});
138-
it(`encodes ${valueName} with nulls`, () => {
148+
it(`encodes ${valueName} with nulls`, async () => {
139149
const vals = withNulls(20);
140-
validateVector(vals, encode(vals, [null]), [null]);
150+
validateVector(vals, await encode(vals, [null]), [null]);
141151
});
142-
it(`encodes ${valueName} with NaNs`, () => {
152+
it(`encodes ${valueName} with NaNs`, async () => {
143153
const vals = withNaNs(20);
144-
validateVector(vals, encode(vals, [NaN]), [NaN]);
154+
validateVector(vals, await encode(vals, [NaN]), [NaN]);
145155
});
146156
});
147157
}

0 commit comments

Comments
 (0)