Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions lib/web/fetch/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,25 @@ const { kState } = require('./symbols')
const { webidl } = require('./webidl')
const { Blob } = require('node:buffer')
const assert = require('node:assert')
const { isErrored } = require('node:stream')
const { isErrored, isDisturbed } = require('node:stream')
const { isArrayBuffer } = require('node:util/types')
const { serializeAMimeType } = require('./data-url')
const { multipartFormDataParser } = require('./formdata-parser')

const textEncoder = new TextEncoder()
function noop () {}

const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0
let streamRegistry

if (hasFinalizationRegistry) {
streamRegistry = new FinalizationRegistry((weakRef) => {
const stream = weakRef.deref()
if (stream && !stream.locked && !isDisturbed(stream) && !isErrored(stream)) {
stream.cancel('Response object has been garbage collected').catch(noop)
}
})
}

// https://fetch.spec.whatwg.org/#concept-bodyinit-extract
function extractBody (object, keepalive = false) {
Expand Down Expand Up @@ -264,14 +277,18 @@ function safelyExtractBody (object, keepalive = false) {
return extractBody(object, keepalive)
}

function cloneBody (body) {
function cloneBody (instance, body) {
// To clone a body body, run these steps:

// https://fetch.spec.whatwg.org/#concept-body-clone

// 1. Let « out1, out2 » be the result of teeing body’s stream.
const [out1, out2] = body.stream.tee()

if (hasFinalizationRegistry) {
streamRegistry.register(instance, new WeakRef(out1))
}

// 2. Set body’s stream to out1.
body.stream = out1

Expand Down Expand Up @@ -499,5 +516,7 @@ module.exports = {
safelyExtractBody,
cloneBody,
mixinBody,
streamRegistry,
hasFinalizationRegistry,
bodyUnusable
}
2 changes: 1 addition & 1 deletion lib/web/fetch/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ function cloneRequest (request) {
// 2. If request’s body is non-null, set newRequest’s body to the
// result of cloning request’s body.
if (request.body != null) {
newRequest.body = cloneBody(request.body)
newRequest.body = cloneBody(newRequest, request.body)
}

// 3. Return newRequest.
Expand Down
21 changes: 3 additions & 18 deletions lib/web/fetch/response.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { Headers, HeadersList, fill, getHeadersGuard, setHeadersGuard, setHeadersList } = require('./headers')
const { extractBody, cloneBody, mixinBody, bodyUnusable } = require('./body')
const { extractBody, cloneBody, mixinBody, hasFinalizationRegistry, streamRegistry, bodyUnusable } = require('./body')
const util = require('../../core/util')
const nodeUtil = require('node:util')
const { kEnumerableProperty } = util
Expand All @@ -26,24 +26,9 @@ const { URLSerializer } = require('./data-url')
const { kConstruct } = require('../../core/symbols')
const assert = require('node:assert')
const { types } = require('node:util')
const { isDisturbed, isErrored } = require('node:stream')

const textEncoder = new TextEncoder('utf-8')

const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0
let registry

if (hasFinalizationRegistry) {
registry = new FinalizationRegistry((weakRef) => {
const stream = weakRef.deref()
if (stream && !stream.locked && !isDisturbed(stream) && !isErrored(stream)) {
stream.cancel('Response object has been garbage collected').catch(noop)
}
})
}

function noop () {}

// https://fetch.spec.whatwg.org/#response-class
class Response {
// Creates network error Response.
Expand Down Expand Up @@ -327,7 +312,7 @@ function cloneResponse (response) {
// 3. If response’s body is non-null, then set newResponse’s body to the
// result of cloning response’s body.
if (response.body != null) {
newResponse.body = cloneBody(response.body)
newResponse.body = cloneBody(newResponse, response.body)
}

// 4. Return newResponse.
Expand Down Expand Up @@ -532,7 +517,7 @@ function fromInnerResponse (innerResponse, guard) {
// a primitive or an object, even undefined. If the held value is an object, the registry keeps
// a strong reference to it (so it can pass it to the cleanup callback later). Reworded from
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry
registry.register(response, new WeakRef(innerResponse.body.stream))
streamRegistry.register(response, new WeakRef(innerResponse.body.stream))
}

return response
Expand Down
3 changes: 2 additions & 1 deletion test/fetch/fire-and-forget.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ test('does not need the body to be consumed to continue', { timeout: 180_000, sk
// eslint-disable-next-line no-undef
gc(true)
const array = new Array(batch)
for (let i = 0; i < batch; i++) {
for (let i = 0; i < batch; i += 2) {
array[i] = fetch(url).catch(() => {})
array[i + 1] = fetch(url).then(r => r.clone()).catch(() => {})
}
await Promise.all(array)
await sleep(delay)
Expand Down
Loading