runtime/globals/streams/readable/
controller.rs

1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 */
6
7use std::collections::VecDeque;
8use std::ptr;
9
10use ion::class::{NativeObject as _, Reflector};
11use ion::conversions::{FromValue as _, ToValue as _};
12use ion::typedarray::{ArrayBuffer, ArrayBufferView, Uint8Array, type_to_constructor};
13use ion::{
14	ClassDefinition, Context, Error, ErrorKind, Exception, Function, Local, Object, Promise, Result, ResultExc,
15	Traceable, TracedHeap, Value, js_class,
16};
17use mozjs::conversions::ConversionBehavior;
18use mozjs::jsapi::{Handle, Heap, JSContext, JSObject, Type};
19use mozjs::jsval::{DoubleValue, Int32Value, JSVal, NullValue, UndefinedValue};
20
21use crate::globals::streams::readable::reader::{Reader, ReaderKind, Request};
22use crate::globals::streams::readable::{ByobReader, ReadableStream, State, StreamSource, UnderlyingSource};
23use crate::globals::streams::strategy::{QueuingStrategy, SizeCallback};
24
25#[derive(Traceable)]
26pub(crate) struct PullIntoDescriptor {
27	pub(crate) buffer: Box<Heap<*mut JSObject>>,
28	pub(crate) offset: usize,
29	pub(crate) length: usize,
30	pub(crate) filled: usize,
31	pub(crate) min: usize,
32	pub(crate) element: usize,
33	pub(crate) constructor: unsafe extern "C" fn(*mut JSContext, Handle<*mut JSObject>, usize, i64) -> *mut JSObject,
34	pub(crate) kind: ReaderKind,
35}
36
37impl PullIntoDescriptor {
38	pub(crate) fn buffer(&self) -> ArrayBuffer<'_> {
39		ArrayBuffer::from(unsafe { Local::from_heap(&self.buffer) }).unwrap()
40	}
41
42	pub(crate) fn construct<'cx>(&self, cx: &'cx Context) -> Result<ArrayBufferView<'cx>> {
43		let view = unsafe {
44			(self.constructor)(
45				cx.as_ptr(),
46				self.buffer.handle(),
47				self.offset,
48				(self.filled / self.element) as i64,
49			)
50		};
51
52		if !view.is_null() {
53			Ok(ArrayBufferView::from(cx.root(view)).unwrap())
54		} else if let Some(Exception::Error(exception)) = Exception::new(cx)? {
55			Err(exception)
56		} else {
57			Err(Error::new("Failed to Initialise Array Buffer", None))
58		}
59	}
60
61	pub(crate) fn commit(&mut self, cx: &Context, reader: &mut ByobReader, state: State) -> ResultExc<()> {
62		let mut done = false;
63
64		let buffer = self.buffer();
65		if state == State::Closed {
66			done = true;
67		}
68		let buffer = buffer.transfer(cx)?;
69
70		self.buffer.set(buffer.get());
71		let view = self.construct(cx)?.as_value(cx);
72
73		let request = reader.common.requests.pop_front().unwrap();
74		let promise = request.promise();
75
76		if !done {
77			(request.chunk)(cx, &promise, &view);
78		} else {
79			(request.close)(cx, &promise, Some(&view))?;
80		}
81		Ok(())
82	}
83}
84
85#[derive(Copy, Clone, Debug, PartialEq, Eq, Traceable)]
86pub enum ControllerKind {
87	Default,
88	ByteStream,
89}
90
91pub enum Controller<'c> {
92	Default(&'c mut DefaultController),
93	ByteStream(&'c mut ByteStreamController),
94}
95
96impl<'c> Controller<'c> {
97	pub(crate) fn common_mut(&mut self) -> &mut CommonController {
98		match self {
99			Controller::Default(controller) => &mut controller.common,
100			Controller::ByteStream(controller) => &mut controller.common,
101		}
102	}
103
104	pub(crate) fn into_default(self) -> Option<&'c mut DefaultController> {
105		match self {
106			Controller::Default(controller) => Some(controller),
107			Controller::ByteStream(_) => None,
108		}
109	}
110
111	pub(crate) fn into_byte_stream(self) -> Option<&'c mut ByteStreamController> {
112		match self {
113			Controller::ByteStream(controller) => Some(controller),
114			Controller::Default(_) => None,
115		}
116	}
117
118	pub fn cancel<'cx: 'v, 'v>(&mut self, cx: &'cx Context, reason: Option<Value<'v>>) -> ResultExc<Promise<'cx>> {
119		match self {
120			Controller::Default(controller) => controller.reset_queue(cx),
121			Controller::ByteStream(controller) => controller.reset_queue(cx),
122		}
123		let common = self.common_mut();
124
125		let mut promise = Promise::new(cx);
126		common.source.cancel(cx, &mut promise, reason)?;
127		common.source.clear_algorithms();
128		Ok(promise)
129	}
130
131	pub fn pull(&mut self, cx: &Context, promise: &Promise, request: Request) -> ResultExc<()> {
132		match self {
133			Controller::Default(controller) => {
134				if let Some((chunk, size)) = controller.queue.pop_front() {
135					controller.common.queue_size -= size as usize;
136
137					if controller.common.close_requested && controller.queue.is_empty() {
138						controller.common.source.clear_algorithms();
139						controller.size = SizeCallback::None;
140
141						let stream = controller.common.stream(cx)?;
142						stream.close(cx)?;
143					} else {
144						controller.pull_if_needed(cx)?;
145					}
146
147					let chunk = Value::from(unsafe { Local::from_heap(&chunk) });
148					(request.chunk)(cx, promise, &chunk);
149				} else {
150					let stream = controller.common.stream(cx)?;
151					match stream.native_reader(cx)? {
152						Some(Reader::Default(reader)) => {
153							if stream.state != State::Readable {
154								return Err(Error::new("Cannot Add Read Request to Read Queue", None).into());
155							}
156							reader.common.requests.push_back(request);
157						}
158						_ => return Ok(()),
159					}
160					controller.pull_if_needed(cx)?;
161				}
162			}
163			Controller::ByteStream(controller) => {
164				{
165					let stream = controller.common.stream(cx)?;
166					if stream.reader_kind != ReaderKind::Default {
167						return Err(Error::new("Reader should have default reader.", ErrorKind::Type).into());
168					}
169				}
170
171				if controller.common.queue_size > 0 {
172					let (buffer, offset, length) = controller.queue.pop_front().unwrap();
173					controller.common.queue_size -= length;
174
175					if controller.common.queue_size == 0 && controller.common.close_requested {
176						controller.close(cx)?;
177					} else {
178						controller.pull_if_needed(cx)?;
179					}
180
181					let buffer = ArrayBuffer::from(unsafe { Local::from_heap(&buffer) }).unwrap();
182					let array = Uint8Array::with_array_buffer(cx, &buffer, offset, length).unwrap().as_value(cx);
183
184					(request.chunk)(cx, promise, &array);
185				} else {
186					if controller.auto_allocate_chunk_size != 0 {
187						let Some(buffer) = ArrayBuffer::new(cx, controller.auto_allocate_chunk_size) else {
188							controller.error_internal(cx, &Exception::new(cx).unwrap().as_value(cx))?;
189							return Ok(());
190						};
191
192						controller.pending_descriptors.push_back(PullIntoDescriptor {
193							buffer: Heap::boxed(buffer.get()),
194							offset: 0,
195							length: controller.auto_allocate_chunk_size,
196							filled: 0,
197							min: 1,
198							element: 1,
199							constructor: type_to_constructor(Type::Uint8),
200							kind: ReaderKind::Default,
201						});
202					}
203
204					let stream = controller.common.stream(cx)?;
205					if let Some(Reader::Default(reader)) = stream.native_reader(cx)? {
206						reader.common.requests.push_back(request);
207					}
208					controller.pull_if_needed(cx)?;
209				}
210			}
211		}
212		Ok(())
213	}
214
215	pub fn release(&mut self) {
216		match self {
217			Controller::Default(_) => {}
218			Controller::ByteStream(controller) => {
219				if let Some(mut descriptor) = controller.pending_descriptors.pop_front() {
220					controller.pending_descriptors.clear();
221					descriptor.kind = ReaderKind::None;
222					controller.pending_descriptors.push_back(descriptor);
223				}
224			}
225		}
226	}
227}
228
229#[js_class]
230#[ion(name = "ReadableStreamCommonController")]
231pub struct CommonController {
232	reflector: Reflector,
233
234	pub(crate) stream: Box<Heap<*mut JSObject>>,
235	pub(crate) source: StreamSource,
236
237	pub(crate) started: bool,
238	pub(crate) pulling: bool,
239	pub(crate) pull_again: bool,
240	pub(crate) close_requested: bool,
241
242	high_water_mark: f64,
243	pub(crate) queue_size: usize,
244}
245
246impl CommonController {
247	pub fn new(stream: &Object, source: StreamSource, high_water_mark: f64) -> CommonController {
248		CommonController {
249			reflector: Reflector::default(),
250
251			stream: Heap::boxed(stream.handle().get()),
252			source,
253
254			started: false,
255			pulling: false,
256			pull_again: false,
257			close_requested: false,
258
259			high_water_mark,
260			queue_size: 0,
261		}
262	}
263}
264
265#[js_class]
266impl CommonController {
267	pub(crate) fn new_from_script(
268		stream: &Object, source_object: Option<&Object>, source: &UnderlyingSource, high_water_mark: f64,
269	) -> CommonController {
270		CommonController::new(stream, source.to_native(source_object), high_water_mark)
271	}
272
273	#[expect(clippy::mut_from_ref)]
274	pub(crate) fn stream<'cx>(&self, cx: &'cx Context) -> Result<&'cx mut ReadableStream> {
275		let stream = Object::from(unsafe { Local::from_heap(&*ptr::from_ref(&self.stream)) });
276		ReadableStream::get_mut_private(cx, &stream)
277	}
278
279	pub(crate) fn start<C: ControllerInternals>(&mut self, cx: &Context, start: Option<&Function>) -> ResultExc<()> {
280		let controller = self.reflector().get();
281
282		let underlying_source = self.source.source_object();
283		let value = controller.as_value(cx);
284		let result = start
285			.map_or_else(
286				|| Ok(UndefinedValue()),
287				|start| start.call(cx, &underlying_source, &[value]).map(|v| v.get()),
288			)
289			.map_err(|report| report.unwrap().exception)?;
290
291		let promise = Promise::resolved(cx, &Value::from(cx.root(result)));
292
293		let controller1 = TracedHeap::new(controller);
294		let controller2 = TracedHeap::new(controller);
295		promise.add_reactions(
296			cx,
297			move |cx, _| {
298				let controller = C::from_traced_heap(cx, &controller1)?;
299				controller.common().started = true;
300				controller.pull_if_needed(cx)?;
301				Ok(Value::undefined_handle())
302			},
303			move |cx, error| {
304				let controller = C::from_traced_heap(cx, &controller2)?;
305				controller.error_internal(cx, error)?;
306				Ok(Value::undefined_handle())
307			},
308		);
309
310		Ok(())
311	}
312
313	pub(crate) fn can_close_or_enqueue(&self, stream: &ReadableStream) -> bool {
314		stream.state == State::Readable && !self.close_requested
315	}
316
317	pub(crate) fn should_call_pull(&self, cx: &Context, stream: &ReadableStream) -> Result<bool> {
318		if !self.can_close_or_enqueue(stream) || !self.started {
319			return Ok(false);
320		}
321
322		if let Some(reader) = stream.native_reader(cx)?
323			&& !reader.is_empty()
324		{
325			return Ok(true);
326		}
327		Ok(stream.state == State::Readable && self.high_water_mark > self.queue_size as f64)
328	}
329
330	pub(crate) fn pull_if_needed<C: ControllerInternals>(&mut self, cx: &Context) -> ResultExc<()> {
331		let stream = self.stream(cx)?;
332		if !self.should_call_pull(cx, stream)? {
333			return Ok(());
334		}
335
336		if self.pulling {
337			self.pull_again = true;
338			return Ok(());
339		}
340
341		self.pulling = true;
342
343		let promise = self.source.pull(cx, self.reflector.get())?;
344		if let Some(promise) = promise {
345			let controller1 = TracedHeap::new(stream.controller.get());
346			let controller2 = TracedHeap::new(stream.controller.get());
347
348			promise.add_reactions(
349				cx,
350				move |cx, _| {
351					let controller = C::from_traced_heap(cx, &controller1)?;
352					controller.common().pulling = false;
353					if controller.common().pull_again {
354						controller.common().pull_again = false;
355						controller.pull_if_needed(cx)?;
356					}
357					Ok(Value::undefined_handle())
358				},
359				move |cx, error| {
360					let controller = C::from_traced_heap(cx, &controller2)?;
361					controller.error_internal(cx, error)?;
362					Ok(Value::undefined_handle())
363				},
364			);
365		}
366		Ok(())
367	}
368
369	pub(crate) fn desired_size(&self, cx: &Context) -> Result<JSVal> {
370		let size = match self.stream(cx)?.state {
371			State::Readable => DoubleValue(self.high_water_mark - self.queue_size as f64),
372			State::Closed => Int32Value(0),
373			State::Errored => NullValue(),
374		};
375		Ok(size)
376	}
377}
378
379pub(crate) trait ControllerInternals: ClassDefinition {
380	fn from_traced_heap<'h>(cx: &Context, heap: &'h TracedHeap<*mut JSObject>) -> Result<&'h mut Self> {
381		let controller = Object::from(heap.to_local());
382		Self::get_mut_private(cx, &controller)
383	}
384
385	fn common(&mut self) -> &mut CommonController;
386
387	fn start(&mut self, cx: &Context, start: Option<&Function>) -> ResultExc<()> {
388		self.common().start::<Self>(cx, start)
389	}
390
391	fn pull_if_needed(&mut self, cx: &Context) -> ResultExc<()> {
392		self.common().pull_if_needed::<Self>(cx)
393	}
394
395	fn reset_queue(&mut self, cx: &Context);
396
397	fn clear_algorithms(&mut self) {
398		self.common().source.clear_algorithms();
399	}
400
401	fn error_internal(&mut self, cx: &Context, error: &Value) -> Result<()> {
402		if self.common().stream(cx)?.state == State::Readable {
403			self.reset_queue(cx);
404			self.clear_algorithms();
405			self.common().stream(cx)?.error(cx, error)
406		} else {
407			Ok(())
408		}
409	}
410}
411
412#[js_class]
413#[ion(name = "ReadableStreamDefaultController")]
414pub struct DefaultController {
415	pub(crate) common: CommonController,
416	pub(crate) size: SizeCallback,
417	pub(crate) queue: VecDeque<(Box<Heap<JSVal>>, u64)>,
418}
419
420#[js_class]
421impl DefaultController {
422	pub(crate) fn initialise(
423		stream: &Object, source_object: Option<&Object>, source: &UnderlyingSource, strategy: &QueuingStrategy,
424		high_water_mark: f64,
425	) -> DefaultController {
426		DefaultController {
427			common: CommonController::new_from_script(stream, source_object, source, high_water_mark),
428			size: strategy.size(),
429			queue: VecDeque::new(),
430		}
431	}
432
433	#[ion(get)]
434	pub fn get_desired_size(&self, cx: &Context) -> Result<JSVal> {
435		self.common.desired_size(cx)
436	}
437
438	pub fn close(&mut self, cx: &Context) -> ResultExc<()> {
439		let stream = self.common.stream(cx)?;
440		if self.common.can_close_or_enqueue(stream) {
441			if self.queue.is_empty() {
442				self.common.close_requested = true;
443			}
444			self.common.source.clear_algorithms();
445			self.size = SizeCallback::None;
446
447			stream.close(cx)
448		} else {
449			Err(Error::new("Cannot Close Stream", ErrorKind::Type).into())
450		}
451	}
452
453	pub fn enqueue(&mut self, cx: &Context, chunk: Value) -> ResultExc<()> {
454		self.enqueue_internal(cx, &chunk)
455	}
456
457	pub(crate) fn enqueue_internal(&mut self, cx: &Context, chunk: &Value) -> ResultExc<()> {
458		let stream = self.common.stream(cx)?;
459		if self.common.can_close_or_enqueue(stream) {
460			if let Some(Reader::Default(reader)) = stream.native_reader(cx)?
461				&& let Some(request) = reader.common.requests.pop_front()
462			{
463				let promise = request.promise();
464				(request.chunk)(cx, &promise, chunk);
465				return Ok(());
466			}
467
468			match self.size.compute(cx, chunk) {
469				Ok(size) => {
470					let size = u64::from_value(cx, &size, false, ConversionBehavior::EnforceRange);
471					match size {
472						Ok(size) => {
473							self.queue.push_back((Heap::boxed(chunk.get()), size));
474							self.common.queue_size += size as usize;
475							self.pull_if_needed(cx)?;
476						}
477						Err(error) => {
478							self.error_internal(cx, &error.as_value(cx))?;
479						}
480					}
481				}
482				Err(exception) => {
483					self.error_internal(cx, &exception.as_value(cx))?;
484				}
485			}
486			Ok(())
487		} else {
488			Err(Error::new("Cannot Enqueue to Stream", ErrorKind::Type).into())
489		}
490	}
491
492	pub fn error(&mut self, cx: &Context, error: Option<Value>) -> Result<()> {
493		self.error_internal(cx, &error.unwrap_or_else(Value::undefined_handle))
494	}
495}
496
497impl ControllerInternals for DefaultController {
498	fn common(&mut self) -> &mut CommonController {
499		&mut self.common
500	}
501
502	fn reset_queue(&mut self, _: &Context) {
503		self.queue.clear();
504		self.common.queue_size = 0;
505	}
506
507	fn clear_algorithms(&mut self) {
508		self.common().source.clear_algorithms();
509		self.size = SizeCallback::None;
510	}
511}
512
513#[js_class]
514#[ion(name = "ReadableByteStreamController")]
515pub struct ByteStreamController {
516	pub(crate) common: CommonController,
517	pub(crate) auto_allocate_chunk_size: usize,
518	pub(crate) byob_request: Option<Box<Heap<*mut JSObject>>>,
519	pub(crate) pending_descriptors: VecDeque<PullIntoDescriptor>,
520	pub(crate) queue: VecDeque<(Box<Heap<*mut JSObject>>, usize, usize)>,
521}
522
523#[js_class]
524impl ByteStreamController {
525	pub(crate) fn initialise(
526		stream: &Object, source_object: &Object, source: &UnderlyingSource, high_water_mark: f64,
527	) -> Result<ByteStreamController> {
528		if let Some(auto_allocate_chunk_size) = source.auto_allocate_chunk_size
529			&& auto_allocate_chunk_size == 0
530		{
531			return Err(Error::new("autoAllocateChunkSize can not be zero.", ErrorKind::Type));
532		}
533
534		Ok(ByteStreamController {
535			common: CommonController::new_from_script(stream, Some(source_object), source, high_water_mark),
536			auto_allocate_chunk_size: source.auto_allocate_chunk_size.unwrap_or(0) as usize,
537			byob_request: None,
538			pending_descriptors: VecDeque::new(),
539			queue: VecDeque::new(),
540		})
541	}
542
543	pub(crate) fn fill_pull_into_descriptor(
544		&mut self, cx: &Context, descriptor: &mut PullIntoDescriptor,
545	) -> Result<bool> {
546		let max_copy = self.common.queue_size.min(descriptor.length - descriptor.filled);
547		let max_aligned = descriptor.filled + max_copy - (descriptor.filled + max_copy) % descriptor.element;
548
549		let ready = max_aligned > descriptor.min;
550
551		let mut remaining = if ready {
552			max_aligned - descriptor.filled
553		} else {
554			max_copy
555		};
556
557		while remaining > 0 {
558			let mut copy = remaining;
559			let mut len = 0;
560
561			if let Some((chunk, offset, length)) = self.queue.front() {
562				copy = copy.min(*length);
563				len = *length;
564				let chunk = ArrayBuffer::from(unsafe { Local::from_heap(chunk) }).unwrap();
565				let buffer = descriptor.buffer();
566				if !chunk.copy_data_to(cx, &buffer, *offset, descriptor.offset + descriptor.filled, copy) {
567					let error = if let Some(Exception::Error(error)) = Exception::new(cx)? {
568						error
569					} else {
570						Error::new("Failed to copy data to descriptor buffer.", None)
571					};
572					return Err(error);
573				}
574			}
575
576			if copy == len {
577				self.queue.pop_front();
578			} else if let Some((_, offset, length)) = self.queue.get_mut(0) {
579				*offset += copy;
580				*length -= copy;
581			}
582			self.common.queue_size -= copy;
583			descriptor.filled += copy;
584			remaining -= copy;
585		}
586
587		if !ready {
588			// TODO: Assert Queue Size 0, Assert Filled > 0, Assert Filled < Element Size
589		}
590
591		Ok(ready)
592	}
593
594	pub(crate) fn invalidate_byob_request(&mut self, cx: &Context) -> Result<()> {
595		if let Some(request) = self.byob_request.take() {
596			let request = Object::from(unsafe { Local::from_heap(&request) });
597			let request = ByobRequest::get_mut_private(cx, &request)?;
598			request.controller = None;
599			request.view = None;
600		}
601		Ok(())
602	}
603
604	pub(crate) fn enqueue_cloned_chunk(
605		&mut self, cx: &Context, buffer: &ArrayBuffer, offset: usize, length: usize,
606	) -> Result<()> {
607		let Some(buffer) = buffer.clone(cx, offset, length) else {
608			let error = if let Some(Exception::Error(error)) = Exception::new(cx)? {
609				error
610			} else {
611				Error::new("Failed to clone ArrayBuffer", None)
612			};
613			self.error_internal(cx, &error.as_value(cx))?;
614			return Err(error);
615		};
616
617		self.queue.push_back((Heap::boxed(buffer.get()), 0, length));
618		self.common.queue_size += length;
619		Ok(())
620	}
621
622	pub(crate) fn process_descriptors(&mut self, cx: &Context, reader: &mut ByobReader, state: State) -> ResultExc<()> {
623		while !self.pending_descriptors.is_empty() {
624			if self.common.queue_size == 0 {
625				break;
626			}
627
628			let mut shift = false;
629
630			let descriptor = ptr::from_mut(self.pending_descriptors.get_mut(0).unwrap());
631			if self.fill_pull_into_descriptor(cx, unsafe { &mut *descriptor })? {
632				shift = true;
633			}
634
635			if shift {
636				let mut descriptor = self.pending_descriptors.pop_front().unwrap();
637				descriptor.commit(cx, reader, state)?;
638			}
639		}
640		Ok(())
641	}
642
643	pub(crate) fn respond(&mut self, cx: &Context, written: usize) -> ResultExc<()> {
644		let descriptor = self.pending_descriptors.front().unwrap();
645		let stream = self.common.stream(cx)?;
646		match stream.state {
647			State::Readable => {
648				if written == 0 {
649					return Err(Error::new("Readable Stream must be written to.", ErrorKind::Type).into());
650				}
651				if descriptor.filled + written > descriptor.length {
652					return Err(
653						Error::new("Buffer of BYOB Request View has been overwritten.", ErrorKind::Range).into(),
654					);
655				}
656			}
657			State::Closed => {
658				if written != 0 {
659					return Err(Error::new("Closed Stream must not be written to.", ErrorKind::Type).into());
660				}
661			}
662			State::Errored => return Err(Error::new("Errored Stream cannot have BYOB Request", ErrorKind::Type).into()),
663		}
664
665		let (buffer, kind) = {
666			let descriptor = self.pending_descriptors.get_mut(0).unwrap();
667			let buffer = descriptor.buffer().transfer(cx)?;
668			descriptor.buffer.set(buffer.get());
669			(buffer, descriptor.kind)
670		};
671
672		self.invalidate_byob_request(cx)?;
673
674		match stream.state {
675			State::Readable => {
676				let descriptor = self.pending_descriptors.front_mut().unwrap();
677				descriptor.filled += written;
678
679				let PullIntoDescriptor { filled, offset, length, min, .. } = *descriptor;
680
681				if let ReaderKind::None = kind {
682					if filled > 0 {
683						self.enqueue_cloned_chunk(cx, &buffer, offset, length)?;
684					}
685
686					if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? {
687						self.process_descriptors(cx, reader, stream.state)?;
688					}
689				} else {
690					if filled < min {
691						return Ok(());
692					}
693
694					let mut descriptor = self.pending_descriptors.pop_front().unwrap();
695					let remainder = descriptor.filled % descriptor.element;
696
697					if remainder > 0 {
698						self.enqueue_cloned_chunk(
699							cx,
700							&buffer,
701							descriptor.offset + descriptor.filled - remainder,
702							remainder,
703						)?;
704
705						descriptor.filled -= remainder;
706					}
707
708					if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? {
709						descriptor.commit(cx, reader, stream.state)?;
710						self.process_descriptors(cx, reader, stream.state)?;
711					}
712				}
713			}
714			State::Closed => match stream.native_reader(cx)? {
715				Some(Reader::Byob(reader)) => {
716					while !reader.common.requests.is_empty() {
717						let mut descriptor = self.pending_descriptors.pop_front().unwrap();
718						descriptor.commit(cx, reader, State::Closed)?;
719					}
720				}
721				_ => {
722					self.pending_descriptors.pop_front();
723				}
724			},
725			State::Errored => unreachable!(),
726		}
727
728		self.pull_if_needed(cx)
729	}
730
731	pub(crate) fn respond_with_new_view(&mut self, cx: &Context, view: ArrayBufferView) -> ResultExc<()> {
732		let buffer = view.buffer(cx);
733
734		if buffer.is_detached() {
735			return Err(Error::new("View Buffer cannot be detached.", ErrorKind::Type).into());
736		}
737
738		let stream = self.common.stream(cx)?;
739		match stream.state {
740			State::Readable => {
741				if view.is_empty() {
742					return Err(Error::new("View must have a non-zero length", ErrorKind::Type).into());
743				}
744			}
745			State::Closed => {
746				if !view.is_empty() {
747					return Err(Error::new(
748						"View for a Closed Readable Stream must have a zero length",
749						ErrorKind::Type,
750					)
751					.into());
752				}
753			}
754			State::Errored => unreachable!(),
755		}
756
757		let offset = view.offset();
758		let descriptor = self.pending_descriptors.get_mut(0).unwrap();
759
760		if descriptor.offset + descriptor.filled != offset {
761			return Err(Error::new("View Offset must be the same as descriptor.", ErrorKind::Range).into());
762		}
763		if descriptor.length != view.len() {
764			return Err(Error::new("View Length must be the same as descriptor.", ErrorKind::Range).into());
765		}
766		if descriptor.filled + view.len() > descriptor.length {
767			return Err(Error::new("View cannot overfill descriptor", ErrorKind::Range).into());
768		}
769
770		let len = view.len();
771		let buffer = buffer.transfer(cx)?;
772		descriptor.buffer.set(buffer.get());
773		self.respond(cx, len)
774	}
775
776	#[ion(get)]
777	pub fn get_desired_size(&self, cx: &Context) -> Result<JSVal> {
778		self.common.desired_size(cx)
779	}
780
781	#[ion(get)]
782	pub fn get_byob_request(&mut self, cx: &Context) -> *mut JSObject {
783		if self.byob_request.is_none() && !self.pending_descriptors.is_empty() {
784			let descriptor = self.pending_descriptors.front().unwrap();
785			let view = Uint8Array::with_array_buffer(
786				cx,
787				&descriptor.buffer(),
788				descriptor.offset + descriptor.filled,
789				descriptor.length - descriptor.filled,
790			)
791			.unwrap();
792
793			let request = ByobRequest {
794				reflector: Reflector::default(),
795				controller: Some(Heap::boxed(self.reflector().get())),
796				view: Some(Heap::boxed(view.get())),
797			};
798			self.byob_request = Some(Heap::boxed(ByobRequest::new_object(cx, Box::new(request))));
799		}
800
801		if let Some(request) = &self.byob_request {
802			request.get()
803		} else {
804			ptr::null_mut()
805		}
806	}
807
808	pub fn close(&mut self, cx: &Context) -> ResultExc<()> {
809		let stream = self.common.stream(cx)?;
810		if self.common.can_close_or_enqueue(stream) {
811			if self.common.queue_size > 0 {
812				self.common.close_requested = true;
813			}
814
815			if let Some(descriptor) = self.pending_descriptors.front()
816				&& descriptor.filled % descriptor.element > 0
817			{
818				let error = Error::new("Pending Pull-Into Not Empty", ErrorKind::Type);
819				self.error_internal(cx, &error.as_value(cx))?;
820				return Err(error.into());
821			}
822
823			self.common.source.clear_algorithms();
824			stream.close(cx)
825		} else {
826			Err(Error::new("Cannot Close Byte Stream Controller", ErrorKind::Type).into())
827		}
828	}
829
830	pub fn enqueue(&mut self, cx: &Context, chunk: ArrayBufferView) -> ResultExc<()> {
831		if chunk.is_empty() {
832			return Err(Error::new("Chunk must contain bytes.", ErrorKind::Type).into());
833		}
834
835		let buffer = chunk.buffer(cx);
836		if buffer.is_empty() {
837			return Err(Error::new("Chunk must contain bytes.", ErrorKind::Type).into());
838		}
839
840		let stream = self.common.stream(cx)?;
841		if self.common.can_close_or_enqueue(stream) {
842			let offset = chunk.offset();
843			let length = chunk.len();
844			let buffer = buffer.transfer(cx)?;
845
846			let mut shift = false;
847			if !self.pending_descriptors.is_empty() {
848				self.invalidate_byob_request(cx)?;
849				let descriptor = self.pending_descriptors.front().unwrap();
850				let buffer = descriptor.buffer().transfer(cx)?;
851				descriptor.buffer.set(buffer.get());
852				if descriptor.kind == ReaderKind::None {
853					if descriptor.filled > 0 {
854						self.enqueue_cloned_chunk(cx, &buffer, descriptor.offset, descriptor.length)?;
855					}
856					shift = true;
857				}
858			}
859
860			if shift {
861				self.pending_descriptors.pop_front();
862			}
863
864			match stream.native_reader(cx)? {
865				Some(Reader::Default(reader)) => {
866					let mut complete = false;
867					while let Some(request) = reader.common.requests.pop_front() {
868						let promise = request.promise();
869
870						if self.common.queue_size == 0 {
871							self.pending_descriptors.pop_front();
872
873							let array =
874								Uint8Array::with_array_buffer(cx, &buffer, offset, length).unwrap().as_value(cx);
875							(request.chunk)(cx, &promise, &array);
876
877							complete = true;
878							break;
879						}
880
881						let (buffer, offset, length) = self.queue.pop_front().unwrap();
882						self.common.queue_size -= length;
883
884						if self.common.queue_size == 0 && self.common.close_requested {
885							self.close(cx)?;
886						} else {
887							self.pull_if_needed(cx)?;
888						}
889
890						let buffer = ArrayBuffer::from(unsafe { Local::from_heap(&buffer) }).unwrap();
891						let array = Uint8Array::with_array_buffer(cx, &buffer, offset, length).unwrap().as_value(cx);
892
893						(request.chunk)(cx, &promise, &array);
894					}
895
896					if !complete {
897						self.queue.push_back((Heap::boxed(buffer.get()), offset, length));
898						self.common.queue_size += length;
899					}
900				}
901				Some(Reader::Byob(reader)) => {
902					self.queue.push_back((Heap::boxed(buffer.get()), offset, length));
903					self.common.queue_size += length;
904
905					self.process_descriptors(cx, reader, stream.state)?;
906				}
907				None => {
908					self.queue.push_back((Heap::boxed(buffer.get()), offset, length));
909					self.common.queue_size += length;
910				}
911			}
912			self.pull_if_needed(cx)
913		} else {
914			Err(Error::new("Cannot Enqueue to Stream", ErrorKind::Type).into())
915		}
916	}
917
918	pub fn error(&mut self, cx: &Context, error: Option<Value>) -> Result<()> {
919		self.error_internal(cx, &error.unwrap_or_else(Value::undefined_handle))
920	}
921}
922
923impl ControllerInternals for ByteStreamController {
924	fn common(&mut self) -> &mut CommonController {
925		&mut self.common
926	}
927
928	fn reset_queue(&mut self, cx: &Context) {
929		self.invalidate_byob_request(cx).unwrap();
930		self.pending_descriptors.clear();
931		self.queue.clear();
932		self.common.queue_size = 0;
933	}
934}
935
936#[js_class]
937#[ion(name = "ReadableStreamBYOBRequest")]
938pub struct ByobRequest {
939	reflector: Reflector,
940	pub(crate) controller: Option<Box<Heap<*mut JSObject>>>,
941	pub(crate) view: Option<Box<Heap<*mut JSObject>>>,
942}
943
944#[js_class]
945impl ByobRequest {
946	#[ion(get)]
947	pub fn get_view(&self) -> *mut JSObject {
948		self.view.as_ref().map_or_else(ptr::null_mut, |view| view.get())
949	}
950
951	pub fn respond(
952		&mut self, cx: &Context, #[ion(convert = ConversionBehavior::Clamp
953	)]
954		written: u64,
955	) -> ResultExc<()> {
956		if let Some(controller) = self.controller.take() {
957			let view = unsafe { Local::from_heap(self.view.as_ref().unwrap()) };
958			let view = ArrayBufferView::from(view).unwrap();
959			let buffer = view.buffer(cx);
960
961			if view.is_empty() || buffer.is_empty() {
962				return Err(Error::new("View and Buffer must have a non-zero length.", ErrorKind::Type).into());
963			}
964
965			if buffer.is_detached() {
966				return Err(Error::new("View Buffer cannot be detached.", ErrorKind::Type).into());
967			}
968
969			let controller = Object::from(unsafe { Local::from_heap(&controller) });
970			let controller = ByteStreamController::get_mut_private(cx, &controller)?;
971			controller.respond(cx, written as usize)
972		} else {
973			Err(Error::new("BYOB Request has already been invalidated.", ErrorKind::Type).into())
974		}
975	}
976
977	#[ion(name = "respondWithNewView")]
978	pub fn respond_with_new_view(&mut self, cx: &Context, view: ArrayBufferView) -> ResultExc<()> {
979		if let Some(controller) = self.controller.take() {
980			let controller = Object::from(unsafe { Local::from_heap(&controller) });
981			let controller = ByteStreamController::get_mut_private(cx, &controller)?;
982			controller.respond_with_new_view(cx, view)
983		} else {
984			Err(Error::new("BYOB Request has already been invalidated.", ErrorKind::Type).into())
985		}
986	}
987}