Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 95 additions & 39 deletions components/script/dom/readablestreamdefaultcontroller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use crate::dom::bindings::callback::ExceptionHandling;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
use crate::dom::bindings::import::module::{Error, Fallible};
use crate::dom::bindings::refcounted::Trusted;
use crate::dom::bindings::reflector::{reflect_dom_object, DomObject, Reflector};
use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestream::{ReadableStream, ReadableStreamState};
use crate::dom::readablestreamdefaultreader::ReadRequest;
Expand All @@ -35,14 +37,27 @@ use crate::script_runtime::{JSContext, JSContext as SafeJSContext};
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
struct PullAlgorithmFulfillmentHandler {
// TODO: check the validity of using Dom here.
controller: Dom<ReadableStreamDefaultController>,
#[ignore_malloc_size_of = "Trusted are hard"]
controller: Trusted<ReadableStreamDefaultController>,
}

impl Callback for PullAlgorithmFulfillmentHandler {
/// Handle fufillment of pull algo promise.
/// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
/// Upon fulfillment of pullPromise
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) {
todo!();
let controller = self.controller.root();

// Set controller.[[pulling]] to false.
controller.pulling.set(false);

// If controller.[[pullAgain]] is true,
if controller.pull_again.get() {
// Set controller.[[pullAgain]] to false.
controller.pull_again.set(false);

// Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
controller.call_pull_if_needed();
}
}
}

Expand All @@ -51,14 +66,18 @@ impl Callback for PullAlgorithmFulfillmentHandler {
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
struct PullAlgorithmRejectionHandler {
// TODO: check the validity of using Dom here.
controller: Dom<ReadableStreamDefaultController>,
#[ignore_malloc_size_of = "Trusted are hard"]
controller: Trusted<ReadableStreamDefaultController>,
}

impl Callback for PullAlgorithmRejectionHandler {
/// Handle rejection of pull algo promise.
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) {
todo!();
/// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
/// Upon rejection of pullPromise with reason e.
fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm) {
let controller = self.controller.root();

// Perform ! ReadableStreamDefaultControllerError(controller, e).
controller.error(v);
}
}

Expand All @@ -67,14 +86,21 @@ impl Callback for PullAlgorithmRejectionHandler {
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
struct StartAlgorithmFulfillmentHandler {
// TODO: check the validity of using Dom here.
controller: Dom<ReadableStreamDefaultController>,
#[ignore_malloc_size_of = "Trusted are hard"]
controller: Trusted<ReadableStreamDefaultController>,
}

impl Callback for StartAlgorithmFulfillmentHandler {
/// Handle fufillment of pull algo promise.
/// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
/// Upon fulfillment of startPromise,
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) {
todo!();
let controller = self.controller.root();

// Set controller.[[started]] to true.
controller.started.set(true);

// Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
controller.call_pull_if_needed();
}
}

