runtime/globals/streams/readable/
mod.rs

1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 */
6
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use ion::class::{NativeObject as _, Reflector};
11use ion::conversions::{ConversionBehavior, FromValue as _, ToValue as _};
12use ion::function::Opt;
13use ion::{
14	ClassDefinition as _, Context, Error, ErrorKind, FromValue, Function, Local, Object, Promise, Result, ResultExc,
15	Traceable, Value, js_class,
16};
17use mozjs::jsapi::{Heap, JSObject};
18use mozjs::jsval::JSVal;
19
20pub use crate::globals::streams::readable::controller::{
21	ByobRequest, ByteStreamController, CommonController, DefaultController,
22};
23use crate::globals::streams::readable::controller::{Controller, ControllerInternals as _, ControllerKind};
24pub use crate::globals::streams::readable::reader::{ByobReader, CommonReader, DefaultReader};
25use crate::globals::streams::readable::reader::{Reader, ReaderKind};
26pub use crate::globals::streams::readable::source::StreamSource;
27use crate::globals::streams::readable::tee::{TeeBytesState, TeeDefaultState, forward_reader_error};
28use crate::globals::streams::strategy::{QueuingStrategy, SizeCallback};
29
30mod controller;
31mod reader;
32mod source;
33mod tee;
34
35#[derive(Default, FromValue)]
36pub struct UnderlyingSource<'cx> {
37	start: Option<Function<'cx>>,
38	pull: Option<Function<'cx>>,
39	cancel: Option<Function<'cx>>,
40	#[ion(name = "type")]
41	ty: Option<String>,
42	#[ion(convert = ConversionBehavior::EnforceRange)]
43	auto_allocate_chunk_size: Option<u64>,
44}
45
46impl UnderlyingSource<'_> {
47	pub(crate) fn to_native(&self, object: Option<&Object>) -> StreamSource {
48		match object {
49			Some(object) => StreamSource::Script {
50				object: Heap::boxed(object.handle().get()),
51				pull: self.pull.as_ref().map(|pull| Heap::boxed(pull.get())),
52				cancel: self.cancel.as_ref().map(|cancel| Heap::boxed(cancel.get())),
53			},
54			None => StreamSource::None,
55		}
56	}
57}
58
59#[derive(Default, FromValue)]
60pub struct ReaderOptions {
61	mode: Option<String>,
62}
63
64#[derive(Copy, Clone, Debug, PartialEq, Eq, Traceable)]
65pub enum State {
66	Readable,
67	Closed,
68	Errored,
69}
70
71#[js_class]
72pub struct ReadableStream {
73	reflector: Reflector,
74
75	pub(crate) controller_kind: ControllerKind,
76	pub(crate) controller: Box<Heap<*mut JSObject>>,
77
78	pub(crate) reader_kind: ReaderKind,
79	pub(crate) reader: Option<Box<Heap<*mut JSObject>>>,
80
81	pub(crate) state: State,
82	pub(crate) disturbed: bool,
83	pub(crate) error: Option<Box<Heap<JSVal>>>,
84}
85
86#[js_class]
87impl ReadableStream {
88	#[ion(constructor)]
89	pub fn constructor<'cx: 'o, 'o>(
90		cx: &'cx Context, #[ion(this)] this: &Object, Opt(underlying_source): Opt<Object<'o>>,
91		Opt(strategy): Opt<QueuingStrategy>,
92	) -> ResultExc<ReadableStream> {
93		let strategy = strategy.unwrap_or_default();
94		let mut source = None;
95
96		let controller = underlying_source
97			.as_ref()
98			.map::<ResultExc<_>, _>(|underlying_source| {
99				let source_value = underlying_source.as_value(cx);
100				source = Some(UnderlyingSource::from_value(cx, &source_value, false, ())?);
101
102				let source = source.as_ref().unwrap();
103				if source.ty.as_deref() == Some("bytes") {
104					if strategy.has_size() {
105						return Err(Error::new("Implementation preserved member 'size'", ErrorKind::Range).into());
106					}
107
108					if let Some(high_water_mark) = strategy.high_water_mark() {
109						if high_water_mark.is_nan() {
110							return Err(Error::new("highWaterMark cannot be NaN", ErrorKind::Range).into());
111						} else if high_water_mark < 0.0 {
112							return Err(Error::new("highWaterMark must be non-negative", ErrorKind::Range).into());
113						}
114					}
115					let high_water_mark = strategy.high_water_mark().unwrap_or(0.0);
116
117					let controller =
118						ByteStreamController::initialise(this, underlying_source, source, high_water_mark)?;
119					let controller = Heap::boxed(ByteStreamController::new_object(cx, Box::new(controller)));
120					unsafe {
121						let controller = Object::from(Local::from_heap(&controller));
122						ByteStreamController::get_mut_private_unchecked(&controller)
123							.start(cx, source.start.as_ref())?;
124					}
125
126					Ok(Some((ControllerKind::ByteStream, controller)))
127				} else if source.ty.is_some() {
128					Err(Error::new(
129						"Type of Underlying Source must be 'bytes' or not exist.",
130						ErrorKind::Type,
131					)
132					.into())
133				} else {
134					Ok(None)
135				}
136			})
137			.transpose()?
138			.flatten();
139
140		let (controller_kind, controller) = if let Some(controller) = controller {
141			controller
142		} else {
143			let source = source.unwrap_or_default();
144			let high_water_mark = strategy.high_water_mark().unwrap_or(1.0);
145			let controller =
146				DefaultController::initialise(this, underlying_source.as_ref(), &source, &strategy, high_water_mark);
147			let controller = Heap::boxed(DefaultController::new_object(cx, Box::new(controller)));
148			unsafe {
149				let controller = Object::from(Local::from_heap(&controller));
150				DefaultController::get_mut_private_unchecked(&controller).start(cx, source.start.as_ref())?;
151			}
152
153			(ControllerKind::Default, controller)
154		};
155
156		Ok(ReadableStream::new(controller_kind, controller))
157	}
158
159	#[ion(get)]
160	pub fn get_locked(&self) -> bool {
161		self.reader_kind != ReaderKind::None
162	}
163
164	pub fn cancel<'cx>(&mut self, cx: &'cx Context, Opt(reason): Opt<Value>) -> ResultExc<Promise<'cx>> {
165		if self.get_locked() {
166			Err(Error::new("ReadableStream is locked.", ErrorKind::Type).into())
167		} else {
168			self.disturbed = true;
169			match self.state {
170				State::Readable => {
171					self.close(cx)?;
172					self.native_controller(cx)?.cancel(cx, reason)
173				}
174				State::Closed => Ok(Promise::resolved(cx, &Value::undefined_handle())),
175				State::Errored => {
176					let mut value = Value::null(cx);
177					if let Some(error) = &self.error {
178						value.handle_mut().set(error.get());
179					}
180					let promise = Promise::new(cx);
181					promise.reject(cx, &value);
182					Ok(promise)
183				}
184			}
185		}
186	}
187
188	#[ion(name = "getReader")]
189	pub fn get_reader<'cx>(&mut self, cx: &'cx Context, Opt(options): Opt<ReaderOptions>) -> Result<Object<'cx>> {
190		if self.get_locked() {
191			return Err(Error::new(
192				"New readers cannot be initialised for locked streams.",
193				ErrorKind::Type,
194			));
195		}
196
197		let options = options.unwrap_or_default();
198		if let Some(mode) = &options.mode {
199			if mode == "byob" {
200				let reader = ByobReader::new(cx, &Object::from(cx.root(self.reflector().get())))?;
201				let object = Object::from(cx.root(ByobReader::new_object(cx, Box::new(reader))));
202
203				self.reader_kind = ReaderKind::Byob;
204				self.reader = Some(Heap::boxed(object.handle().get()));
205
206				Ok(object)
207			} else {
208				Err(Error::new("Mode must be 'byob' or must not exist.", ErrorKind::Type))
209			}
210		} else {
211			let reader = DefaultReader::new(cx, &Object::from(Local::from_handle(self.reflector().handle())))?;
212			let object = Object::from(cx.root(DefaultReader::new_object(cx, Box::new(reader))));
213
214			self.reader_kind = ReaderKind::Default;
215			self.reader = Some(Heap::boxed(object.handle().get()));
216
217			Ok(object)
218		}
219	}
220
221	pub fn tee<'cx>(&mut self, cx: &'cx Context) -> Result<[Object<'cx>; 2]> {
222		self.get_reader(cx, Opt(None))?;
223		Ok(self.tee_internal(cx, false))
224	}
225}
226
227impl ReadableStream {
228	pub(crate) fn new(controller_kind: ControllerKind, controller: Box<Heap<*mut JSObject>>) -> ReadableStream {
229		ReadableStream {
230			reflector: Reflector::default(),
231
232			controller_kind,
233			controller,
234
235			reader_kind: ReaderKind::None,
236			reader: None,
237
238			state: State::Readable,
239			disturbed: false,
240			error: None,
241		}
242	}
243
244	pub(crate) fn tee_internal<'cx>(&mut self, cx: &'cx Context, clone_branch_2: bool) -> [Object<'cx>; 2] {
245		match self.controller_kind {
246			ControllerKind::Default => {
247				fn create_branch(cx: &Context, state: Rc<TeeDefaultState>, second: bool) -> Object<'_> {
248					let branch = Object::from(cx.root(ReadableStream::new_raw_object(cx)));
249					let source = StreamSource::TeeDefault(state, second);
250					let controller = DefaultController {
251						common: CommonController::new(&branch, source, 1.0),
252						size: SizeCallback::None,
253						queue: VecDeque::default(),
254					};
255					let controller = Heap::boxed(DefaultController::new_object(cx, Box::new(controller)));
256
257					unsafe {
258						let controller = Object::from(Local::from_heap(&controller));
259						DefaultController::get_mut_private_unchecked(&controller).start(cx, None).unwrap();
260					}
261
262					let stream = ReadableStream::new(ControllerKind::Default, controller);
263					unsafe {
264						ReadableStream::set_private(branch.handle().get(), Box::new(stream));
265					}
266					branch
267				}
268
269				let state = Rc::new(TeeDefaultState::new(cx, self, clone_branch_2));
270				let branch1 = create_branch(cx, Rc::clone(&state), false);
271				let branch2 = create_branch(cx, Rc::clone(&state), true);
272
273				state.common.branch[0].set(branch1.handle().get());
274				state.common.branch[1].set(branch2.handle().get());
275
276				[branch1, branch2]
277			}
278			ControllerKind::ByteStream => {
279				fn create_branch(cx: &Context, state: Rc<TeeBytesState>, second: bool) -> Object<'_> {
280					let branch = Object::from(cx.root(ReadableStream::new_raw_object(cx)));
281					let source = StreamSource::TeeBytes(state, second);
282					let controller = ByteStreamController {
283						common: CommonController::new(&branch, source, 1.0),
284						auto_allocate_chunk_size: 0,
285						byob_request: None,
286						pending_descriptors: VecDeque::default(),
287						queue: VecDeque::default(),
288					};
289					let controller = Heap::boxed(ByteStreamController::new_object(cx, Box::new(controller)));
290
291					unsafe {
292						let controller = Object::from(Local::from_heap(&controller));
293						ByteStreamController::get_mut_private_unchecked(&controller).start(cx, None).unwrap();
294					}
295
296					let stream = ReadableStream::new(ControllerKind::ByteStream, controller);
297					unsafe {
298						ReadableStream::set_private(branch.handle().get(), Box::new(stream));
299					}
300					branch
301				}
302
303				let state = Rc::new(TeeBytesState::new(cx, self));
304				let branch1 = create_branch(cx, Rc::clone(&state), false);
305				let branch2 = create_branch(cx, Rc::clone(&state), true);
306
307				state.common.branch[0].set(branch1.handle().get());
308				state.common.branch[1].set(branch2.handle().get());
309
310				let reader = self.native_reader(cx).unwrap().unwrap().into_default().unwrap();
311				forward_reader_error(cx, &reader.common.closed(), Rc::clone(&state)).unwrap();
312
313				[branch1, branch2]
314			}
315		}
316	}
317
318	pub(crate) fn close(&mut self, cx: &Context) -> ResultExc<()> {
319		if self.state != State::Readable {
320			return Err(Error::new("Cannot Close Stream", None).into());
321		}
322
323		self.state = State::Closed;
324		let (requests, closed) = match self.native_reader(cx)? {
325			Some(reader) => reader.requests_closed(),
326			None => return Ok(()),
327		};
328
329		closed.resolve(cx, &Value::undefined_handle());
330		if self.reader_kind == ReaderKind::Default {
331			for request in &*requests {
332				let promise = request.promise();
333				(request.close)(cx, &promise, None)?;
334			}
335			requests.clear();
336		}
337
338		Ok(())
339	}
340
341	pub(crate) fn error(&mut self, cx: &Context, error: &Value) -> Result<()> {
342		if self.state != State::Readable {
343			return Err(Error::new("Cannot Error Stream", None));
344		}
345		self.state = State::Errored;
346		self.error = Some(Heap::boxed(error.get()));
347
348		let (requests, closed) = match self.native_reader(cx)? {
349			Some(reader) => reader.requests_closed(),
350			None => return Ok(()),
351		};
352
353		closed.reject(cx, error);
354		for request in &*requests {
355			let promise = request.promise();
356			(request.error)(cx, &promise, error);
357		}
358		requests.clear();
359
360		Ok(())
361	}
362
363	pub(crate) fn stored_error(&self) -> Value<'_> {
364		self.error.as_ref().map_or_else(Value::undefined_handle, |error| {
365			Value::from(unsafe { Local::from_heap(error) })
366		})
367	}
368
369	pub(crate) fn native_controller(&self, cx: &Context) -> Result<Controller<'_>> {
370		match self.controller_kind {
371			ControllerKind::Default => {
372				let controller = Object::from(unsafe { Local::from_heap(&self.controller) });
373				let controller = DefaultController::get_mut_private(cx, &controller)?;
374				Ok(Controller::Default(controller))
375			}
376			ControllerKind::ByteStream => {
377				let controller = Object::from(unsafe { Local::from_heap(&self.controller) });
378				let controller = ByteStreamController::get_mut_private(cx, &controller)?;
379				Ok(Controller::ByteStream(controller))
380			}
381		}
382	}
383
384	pub(crate) fn native_reader(&self, cx: &Context) -> Result<Option<Reader<'_>>> {
385		match self.reader_kind {
386			ReaderKind::None => Ok(None),
387			ReaderKind::Default => {
388				let reader = Object::from(unsafe { Local::from_heap(self.reader.as_ref().unwrap()) });
389				let reader = DefaultReader::get_mut_private(cx, &reader)?;
390				Ok(Some(Reader::Default(reader)))
391			}
392			ReaderKind::Byob => {
393				let reader = Object::from(unsafe { Local::from_heap(self.reader.as_ref().unwrap()) });
394				let reader = ByobReader::get_mut_private(cx, &reader)?;
395				Ok(Some(Reader::Byob(reader)))
396			}
397		}
398	}
399}