1use std::collections::VecDeque;
8use std::ptr;
9
10use ion::class::{NativeObject as _, Reflector};
11use ion::conversions::{FromValue as _, ToValue as _};
12use ion::typedarray::{ArrayBuffer, ArrayBufferView, Uint8Array, type_to_constructor};
13use ion::{
14 ClassDefinition, Context, Error, ErrorKind, Exception, Function, Local, Object, Promise, Result, ResultExc,
15 Traceable, TracedHeap, Value, js_class,
16};
17use mozjs::conversions::ConversionBehavior;
18use mozjs::jsapi::{Handle, Heap, JSContext, JSObject, Type};
19use mozjs::jsval::{DoubleValue, Int32Value, JSVal, NullValue, UndefinedValue};
20
21use crate::globals::streams::readable::reader::{Reader, ReaderKind, Request};
22use crate::globals::streams::readable::{ByobReader, ReadableStream, State, StreamSource, UnderlyingSource};
23use crate::globals::streams::strategy::{QueuingStrategy, SizeCallback};
24
25#[derive(Traceable)]
26pub(crate) struct PullIntoDescriptor {
27 pub(crate) buffer: Box<Heap<*mut JSObject>>,
28 pub(crate) offset: usize,
29 pub(crate) length: usize,
30 pub(crate) filled: usize,
31 pub(crate) min: usize,
32 pub(crate) element: usize,
33 pub(crate) constructor: unsafe extern "C" fn(*mut JSContext, Handle<*mut JSObject>, usize, i64) -> *mut JSObject,
34 pub(crate) kind: ReaderKind,
35}
36
37impl PullIntoDescriptor {
38 pub(crate) fn buffer(&self) -> ArrayBuffer<'_> {
39 ArrayBuffer::from(unsafe { Local::from_heap(&self.buffer) }).unwrap()
40 }
41
42 pub(crate) fn construct<'cx>(&self, cx: &'cx Context) -> Result<ArrayBufferView<'cx>> {
43 let view = unsafe {
44 (self.constructor)(
45 cx.as_ptr(),
46 self.buffer.handle(),
47 self.offset,
48 (self.filled / self.element) as i64,
49 )
50 };
51
52 if !view.is_null() {
53 Ok(ArrayBufferView::from(cx.root(view)).unwrap())
54 } else if let Some(Exception::Error(exception)) = Exception::new(cx)? {
55 Err(exception)
56 } else {
57 Err(Error::new("Failed to Initialise Array Buffer", None))
58 }
59 }
60
61 pub(crate) fn commit(&mut self, cx: &Context, reader: &mut ByobReader, state: State) -> ResultExc<()> {
62 let mut done = false;
63
64 let buffer = self.buffer();
65 if state == State::Closed {
66 done = true;
67 }
68 let buffer = buffer.transfer(cx)?;
69
70 self.buffer.set(buffer.get());
71 let view = self.construct(cx)?.as_value(cx);
72
73 let request = reader.common.requests.pop_front().unwrap();
74 let promise = request.promise();
75
76 if !done {
77 (request.chunk)(cx, &promise, &view);
78 } else {
79 (request.close)(cx, &promise, Some(&view))?;
80 }
81 Ok(())
82 }
83}
84
85#[derive(Copy, Clone, Debug, PartialEq, Eq, Traceable)]
86pub enum ControllerKind {
87 Default,
88 ByteStream,
89}
90
91pub enum Controller<'c> {
92 Default(&'c mut DefaultController),
93 ByteStream(&'c mut ByteStreamController),
94}
95
96impl<'c> Controller<'c> {
97 pub(crate) fn common_mut(&mut self) -> &mut CommonController {
98 match self {
99 Controller::Default(controller) => &mut controller.common,
100 Controller::ByteStream(controller) => &mut controller.common,
101 }
102 }
103
104 pub(crate) fn into_default(self) -> Option<&'c mut DefaultController> {
105 match self {
106 Controller::Default(controller) => Some(controller),
107 Controller::ByteStream(_) => None,
108 }
109 }
110
111 pub(crate) fn into_byte_stream(self) -> Option<&'c mut ByteStreamController> {
112 match self {
113 Controller::ByteStream(controller) => Some(controller),
114 Controller::Default(_) => None,
115 }
116 }
117
118 pub fn cancel<'cx: 'v, 'v>(&mut self, cx: &'cx Context, reason: Option<Value<'v>>) -> ResultExc<Promise<'cx>> {
119 match self {
120 Controller::Default(controller) => controller.reset_queue(cx),
121 Controller::ByteStream(controller) => controller.reset_queue(cx),
122 }
123 let common = self.common_mut();
124
125 let mut promise = Promise::new(cx);
126 common.source.cancel(cx, &mut promise, reason)?;
127 common.source.clear_algorithms();
128 Ok(promise)
129 }
130
131 pub fn pull(&mut self, cx: &Context, promise: &Promise, request: Request) -> ResultExc<()> {
132 match self {
133 Controller::Default(controller) => {
134 if let Some((chunk, size)) = controller.queue.pop_front() {
135 controller.common.queue_size -= size as usize;
136
137 if controller.common.close_requested && controller.queue.is_empty() {
138 controller.common.source.clear_algorithms();
139 controller.size = SizeCallback::None;
140
141 let stream = controller.common.stream(cx)?;
142 stream.close(cx)?;
143 } else {
144 controller.pull_if_needed(cx)?;
145 }
146
147 let chunk = Value::from(unsafe { Local::from_heap(&chunk) });
148 (request.chunk)(cx, promise, &chunk);
149 } else {
150 let stream = controller.common.stream(cx)?;
151 match stream.native_reader(cx)? {
152 Some(Reader::Default(reader)) => {
153 if stream.state != State::Readable {
154 return Err(Error::new("Cannot Add Read Request to Read Queue", None).into());
155 }
156 reader.common.requests.push_back(request);
157 }
158 _ => return Ok(()),
159 }
160 controller.pull_if_needed(cx)?;
161 }
162 }
163 Controller::ByteStream(controller) => {
164 {
165 let stream = controller.common.stream(cx)?;
166 if stream.reader_kind != ReaderKind::Default {
167 return Err(Error::new("Reader should have default reader.", ErrorKind::Type).into());
168 }
169 }
170
171 if controller.common.queue_size > 0 {
172 let (buffer, offset, length) = controller.queue.pop_front().unwrap();
173 controller.common.queue_size -= length;
174
175 if controller.common.queue_size == 0 && controller.common.close_requested {
176 controller.close(cx)?;
177 } else {
178 controller.pull_if_needed(cx)?;
179 }
180
181 let buffer = ArrayBuffer::from(unsafe { Local::from_heap(&buffer) }).unwrap();
182 let array = Uint8Array::with_array_buffer(cx, &buffer, offset, length).unwrap().as_value(cx);
183
184 (request.chunk)(cx, promise, &array);
185 } else {
186 if controller.auto_allocate_chunk_size != 0 {
187 let Some(buffer) = ArrayBuffer::new(cx, controller.auto_allocate_chunk_size) else {
188 controller.error_internal(cx, &Exception::new(cx).unwrap().as_value(cx))?;
189 return Ok(());
190 };
191
192 controller.pending_descriptors.push_back(PullIntoDescriptor {
193 buffer: Heap::boxed(buffer.get()),
194 offset: 0,
195 length: controller.auto_allocate_chunk_size,
196 filled: 0,
197 min: 1,
198 element: 1,
199 constructor: type_to_constructor(Type::Uint8),
200 kind: ReaderKind::Default,
201 });
202 }
203
204 let stream = controller.common.stream(cx)?;
205 if let Some(Reader::Default(reader)) = stream.native_reader(cx)? {
206 reader.common.requests.push_back(request);
207 }
208 controller.pull_if_needed(cx)?;
209 }
210 }
211 }
212 Ok(())
213 }
214
215 pub fn release(&mut self) {
216 match self {
217 Controller::Default(_) => {}
218 Controller::ByteStream(controller) => {
219 if let Some(mut descriptor) = controller.pending_descriptors.pop_front() {
220 controller.pending_descriptors.clear();
221 descriptor.kind = ReaderKind::None;
222 controller.pending_descriptors.push_back(descriptor);
223 }
224 }
225 }
226 }
227}
228
229#[js_class]
230#[ion(name = "ReadableStreamCommonController")]
231pub struct CommonController {
232 reflector: Reflector,
233
234 pub(crate) stream: Box<Heap<*mut JSObject>>,
235 pub(crate) source: StreamSource,
236
237 pub(crate) started: bool,
238 pub(crate) pulling: bool,
239 pub(crate) pull_again: bool,
240 pub(crate) close_requested: bool,
241
242 high_water_mark: f64,
243 pub(crate) queue_size: usize,
244}
245
246impl CommonController {
247 pub fn new(stream: &Object, source: StreamSource, high_water_mark: f64) -> CommonController {
248 CommonController {
249 reflector: Reflector::default(),
250
251 stream: Heap::boxed(stream.handle().get()),
252 source,
253
254 started: false,
255 pulling: false,
256 pull_again: false,
257 close_requested: false,
258
259 high_water_mark,
260 queue_size: 0,
261 }
262 }
263}
264
265#[js_class]
266impl CommonController {
267 pub(crate) fn new_from_script(
268 stream: &Object, source_object: Option<&Object>, source: &UnderlyingSource, high_water_mark: f64,
269 ) -> CommonController {
270 CommonController::new(stream, source.to_native(source_object), high_water_mark)
271 }
272
273 #[expect(clippy::mut_from_ref)]
274 pub(crate) fn stream<'cx>(&self, cx: &'cx Context) -> Result<&'cx mut ReadableStream> {
275 let stream = Object::from(unsafe { Local::from_heap(&*ptr::from_ref(&self.stream)) });
276 ReadableStream::get_mut_private(cx, &stream)
277 }
278
279 pub(crate) fn start<C: ControllerInternals>(&mut self, cx: &Context, start: Option<&Function>) -> ResultExc<()> {
280 let controller = self.reflector().get();
281
282 let underlying_source = self.source.source_object();
283 let value = controller.as_value(cx);
284 let result = start
285 .map_or_else(
286 || Ok(UndefinedValue()),
287 |start| start.call(cx, &underlying_source, &[value]).map(|v| v.get()),
288 )
289 .map_err(|report| report.unwrap().exception)?;
290
291 let promise = Promise::resolved(cx, &Value::from(cx.root(result)));
292
293 let controller1 = TracedHeap::new(controller);
294 let controller2 = TracedHeap::new(controller);
295 promise.add_reactions(
296 cx,
297 move |cx, _| {
298 let controller = C::from_traced_heap(cx, &controller1)?;
299 controller.common().started = true;
300 controller.pull_if_needed(cx)?;
301 Ok(Value::undefined_handle())
302 },
303 move |cx, error| {
304 let controller = C::from_traced_heap(cx, &controller2)?;
305 controller.error_internal(cx, error)?;
306 Ok(Value::undefined_handle())
307 },
308 );
309
310 Ok(())
311 }
312
313 pub(crate) fn can_close_or_enqueue(&self, stream: &ReadableStream) -> bool {
314 stream.state == State::Readable && !self.close_requested
315 }
316
317 pub(crate) fn should_call_pull(&self, cx: &Context, stream: &ReadableStream) -> Result<bool> {
318 if !self.can_close_or_enqueue(stream) || !self.started {
319 return Ok(false);
320 }
321
322 if let Some(reader) = stream.native_reader(cx)?
323 && !reader.is_empty()
324 {
325 return Ok(true);
326 }
327 Ok(stream.state == State::Readable && self.high_water_mark > self.queue_size as f64)
328 }
329
330 pub(crate) fn pull_if_needed<C: ControllerInternals>(&mut self, cx: &Context) -> ResultExc<()> {
331 let stream = self.stream(cx)?;
332 if !self.should_call_pull(cx, stream)? {
333 return Ok(());
334 }
335
336 if self.pulling {
337 self.pull_again = true;
338 return Ok(());
339 }
340
341 self.pulling = true;
342
343 let promise = self.source.pull(cx, self.reflector.get())?;
344 if let Some(promise) = promise {
345 let controller1 = TracedHeap::new(stream.controller.get());
346 let controller2 = TracedHeap::new(stream.controller.get());
347
348 promise.add_reactions(
349 cx,
350 move |cx, _| {
351 let controller = C::from_traced_heap(cx, &controller1)?;
352 controller.common().pulling = false;
353 if controller.common().pull_again {
354 controller.common().pull_again = false;
355 controller.pull_if_needed(cx)?;
356 }
357 Ok(Value::undefined_handle())
358 },
359 move |cx, error| {
360 let controller = C::from_traced_heap(cx, &controller2)?;
361 controller.error_internal(cx, error)?;
362 Ok(Value::undefined_handle())
363 },
364 );
365 }
366 Ok(())
367 }
368
369 pub(crate) fn desired_size(&self, cx: &Context) -> Result<JSVal> {
370 let size = match self.stream(cx)?.state {
371 State::Readable => DoubleValue(self.high_water_mark - self.queue_size as f64),
372 State::Closed => Int32Value(0),
373 State::Errored => NullValue(),
374 };
375 Ok(size)
376 }
377}
378
379pub(crate) trait ControllerInternals: ClassDefinition {
380 fn from_traced_heap<'h>(cx: &Context, heap: &'h TracedHeap<*mut JSObject>) -> Result<&'h mut Self> {
381 let controller = Object::from(heap.to_local());
382 Self::get_mut_private(cx, &controller)
383 }
384
385 fn common(&mut self) -> &mut CommonController;
386
387 fn start(&mut self, cx: &Context, start: Option<&Function>) -> ResultExc<()> {
388 self.common().start::<Self>(cx, start)
389 }
390
391 fn pull_if_needed(&mut self, cx: &Context) -> ResultExc<()> {
392 self.common().pull_if_needed::<Self>(cx)
393 }
394
395 fn reset_queue(&mut self, cx: &Context);
396
397 fn clear_algorithms(&mut self) {
398 self.common().source.clear_algorithms();
399 }
400
401 fn error_internal(&mut self, cx: &Context, error: &Value) -> Result<()> {
402 if self.common().stream(cx)?.state == State::Readable {
403 self.reset_queue(cx);
404 self.clear_algorithms();
405 self.common().stream(cx)?.error(cx, error)
406 } else {
407 Ok(())
408 }
409 }
410}
411
412#[js_class]
413#[ion(name = "ReadableStreamDefaultController")]
414pub struct DefaultController {
415 pub(crate) common: CommonController,
416 pub(crate) size: SizeCallback,
417 pub(crate) queue: VecDeque<(Box<Heap<JSVal>>, u64)>,
418}
419
420#[js_class]
421impl DefaultController {
422 pub(crate) fn initialise(
423 stream: &Object, source_object: Option<&Object>, source: &UnderlyingSource, strategy: &QueuingStrategy,
424 high_water_mark: f64,
425 ) -> DefaultController {
426 DefaultController {
427 common: CommonController::new_from_script(stream, source_object, source, high_water_mark),
428 size: strategy.size(),
429 queue: VecDeque::new(),
430 }
431 }
432
433 #[ion(get)]
434 pub fn get_desired_size(&self, cx: &Context) -> Result<JSVal> {
435 self.common.desired_size(cx)
436 }
437
438 pub fn close(&mut self, cx: &Context) -> ResultExc<()> {
439 let stream = self.common.stream(cx)?;
440 if self.common.can_close_or_enqueue(stream) {
441 if self.queue.is_empty() {
442 self.common.close_requested = true;
443 }
444 self.common.source.clear_algorithms();
445 self.size = SizeCallback::None;
446
447 stream.close(cx)
448 } else {
449 Err(Error::new("Cannot Close Stream", ErrorKind::Type).into())
450 }
451 }
452
453 pub fn enqueue(&mut self, cx: &Context, chunk: Value) -> ResultExc<()> {
454 self.enqueue_internal(cx, &chunk)
455 }
456
457 pub(crate) fn enqueue_internal(&mut self, cx: &Context, chunk: &Value) -> ResultExc<()> {
458 let stream = self.common.stream(cx)?;
459 if self.common.can_close_or_enqueue(stream) {
460 if let Some(Reader::Default(reader)) = stream.native_reader(cx)?
461 && let Some(request) = reader.common.requests.pop_front()
462 {
463 let promise = request.promise();
464 (request.chunk)(cx, &promise, chunk);
465 return Ok(());
466 }
467
468 match self.size.compute(cx, chunk) {
469 Ok(size) => {
470 let size = u64::from_value(cx, &size, false, ConversionBehavior::EnforceRange);
471 match size {
472 Ok(size) => {
473 self.queue.push_back((Heap::boxed(chunk.get()), size));
474 self.common.queue_size += size as usize;
475 self.pull_if_needed(cx)?;
476 }
477 Err(error) => {
478 self.error_internal(cx, &error.as_value(cx))?;
479 }
480 }
481 }
482 Err(exception) => {
483 self.error_internal(cx, &exception.as_value(cx))?;
484 }
485 }
486 Ok(())
487 } else {
488 Err(Error::new("Cannot Enqueue to Stream", ErrorKind::Type).into())
489 }
490 }
491
492 pub fn error(&mut self, cx: &Context, error: Option<Value>) -> Result<()> {
493 self.error_internal(cx, &error.unwrap_or_else(Value::undefined_handle))
494 }
495}
496
497impl ControllerInternals for DefaultController {
498 fn common(&mut self) -> &mut CommonController {
499 &mut self.common
500 }
501
502 fn reset_queue(&mut self, _: &Context) {
503 self.queue.clear();
504 self.common.queue_size = 0;
505 }
506
507 fn clear_algorithms(&mut self) {
508 self.common().source.clear_algorithms();
509 self.size = SizeCallback::None;
510 }
511}
512
513#[js_class]
514#[ion(name = "ReadableByteStreamController")]
515pub struct ByteStreamController {
516 pub(crate) common: CommonController,
517 pub(crate) auto_allocate_chunk_size: usize,
518 pub(crate) byob_request: Option<Box<Heap<*mut JSObject>>>,
519 pub(crate) pending_descriptors: VecDeque<PullIntoDescriptor>,
520 pub(crate) queue: VecDeque<(Box<Heap<*mut JSObject>>, usize, usize)>,
521}
522
523#[js_class]
524impl ByteStreamController {
525 pub(crate) fn initialise(
526 stream: &Object, source_object: &Object, source: &UnderlyingSource, high_water_mark: f64,
527 ) -> Result<ByteStreamController> {
528 if let Some(auto_allocate_chunk_size) = source.auto_allocate_chunk_size
529 && auto_allocate_chunk_size == 0
530 {
531 return Err(Error::new("autoAllocateChunkSize can not be zero.", ErrorKind::Type));
532 }
533
534 Ok(ByteStreamController {
535 common: CommonController::new_from_script(stream, Some(source_object), source, high_water_mark),
536 auto_allocate_chunk_size: source.auto_allocate_chunk_size.unwrap_or(0) as usize,
537 byob_request: None,
538 pending_descriptors: VecDeque::new(),
539 queue: VecDeque::new(),
540 })
541 }
542
543 pub(crate) fn fill_pull_into_descriptor(
544 &mut self, cx: &Context, descriptor: &mut PullIntoDescriptor,
545 ) -> Result<bool> {
546 let max_copy = self.common.queue_size.min(descriptor.length - descriptor.filled);
547 let max_aligned = descriptor.filled + max_copy - (descriptor.filled + max_copy) % descriptor.element;
548
549 let ready = max_aligned > descriptor.min;
550
551 let mut remaining = if ready {
552 max_aligned - descriptor.filled
553 } else {
554 max_copy
555 };
556
557 while remaining > 0 {
558 let mut copy = remaining;
559 let mut len = 0;
560
561 if let Some((chunk, offset, length)) = self.queue.front() {
562 copy = copy.min(*length);
563 len = *length;
564 let chunk = ArrayBuffer::from(unsafe { Local::from_heap(chunk) }).unwrap();
565 let buffer = descriptor.buffer();
566 if !chunk.copy_data_to(cx, &buffer, *offset, descriptor.offset + descriptor.filled, copy) {
567 let error = if let Some(Exception::Error(error)) = Exception::new(cx)? {
568 error
569 } else {
570 Error::new("Failed to copy data to descriptor buffer.", None)
571 };
572 return Err(error);
573 }
574 }
575
576 if copy == len {
577 self.queue.pop_front();
578 } else if let Some((_, offset, length)) = self.queue.get_mut(0) {
579 *offset += copy;
580 *length -= copy;
581 }
582 self.common.queue_size -= copy;
583 descriptor.filled += copy;
584 remaining -= copy;
585 }
586
587 if !ready {
588 }
590
591 Ok(ready)
592 }
593
594 pub(crate) fn invalidate_byob_request(&mut self, cx: &Context) -> Result<()> {
595 if let Some(request) = self.byob_request.take() {
596 let request = Object::from(unsafe { Local::from_heap(&request) });
597 let request = ByobRequest::get_mut_private(cx, &request)?;
598 request.controller = None;
599 request.view = None;
600 }
601 Ok(())
602 }
603
604 pub(crate) fn enqueue_cloned_chunk(
605 &mut self, cx: &Context, buffer: &ArrayBuffer, offset: usize, length: usize,
606 ) -> Result<()> {
607 let Some(buffer) = buffer.clone(cx, offset, length) else {
608 let error = if let Some(Exception::Error(error)) = Exception::new(cx)? {
609 error
610 } else {
611 Error::new("Failed to clone ArrayBuffer", None)
612 };
613 self.error_internal(cx, &error.as_value(cx))?;
614 return Err(error);
615 };
616
617 self.queue.push_back((Heap::boxed(buffer.get()), 0, length));
618 self.common.queue_size += length;
619 Ok(())
620 }
621
622 pub(crate) fn process_descriptors(&mut self, cx: &Context, reader: &mut ByobReader, state: State) -> ResultExc<()> {
623 while !self.pending_descriptors.is_empty() {
624 if self.common.queue_size == 0 {
625 break;
626 }
627
628 let mut shift = false;
629
630 let descriptor = ptr::from_mut(self.pending_descriptors.get_mut(0).unwrap());
631 if self.fill_pull_into_descriptor(cx, unsafe { &mut *descriptor })? {
632 shift = true;
633 }
634
635 if shift {
636 let mut descriptor = self.pending_descriptors.pop_front().unwrap();
637 descriptor.commit(cx, reader, state)?;
638 }
639 }
640 Ok(())
641 }
642
643 pub(crate) fn respond(&mut self, cx: &Context, written: usize) -> ResultExc<()> {
644 let descriptor = self.pending_descriptors.front().unwrap();
645 let stream = self.common.stream(cx)?;
646 match stream.state {
647 State::Readable => {
648 if written == 0 {
649 return Err(Error::new("Readable Stream must be written to.", ErrorKind::Type).into());
650 }
651 if descriptor.filled + written > descriptor.length {
652 return Err(
653 Error::new("Buffer of BYOB Request View has been overwritten.", ErrorKind::Range).into(),
654 );
655 }
656 }
657 State::Closed => {
658 if written != 0 {
659 return Err(Error::new("Closed Stream must not be written to.", ErrorKind::Type).into());
660 }
661 }
662 State::Errored => return Err(Error::new("Errored Stream cannot have BYOB Request", ErrorKind::Type).into()),
663 }
664
665 let (buffer, kind) = {
666 let descriptor = self.pending_descriptors.get_mut(0).unwrap();
667 let buffer = descriptor.buffer().transfer(cx)?;
668 descriptor.buffer.set(buffer.get());
669 (buffer, descriptor.kind)
670 };
671
672 self.invalidate_byob_request(cx)?;
673
674 match stream.state {
675 State::Readable => {
676 let descriptor = self.pending_descriptors.front_mut().unwrap();
677 descriptor.filled += written;
678
679 let PullIntoDescriptor { filled, offset, length, min, .. } = *descriptor;
680
681 if let ReaderKind::None = kind {
682 if filled > 0 {
683 self.enqueue_cloned_chunk(cx, &buffer, offset, length)?;
684 }
685
686 if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? {
687 self.process_descriptors(cx, reader, stream.state)?;
688 }
689 } else {
690 if filled < min {
691 return Ok(());
692 }
693
694 let mut descriptor = self.pending_descriptors.pop_front().unwrap();
695 let remainder = descriptor.filled % descriptor.element;
696
697 if remainder > 0 {
698 self.enqueue_cloned_chunk(
699 cx,
700 &buffer,
701 descriptor.offset + descriptor.filled - remainder,
702 remainder,
703 )?;
704
705 descriptor.filled -= remainder;
706 }
707
708 if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? {
709 descriptor.commit(cx, reader, stream.state)?;
710 self.process_descriptors(cx, reader, stream.state)?;
711 }
712 }
713 }
714 State::Closed => match stream.native_reader(cx)? {
715 Some(Reader::Byob(reader)) => {
716 while !reader.common.requests.is_empty() {
717 let mut descriptor = self.pending_descriptors.pop_front().unwrap();
718 descriptor.commit(cx, reader, State::Closed)?;
719 }
720 }
721 _ => {
722 self.pending_descriptors.pop_front();
723 }
724 },
725 State::Errored => unreachable!(),
726 }
727
728 self.pull_if_needed(cx)
729 }
730
731 pub(crate) fn respond_with_new_view(&mut self, cx: &Context, view: ArrayBufferView) -> ResultExc<()> {
732 let buffer = view.buffer(cx);
733
734 if buffer.is_detached() {
735 return Err(Error::new("View Buffer cannot be detached.", ErrorKind::Type).into());
736 }
737
738 let stream = self.common.stream(cx)?;
739 match stream.state {
740 State::Readable => {
741 if view.is_empty() {
742 return Err(Error::new("View must have a non-zero length", ErrorKind::Type).into());
743 }
744 }
745 State::Closed => {
746 if !view.is_empty() {
747 return Err(Error::new(
748 "View for a Closed Readable Stream must have a zero length",
749 ErrorKind::Type,
750 )
751 .into());
752 }
753 }
754 State::Errored => unreachable!(),
755 }
756
757 let offset = view.offset();
758 let descriptor = self.pending_descriptors.get_mut(0).unwrap();
759
760 if descriptor.offset + descriptor.filled != offset {
761 return Err(Error::new("View Offset must be the same as descriptor.", ErrorKind::Range).into());
762 }
763 if descriptor.length != view.len() {
764 return Err(Error::new("View Length must be the same as descriptor.", ErrorKind::Range).into());
765 }
766 if descriptor.filled + view.len() > descriptor.length {
767 return Err(Error::new("View cannot overfill descriptor", ErrorKind::Range).into());
768 }
769
770 let len = view.len();
771 let buffer = buffer.transfer(cx)?;
772 descriptor.buffer.set(buffer.get());
773 self.respond(cx, len)
774 }
775
776 #[ion(get)]
777 pub fn get_desired_size(&self, cx: &Context) -> Result<JSVal> {
778 self.common.desired_size(cx)
779 }
780
781 #[ion(get)]
782 pub fn get_byob_request(&mut self, cx: &Context) -> *mut JSObject {
783 if self.byob_request.is_none() && !self.pending_descriptors.is_empty() {
784 let descriptor = self.pending_descriptors.front().unwrap();
785 let view = Uint8Array::with_array_buffer(
786 cx,
787 &descriptor.buffer(),
788 descriptor.offset + descriptor.filled,
789 descriptor.length - descriptor.filled,
790 )
791 .unwrap();
792
793 let request = ByobRequest {
794 reflector: Reflector::default(),
795 controller: Some(Heap::boxed(self.reflector().get())),
796 view: Some(Heap::boxed(view.get())),
797 };
798 self.byob_request = Some(Heap::boxed(ByobRequest::new_object(cx, Box::new(request))));
799 }
800
801 if let Some(request) = &self.byob_request {
802 request.get()
803 } else {
804 ptr::null_mut()
805 }
806 }
807
808 pub fn close(&mut self, cx: &Context) -> ResultExc<()> {
809 let stream = self.common.stream(cx)?;
810 if self.common.can_close_or_enqueue(stream) {
811 if self.common.queue_size > 0 {
812 self.common.close_requested = true;
813 }
814
815 if let Some(descriptor) = self.pending_descriptors.front()
816 && descriptor.filled % descriptor.element > 0
817 {
818 let error = Error::new("Pending Pull-Into Not Empty", ErrorKind::Type);
819 self.error_internal(cx, &error.as_value(cx))?;
820 return Err(error.into());
821 }
822
823 self.common.source.clear_algorithms();
824 stream.close(cx)
825 } else {
826 Err(Error::new("Cannot Close Byte Stream Controller", ErrorKind::Type).into())
827 }
828 }
829
830 pub fn enqueue(&mut self, cx: &Context, chunk: ArrayBufferView) -> ResultExc<()> {
831 if chunk.is_empty() {
832 return Err(Error::new("Chunk must contain bytes.", ErrorKind::Type).into());
833 }
834
835 let buffer = chunk.buffer(cx);
836 if buffer.is_empty() {
837 return Err(Error::new("Chunk must contain bytes.", ErrorKind::Type).into());
838 }
839
840 let stream = self.common.stream(cx)?;
841 if self.common.can_close_or_enqueue(stream) {
842 let offset = chunk.offset();
843 let length = chunk.len();
844 let buffer = buffer.transfer(cx)?;
845
846 let mut shift = false;
847 if !self.pending_descriptors.is_empty() {
848 self.invalidate_byob_request(cx)?;
849 let descriptor = self.pending_descriptors.front().unwrap();
850 let buffer = descriptor.buffer().transfer(cx)?;
851 descriptor.buffer.set(buffer.get());
852 if descriptor.kind == ReaderKind::None {
853 if descriptor.filled > 0 {
854 self.enqueue_cloned_chunk(cx, &buffer, descriptor.offset, descriptor.length)?;
855 }
856 shift = true;
857 }
858 }
859
860 if shift {
861 self.pending_descriptors.pop_front();
862 }
863
864 match stream.native_reader(cx)? {
865 Some(Reader::Default(reader)) => {
866 let mut complete = false;
867 while let Some(request) = reader.common.requests.pop_front() {
868 let promise = request.promise();
869
870 if self.common.queue_size == 0 {
871 self.pending_descriptors.pop_front();
872
873 let array =
874 Uint8Array::with_array_buffer(cx, &buffer, offset, length).unwrap().as_value(cx);
875 (request.chunk)(cx, &promise, &array);
876
877 complete = true;
878 break;
879 }
880
881 let (buffer, offset, length) = self.queue.pop_front().unwrap();
882 self.common.queue_size -= length;
883
884 if self.common.queue_size == 0 && self.common.close_requested {
885 self.close(cx)?;
886 } else {
887 self.pull_if_needed(cx)?;
888 }
889
890 let buffer = ArrayBuffer::from(unsafe { Local::from_heap(&buffer) }).unwrap();
891 let array = Uint8Array::with_array_buffer(cx, &buffer, offset, length).unwrap().as_value(cx);
892
893 (request.chunk)(cx, &promise, &array);
894 }
895
896 if !complete {
897 self.queue.push_back((Heap::boxed(buffer.get()), offset, length));
898 self.common.queue_size += length;
899 }
900 }
901 Some(Reader::Byob(reader)) => {
902 self.queue.push_back((Heap::boxed(buffer.get()), offset, length));
903 self.common.queue_size += length;
904
905 self.process_descriptors(cx, reader, stream.state)?;
906 }
907 None => {
908 self.queue.push_back((Heap::boxed(buffer.get()), offset, length));
909 self.common.queue_size += length;
910 }
911 }
912 self.pull_if_needed(cx)
913 } else {
914 Err(Error::new("Cannot Enqueue to Stream", ErrorKind::Type).into())
915 }
916 }
917
918 pub fn error(&mut self, cx: &Context, error: Option<Value>) -> Result<()> {
919 self.error_internal(cx, &error.unwrap_or_else(Value::undefined_handle))
920 }
921}
922
923impl ControllerInternals for ByteStreamController {
924 fn common(&mut self) -> &mut CommonController {
925 &mut self.common
926 }
927
928 fn reset_queue(&mut self, cx: &Context) {
929 self.invalidate_byob_request(cx).unwrap();
930 self.pending_descriptors.clear();
931 self.queue.clear();
932 self.common.queue_size = 0;
933 }
934}
935
936#[js_class]
937#[ion(name = "ReadableStreamBYOBRequest")]
938pub struct ByobRequest {
939 reflector: Reflector,
940 pub(crate) controller: Option<Box<Heap<*mut JSObject>>>,
941 pub(crate) view: Option<Box<Heap<*mut JSObject>>>,
942}
943
944#[js_class]
945impl ByobRequest {
946 #[ion(get)]
947 pub fn get_view(&self) -> *mut JSObject {
948 self.view.as_ref().map_or_else(ptr::null_mut, |view| view.get())
949 }
950
951 pub fn respond(
952 &mut self, cx: &Context, #[ion(convert = ConversionBehavior::Clamp
953 )]
954 written: u64,
955 ) -> ResultExc<()> {
956 if let Some(controller) = self.controller.take() {
957 let view = unsafe { Local::from_heap(self.view.as_ref().unwrap()) };
958 let view = ArrayBufferView::from(view).unwrap();
959 let buffer = view.buffer(cx);
960
961 if view.is_empty() || buffer.is_empty() {
962 return Err(Error::new("View and Buffer must have a non-zero length.", ErrorKind::Type).into());
963 }
964
965 if buffer.is_detached() {
966 return Err(Error::new("View Buffer cannot be detached.", ErrorKind::Type).into());
967 }
968
969 let controller = Object::from(unsafe { Local::from_heap(&controller) });
970 let controller = ByteStreamController::get_mut_private(cx, &controller)?;
971 controller.respond(cx, written as usize)
972 } else {
973 Err(Error::new("BYOB Request has already been invalidated.", ErrorKind::Type).into())
974 }
975 }
976
977 #[ion(name = "respondWithNewView")]
978 pub fn respond_with_new_view(&mut self, cx: &Context, view: ArrayBufferView) -> ResultExc<()> {
979 if let Some(controller) = self.controller.take() {
980 let controller = Object::from(unsafe { Local::from_heap(&controller) });
981 let controller = ByteStreamController::get_mut_private(cx, &controller)?;
982 controller.respond_with_new_view(cx, view)
983 } else {
984 Err(Error::new("BYOB Request has already been invalidated.", ErrorKind::Type).into())
985 }
986 }
987}