@@ -27,7 +27,6 @@ import {ClientReadableStream, ClientDuplexStream} from '@grpc/grpc-js';
2727// eslint-disable-next-line @typescript-eslint/no-var-requires
2828const pumpify = require ( 'pumpify' ) ;
2929import * as streamEvents from 'stream-events' ;
30- import * as through from 'through2' ;
3130import * as middleware from './middleware' ;
3231import { detectServiceContext } from './metadata' ;
3332import { StackdriverHttpRequest as HttpRequest } from './http-request' ;
@@ -51,7 +50,7 @@ import {
5150 SeverityNames ,
5251} from './log' ;
5352import { Sink } from './sink' ;
54- import { Duplex } from 'stream' ;
53+ import { Duplex , PassThrough , Transform } from 'stream' ;
5554import { google } from '../protos/protos' ;
5655
5756import { Bucket } from '@google-cloud/storage' ; // types only
@@ -635,8 +634,11 @@ class Logging {
635634 ( requestStream as AbortableDuplex ) . abort ( ) ;
636635 }
637636 } ;
638- const toEntryStream = through . obj ( ( entry , _ , next ) => {
639- next ( null , Entry . fromApiResponse_ ( entry ) ) ;
637+ const toEntryStream = new Transform ( {
638+ objectMode : true ,
639+ transform : ( chunk , encoding , callback ) => {
640+ callback ( null , Entry . fromApiResponse_ ( chunk ) ) ;
641+ } ,
640642 } ) ;
641643 userStream . once ( 'reading' , ( ) => {
642644 this . auth . getProjectId ( ) . then ( projectId => {
@@ -672,7 +674,9 @@ class Logging {
672674 ) ;
673675
674676 let gaxStream : ClientReadableStream < LogEntry > ;
675- requestStream = streamEvents < Duplex > ( through . obj ( ) ) ;
677+ requestStream = streamEvents < Duplex > (
678+ new PassThrough ( { objectMode : true } )
679+ ) ;
676680 ( requestStream as AbortableDuplex ) . abort = ( ) => {
677681 if ( gaxStream && gaxStream . cancel ) {
678682 gaxStream . cancel ( ) ;
@@ -767,21 +771,24 @@ class Logging {
767771 }
768772 } ;
769773
770- const transformStream = through . obj ( ( data , _ , next ) => {
771- next (
772- null ,
773- ( ( ) => {
774- const formattedEntries : Entry [ ] = [ ] ;
775- data . entries . forEach ( ( entry : google . logging . v2 . LogEntry ) => {
776- formattedEntries . push ( Entry . fromApiResponse_ ( entry ) ) ;
777- } ) ;
778- const resp : TailEntriesResponse = {
779- entries : formattedEntries ,
780- suppressionInfo : data . suppressionInfo ,
781- } ;
782- return resp ;
783- } ) ( )
784- ) ;
774+ const transformStream = new Transform ( {
775+ objectMode : true ,
776+ transform : ( chunk , encoding , callback ) => {
777+ callback (
778+ null ,
779+ ( ( ) => {
780+ const formattedEntries : Entry [ ] = [ ] ;
781+ chunk . entries . forEach ( ( entry : google . logging . v2 . LogEntry ) => {
782+ formattedEntries . push ( Entry . fromApiResponse_ ( entry ) ) ;
783+ } ) ;
784+ const resp : TailEntriesResponse = {
785+ entries : formattedEntries ,
786+ suppressionInfo : chunk . suppressionInfo ,
787+ } ;
788+ return resp ;
789+ } ) ( )
790+ ) ;
791+ } ,
785792 } ) ;
786793
787794 this . auth . getProjectId ( ) . then ( projectId => {
@@ -957,8 +964,11 @@ class Logging {
957964 ( requestStream as AbortableDuplex ) . abort ( ) ;
958965 }
959966 } ;
960- const toLogStream = through . obj ( ( logName , _ , next ) => {
961- next ( null , this . log ( logName ) ) ;
967+ const toLogStream = new Transform ( {
968+ objectMode : true ,
969+ transform : ( chunk , encoding , callback ) => {
970+ callback ( null , this . log ( chunk ) ) ;
971+ } ,
962972 } ) ;
963973 userStream . once ( 'reading' , ( ) => {
964974 this . auth . getProjectId ( ) . then ( projectId => {
@@ -975,7 +985,9 @@ class Logging {
975985 ) ;
976986
977987 let gaxStream : ClientReadableStream < Log > ;
978- requestStream = streamEvents < Duplex > ( through . obj ( ) ) ;
988+ requestStream = streamEvents < Duplex > (
989+ new PassThrough ( { objectMode : true } )
990+ ) ;
979991 ( requestStream as AbortableDuplex ) . abort = ( ) => {
980992 if ( gaxStream && gaxStream . cancel ) {
981993 gaxStream . cancel ( ) ;
@@ -1129,10 +1141,13 @@ class Logging {
11291141 ( requestStream as AbortableDuplex ) . abort ( ) ;
11301142 }
11311143 } ;
1132- const toSinkStream = through . obj ( ( sink , _ , next ) => {
1133- const sinkInstance = self . sink ( sink . name ) ;
1134- sinkInstance . metadata = sink ;
1135- next ( null , sinkInstance ) ;
1144+ const toSinkStream = new Transform ( {
1145+ objectMode : true ,
1146+ transform : ( chunk , encoding , callback ) => {
1147+ const sinkInstance = self . sink ( chunk . name ) ;
1148+ sinkInstance . metadata = chunk ;
1149+ callback ( null , sinkInstance ) ;
1150+ } ,
11361151 } ) ;
11371152 userStream . once ( 'reading' , ( ) => {
11381153 this . auth . getProjectId ( ) . then ( projectId => {
@@ -1149,7 +1164,9 @@ class Logging {
11491164 ) ;
11501165
11511166 let gaxStream : ClientReadableStream < LogSink > ;
1152- requestStream = streamEvents < Duplex > ( through . obj ( ) ) ;
1167+ requestStream = streamEvents < Duplex > (
1168+ new PassThrough ( { objectMode : true } )
1169+ ) ;
11531170 ( requestStream as AbortableDuplex ) . abort = ( ) => {
11541171 if ( gaxStream && gaxStream . cancel ) {
11551172 gaxStream . cancel ( ) ;
@@ -1240,7 +1257,7 @@ class Logging {
12401257 let gaxStream : ClientReadableStream < LogSink | LogEntry > ;
12411258 let stream : Duplex ;
12421259 if ( isStreamMode ) {
1243- stream = streamEvents < Duplex > ( through . obj ( ) ) ;
1260+ stream = streamEvents < Duplex > ( new PassThrough ( { objectMode : true } ) ) ;
12441261 ( stream as AbortableDuplex ) . abort = ( ) => {
12451262 if ( gaxStream && gaxStream . cancel ) {
12461263 gaxStream . cancel ( ) ;
@@ -1289,7 +1306,7 @@ class Logging {
12891306 function makeRequestStream ( ) {
12901307 // eslint-disable-next-line @typescript-eslint/no-explicit-any
12911308 if ( ( global as any ) . GCLOUD_SANDBOX_ENV ) {
1292- return through . obj ( ) ;
1309+ return new PassThrough ( { objectMode : true } ) ;
12931310 }
12941311 prepareGaxRequest ( ( err , requestFn ) => {
12951312 if ( err ) {
0 commit comments