Expand All @@ -83,14 +109,18 @@ impl Callback for StartAlgorithmFulfillmentHandler {
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
struct StartAlgorithmRejectionHandler {
// TODO: check the validity of using Dom here.
controller: Dom<ReadableStreamDefaultController>,
#[ignore_malloc_size_of = "Trusted are hard"]
controller: Trusted<ReadableStreamDefaultController>,
}

impl Callback for StartAlgorithmRejectionHandler {
/// Handle rejection of pull algo promise.
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm) {
todo!();
/// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
/// Upon rejection of startPromise with reason r,
fn callback(&self, _cx: JSContext, v: HandleValue, _realm: InRealm) {
let controller = self.controller.root();

// Perform ! ReadableStreamDefaultControllerError(controller, r).
controller.error(v);
}
}

Expand Down Expand Up @@ -207,6 +237,12 @@ pub struct ReadableStreamDefaultController {

/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
started: Cell<bool>,

/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
pulling: Cell<bool>,

/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
pull_again: Cell<bool>,
}

impl ReadableStreamDefaultController {
Expand All @@ -228,8 +264,9 @@ impl ReadableStreamDefaultController {
strategy_hwm,
strategy_size: RefCell::new(Some(strategy_size)),
close_requested: Default::default(),
// TODO: set to true when start algo promise resolves.
started: Default::default(),
pulling: Default::default(),
pull_again: Default::default(),
}
}
pub fn new(
Expand All @@ -249,14 +286,11 @@ impl ReadableStreamDefaultController {
);

if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
let promise = underlying_source.call_start_algorithm(
Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
);
let fulfillment_handler = Box::new(StartAlgorithmFulfillmentHandler {
controller: Dom::from_ref(&*rooted_default_controller),
controller: Trusted::new(&*rooted_default_controller),
});
let rejection_handler = Box::new(StartAlgorithmRejectionHandler {
controller: Dom::from_ref(&*rooted_default_controller),
controller: Trusted::new(&*rooted_default_controller),
});
let handler = PromiseNativeHandler::new(
&global,
Expand All @@ -266,7 +300,15 @@ impl ReadableStreamDefaultController {

let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
promise.append_native_handler(&handler, comp);
if let Some(promise) = underlying_source.call_start_algorithm(
Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
) {
promise.append_native_handler(&handler, comp);
} else {
let promise = Promise::new(&*global);
promise.append_native_handler(&handler, comp);
promise.resolve_native(&());
}
};

rooted_default_controller
Expand Down Expand Up @@ -322,6 +364,19 @@ impl ReadableStreamDefaultController {
return;
}

// If controller.[[pulling]] is true,
if self.pulling.get() {
// Set controller.[[pullAgain]] to true.
self.pull_again.set(true);

return;
}

// Set controller.[[pulling]] to true.
self.pulling.set(true);

// Let pullPromise be the result of performing controller.[[pullAlgorithm]].
// Continues into the resolve and reject handling of the native handler.
let global = self.global();
let rooted_default_controller = DomRoot::from_ref(self);
let controller =
Expand All @@ -331,22 +386,23 @@ impl ReadableStreamDefaultController {
return;
};

let fulfillment_handler = Box::new(PullAlgorithmFulfillmentHandler {
controller: Trusted::new(&*rooted_default_controller),
});
let rejection_handler = Box::new(PullAlgorithmRejectionHandler {
controller: Trusted::new(&*rooted_default_controller),
});
let handler =
PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler));

let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
if let Some(promise) = underlying_source.call_pull_algorithm(controller) {
let fulfillment_handler = Box::new(PullAlgorithmFulfillmentHandler {
controller: Dom::from_ref(&*rooted_default_controller),
});
let rejection_handler = Box::new(PullAlgorithmRejectionHandler {
controller: Dom::from_ref(&*rooted_default_controller),
});
let handler = PromiseNativeHandler::new(
&global,
Some(fulfillment_handler),
Some(rejection_handler),
);

let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
promise.append_native_handler(&handler, comp);
} else {
let promise = Promise::new(&*global);
promise.append_native_handler(&handler, comp);
promise.resolve_native(&());
}
}

Expand Down
42 changes: 15 additions & 27 deletions components/script/dom/underlyingsourcecontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,18 @@ impl UnderlyingSourceContainer {
#[allow(unsafe_code)]
pub fn call_pull_algorithm(&self, controller: Controller) -> Option<Rc<Promise>> {
if let UnderlyingSourceType::Js(source) = &self.underlying_source_type {
let global = self.global();
let promise = if let Some(pull) = &source.pull {
if let Some(pull) = &source.pull {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
unsafe {
source.to_jsobject(*cx, this_object.handle_mut());
}
let this_handle = this_object.handle();
pull.Call_(&this_handle, controller, ExceptionHandling::Report)
.expect("Pull algorithm call failed")
} else {
let promise = Promise::new(&*global);
promise.resolve_native(&());
promise
};
return Some(promise);
let promise = pull
.Call_(&this_handle, controller, ExceptionHandling::Report)
.expect("Pull algorithm call failed");
return Some(promise);
}
}
// Note: other source type have no pull steps for now.
None
Expand All @@ -120,10 +116,9 @@ impl UnderlyingSourceContainer {
/// see "Let startPromise be a promise resolved with startResult."
/// at <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
#[allow(unsafe_code)]
pub fn call_start_algorithm(&self, controller: Controller) -> Rc<Promise> {
pub fn call_start_algorithm(&self, controller: Controller) -> Option<Rc<Promise>> {
if let UnderlyingSourceType::Js(source) = &self.underlying_source_type {
let global = self.global();
let promise = if let Some(start) = &source.start {
if let Some(start) = &source.start {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
unsafe {
Expand All @@ -139,27 +134,20 @@ impl UnderlyingSourceContainer {
IsPromiseObject(result_object.handle().into_handle())
};
// Let startPromise be a promise resolved with startResult.
if is_promise {
// from #set-up-readable-stream-default-controller.
let promise = if is_promise {
let promise = Promise::new_with_js_promise(result_object.handle(), cx);
promise
} else {
let global = self.global();
let promise = Promise::new(&*global);
promise.resolve_native(&result);
promise
}
} else {
let promise = Promise::new(&*global);
promise.resolve_native(&());
promise
};
promise
} else {
// Native sources start immediately.
let global = self.global();
let promise = Promise::new(&*global);
promise.resolve_native(&());
promise
};
return Some(promise);
}
}
None
}

/// Does the source have all data in memory?
Expand Down