Describe the bug
When using stream-transform for processing large datasets and the parallel option is a value greater than 1, we're seeing high memory usage.
To Reproduce
const fs = require('fs')
const memwatch = require('@airbnb/node-memwatch')
const { pipeline } = require('stream/promises')
const { transform } = require('stream-transform')
let maxUsedHeap = 0
async function main() {
memwatch.on('stats', (stats) => {
maxUsedHeap = Math.max(maxUsedHeap, stats.used_heap_size)
})
await pipeline(
function* () {
let i = -1
const n = 9999999
while (++i < n) {
yield { i }
}
},
transform({ parallel: +process.env.PARALLEL }, (chunk, next) =>
next(null, chunk.i)
),
fs.createWriteStream('/tmp/output')
)
console.log(`${maxUsedHeap / (1000 * 1000)}mb`)
}
main()
// $ PARALLEL=1 node example.js
// 6.009856mb
// $ PARALLEL=2 node example.js
// 320.684144mb
Additional context
Describe the bug
When using
stream-transformfor processing large datasets and theparalleloption is a value greater than1, we're seeing high memory usage.To Reproduce
Additional context
this.pushwas returningfalseto indicate that the stream should pause reading, yet stream-transform asks for more input over here regardless.