1use std::cell::Cell;
8use std::rc::Rc;
9
10use ion::class::NativeObject as _;
11use ion::clone::StructuredCloneBuffer;
12use ion::conversions::{FromValue as _, ToValue as _};
13use ion::function::Opt;
14use ion::typedarray::{ArrayBufferView, Uint8Array};
15use ion::{ClassDefinition as _, Context, Exception, Local, Object, Promise, ResultExc, Traceable, TracedHeap, Value};
16use mozjs::jsapi::{CloneDataPolicy, Heap, JSObject, StructuredCloneScope};
17use mozjs::jsval::{JSVal, UndefinedValue};
18
19use crate::globals::clone::{STRUCTURED_CLONE_CALLBACKS, StructuredCloneDataHolder};
20use crate::globals::streams::readable::controller::ControllerInternals as _;
21use crate::globals::streams::readable::reader::{ChunkErrorClosure, CloseClosure, ReaderKind, Request};
22use crate::globals::streams::readable::{ByobRequest, ByteStreamController, ReadableStream, ReaderOptions};
23
24#[derive(Traceable)]
25pub(crate) struct TeeCommonState {
26 stream: Box<Heap<*mut JSObject>>,
27 pub(crate) branch: [Box<Heap<*mut JSObject>>; 2],
28
29 pub(crate) reading: Cell<bool>,
30 pub(crate) cancelled: [Cell<bool>; 2],
31
32 pub(crate) reason: [Box<Heap<JSVal>>; 2],
33 pub(crate) cancel_promise: Box<Heap<*mut JSObject>>,
34}
35
36impl TeeCommonState {
37 pub(crate) fn new(cx: &Context, stream: &ReadableStream) -> TeeCommonState {
38 let promise = Promise::new(cx);
39 TeeCommonState {
40 stream: Heap::boxed(stream.reflector.get()),
41 branch: [Box::default(), Box::default()],
42
43 reading: Cell::new(false),
44 cancelled: [Cell::new(false), Cell::new(false)],
45
46 reason: [Heap::boxed(UndefinedValue()), Heap::boxed(UndefinedValue())],
47 cancel_promise: Heap::boxed(promise.get()),
48 }
49 }
50
51 #[expect(clippy::mut_from_ref)]
52 pub(crate) fn stream(&self, cx: &Context) -> ion::Result<&mut ReadableStream> {
53 let stream = Object::from(unsafe { Local::from_heap(&self.stream) });
54 ReadableStream::get_mut_private(cx, &stream)
55 }
56
57 #[expect(clippy::mut_from_ref)]
58 pub(crate) fn branch(&self, cx: &Context, second: bool) -> ion::Result<&mut ReadableStream> {
59 let stream = Object::from(unsafe { Local::from_heap(&self.branch[usize::from(second)]) });
60 ReadableStream::get_mut_private(cx, &stream)
61 }
62
63 pub(crate) fn cancel_promise(&self) -> Promise<'_> {
64 Promise::from(unsafe { Local::from_heap(&self.cancel_promise) }).unwrap()
65 }
66
67 pub(crate) fn cancel(&self, cx: &Context, value: &Value) {
68 if !self.cancelled[0].get() || !self.cancelled[1].get() {
69 self.cancel_promise().resolve(cx, value);
70 }
71 }
72}
73
74#[derive(Traceable)]
75pub struct TeeDefaultState {
76 pub(crate) common: TeeCommonState,
77 pub(crate) clone_branch_2: bool,
78 pub(crate) read_again: Cell<bool>,
79}
80
81impl TeeDefaultState {
82 pub(crate) fn new(cx: &Context, stream: &ReadableStream, clone_branch_2: bool) -> TeeDefaultState {
83 TeeDefaultState {
84 common: TeeCommonState::new(cx, stream),
85 clone_branch_2,
86 read_again: Cell::new(false),
87 }
88 }
89
90 pub(crate) fn pull<'cx>(self: &Rc<Self>, cx: &'cx Context, second: bool) -> ResultExc<Option<Promise<'cx>>> {
91 if self.common.reading.get() {
92 self.read_again.set(true);
93 return Ok(Some(Promise::resolved(cx, &Value::undefined_handle())));
94 }
95
96 self.common.reading.set(true);
97
98 let promise = Promise::new(cx);
99 let request = Request {
100 promise: Heap::boxed(promise.get()),
101 chunk: Rc::clone(self).chunk_closure(second),
102 close: Rc::clone(self).close_closure(),
103 error: error_closure(Rc::clone(self)),
104 };
105
106 let reader = self.common.stream(cx)?.native_reader(cx)?.unwrap().into_default().unwrap();
107 reader.read_internal(cx, request)?;
108
109 promise.resolve(cx, &Value::undefined_handle());
110 Ok(Some(promise))
111 }
112
113 fn chunk_closure(self: Rc<Self>, second: bool) -> Box<ChunkErrorClosure> {
114 Box::new(move |cx, _, chunk| {
115 let promise = Promise::resolved(cx, &Value::undefined_handle());
116 let chunk = TracedHeap::new(chunk.get());
117
118 let state = Rc::clone(&self);
119 promise.then(cx, move |cx, _| {
120 state.read_again.set(false);
121 let chunk = Value::from(chunk.to_local());
122 let mut chunk2 = None;
123
124 if !state.common.cancelled[1].get() && state.clone_branch_2 {
125 let policy = CloneDataPolicy {
126 allowIntraClusterClonableSharedObjects_: false,
127 allowSharedMemoryObjects_: true,
128 };
129
130 let mut buffer = StructuredCloneBuffer::new(
131 StructuredCloneScope::SameProcess,
132 &STRUCTURED_CLONE_CALLBACKS,
133 Some(Box::new(StructuredCloneDataHolder::default())),
134 );
135 let result = buffer.write(cx, &chunk, None, &policy).and_then(|_| buffer.read(cx, &policy));
136
137 match result {
138 Ok(chunk) => {
139 chunk2 = Some(chunk);
140 }
141 Err(e) => {
142 let value = e.as_value(cx);
143 let branch1 = state.common.branch(cx, false)?;
144 let controller1 = branch1.native_controller(cx)?.into_default().unwrap();
145 controller1.error_internal(cx, &value)?;
146
147 let branch2 = state.common.branch(cx, true)?;
148 let controller2 = branch2.native_controller(cx)?.into_default().unwrap();
149 controller2.error_internal(cx, &value)?;
150
151 state.common.cancel(cx, &value);
152 return Ok(Value::undefined_handle());
153 }
154 }
155 }
156
157 if !state.common.cancelled[0].get() {
158 let branch = state.common.branch(cx, false)?;
159 let controller = branch.native_controller(cx)?.into_default().unwrap();
160 controller.enqueue_internal(cx, &chunk)?;
161 }
162 if !state.common.cancelled[1].get() {
163 let branch = state.common.branch(cx, true)?;
164 let controller = branch.native_controller(cx)?.into_default().unwrap();
165 controller.enqueue_internal(cx, chunk2.as_ref().unwrap_or(&chunk))?;
166 }
167
168 state.common.reading.set(false);
169 if state.read_again.get() {
170 let branch = state.common.branch(cx, second)?;
171 let controller = branch.native_controller(cx)?.into_default().unwrap();
172 controller.common.source.pull(cx, controller.reflector().get())?;
173 }
174 Ok(Value::undefined_handle())
175 });
176 })
177 }
178
179 fn close_closure(self: Rc<Self>) -> Box<CloseClosure> {
180 Box::new(move |cx, _, _| {
181 self.common.reading.set(false);
182
183 if !self.common.cancelled[0].get() {
184 let branch = self.common.branch(cx, false)?;
185 branch.native_controller(cx)?.into_default().unwrap().close(cx)?;
186 }
187 if !self.common.cancelled[1].get() {
188 let branch = self.common.branch(cx, true)?;
189 branch.native_controller(cx)?.into_default().unwrap().close(cx)?;
190 }
191
192 self.common.cancel(cx, &Value::undefined_handle());
193 Ok(())
194 })
195 }
196}
197
198#[derive(Traceable)]
199pub struct TeeBytesState {
200 pub(crate) common: TeeCommonState,
201 pub(crate) read_again: [Cell<bool>; 2],
202}
203
204impl TeeBytesState {
205 pub(crate) fn new(cx: &Context, stream: &ReadableStream) -> TeeBytesState {
206 TeeBytesState {
207 common: TeeCommonState::new(cx, stream),
208 read_again: [Cell::new(false), Cell::new(false)],
209 }
210 }
211
212 pub(crate) fn pull<'cx>(self: &Rc<Self>, cx: &'cx Context, second: bool) -> ResultExc<Option<Promise<'cx>>> {
213 if self.common.reading.get() {
214 self.read_again[usize::from(second)].set(true);
215 return Ok(Some(Promise::resolved(cx, &Value::undefined_handle())));
216 }
217
218 self.common.reading.set(true);
219
220 let stream = self.common.stream(cx)?;
221 let controller = stream.native_controller(cx)?.into_byte_stream().unwrap();
222 let byob_request = controller.get_byob_request(cx);
223
224 let promise = Promise::new(cx);
225 if byob_request.is_null() {
226 if stream.reader_kind == ReaderKind::Byob {
227 {
228 let reader = stream.native_reader(cx)?.unwrap().into_byob().unwrap();
229 assert!(reader.common.requests.is_empty());
230 reader.release_lock(cx)?;
231 }
232 stream.get_reader(cx, Opt(None))?;
233
234 let reader = stream.native_reader(cx)?.unwrap().into_default().unwrap();
235 forward_reader_error(cx, &reader.common.closed(), Rc::clone(self))?;
236 }
237
238 let request = Request {
239 promise: Heap::boxed(promise.get()),
240 chunk: Rc::clone(self).chunk_closure(),
241 close: Rc::clone(self).close_closure(),
242 error: error_closure(Rc::clone(self)),
243 };
244
245 let reader = self.common.stream(cx)?.native_reader(cx)?.unwrap().into_default().unwrap();
246 reader.read_internal(cx, request)?;
247 } else {
248 if stream.reader_kind == ReaderKind::Default {
249 {
250 let reader = stream.native_reader(cx)?.unwrap().into_default().unwrap();
251 assert!(reader.common.requests.is_empty());
252 reader.release_lock(cx)?;
253 }
254 stream.get_reader(cx, Opt(Some(ReaderOptions { mode: Some(String::from("byob")) })))?;
255
256 let reader = stream.native_reader(cx)?.unwrap().into_byob().unwrap();
257 forward_reader_error(cx, &reader.common.closed(), Rc::clone(self))?;
258 }
259
260 let request = Request {
261 promise: Heap::boxed(promise.get()),
262 chunk: Rc::clone(self).chunk_byob_closure(second),
263 close: Rc::clone(self).close_closure(),
264 error: error_closure(Rc::clone(self)),
265 };
266
267 let reader = self.common.stream(cx)?.native_reader(cx)?.unwrap().into_byob().unwrap();
268 let byob_request = Object::from(cx.root(byob_request));
269 let byob_request = ByobRequest::get_private(cx, &byob_request)?;
270
271 let view = ArrayBufferView::from(cx.root(byob_request.get_view())).unwrap();
272 reader.read_internal(cx, view, 1, request)?;
273 }
274
275 promise.resolve(cx, &Value::undefined_handle());
276 Ok(Some(promise))
277 }
278
279 fn chunk_closure(self: Rc<Self>) -> Box<ChunkErrorClosure> {
280 Box::new(move |cx, _, chunk| {
281 let promise = Promise::resolved(cx, &Value::undefined_handle());
282 let chunk = TracedHeap::new(chunk.get());
283
284 let state = Rc::clone(&self);
285 promise.then(cx, move |cx, _| {
286 state.read_again[0].set(false);
287 state.read_again[1].set(false);
288
289 let chunk = Value::from(chunk.to_local());
290 let chunk = ArrayBufferView::from_value(cx, &chunk, true, ())?;
291 let controller1 = state.common.branch(cx, false)?.native_controller(cx)?.into_byte_stream().unwrap();
292 let controller2 = state.common.branch(cx, true)?.native_controller(cx)?.into_byte_stream().unwrap();
293
294 match (state.common.cancelled[0].get(), state.common.cancelled[1].get()) {
295 (false, false) => {
296 let chunk2 = clone_bytes_chunk(cx, &chunk);
297
298 if let Some(chunk2) = chunk2 {
299 controller1.enqueue(cx, chunk)?;
300 controller2.enqueue(cx, chunk2)?;
301 } else {
302 state.chunk_error(cx, [controller1, controller2])?;
303 }
304
305 state.common.reading.set(false);
306 }
307 (false, true) => controller1.enqueue(cx, chunk)?,
308 (true, false) => controller2.enqueue(cx, chunk)?,
309 _ => {}
310 }
311
312 state.common.reading.set(false);
313 if state.read_again[0].get() {
314 controller1.common.source.pull(cx, controller1.reflector().get())?;
315 } else if state.read_again[1].get() {
316 controller2.common.source.pull(cx, controller2.reflector().get())?;
317 }
318
319 Ok(Value::undefined_handle())
320 });
321 })
322 }
323
324 fn chunk_byob_closure(self: Rc<Self>, second: bool) -> Box<ChunkErrorClosure> {
325 Box::new(move |cx, _, chunk| {
326 let promise = Promise::resolved(cx, &Value::undefined_handle());
327 let chunk = TracedHeap::new(chunk.get());
328
329 let state = Rc::clone(&self);
330 promise.then(cx, move |cx, _| {
331 state.read_again[0].set(false);
332 state.read_again[1].set(false);
333
334 let chunk = Value::from(chunk.to_local());
335 let chunk = ArrayBufferView::from_value(cx, &chunk, true, ())?;
336
337 let byob_controller =
338 state.common.branch(cx, second)?.native_controller(cx)?.into_byte_stream().unwrap();
339 let other_controller =
340 state.common.branch(cx, !second)?.native_controller(cx)?.into_byte_stream().unwrap();
341
342 if !state.common.cancelled[usize::from(!second)].get() {
343 let chunk2 = clone_bytes_chunk(cx, &chunk);
344
345 if let Some(chunk2) = chunk2 {
346 other_controller.enqueue(cx, chunk2)?;
347 } else {
348 state.chunk_error(cx, [byob_controller, other_controller])?;
349 }
350 }
351 if !state.common.cancelled[usize::from(second)].get() {
352 byob_controller.respond_with_new_view(cx, chunk)?;
353 }
354
355 state.common.reading.set(false);
356 if state.read_again[usize::from(second)].get() {
357 byob_controller.common.source.pull(cx, byob_controller.reflector().get())?;
358 } else if state.read_again[usize::from(!second)].get() {
359 other_controller.common.source.pull(cx, other_controller.reflector().get())?;
360 }
361
362 Ok(Value::undefined_handle())
363 });
364 })
365 }
366
367 fn close_closure(self: Rc<Self>) -> Box<CloseClosure> {
368 Box::new(move |cx, _, _| {
369 self.common.reading.set(false);
370
371 let controller1 = self.common.branch(cx, false)?.native_controller(cx)?.into_byte_stream().unwrap();
372 let controller2 = self.common.branch(cx, true)?.native_controller(cx)?.into_byte_stream().unwrap();
373
374 if !self.common.cancelled[0].get() {
375 controller1.close(cx)?;
376 }
377 if !self.common.cancelled[1].get() {
378 controller2.close(cx)?;
379 }
380
381 if !controller1.pending_descriptors.is_empty() {
382 controller1.respond(cx, 0)?;
383 }
384 if !controller2.pending_descriptors.is_empty() {
385 controller2.respond(cx, 0)?;
386 }
387
388 self.common.cancel(cx, &Value::undefined_handle());
389 Ok(())
390 })
391 }
392
393 fn chunk_error(&self, cx: &Context, controllers: [&mut ByteStreamController; 2]) -> ResultExc<()> {
394 let exception = Exception::new(cx).unwrap().as_value(cx);
395 for controller in controllers {
396 controller.error_internal(cx, &exception)?;
397 }
398 self.common.cancel(cx, &Value::undefined_handle());
399 Ok(())
400 }
401}
402
403pub(crate) trait TeeState {
404 fn common(&self) -> &TeeCommonState;
405}
406
407impl TeeState for TeeDefaultState {
408 fn common(&self) -> &TeeCommonState {
409 &self.common
410 }
411}
412
413impl TeeState for TeeBytesState {
414 fn common(&self) -> &TeeCommonState {
415 &self.common
416 }
417}
418
419impl<T: TeeState> TeeState for Rc<T> {
420 fn common(&self) -> &TeeCommonState {
421 (**self).common()
422 }
423}
424
425fn clone_bytes_chunk<'cx>(cx: &'cx Context, chunk: &ArrayBufferView) -> Option<ArrayBufferView<'cx>> {
426 chunk
427 .buffer(cx)
428 .clone(cx, chunk.offset(), chunk.len())
429 .and_then(|buffer| Uint8Array::with_array_buffer(cx, &buffer, 0, buffer.len()))
430 .map(|array| ArrayBufferView::from(array.into_local()).unwrap())
431}
432
433fn error_closure<T: TeeState + 'static>(state: T) -> Box<ChunkErrorClosure> {
434 Box::new(move |_, _, _| {
435 state.common().reading.set(false);
436 })
437}
438
439pub(crate) fn forward_reader_error(
440 cx: &Context, closed_promise: &Promise, state: Rc<TeeBytesState>,
441) -> ion::Result<()> {
442 let controller1 = TracedHeap::new(state.common.branch(cx, false)?.controller.get());
443 let controller2 = TracedHeap::new(state.common.branch(cx, true)?.controller.get());
444
445 closed_promise.catch(cx, move |cx, reason| {
446 ByteStreamController::from_traced_heap(cx, &controller1)?.error_internal(cx, reason)?;
447 ByteStreamController::from_traced_heap(cx, &controller2)?.error_internal(cx, reason)?;
448 state.common.cancel(cx, &Value::undefined_handle());
449 Ok(Value::undefined_handle())
450 });
451 Ok(())
452}