1use std::collections::VecDeque;
8
9use ion::class::Reflector;
10use ion::conversions::ToValue as _;
11use ion::function::Opt;
12use ion::typedarray::{ArrayBufferView, type_to_constructor, type_to_element_size};
13use ion::{
14 ClassDefinition as _, Context, Error, ErrorKind, FromValue, Local, Object, Promise, Result, ResultExc, Traceable,
15 Value, js_class,
16};
17use mozjs::conversions::ConversionBehavior;
18use mozjs::jsapi::{Heap, JSObject};
19
20use crate::globals::streams::readable::controller::{ControllerInternals as _, ControllerKind, PullIntoDescriptor};
21use crate::globals::streams::readable::{ReadableStream, State};
22
23pub type ChunkErrorClosure = dyn Fn(&Context, &Promise, &Value);
24pub type CloseClosure = dyn Fn(&Context, &Promise, Option<&Value>) -> ResultExc<()>;
25
26#[derive(Traceable)]
27pub struct Request {
28 pub(crate) promise: Box<Heap<*mut JSObject>>,
29 #[trace(no_trace)]
30 pub(crate) chunk: Box<ChunkErrorClosure>,
31 #[trace(no_trace)]
32 pub(crate) close: Box<CloseClosure>,
33 #[trace(no_trace)]
34 pub(crate) error: Box<ChunkErrorClosure>,
35}
36
37impl Request {
38 pub(crate) fn standard(promise: *mut JSObject) -> Request {
39 struct ReadResult<'cx> {
40 pub value: Option<Value<'cx>>,
41 pub done: bool,
42 }
43
44 fn into_value<'cx>(result: ReadResult, cx: &'cx Context) -> Value<'cx> {
45 let object = Object::new(cx);
46 object.set(cx, "value", &result.value.unwrap_or_else(Value::undefined_handle));
47 object.set_as(cx, "done", &result.done);
48 object.as_value(cx)
49 }
50
51 Request {
52 promise: Heap::boxed(promise),
53 chunk: Box::new(|cx, promise, chunk| {
54 let result = ReadResult {
55 value: Some(Value::from(Local::from_handle(chunk.handle()))),
56 done: false,
57 };
58 promise.resolve(cx, &into_value(result, cx));
59 }),
60 close: Box::new(|cx, promise, chunk| {
61 let result = ReadResult {
62 value: chunk.map(|v| Value::from(Local::from_handle(v.handle()))),
63 done: true,
64 };
65 promise.resolve(cx, &into_value(result, cx));
66 Ok(())
67 }),
68 error: Box::new(|cx, promise, error| {
69 promise.resolve(cx, error);
70 }),
71 }
72 }
73
74 pub(crate) fn promise(&self) -> Promise<'_> {
75 Promise::from(unsafe { Local::from_heap(&self.promise) }).unwrap()
76 }
77}
78
79#[derive(Copy, Clone, Debug, PartialEq, Eq, Traceable)]
80pub enum ReaderKind {
81 None,
82 Default,
83 Byob,
84}
85
86pub enum Reader<'r> {
87 Default(&'r mut DefaultReader),
88 Byob(&'r mut ByobReader),
89}
90
91impl<'r> Reader<'r> {
92 pub(crate) fn common(&self) -> &CommonReader {
93 match self {
94 Reader::Default(reader) => &reader.common,
95 Reader::Byob(reader) => &reader.common,
96 }
97 }
98
99 pub(crate) fn into_default(self) -> Option<&'r mut DefaultReader> {
100 match self {
101 Reader::Default(reader) => Some(reader),
102 Reader::Byob(_) => None,
103 }
104 }
105
106 pub(crate) fn into_byob(self) -> Option<&'r mut ByobReader> {
107 match self {
108 Reader::Byob(reader) => Some(reader),
109 Reader::Default(_) => None,
110 }
111 }
112
113 pub fn requests_closed(self) -> (&'r mut VecDeque<Request>, Promise<'r>) {
114 let common = match self {
115 Reader::Default(reader) => &mut reader.common,
116 Reader::Byob(reader) => &mut reader.common,
117 };
118 unsafe {
119 (
120 &mut common.requests,
121 Promise::from(Local::from_heap(&common.closed)).unwrap(),
122 )
123 }
124 }
125
126 pub fn is_empty(&self) -> bool {
127 self.common().requests.is_empty()
128 }
129}
130
131#[js_class]
132#[ion(name = "ReadableStreamCommonReader")]
133pub struct CommonReader {
134 reflector: Reflector,
135
136 stream: Option<Box<Heap<*mut JSObject>>>,
137 pub(crate) requests: VecDeque<Request>,
138 pub(crate) closed: Box<Heap<*mut JSObject>>,
139}
140
141#[js_class]
142impl CommonReader {
143 pub(crate) fn new(cx: &Context, stream: &ReadableStream, stream_object: &Object) -> CommonReader {
144 let closed = Promise::new(cx);
145 match stream.state {
146 State::Readable => {}
147 State::Closed => {
148 closed.resolve(cx, &Value::undefined_handle());
149 }
150 State::Errored => {
151 closed.reject(cx, &stream.stored_error());
152 }
153 }
154
155 CommonReader {
156 reflector: Reflector::default(),
157 stream: Some(Heap::boxed(stream_object.handle().get())),
158 requests: VecDeque::new(),
159 closed: Heap::boxed(closed.get()),
160 }
161 }
162
163 #[expect(clippy::mut_from_ref)]
164 pub(crate) fn stream(&self, cx: &Context) -> Result<Option<&mut ReadableStream>> {
165 self.stream
166 .as_ref()
167 .map::<Result<_>, _>(|stream| {
168 let stream = Object::from(unsafe { Local::from_heap(stream) });
169 ReadableStream::get_mut_private(cx, &stream)
170 })
171 .transpose()
172 }
173
174 pub(crate) fn closed(&self) -> Promise<'_> {
175 Promise::from(unsafe { Local::from_heap(&self.closed) }).unwrap()
176 }
177
178 pub(crate) fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt<Value>) -> ResultExc<Promise<'cx>> {
179 if let Some(stream) = self.stream(cx)? {
180 stream.cancel(cx, reason)
181 } else {
182 let promise = Promise::new(cx);
183 promise.reject_with_error(cx, &Error::new("Reader has already been released.", ErrorKind::Type));
184 Ok(promise)
185 }
186 }
187
188 pub(crate) fn release_lock(&mut self, cx: &Context) -> Result<()> {
189 if let Some(stream) = self.stream(cx)? {
190 let mut closed = self.closed();
191 if matches!(stream.state, State::Closed | State::Errored) {
192 self.closed.set(Promise::new(cx).get());
193 closed = self.closed();
194 }
195 closed.reject_with_error(cx, &Error::new("Released Reader", ErrorKind::Type));
196
197 stream.reader_kind = ReaderKind::None;
198 stream.reader = None;
199
200 stream.native_controller(cx)?.release();
201
202 while let Some(request) = self.requests.pop_front() {
203 let promise = request.promise();
204 (request.error)(
205 cx,
206 &promise,
207 &Error::new("Reader has been released.", ErrorKind::Type).as_value(cx),
208 );
209 }
210 } else {
211 return Err(Error::new("Reader has already been released.", ErrorKind::Type));
212 }
213 self.stream = None;
214 Ok(())
215 }
216}
217
218#[js_class]
219#[ion(name = "ReadableStreamDefaultReader")]
220pub struct DefaultReader {
221 pub(crate) common: CommonReader,
222}
223
224#[js_class]
225impl DefaultReader {
226 #[ion(constructor)]
227 pub fn constructor(cx: &Context, #[ion(this)] this: &Object, stream_object: Object) -> Result<DefaultReader> {
228 let reader = DefaultReader::new(cx, &stream_object)?;
229 let stream = ReadableStream::get_mut_private(cx, &stream_object)?;
230 stream.reader_kind = ReaderKind::Default;
231 stream.reader = Some(Heap::boxed(this.handle().get()));
232
233 Ok(reader)
234 }
235
236 pub(crate) fn new(cx: &Context, stream_object: &Object) -> Result<DefaultReader> {
237 let stream = ReadableStream::get_private(cx, stream_object)?;
238 if stream.get_locked() {
239 return Err(Error::new(
240 "Cannot create DefaultReader from locked stream.",
241 ErrorKind::Type,
242 ));
243 }
244
245 Ok(DefaultReader {
246 common: CommonReader::new(cx, stream, stream_object),
247 })
248 }
249
250 pub(crate) fn read_internal<'cx>(&mut self, cx: &'cx Context, request: Request) -> ResultExc<Promise<'cx>> {
251 let promise = Promise::from(cx.root(request.promise.get())).unwrap();
252 if let Some(stream) = self.common.stream(cx)? {
253 stream.disturbed = true;
254
255 match stream.state {
256 State::Readable => stream.native_controller(cx)?.pull(cx, &promise, request)?,
257 State::Closed => (request.close)(cx, &promise, None)?,
258 State::Errored => (request.error)(cx, &promise, &stream.stored_error()),
259 }
260 } else {
261 promise.reject_with_error(cx, &Error::new("Reader has already been released.", ErrorKind::Type));
262 }
263 Ok(promise)
264 }
265
266 pub fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt<Value>) -> ResultExc<Promise<'cx>> {
267 self.common.cancel(cx, reason)
268 }
269
270 pub fn read<'cx>(&mut self, cx: &'cx Context) -> ResultExc<Promise<'cx>> {
271 let promise = Promise::new(cx);
272 self.read_internal(cx, Request::standard(promise.get()))
273 }
274
275 #[ion(name = "releaseLock")]
276 pub fn release_lock(&mut self, cx: &Context) -> Result<()> {
277 self.common.release_lock(cx)
278 }
279
280 #[ion(get)]
281 pub fn get_closed(&self) -> *mut JSObject {
282 self.common.closed.get()
283 }
284}
285
286#[derive(FromValue)]
287pub struct ByobReadOptions {
288 #[ion(convert = ConversionBehavior::EnforceRange, default = 1)]
289 min: u64,
290}
291
292impl Default for ByobReadOptions {
293 fn default() -> ByobReadOptions {
294 ByobReadOptions { min: 1 }
295 }
296}
297
298#[js_class]
299#[ion(name = "ReadableStreamBYOBReader")]
300pub struct ByobReader {
301 pub(crate) common: CommonReader,
302}
303
304#[js_class]
305impl ByobReader {
306 #[ion(constructor)]
307 pub fn constructor(cx: &Context, #[ion(this)] this: &Object, stream_object: Object) -> Result<ByobReader> {
308 let reader = ByobReader::new(cx, &stream_object)?;
309 let stream = ReadableStream::get_mut_private(cx, &stream_object)?;
310 stream.reader_kind = ReaderKind::Byob;
311 stream.reader = Some(Heap::boxed(this.handle().get()));
312
313 Ok(reader)
314 }
315
316 pub(crate) fn new(cx: &Context, stream_object: &Object) -> Result<ByobReader> {
317 let stream = ReadableStream::get_private(cx, stream_object)?;
318 if stream.get_locked() {
319 return Err(Error::new(
320 "Cannot create BYOBReader from locked stream.",
321 ErrorKind::Type,
322 ));
323 }
324
325 if stream.controller_kind == ControllerKind::Default {
326 return Err(Error::new(
327 "Cannot create BYOBReader from DefaultController",
328 ErrorKind::Type,
329 ));
330 }
331
332 Ok(ByobReader {
333 common: CommonReader::new(cx, stream, stream_object),
334 })
335 }
336
337 pub(crate) fn read_internal<'cx>(
338 &mut self, cx: &'cx Context, view: ArrayBufferView, min: usize, request: Request,
339 ) -> ResultExc<Promise<'cx>> {
340 let stream = self.common.stream(cx)?.unwrap();
341 let promise = Promise::from(cx.root(request.promise.get())).unwrap();
342
343 stream.disturbed = true;
344 if stream.state == State::Errored {
345 (request.error)(cx, &promise, &stream.stored_error());
346 return Ok(promise);
347 }
348
349 let (constructor, element_size) = {
350 let ty = view.view_type();
351 (type_to_constructor(ty), type_to_element_size(ty))
352 };
353
354 let offset = view.offset();
355 let length = view.len();
356 let buffer = view.buffer(cx);
357 match buffer.transfer(cx) {
358 Ok(buffer) => {
359 let mut descriptor = PullIntoDescriptor {
360 buffer: Heap::boxed(buffer.get()),
361 offset,
362 length: length * element_size,
363 filled: 0,
364 min: min * element_size,
365 element: element_size,
366 constructor,
367 kind: ReaderKind::Byob,
368 };
369
370 let controller = stream.native_controller(cx)?.into_byte_stream().unwrap();
371 if !controller.pending_descriptors.is_empty() {
372 controller.pending_descriptors.push_back(descriptor);
373
374 if stream.state == State::Readable {
375 self.common.requests.push_back(request);
376 }
377 return Ok(promise);
378 } else if stream.state == State::Closed {
379 let empty = descriptor.construct(cx)?.as_value(cx);
380 (request.close)(cx, &promise, Some(&empty))?;
381 return Ok(promise);
382 } else if controller.common.queue_size > 0 {
383 if controller.fill_pull_into_descriptor(cx, &mut descriptor)? {
384 let buffer = buffer.transfer(cx)?;
385 descriptor.buffer.set(buffer.get());
386 let view = descriptor.construct(cx)?.as_value(cx);
387
388 if controller.common.queue_size == 0 && controller.common.close_requested {
389 controller.close(cx)?;
390 } else {
391 controller.pull_if_needed(cx)?;
392 }
393
394 (request.chunk)(cx, &promise, &view);
395 return Ok(promise);
396 } else if controller.common.close_requested {
397 let error = Error::new("Stream closed by request.", ErrorKind::Type).as_value(cx);
398 let _ = controller.error_internal(cx, &error);
399 (request.error)(cx, &promise, &error);
400 return Ok(promise);
401 }
402 }
403
404 controller.pending_descriptors.push_back(descriptor);
405 if stream.state == State::Readable {
406 self.common.requests.push_back(request);
407 }
408
409 let stream = self.common.stream(cx)?.unwrap();
410 let controller = stream.native_controller(cx)?.into_byte_stream().unwrap();
411 controller.pull_if_needed(cx)?;
412 }
413 Err(error) => {
414 (request.error)(cx, &promise, &error.as_value(cx));
415 }
416 }
417 Ok(promise)
418 }
419
420 pub fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt<Value>) -> ResultExc<Promise<'cx>> {
421 self.common.cancel(cx, reason)
422 }
423
424 pub fn read<'cx>(
425 &mut self, cx: &'cx Context, view: ArrayBufferView, Opt(options): Opt<ByobReadOptions>,
426 ) -> ResultExc<Promise<'cx>> {
427 let promise = Promise::new(cx);
428 let request = Request::standard(promise.get());
429 if self.common.stream.is_some() {
430 if view.is_empty() {
431 promise.reject(cx, &Error::new("View must not be empty.", ErrorKind::Type).as_value(cx));
432 return Ok(promise);
433 }
434
435 let buffer = view.buffer(cx);
436
437 if buffer.is_empty() {
438 promise.reject(
439 cx,
440 &Error::new("Buffer must contain bytes.", ErrorKind::Type).as_value(cx),
441 );
442 return Ok(promise);
443 }
444
445 if buffer.is_detached() {
446 promise.reject(
447 cx,
448 &Error::new("ArrayBuffer must not be detached.", ErrorKind::Type).as_value(cx),
449 );
450 return Ok(promise);
451 }
452
453 let options = options.unwrap_or_default();
454 if options.min == 0 {
455 promise.reject(
456 cx,
457 &Error::new("min must be greater than 0.", ErrorKind::Type).as_value(cx),
458 );
459 return Ok(promise);
460 }
461
462 if options.min > view.len() as u64 {
463 promise.reject(
464 cx,
465 &Error::new("min is greater than View Length", ErrorKind::Range).as_value(cx),
466 );
467 return Ok(promise);
468 }
469
470 self.read_internal(cx, view, options.min as usize, request)
471 } else {
472 (request.error)(
473 cx,
474 &promise,
475 &Error::new("Reader has already been released.", ErrorKind::Type).as_value(cx),
476 );
477 Ok(Promise::new(cx))
478 }
479 }
480
481 #[ion(name = "releaseLock")]
482 pub fn release_lock(&mut self, cx: &Context) -> Result<()> {
483 self.common.release_lock(cx)
484 }
485
486 #[ion(get)]
487 pub fn get_closed(&self) -> *mut JSObject {
488 self.common.closed.get()
489 }
490}