1use std::collections::VecDeque;
8use std::rc::Rc;
9
10use ion::class::{NativeObject as _, Reflector};
11use ion::conversions::{ConversionBehavior, FromValue as _, ToValue as _};
12use ion::function::Opt;
13use ion::{
14 ClassDefinition as _, Context, Error, ErrorKind, FromValue, Function, Local, Object, Promise, Result, ResultExc,
15 Traceable, Value, js_class,
16};
17use mozjs::jsapi::{Heap, JSObject};
18use mozjs::jsval::JSVal;
19
20pub use crate::globals::streams::readable::controller::{
21 ByobRequest, ByteStreamController, CommonController, DefaultController,
22};
23use crate::globals::streams::readable::controller::{Controller, ControllerInternals as _, ControllerKind};
24pub use crate::globals::streams::readable::reader::{ByobReader, CommonReader, DefaultReader};
25use crate::globals::streams::readable::reader::{Reader, ReaderKind};
26pub use crate::globals::streams::readable::source::StreamSource;
27use crate::globals::streams::readable::tee::{TeeBytesState, TeeDefaultState, forward_reader_error};
28use crate::globals::streams::strategy::{QueuingStrategy, SizeCallback};
29
30mod controller;
31mod reader;
32mod source;
33mod tee;
34
35#[derive(Default, FromValue)]
36pub struct UnderlyingSource<'cx> {
37 start: Option<Function<'cx>>,
38 pull: Option<Function<'cx>>,
39 cancel: Option<Function<'cx>>,
40 #[ion(name = "type")]
41 ty: Option<String>,
42 #[ion(convert = ConversionBehavior::EnforceRange)]
43 auto_allocate_chunk_size: Option<u64>,
44}
45
46impl UnderlyingSource<'_> {
47 pub(crate) fn to_native(&self, object: Option<&Object>) -> StreamSource {
48 match object {
49 Some(object) => StreamSource::Script {
50 object: Heap::boxed(object.handle().get()),
51 pull: self.pull.as_ref().map(|pull| Heap::boxed(pull.get())),
52 cancel: self.cancel.as_ref().map(|cancel| Heap::boxed(cancel.get())),
53 },
54 None => StreamSource::None,
55 }
56 }
57}
58
59#[derive(Default, FromValue)]
60pub struct ReaderOptions {
61 mode: Option<String>,
62}
63
64#[derive(Copy, Clone, Debug, PartialEq, Eq, Traceable)]
65pub enum State {
66 Readable,
67 Closed,
68 Errored,
69}
70
71#[js_class]
72pub struct ReadableStream {
73 reflector: Reflector,
74
75 pub(crate) controller_kind: ControllerKind,
76 pub(crate) controller: Box<Heap<*mut JSObject>>,
77
78 pub(crate) reader_kind: ReaderKind,
79 pub(crate) reader: Option<Box<Heap<*mut JSObject>>>,
80
81 pub(crate) state: State,
82 pub(crate) disturbed: bool,
83 pub(crate) error: Option<Box<Heap<JSVal>>>,
84}
85
86#[js_class]
87impl ReadableStream {
88 #[ion(constructor)]
89 pub fn constructor<'cx: 'o, 'o>(
90 cx: &'cx Context, #[ion(this)] this: &Object, Opt(underlying_source): Opt<Object<'o>>,
91 Opt(strategy): Opt<QueuingStrategy>,
92 ) -> ResultExc<ReadableStream> {
93 let strategy = strategy.unwrap_or_default();
94 let mut source = None;
95
96 let controller = underlying_source
97 .as_ref()
98 .map::<ResultExc<_>, _>(|underlying_source| {
99 let source_value = underlying_source.as_value(cx);
100 source = Some(UnderlyingSource::from_value(cx, &source_value, false, ())?);
101
102 let source = source.as_ref().unwrap();
103 if source.ty.as_deref() == Some("bytes") {
104 if strategy.has_size() {
105 return Err(Error::new("Implementation preserved member 'size'", ErrorKind::Range).into());
106 }
107
108 if let Some(high_water_mark) = strategy.high_water_mark() {
109 if high_water_mark.is_nan() {
110 return Err(Error::new("highWaterMark cannot be NaN", ErrorKind::Range).into());
111 } else if high_water_mark < 0.0 {
112 return Err(Error::new("highWaterMark must be non-negative", ErrorKind::Range).into());
113 }
114 }
115 let high_water_mark = strategy.high_water_mark().unwrap_or(0.0);
116
117 let controller =
118 ByteStreamController::initialise(this, underlying_source, source, high_water_mark)?;
119 let controller = Heap::boxed(ByteStreamController::new_object(cx, Box::new(controller)));
120 unsafe {
121 let controller = Object::from(Local::from_heap(&controller));
122 ByteStreamController::get_mut_private_unchecked(&controller)
123 .start(cx, source.start.as_ref())?;
124 }
125
126 Ok(Some((ControllerKind::ByteStream, controller)))
127 } else if source.ty.is_some() {
128 Err(Error::new(
129 "Type of Underlying Source must be 'bytes' or not exist.",
130 ErrorKind::Type,
131 )
132 .into())
133 } else {
134 Ok(None)
135 }
136 })
137 .transpose()?
138 .flatten();
139
140 let (controller_kind, controller) = if let Some(controller) = controller {
141 controller
142 } else {
143 let source = source.unwrap_or_default();
144 let high_water_mark = strategy.high_water_mark().unwrap_or(1.0);
145 let controller =
146 DefaultController::initialise(this, underlying_source.as_ref(), &source, &strategy, high_water_mark);
147 let controller = Heap::boxed(DefaultController::new_object(cx, Box::new(controller)));
148 unsafe {
149 let controller = Object::from(Local::from_heap(&controller));
150 DefaultController::get_mut_private_unchecked(&controller).start(cx, source.start.as_ref())?;
151 }
152
153 (ControllerKind::Default, controller)
154 };
155
156 Ok(ReadableStream::new(controller_kind, controller))
157 }
158
159 #[ion(get)]
160 pub fn get_locked(&self) -> bool {
161 self.reader_kind != ReaderKind::None
162 }
163
164 pub fn cancel<'cx>(&mut self, cx: &'cx Context, Opt(reason): Opt<Value>) -> ResultExc<Promise<'cx>> {
165 if self.get_locked() {
166 Err(Error::new("ReadableStream is locked.", ErrorKind::Type).into())
167 } else {
168 self.disturbed = true;
169 match self.state {
170 State::Readable => {
171 self.close(cx)?;
172 self.native_controller(cx)?.cancel(cx, reason)
173 }
174 State::Closed => Ok(Promise::resolved(cx, &Value::undefined_handle())),
175 State::Errored => {
176 let mut value = Value::null(cx);
177 if let Some(error) = &self.error {
178 value.handle_mut().set(error.get());
179 }
180 let promise = Promise::new(cx);
181 promise.reject(cx, &value);
182 Ok(promise)
183 }
184 }
185 }
186 }
187
188 #[ion(name = "getReader")]
189 pub fn get_reader<'cx>(&mut self, cx: &'cx Context, Opt(options): Opt<ReaderOptions>) -> Result<Object<'cx>> {
190 if self.get_locked() {
191 return Err(Error::new(
192 "New readers cannot be initialised for locked streams.",
193 ErrorKind::Type,
194 ));
195 }
196
197 let options = options.unwrap_or_default();
198 if let Some(mode) = &options.mode {
199 if mode == "byob" {
200 let reader = ByobReader::new(cx, &Object::from(cx.root(self.reflector().get())))?;
201 let object = Object::from(cx.root(ByobReader::new_object(cx, Box::new(reader))));
202
203 self.reader_kind = ReaderKind::Byob;
204 self.reader = Some(Heap::boxed(object.handle().get()));
205
206 Ok(object)
207 } else {
208 Err(Error::new("Mode must be 'byob' or must not exist.", ErrorKind::Type))
209 }
210 } else {
211 let reader = DefaultReader::new(cx, &Object::from(Local::from_handle(self.reflector().handle())))?;
212 let object = Object::from(cx.root(DefaultReader::new_object(cx, Box::new(reader))));
213
214 self.reader_kind = ReaderKind::Default;
215 self.reader = Some(Heap::boxed(object.handle().get()));
216
217 Ok(object)
218 }
219 }
220
221 pub fn tee<'cx>(&mut self, cx: &'cx Context) -> Result<[Object<'cx>; 2]> {
222 self.get_reader(cx, Opt(None))?;
223 Ok(self.tee_internal(cx, false))
224 }
225}
226
227impl ReadableStream {
228 pub(crate) fn new(controller_kind: ControllerKind, controller: Box<Heap<*mut JSObject>>) -> ReadableStream {
229 ReadableStream {
230 reflector: Reflector::default(),
231
232 controller_kind,
233 controller,
234
235 reader_kind: ReaderKind::None,
236 reader: None,
237
238 state: State::Readable,
239 disturbed: false,
240 error: None,
241 }
242 }
243
244 pub(crate) fn tee_internal<'cx>(&mut self, cx: &'cx Context, clone_branch_2: bool) -> [Object<'cx>; 2] {
245 match self.controller_kind {
246 ControllerKind::Default => {
247 fn create_branch(cx: &Context, state: Rc<TeeDefaultState>, second: bool) -> Object<'_> {
248 let branch = Object::from(cx.root(ReadableStream::new_raw_object(cx)));
249 let source = StreamSource::TeeDefault(state, second);
250 let controller = DefaultController {
251 common: CommonController::new(&branch, source, 1.0),
252 size: SizeCallback::None,
253 queue: VecDeque::default(),
254 };
255 let controller = Heap::boxed(DefaultController::new_object(cx, Box::new(controller)));
256
257 unsafe {
258 let controller = Object::from(Local::from_heap(&controller));
259 DefaultController::get_mut_private_unchecked(&controller).start(cx, None).unwrap();
260 }
261
262 let stream = ReadableStream::new(ControllerKind::Default, controller);
263 unsafe {
264 ReadableStream::set_private(branch.handle().get(), Box::new(stream));
265 }
266 branch
267 }
268
269 let state = Rc::new(TeeDefaultState::new(cx, self, clone_branch_2));
270 let branch1 = create_branch(cx, Rc::clone(&state), false);
271 let branch2 = create_branch(cx, Rc::clone(&state), true);
272
273 state.common.branch[0].set(branch1.handle().get());
274 state.common.branch[1].set(branch2.handle().get());
275
276 [branch1, branch2]
277 }
278 ControllerKind::ByteStream => {
279 fn create_branch(cx: &Context, state: Rc<TeeBytesState>, second: bool) -> Object<'_> {
280 let branch = Object::from(cx.root(ReadableStream::new_raw_object(cx)));
281 let source = StreamSource::TeeBytes(state, second);
282 let controller = ByteStreamController {
283 common: CommonController::new(&branch, source, 1.0),
284 auto_allocate_chunk_size: 0,
285 byob_request: None,
286 pending_descriptors: VecDeque::default(),
287 queue: VecDeque::default(),
288 };
289 let controller = Heap::boxed(ByteStreamController::new_object(cx, Box::new(controller)));
290
291 unsafe {
292 let controller = Object::from(Local::from_heap(&controller));
293 ByteStreamController::get_mut_private_unchecked(&controller).start(cx, None).unwrap();
294 }
295
296 let stream = ReadableStream::new(ControllerKind::ByteStream, controller);
297 unsafe {
298 ReadableStream::set_private(branch.handle().get(), Box::new(stream));
299 }
300 branch
301 }
302
303 let state = Rc::new(TeeBytesState::new(cx, self));
304 let branch1 = create_branch(cx, Rc::clone(&state), false);
305 let branch2 = create_branch(cx, Rc::clone(&state), true);
306
307 state.common.branch[0].set(branch1.handle().get());
308 state.common.branch[1].set(branch2.handle().get());
309
310 let reader = self.native_reader(cx).unwrap().unwrap().into_default().unwrap();
311 forward_reader_error(cx, &reader.common.closed(), Rc::clone(&state)).unwrap();
312
313 [branch1, branch2]
314 }
315 }
316 }
317
318 pub(crate) fn close(&mut self, cx: &Context) -> ResultExc<()> {
319 if self.state != State::Readable {
320 return Err(Error::new("Cannot Close Stream", None).into());
321 }
322
323 self.state = State::Closed;
324 let (requests, closed) = match self.native_reader(cx)? {
325 Some(reader) => reader.requests_closed(),
326 None => return Ok(()),
327 };
328
329 closed.resolve(cx, &Value::undefined_handle());
330 if self.reader_kind == ReaderKind::Default {
331 for request in &*requests {
332 let promise = request.promise();
333 (request.close)(cx, &promise, None)?;
334 }
335 requests.clear();
336 }
337
338 Ok(())
339 }
340
341 pub(crate) fn error(&mut self, cx: &Context, error: &Value) -> Result<()> {
342 if self.state != State::Readable {
343 return Err(Error::new("Cannot Error Stream", None));
344 }
345 self.state = State::Errored;
346 self.error = Some(Heap::boxed(error.get()));
347
348 let (requests, closed) = match self.native_reader(cx)? {
349 Some(reader) => reader.requests_closed(),
350 None => return Ok(()),
351 };
352
353 closed.reject(cx, error);
354 for request in &*requests {
355 let promise = request.promise();
356 (request.error)(cx, &promise, error);
357 }
358 requests.clear();
359
360 Ok(())
361 }
362
363 pub(crate) fn stored_error(&self) -> Value<'_> {
364 self.error.as_ref().map_or_else(Value::undefined_handle, |error| {
365 Value::from(unsafe { Local::from_heap(error) })
366 })
367 }
368
369 pub(crate) fn native_controller(&self, cx: &Context) -> Result<Controller<'_>> {
370 match self.controller_kind {
371 ControllerKind::Default => {
372 let controller = Object::from(unsafe { Local::from_heap(&self.controller) });
373 let controller = DefaultController::get_mut_private(cx, &controller)?;
374 Ok(Controller::Default(controller))
375 }
376 ControllerKind::ByteStream => {
377 let controller = Object::from(unsafe { Local::from_heap(&self.controller) });
378 let controller = ByteStreamController::get_mut_private(cx, &controller)?;
379 Ok(Controller::ByteStream(controller))
380 }
381 }
382 }
383
384 pub(crate) fn native_reader(&self, cx: &Context) -> Result<Option<Reader<'_>>> {
385 match self.reader_kind {
386 ReaderKind::None => Ok(None),
387 ReaderKind::Default => {
388 let reader = Object::from(unsafe { Local::from_heap(self.reader.as_ref().unwrap()) });
389 let reader = DefaultReader::get_mut_private(cx, &reader)?;
390 Ok(Some(Reader::Default(reader)))
391 }
392 ReaderKind::Byob => {
393 let reader = Object::from(unsafe { Local::from_heap(self.reader.as_ref().unwrap()) });
394 let reader = ByobReader::get_mut_private(cx, &reader)?;
395 Ok(Some(Reader::Byob(reader)))
396 }
397 }
398 }
399}