Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ Returns: `void`
* **body** `string | Buffer | Uint8Array | stream.Readable | Iterable | AsyncIterable | null` (optional) - Default: `null`
* **headers** `UndiciHeaders` (optional) - Default: `null`
* **idempotent** `boolean` (optional) - Default: `true` if `method` is `'HEAD'` or `'GET'` - Whether the requests can be safely retried or not. If `false` the request won't be sent until all preceding requests in the pipeline has completed.
* **blocking** `boolean` (optional) - Default: `false` - Whether the response is expected to take a long time and would end up blocking the pipeline. When this is set to `true` further pipelining will be avoided on the same connection until headers have been received.
* **upgrade** `string | null` (optional) - Default: `null` - Upgrade the request. Should be used to specify the kind of upgrade i.e. `'Websocket'`.
* **bodyTimeout** `number | null` (optional) - The timeout after which a request will time out, in milliseconds. Monitors time between receiving body data. Use `0` to disable it entirely. Defaults to 30 seconds.
* **headersTimeout** `number | null` (optional) - The amount of time the parser will wait to receive the complete HTTP headers. Defaults to 30 seconds.
Expand Down Expand Up @@ -673,7 +674,7 @@ try {
path: '/',
method: 'GET'
})

console.log(`response received ${statusCode}`)
body.setEncoding('utf8')
body.on('data', console.log)
Expand All @@ -692,7 +693,7 @@ Upgrade to a different protocol. Visit [MDN - HTTP - Protocol upgrade mechanism]
Arguments:

* **options** `UpgradeOptions`

* **callback** `(error: Error | null, data: UpgradeData) => void` (optional)

Returns: `void | Promise<UpgradeData>` - Only returns a `Promise` if no `callback` argument was passed
Expand Down Expand Up @@ -788,7 +789,7 @@ Emitted when dispatcher is no longer busy.

Header arguments such as `options.headers` in [`Client.dispatch`](./Client.md#client-dispatchoptions-handlers) can be specified in two forms; either as an object specified by the `http.IncomingHttpHeaders` type, or an array of strings. An array representation of a header list must have an even length or an `InvalidArgumentError` will be thrown.

Keys are lowercase and values are not modified.
Keys are lowercase and values are not modified.

Response headers will derive a `host` from the `url` of the [Client](#class-client) instance if no `host` header was previously specified.

Expand Down
17 changes: 14 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const {
kBusy,
kParser,
kConnect,
kBlocking,
kResuming,
kRunning,
kPending,
Expand Down Expand Up @@ -231,7 +232,7 @@ class Client extends Dispatcher {
get [kBusy] () {
const socket = this[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting])) ||
(socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
(this[kSize] >= (this[kPipelining] || 1)) ||
this[kPending] > 0
)
Expand Down Expand Up @@ -876,6 +877,11 @@ class Parser {
if (statusCode < 200) {
return 1
}

if (socket[kBlocking]) {
socket[kBlocking] = false
resume(client)
}
}

onBody (buf) {
Expand Down Expand Up @@ -1150,6 +1156,7 @@ function connect (client) {
socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kBlocking] = false
socket[kError] = null
socket[kParser] = new Parser(client, socket)
socket[kClient] = client
Expand Down Expand Up @@ -1271,7 +1278,7 @@ function _resume (client, sync) {
continue
}

if (socket.destroyed || socket[kWriting] || socket[kReset]) {
if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
return
}

Expand Down Expand Up @@ -1327,7 +1334,7 @@ function _resume (client, sync) {
}

function write (client, request) {
const { body, method, path, host, upgrade, headers } = request
const { body, method, path, host, upgrade, headers, blocking } = request

// https://tools.ietf.org/html/rfc7231#section-4.3.1
// https://tools.ietf.org/html/rfc7231#section-4.3.2
Expand Down Expand Up @@ -1409,6 +1416,10 @@ function write (client, request) {
socket[kReset] = true
}

if (blocking) {
socket[kBlocking] = true
}

// TODO: Expect: 100-continue

let header
Expand Down
3 changes: 3 additions & 0 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Request {
body,
headers,
idempotent,
blocking,
upgrade,
headersTimeout,
bodyTimeout
Expand Down Expand Up @@ -79,6 +80,8 @@ class Request {
? method === 'HEAD' || method === 'GET'
: idempotent

this.blocking = blocking == null ? false : blocking

this.host = null

this.contentLength = null
Expand Down
1 change: 1 addition & 0 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module.exports = {
kNoRef: Symbol('no ref'),
kBodyUsed: Symbol('used'),
kRunning: Symbol('running'),
kBlocking: Symbol('blocking'),
kPending: Symbol('pending'),
kSize: Symbol('size'),
kBusy: Symbol('busy'),
Expand Down
49 changes: 49 additions & 0 deletions test/client-pipelining.js
Original file line number Diff line number Diff line change
Expand Up @@ -701,3 +701,52 @@ function pipeliningIdempotentBusy (bodyType) {

pipeliningIdempotentBusy(consts.STREAM)
pipeliningIdempotentBusy(consts.ASYNC_ITERATOR)

test('pipelining blocked', (t) => {
t.plan(6)

const server = createServer()

let blocking = true
let count = 0

server.on('request', (req, res) => {
t.ok(!count || !blocking)
count++
setImmediate(() => {
res.end('asd')
})
})
t.teardown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
pipelining: 10
})
t.teardown(client.close.bind(client))
client.request({
path: '/',
method: 'GET',
blocking: true
}, (err, data) => {
t.error(err)
blocking = false
data.body
.resume()
.on('end', () => {
t.pass()
})
})
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body
.resume()
.on('end', () => {
t.pass()
})
})
})
})