runtime/globals/streams/readable/
source.rs

1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 */
6
7use std::rc::Rc;
8
9use bytes::{Buf, Bytes};
10use ion::conversions::{FromValue as _, ToValue as _};
11use ion::function::Opt;
12use ion::typedarray::ArrayBuffer;
13use ion::{Context, Function, JSIterator, Local, Object, Promise, ResultExc, Traceable, Value};
14use mozjs::gc::HandleObject;
15use mozjs::jsapi::{Heap, JSFunction, JSObject};
16use mozjs::jsval::JSVal;
17
18use crate::globals::streams::readable::tee::{TeeBytesState, TeeDefaultState};
19
20#[derive(Traceable)]
21pub enum StreamSource {
22	None,
23	Script {
24		object: Box<Heap<*mut JSObject>>,
25		pull: Option<Box<Heap<*mut JSFunction>>>,
26		cancel: Option<Box<Heap<*mut JSFunction>>>,
27	},
28	Bytes(#[trace(no_trace)] Option<Bytes>),
29	BytesBuf(#[trace(no_trace)] Option<Box<dyn Buf>>),
30	Iterator(#[trace(no_trace)] Box<dyn JSIterator>, Option<Box<Heap<JSVal>>>),
31	TeeDefault(Rc<TeeDefaultState>, bool),
32	TeeBytes(Rc<TeeBytesState>, bool),
33}
34
35impl StreamSource {
36	pub fn source_object(&self) -> Object<'_> {
37		match self {
38			StreamSource::Script { object, .. } => Object::from(unsafe { Local::from_heap(object) }),
39			_ => Object::from(Local::from_handle(HandleObject::null())),
40		}
41	}
42
43	pub fn pull<'cx>(&mut self, cx: &'cx Context, controller: *mut JSObject) -> ResultExc<Option<Promise<'cx>>> {
44		match self {
45			StreamSource::Script { object, pull: Some(pull), .. } => {
46				let pull = Function::from(unsafe { Local::from_heap(pull) });
47				let controller = controller.as_value(cx);
48				let this = Object::from(unsafe { Local::from_heap(object) });
49
50				let result = pull.call(cx, &this, &[controller]).map_err(|report| report.unwrap().exception)?;
51				Ok(Some(
52					Promise::from_value(cx, &result, true, ()).unwrap_or_else(|_| Promise::new(cx)),
53				))
54			}
55			StreamSource::Script { pull: None, .. } => Ok(Some(Promise::resolved(cx, &Value::undefined_handle()))),
56			StreamSource::Bytes(bytes) => Ok(bytes.take().map(|bytes| {
57				let buffer = ArrayBuffer::copy_from_bytes(cx, &bytes).unwrap();
58				Promise::resolved(cx, &buffer.as_value(cx))
59			})),
60			StreamSource::BytesBuf(Some(buf)) => {
61				if !buf.has_remaining() {
62					return Ok(None);
63				}
64
65				let chunk = buf.chunk();
66				let buffer = ArrayBuffer::copy_from_bytes(cx, chunk).unwrap();
67				buf.advance(chunk.len());
68				Ok(Some(Promise::resolved(cx, &buffer.as_value(cx))))
69			}
70			StreamSource::Iterator(iterator, Some(data)) => {
71				let data = Value::from(unsafe { Local::from_heap(data) });
72				Ok(iterator.next_value(cx, &data).map(|value| Promise::resolved(cx, &value)))
73			}
74			StreamSource::TeeDefault(state, second) => state.pull(cx, *second),
75			StreamSource::TeeBytes(state, second) => state.pull(cx, *second),
76			_ => Ok(None),
77		}
78	}
79
80	pub fn cancel(&mut self, cx: &Context, promise: &mut Promise, reason: Option<Value>) -> ResultExc<()> {
81		match self {
82			StreamSource::Script { object, cancel: Some(cancel), .. } => {
83				let cancel = Function::from(unsafe { Local::from_heap(cancel) });
84				let this = Object::from(unsafe { Local::from_heap(object) });
85				let reason = reason.unwrap_or_else(Value::undefined_handle);
86
87				let result = cancel.call(cx, &this, &[reason]).map_err(|report| report.unwrap().exception)?;
88				if let Ok(result) = Promise::from_value(cx, &result, true, ()) {
89					result.then(cx, |_, _| Ok(Value::undefined_handle()));
90					promise.handle_mut().set(result.get());
91				} else {
92					promise.resolve(cx, &Value::undefined_handle());
93				}
94			}
95			StreamSource::TeeDefault(state, second) => {
96				let reason = reason.unwrap_or_else(Value::undefined_handle);
97
98				let branch = usize::from(*second);
99				state.common.cancelled[branch].set(true);
100				state.common.reason[branch].set(reason.get());
101
102				if state.common.cancelled[usize::from(!*second)].get() {
103					let composite = [state.common.reason[0].get(), state.common.reason[1].get()].as_value(cx);
104					let result = state.common.stream(cx)?.cancel(cx, Opt(Some(composite)))?;
105
106					let cancel_promise = state.common.cancel_promise();
107					cancel_promise.resolve(cx, &result.as_value(cx));
108				}
109
110				promise.handle_mut().set(state.common.cancel_promise.get());
111			}
112			_ => {}
113		}
114
115		Ok(())
116	}
117
118	pub fn clear_algorithms(&mut self) {
119		match self {
120			StreamSource::Script { pull, cancel, .. } => {
121				*pull = None;
122				*cancel = None;
123			}
124			StreamSource::Bytes(bytes) => {
125				*bytes = None;
126			}
127			StreamSource::BytesBuf(buf) => {
128				*buf = None;
129			}
130			StreamSource::Iterator(_, data) => {
131				*data = None;
132			}
133			_ => {}
134		}
135	}
136}