runtime/globals/streams/readable/
reader.rs

1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 */
6
7use std::collections::VecDeque;
8
9use ion::class::Reflector;
10use ion::conversions::ToValue as _;
11use ion::function::Opt;
12use ion::typedarray::{ArrayBufferView, type_to_constructor, type_to_element_size};
13use ion::{
14	ClassDefinition as _, Context, Error, ErrorKind, FromValue, Local, Object, Promise, Result, ResultExc, Traceable,
15	Value, js_class,
16};
17use mozjs::conversions::ConversionBehavior;
18use mozjs::jsapi::{Heap, JSObject};
19
20use crate::globals::streams::readable::controller::{ControllerInternals as _, ControllerKind, PullIntoDescriptor};
21use crate::globals::streams::readable::{ReadableStream, State};
22
23pub type ChunkErrorClosure = dyn Fn(&Context, &Promise, &Value);
24pub type CloseClosure = dyn Fn(&Context, &Promise, Option<&Value>) -> ResultExc<()>;
25
26#[derive(Traceable)]
27pub struct Request {
28	pub(crate) promise: Box<Heap<*mut JSObject>>,
29	#[trace(no_trace)]
30	pub(crate) chunk: Box<ChunkErrorClosure>,
31	#[trace(no_trace)]
32	pub(crate) close: Box<CloseClosure>,
33	#[trace(no_trace)]
34	pub(crate) error: Box<ChunkErrorClosure>,
35}
36
37impl Request {
38	pub(crate) fn standard(promise: *mut JSObject) -> Request {
39		struct ReadResult<'cx> {
40			pub value: Option<Value<'cx>>,
41			pub done: bool,
42		}
43
44		fn into_value<'cx>(result: ReadResult, cx: &'cx Context) -> Value<'cx> {
45			let object = Object::new(cx);
46			object.set(cx, "value", &result.value.unwrap_or_else(Value::undefined_handle));
47			object.set_as(cx, "done", &result.done);
48			object.as_value(cx)
49		}
50
51		Request {
52			promise: Heap::boxed(promise),
53			chunk: Box::new(|cx, promise, chunk| {
54				let result = ReadResult {
55					value: Some(Value::from(Local::from_handle(chunk.handle()))),
56					done: false,
57				};
58				promise.resolve(cx, &into_value(result, cx));
59			}),
60			close: Box::new(|cx, promise, chunk| {
61				let result = ReadResult {
62					value: chunk.map(|v| Value::from(Local::from_handle(v.handle()))),
63					done: true,
64				};
65				promise.resolve(cx, &into_value(result, cx));
66				Ok(())
67			}),
68			error: Box::new(|cx, promise, error| {
69				promise.resolve(cx, error);
70			}),
71		}
72	}
73
74	pub(crate) fn promise(&self) -> Promise<'_> {
75		Promise::from(unsafe { Local::from_heap(&self.promise) }).unwrap()
76	}
77}
78
79#[derive(Copy, Clone, Debug, PartialEq, Eq, Traceable)]
80pub enum ReaderKind {
81	None,
82	Default,
83	Byob,
84}
85
86pub enum Reader<'r> {
87	Default(&'r mut DefaultReader),
88	Byob(&'r mut ByobReader),
89}
90
91impl<'r> Reader<'r> {
92	pub(crate) fn common(&self) -> &CommonReader {
93		match self {
94			Reader::Default(reader) => &reader.common,
95			Reader::Byob(reader) => &reader.common,
96		}
97	}
98
99	pub(crate) fn into_default(self) -> Option<&'r mut DefaultReader> {
100		match self {
101			Reader::Default(reader) => Some(reader),
102			Reader::Byob(_) => None,
103		}
104	}
105
106	pub(crate) fn into_byob(self) -> Option<&'r mut ByobReader> {
107		match self {
108			Reader::Byob(reader) => Some(reader),
109			Reader::Default(_) => None,
110		}
111	}
112
113	pub fn requests_closed(self) -> (&'r mut VecDeque<Request>, Promise<'r>) {
114		let common = match self {
115			Reader::Default(reader) => &mut reader.common,
116			Reader::Byob(reader) => &mut reader.common,
117		};
118		unsafe {
119			(
120				&mut common.requests,
121				Promise::from(Local::from_heap(&common.closed)).unwrap(),
122			)
123		}
124	}
125
126	pub fn is_empty(&self) -> bool {
127		self.common().requests.is_empty()
128	}
129}
130
131#[js_class]
132#[ion(name = "ReadableStreamCommonReader")]
133pub struct CommonReader {
134	reflector: Reflector,
135
136	stream: Option<Box<Heap<*mut JSObject>>>,
137	pub(crate) requests: VecDeque<Request>,
138	pub(crate) closed: Box<Heap<*mut JSObject>>,
139}
140
141#[js_class]
142impl CommonReader {
143	pub(crate) fn new(cx: &Context, stream: &ReadableStream, stream_object: &Object) -> CommonReader {
144		let closed = Promise::new(cx);
145		match stream.state {
146			State::Readable => {}
147			State::Closed => {
148				closed.resolve(cx, &Value::undefined_handle());
149			}
150			State::Errored => {
151				closed.reject(cx, &stream.stored_error());
152			}
153		}
154
155		CommonReader {
156			reflector: Reflector::default(),
157			stream: Some(Heap::boxed(stream_object.handle().get())),
158			requests: VecDeque::new(),
159			closed: Heap::boxed(closed.get()),
160		}
161	}
162
163	#[expect(clippy::mut_from_ref)]
164	pub(crate) fn stream(&self, cx: &Context) -> Result<Option<&mut ReadableStream>> {
165		self.stream
166			.as_ref()
167			.map::<Result<_>, _>(|stream| {
168				let stream = Object::from(unsafe { Local::from_heap(stream) });
169				ReadableStream::get_mut_private(cx, &stream)
170			})
171			.transpose()
172	}
173
174	pub(crate) fn closed(&self) -> Promise<'_> {
175		Promise::from(unsafe { Local::from_heap(&self.closed) }).unwrap()
176	}
177
178	pub(crate) fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt<Value>) -> ResultExc<Promise<'cx>> {
179		if let Some(stream) = self.stream(cx)? {
180			stream.cancel(cx, reason)
181		} else {
182			let promise = Promise::new(cx);
183			promise.reject_with_error(cx, &Error::new("Reader has already been released.", ErrorKind::Type));
184			Ok(promise)
185		}
186	}
187
188	pub(crate) fn release_lock(&mut self, cx: &Context) -> Result<()> {
189		if let Some(stream) = self.stream(cx)? {
190			let mut closed = self.closed();
191			if matches!(stream.state, State::Closed | State::Errored) {
192				self.closed.set(Promise::new(cx).get());
193				closed = self.closed();
194			}
195			closed.reject_with_error(cx, &Error::new("Released Reader", ErrorKind::Type));
196
197			stream.reader_kind = ReaderKind::None;
198			stream.reader = None;
199
200			stream.native_controller(cx)?.release();
201
202			while let Some(request) = self.requests.pop_front() {
203				let promise = request.promise();
204				(request.error)(
205					cx,
206					&promise,
207					&Error::new("Reader has been released.", ErrorKind::Type).as_value(cx),
208				);
209			}
210		} else {
211			return Err(Error::new("Reader has already been released.", ErrorKind::Type));
212		}
213		self.stream = None;
214		Ok(())
215	}
216}
217
218#[js_class]
219#[ion(name = "ReadableStreamDefaultReader")]
220pub struct DefaultReader {
221	pub(crate) common: CommonReader,
222}
223
224#[js_class]
225impl DefaultReader {
226	#[ion(constructor)]
227	pub fn constructor(cx: &Context, #[ion(this)] this: &Object, stream_object: Object) -> Result<DefaultReader> {
228		let reader = DefaultReader::new(cx, &stream_object)?;
229		let stream = ReadableStream::get_mut_private(cx, &stream_object)?;
230		stream.reader_kind = ReaderKind::Default;
231		stream.reader = Some(Heap::boxed(this.handle().get()));
232
233		Ok(reader)
234	}
235
236	pub(crate) fn new(cx: &Context, stream_object: &Object) -> Result<DefaultReader> {
237		let stream = ReadableStream::get_private(cx, stream_object)?;
238		if stream.get_locked() {
239			return Err(Error::new(
240				"Cannot create DefaultReader from locked stream.",
241				ErrorKind::Type,
242			));
243		}
244
245		Ok(DefaultReader {
246			common: CommonReader::new(cx, stream, stream_object),
247		})
248	}
249
250	pub(crate) fn read_internal<'cx>(&mut self, cx: &'cx Context, request: Request) -> ResultExc<Promise<'cx>> {
251		let promise = Promise::from(cx.root(request.promise.get())).unwrap();
252		if let Some(stream) = self.common.stream(cx)? {
253			stream.disturbed = true;
254
255			match stream.state {
256				State::Readable => stream.native_controller(cx)?.pull(cx, &promise, request)?,
257				State::Closed => (request.close)(cx, &promise, None)?,
258				State::Errored => (request.error)(cx, &promise, &stream.stored_error()),
259			}
260		} else {
261			promise.reject_with_error(cx, &Error::new("Reader has already been released.", ErrorKind::Type));
262		}
263		Ok(promise)
264	}
265
266	pub fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt<Value>) -> ResultExc<Promise<'cx>> {
267		self.common.cancel(cx, reason)
268	}
269
270	pub fn read<'cx>(&mut self, cx: &'cx Context) -> ResultExc<Promise<'cx>> {
271		let promise = Promise::new(cx);
272		self.read_internal(cx, Request::standard(promise.get()))
273	}
274
275	#[ion(name = "releaseLock")]
276	pub fn release_lock(&mut self, cx: &Context) -> Result<()> {
277		self.common.release_lock(cx)
278	}
279
280	#[ion(get)]
281	pub fn get_closed(&self) -> *mut JSObject {
282		self.common.closed.get()
283	}
284}
285
286#[derive(FromValue)]
287pub struct ByobReadOptions {
288	#[ion(convert = ConversionBehavior::EnforceRange, default = 1)]
289	min: u64,
290}
291
292impl Default for ByobReadOptions {
293	fn default() -> ByobReadOptions {
294		ByobReadOptions { min: 1 }
295	}
296}
297
298#[js_class]
299#[ion(name = "ReadableStreamBYOBReader")]
300pub struct ByobReader {
301	pub(crate) common: CommonReader,
302}
303
304#[js_class]
305impl ByobReader {
306	#[ion(constructor)]
307	pub fn constructor(cx: &Context, #[ion(this)] this: &Object, stream_object: Object) -> Result<ByobReader> {
308		let reader = ByobReader::new(cx, &stream_object)?;
309		let stream = ReadableStream::get_mut_private(cx, &stream_object)?;
310		stream.reader_kind = ReaderKind::Byob;
311		stream.reader = Some(Heap::boxed(this.handle().get()));
312
313		Ok(reader)
314	}
315
316	pub(crate) fn new(cx: &Context, stream_object: &Object) -> Result<ByobReader> {
317		let stream = ReadableStream::get_private(cx, stream_object)?;
318		if stream.get_locked() {
319			return Err(Error::new(
320				"Cannot create BYOBReader from locked stream.",
321				ErrorKind::Type,
322			));
323		}
324
325		if stream.controller_kind == ControllerKind::Default {
326			return Err(Error::new(
327				"Cannot create BYOBReader from DefaultController",
328				ErrorKind::Type,
329			));
330		}
331
332		Ok(ByobReader {
333			common: CommonReader::new(cx, stream, stream_object),
334		})
335	}
336
337	pub(crate) fn read_internal<'cx>(
338		&mut self, cx: &'cx Context, view: ArrayBufferView, min: usize, request: Request,
339	) -> ResultExc<Promise<'cx>> {
340		let stream = self.common.stream(cx)?.unwrap();
341		let promise = Promise::from(cx.root(request.promise.get())).unwrap();
342
343		stream.disturbed = true;
344		if stream.state == State::Errored {
345			(request.error)(cx, &promise, &stream.stored_error());
346			return Ok(promise);
347		}
348
349		let (constructor, element_size) = {
350			let ty = view.view_type();
351			(type_to_constructor(ty), type_to_element_size(ty))
352		};
353
354		let offset = view.offset();
355		let length = view.len();
356		let buffer = view.buffer(cx);
357		match buffer.transfer(cx) {
358			Ok(buffer) => {
359				let mut descriptor = PullIntoDescriptor {
360					buffer: Heap::boxed(buffer.get()),
361					offset,
362					length: length * element_size,
363					filled: 0,
364					min: min * element_size,
365					element: element_size,
366					constructor,
367					kind: ReaderKind::Byob,
368				};
369
370				let controller = stream.native_controller(cx)?.into_byte_stream().unwrap();
371				if !controller.pending_descriptors.is_empty() {
372					controller.pending_descriptors.push_back(descriptor);
373
374					if stream.state == State::Readable {
375						self.common.requests.push_back(request);
376					}
377					return Ok(promise);
378				} else if stream.state == State::Closed {
379					let empty = descriptor.construct(cx)?.as_value(cx);
380					(request.close)(cx, &promise, Some(&empty))?;
381					return Ok(promise);
382				} else if controller.common.queue_size > 0 {
383					if controller.fill_pull_into_descriptor(cx, &mut descriptor)? {
384						let buffer = buffer.transfer(cx)?;
385						descriptor.buffer.set(buffer.get());
386						let view = descriptor.construct(cx)?.as_value(cx);
387
388						if controller.common.queue_size == 0 && controller.common.close_requested {
389							controller.close(cx)?;
390						} else {
391							controller.pull_if_needed(cx)?;
392						}
393
394						(request.chunk)(cx, &promise, &view);
395						return Ok(promise);
396					} else if controller.common.close_requested {
397						let error = Error::new("Stream closed by request.", ErrorKind::Type).as_value(cx);
398						let _ = controller.error_internal(cx, &error);
399						(request.error)(cx, &promise, &error);
400						return Ok(promise);
401					}
402				}
403
404				controller.pending_descriptors.push_back(descriptor);
405				if stream.state == State::Readable {
406					self.common.requests.push_back(request);
407				}
408
409				let stream = self.common.stream(cx)?.unwrap();
410				let controller = stream.native_controller(cx)?.into_byte_stream().unwrap();
411				controller.pull_if_needed(cx)?;
412			}
413			Err(error) => {
414				(request.error)(cx, &promise, &error.as_value(cx));
415			}
416		}
417		Ok(promise)
418	}
419
420	pub fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt<Value>) -> ResultExc<Promise<'cx>> {
421		self.common.cancel(cx, reason)
422	}
423
424	pub fn read<'cx>(
425		&mut self, cx: &'cx Context, view: ArrayBufferView, Opt(options): Opt<ByobReadOptions>,
426	) -> ResultExc<Promise<'cx>> {
427		let promise = Promise::new(cx);
428		let request = Request::standard(promise.get());
429		if self.common.stream.is_some() {
430			if view.is_empty() {
431				promise.reject(cx, &Error::new("View must not be empty.", ErrorKind::Type).as_value(cx));
432				return Ok(promise);
433			}
434
435			let buffer = view.buffer(cx);
436
437			if buffer.is_empty() {
438				promise.reject(
439					cx,
440					&Error::new("Buffer must contain bytes.", ErrorKind::Type).as_value(cx),
441				);
442				return Ok(promise);
443			}
444
445			if buffer.is_detached() {
446				promise.reject(
447					cx,
448					&Error::new("ArrayBuffer must not be detached.", ErrorKind::Type).as_value(cx),
449				);
450				return Ok(promise);
451			}
452
453			let options = options.unwrap_or_default();
454			if options.min == 0 {
455				promise.reject(
456					cx,
457					&Error::new("min must be greater than 0.", ErrorKind::Type).as_value(cx),
458				);
459				return Ok(promise);
460			}
461
462			if options.min > view.len() as u64 {
463				promise.reject(
464					cx,
465					&Error::new("min is greater than View Length", ErrorKind::Range).as_value(cx),
466				);
467				return Ok(promise);
468			}
469
470			self.read_internal(cx, view, options.min as usize, request)
471		} else {
472			(request.error)(
473				cx,
474				&promise,
475				&Error::new("Reader has already been released.", ErrorKind::Type).as_value(cx),
476			);
477			Ok(Promise::new(cx))
478		}
479	}
480
481	#[ion(name = "releaseLock")]
482	pub fn release_lock(&mut self, cx: &Context) -> Result<()> {
483		self.common.release_lock(cx)
484	}
485
486	#[ion(get)]
487	pub fn get_closed(&self) -> *mut JSObject {
488		self.common.closed.get()
489	}
490}