Skip to content
This repository was archived by the owner on Mar 11, 2026. It is now read-only.

Commit 485347f

Browse files
fix(deps): remove dependency on through2 (#1023)
* fix(deps): remove dependency on through2 * nothru! Co-authored-by: Nicole Zhu <[email protected]>
1 parent 3808656 commit 485347f

3 files changed

Lines changed: 56 additions & 39 deletions

File tree

package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@
6161
"on-finished": "^2.3.0",
6262
"pumpify": "^2.0.1",
6363
"snakecase-keys": "^3.1.2",
64-
"stream-events": "^1.0.5",
65-
"through2": "^4.0.0"
64+
"stream-events": "^1.0.5"
6665
},
6766
"devDependencies": {
6867
"@google-cloud/bigquery": "^5.0.0",
@@ -77,7 +76,6 @@
7776
"@types/proxyquire": "^1.3.28",
7877
"@types/pumpify": "^1.4.1",
7978
"@types/sinon": "^9.0.0",
80-
"@types/through2": "^2.0.34",
8179
"@types/tmp": "^0.2.0",
8280
"@types/uuid": "^8.0.0",
8381
"bignumber.js": "^9.0.0",

src/index.ts

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import {ClientReadableStream, ClientDuplexStream} from '@grpc/grpc-js';
2727
// eslint-disable-next-line @typescript-eslint/no-var-requires
2828
const pumpify = require('pumpify');
2929
import * as streamEvents from 'stream-events';
30-
import * as through from 'through2';
3130
import * as middleware from './middleware';
3231
import {detectServiceContext} from './metadata';
3332
import {StackdriverHttpRequest as HttpRequest} from './http-request';
@@ -51,7 +50,7 @@ import {
5150
SeverityNames,
5251
} from './log';
5352
import {Sink} from './sink';
54-
import {Duplex} from 'stream';
53+
import {Duplex, PassThrough, Transform} from 'stream';
5554
import {google} from '../protos/protos';
5655

5756
import {Bucket} from '@google-cloud/storage'; // types only
@@ -635,8 +634,11 @@ class Logging {
635634
(requestStream as AbortableDuplex).abort();
636635
}
637636
};
638-
const toEntryStream = through.obj((entry, _, next) => {
639-
next(null, Entry.fromApiResponse_(entry));
637+
const toEntryStream = new Transform({
638+
objectMode: true,
639+
transform: (chunk, encoding, callback) => {
640+
callback(null, Entry.fromApiResponse_(chunk));
641+
},
640642
});
641643
userStream.once('reading', () => {
642644
this.auth.getProjectId().then(projectId => {
@@ -672,7 +674,9 @@ class Logging {
672674
);
673675

674676
let gaxStream: ClientReadableStream<LogEntry>;
675-
requestStream = streamEvents<Duplex>(through.obj());
677+
requestStream = streamEvents<Duplex>(
678+
new PassThrough({objectMode: true})
679+
);
676680
(requestStream as AbortableDuplex).abort = () => {
677681
if (gaxStream && gaxStream.cancel) {
678682
gaxStream.cancel();
@@ -767,21 +771,24 @@ class Logging {
767771
}
768772
};
769773

770-
const transformStream = through.obj((data, _, next) => {
771-
next(
772-
null,
773-
(() => {
774-
const formattedEntries: Entry[] = [];
775-
data.entries.forEach((entry: google.logging.v2.LogEntry) => {
776-
formattedEntries.push(Entry.fromApiResponse_(entry));
777-
});
778-
const resp: TailEntriesResponse = {
779-
entries: formattedEntries,
780-
suppressionInfo: data.suppressionInfo,
781-
};
782-
return resp;
783-
})()
784-
);
774+
const transformStream = new Transform({
775+
objectMode: true,
776+
transform: (chunk, encoding, callback) => {
777+
callback(
778+
null,
779+
(() => {
780+
const formattedEntries: Entry[] = [];
781+
chunk.entries.forEach((entry: google.logging.v2.LogEntry) => {
782+
formattedEntries.push(Entry.fromApiResponse_(entry));
783+
});
784+
const resp: TailEntriesResponse = {
785+
entries: formattedEntries,
786+
suppressionInfo: chunk.suppressionInfo,
787+
};
788+
return resp;
789+
})()
790+
);
791+
},
785792
});
786793

787794
this.auth.getProjectId().then(projectId => {
@@ -957,8 +964,11 @@ class Logging {
957964
(requestStream as AbortableDuplex).abort();
958965
}
959966
};
960-
const toLogStream = through.obj((logName, _, next) => {
961-
next(null, this.log(logName));
967+
const toLogStream = new Transform({
968+
objectMode: true,
969+
transform: (chunk, encoding, callback) => {
970+
callback(null, this.log(chunk));
971+
},
962972
});
963973
userStream.once('reading', () => {
964974
this.auth.getProjectId().then(projectId => {
@@ -975,7 +985,9 @@ class Logging {
975985
);
976986

977987
let gaxStream: ClientReadableStream<Log>;
978-
requestStream = streamEvents<Duplex>(through.obj());
988+
requestStream = streamEvents<Duplex>(
989+
new PassThrough({objectMode: true})
990+
);
979991
(requestStream as AbortableDuplex).abort = () => {
980992
if (gaxStream && gaxStream.cancel) {
981993
gaxStream.cancel();
@@ -1129,10 +1141,13 @@ class Logging {
11291141
(requestStream as AbortableDuplex).abort();
11301142
}
11311143
};
1132-
const toSinkStream = through.obj((sink, _, next) => {
1133-
const sinkInstance = self.sink(sink.name);
1134-
sinkInstance.metadata = sink;
1135-
next(null, sinkInstance);
1144+
const toSinkStream = new Transform({
1145+
objectMode: true,
1146+
transform: (chunk, encoding, callback) => {
1147+
const sinkInstance = self.sink(chunk.name);
1148+
sinkInstance.metadata = chunk;
1149+
callback(null, sinkInstance);
1150+
},
11361151
});
11371152
userStream.once('reading', () => {
11381153
this.auth.getProjectId().then(projectId => {
@@ -1149,7 +1164,9 @@ class Logging {
11491164
);
11501165

11511166
let gaxStream: ClientReadableStream<LogSink>;
1152-
requestStream = streamEvents<Duplex>(through.obj());
1167+
requestStream = streamEvents<Duplex>(
1168+
new PassThrough({objectMode: true})
1169+
);
11531170
(requestStream as AbortableDuplex).abort = () => {
11541171
if (gaxStream && gaxStream.cancel) {
11551172
gaxStream.cancel();
@@ -1240,7 +1257,7 @@ class Logging {
12401257
let gaxStream: ClientReadableStream<LogSink | LogEntry>;
12411258
let stream: Duplex;
12421259
if (isStreamMode) {
1243-
stream = streamEvents<Duplex>(through.obj());
1260+
stream = streamEvents<Duplex>(new PassThrough({objectMode: true}));
12441261
(stream as AbortableDuplex).abort = () => {
12451262
if (gaxStream && gaxStream.cancel) {
12461263
gaxStream.cancel();
@@ -1289,7 +1306,7 @@ class Logging {
12891306
function makeRequestStream() {
12901307
// eslint-disable-next-line @typescript-eslint/no-explicit-any
12911308
if ((global as any).GCLOUD_SANDBOX_ENV) {
1292-
return through.obj();
1309+
return new PassThrough({objectMode: true});
12931310
}
12941311
prepareGaxRequest((err, requestFn) => {
12951312
if (err) {

test/index.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import * as assert from 'assert';
1919
import {describe, it, beforeEach, before} from 'mocha';
2020
import * as extend from 'extend';
2121
import * as proxyquire from 'proxyquire';
22-
import * as through from 'through2';
2322
import {
2423
Logging as LOGGING,
2524
LoggingOptions,
@@ -29,7 +28,7 @@ import {
2928
GetSinksRequest,
3029
Sink,
3130
} from '../src/index';
32-
import {Duplex} from 'stream';
31+
import {Duplex, PassThrough} from 'stream';
3332
import {Policy} from '@google-cloud/pubsub';
3433
import {GetEntriesRequest} from '../src/log';
3534
import {Dataset} from '@google-cloud/bigquery';
@@ -44,6 +43,9 @@ interface AbortableDuplex extends Duplex {
4443
abort: Function;
4544
}
4645

46+
const through = () =>
47+
(new PassThrough({objectMode: true}) as {}) as AbortableDuplex;
48+
4749
const noop = () => {};
4850
let extended = false;
4951
const fakePaginator = {
@@ -660,7 +662,7 @@ describe('Logging', () => {
660662
const RESULT = {};
661663

662664
beforeEach(() => {
663-
GAX_STREAM = (through.obj() as {}) as AbortableDuplex;
665+
GAX_STREAM = through();
664666
GAX_STREAM.push(RESULT);
665667
logging.loggingService.listLogEntriesStream = () => GAX_STREAM;
666668
logging.auth.getProjectId = async () => PROJECT_ID;
@@ -926,7 +928,7 @@ describe('Logging', () => {
926928
const RESPONSE = ['log1'];
927929

928930
beforeEach(() => {
929-
GAX_STREAM = (through.obj() as {}) as AbortableDuplex;
931+
GAX_STREAM = through();
930932
GAX_STREAM.push(RESPONSE[0]);
931933
logging.loggingService.listLogsStream = () => GAX_STREAM;
932934
(logging.auth.getProjectId as Function) = async () => {};
@@ -1108,7 +1110,7 @@ describe('Logging', () => {
11081110
};
11091111

11101112
beforeEach(() => {
1111-
GAX_STREAM = (through.obj() as {}) as AbortableDuplex;
1113+
GAX_STREAM = through();
11121114
GAX_STREAM.push(RESULT);
11131115
logging.configService.listSinksStream = () => GAX_STREAM;
11141116
(logging.auth.getProjectId as Function) = async () => {};
@@ -1381,7 +1383,7 @@ describe('Logging', () => {
13811383
let GAX_STREAM: AbortableDuplex;
13821384

13831385
beforeEach(() => {
1384-
GAX_STREAM = (through() as {}) as AbortableDuplex;
1386+
GAX_STREAM = through();
13851387
// eslint-disable-next-line @typescript-eslint/no-explicit-any
13861388
(logging.api as any)[CONFIG.client][CONFIG.method] = {
13871389
bind() {

0 commit comments

Comments
 (0)