runtime/globals/streams/readable/
source.rs1use std::rc::Rc;
8
9use bytes::{Buf, Bytes};
10use ion::conversions::{FromValue as _, ToValue as _};
11use ion::function::Opt;
12use ion::typedarray::ArrayBuffer;
13use ion::{Context, Function, JSIterator, Local, Object, Promise, ResultExc, Traceable, Value};
14use mozjs::gc::HandleObject;
15use mozjs::jsapi::{Heap, JSFunction, JSObject};
16use mozjs::jsval::JSVal;
17
18use crate::globals::streams::readable::tee::{TeeBytesState, TeeDefaultState};
19
20#[derive(Traceable)]
21pub enum StreamSource {
22 None,
23 Script {
24 object: Box<Heap<*mut JSObject>>,
25 pull: Option<Box<Heap<*mut JSFunction>>>,
26 cancel: Option<Box<Heap<*mut JSFunction>>>,
27 },
28 Bytes(#[trace(no_trace)] Option<Bytes>),
29 BytesBuf(#[trace(no_trace)] Option<Box<dyn Buf>>),
30 Iterator(#[trace(no_trace)] Box<dyn JSIterator>, Option<Box<Heap<JSVal>>>),
31 TeeDefault(Rc<TeeDefaultState>, bool),
32 TeeBytes(Rc<TeeBytesState>, bool),
33}
34
35impl StreamSource {
36 pub fn source_object(&self) -> Object<'_> {
37 match self {
38 StreamSource::Script { object, .. } => Object::from(unsafe { Local::from_heap(object) }),
39 _ => Object::from(Local::from_handle(HandleObject::null())),
40 }
41 }
42
43 pub fn pull<'cx>(&mut self, cx: &'cx Context, controller: *mut JSObject) -> ResultExc<Option<Promise<'cx>>> {
44 match self {
45 StreamSource::Script { object, pull: Some(pull), .. } => {
46 let pull = Function::from(unsafe { Local::from_heap(pull) });
47 let controller = controller.as_value(cx);
48 let this = Object::from(unsafe { Local::from_heap(object) });
49
50 let result = pull.call(cx, &this, &[controller]).map_err(|report| report.unwrap().exception)?;
51 Ok(Some(
52 Promise::from_value(cx, &result, true, ()).unwrap_or_else(|_| Promise::new(cx)),
53 ))
54 }
55 StreamSource::Script { pull: None, .. } => Ok(Some(Promise::resolved(cx, &Value::undefined_handle()))),
56 StreamSource::Bytes(bytes) => Ok(bytes.take().map(|bytes| {
57 let buffer = ArrayBuffer::copy_from_bytes(cx, &bytes).unwrap();
58 Promise::resolved(cx, &buffer.as_value(cx))
59 })),
60 StreamSource::BytesBuf(Some(buf)) => {
61 if !buf.has_remaining() {
62 return Ok(None);
63 }
64
65 let chunk = buf.chunk();
66 let buffer = ArrayBuffer::copy_from_bytes(cx, chunk).unwrap();
67 buf.advance(chunk.len());
68 Ok(Some(Promise::resolved(cx, &buffer.as_value(cx))))
69 }
70 StreamSource::Iterator(iterator, Some(data)) => {
71 let data = Value::from(unsafe { Local::from_heap(data) });
72 Ok(iterator.next_value(cx, &data).map(|value| Promise::resolved(cx, &value)))
73 }
74 StreamSource::TeeDefault(state, second) => state.pull(cx, *second),
75 StreamSource::TeeBytes(state, second) => state.pull(cx, *second),
76 _ => Ok(None),
77 }
78 }
79
80 pub fn cancel(&mut self, cx: &Context, promise: &mut Promise, reason: Option<Value>) -> ResultExc<()> {
81 match self {
82 StreamSource::Script { object, cancel: Some(cancel), .. } => {
83 let cancel = Function::from(unsafe { Local::from_heap(cancel) });
84 let this = Object::from(unsafe { Local::from_heap(object) });
85 let reason = reason.unwrap_or_else(Value::undefined_handle);
86
87 let result = cancel.call(cx, &this, &[reason]).map_err(|report| report.unwrap().exception)?;
88 if let Ok(result) = Promise::from_value(cx, &result, true, ()) {
89 result.then(cx, |_, _| Ok(Value::undefined_handle()));
90 promise.handle_mut().set(result.get());
91 } else {
92 promise.resolve(cx, &Value::undefined_handle());
93 }
94 }
95 StreamSource::TeeDefault(state, second) => {
96 let reason = reason.unwrap_or_else(Value::undefined_handle);
97
98 let branch = usize::from(*second);
99 state.common.cancelled[branch].set(true);
100 state.common.reason[branch].set(reason.get());
101
102 if state.common.cancelled[usize::from(!*second)].get() {
103 let composite = [state.common.reason[0].get(), state.common.reason[1].get()].as_value(cx);
104 let result = state.common.stream(cx)?.cancel(cx, Opt(Some(composite)))?;
105
106 let cancel_promise = state.common.cancel_promise();
107 cancel_promise.resolve(cx, &result.as_value(cx));
108 }
109
110 promise.handle_mut().set(state.common.cancel_promise.get());
111 }
112 _ => {}
113 }
114
115 Ok(())
116 }
117
118 pub fn clear_algorithms(&mut self) {
119 match self {
120 StreamSource::Script { pull, cancel, .. } => {
121 *pull = None;
122 *cancel = None;
123 }
124 StreamSource::Bytes(bytes) => {
125 *bytes = None;
126 }
127 StreamSource::BytesBuf(buf) => {
128 *buf = None;
129 }
130 StreamSource::Iterator(_, data) => {
131 *data = None;
132 }
133 _ => {}
134 }
135 }
136}