runtime/globals/streams/readable/
tee.rs

1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 */
6
7use std::cell::Cell;
8use std::rc::Rc;
9
10use ion::class::NativeObject as _;
11use ion::clone::StructuredCloneBuffer;
12use ion::conversions::{FromValue as _, ToValue as _};
13use ion::function::Opt;
14use ion::typedarray::{ArrayBufferView, Uint8Array};
15use ion::{ClassDefinition as _, Context, Exception, Local, Object, Promise, ResultExc, Traceable, TracedHeap, Value};
16use mozjs::jsapi::{CloneDataPolicy, Heap, JSObject, StructuredCloneScope};
17use mozjs::jsval::{JSVal, UndefinedValue};
18
19use crate::globals::clone::{STRUCTURED_CLONE_CALLBACKS, StructuredCloneDataHolder};
20use crate::globals::streams::readable::controller::ControllerInternals as _;
21use crate::globals::streams::readable::reader::{ChunkErrorClosure, CloseClosure, ReaderKind, Request};
22use crate::globals::streams::readable::{ByobRequest, ByteStreamController, ReadableStream, ReaderOptions};
23
24#[derive(Traceable)]
25pub(crate) struct TeeCommonState {
26	stream: Box<Heap<*mut JSObject>>,
27	pub(crate) branch: [Box<Heap<*mut JSObject>>; 2],
28
29	pub(crate) reading: Cell<bool>,
30	pub(crate) cancelled: [Cell<bool>; 2],
31
32	pub(crate) reason: [Box<Heap<JSVal>>; 2],
33	pub(crate) cancel_promise: Box<Heap<*mut JSObject>>,
34}
35
36impl TeeCommonState {
37	pub(crate) fn new(cx: &Context, stream: &ReadableStream) -> TeeCommonState {
38		let promise = Promise::new(cx);
39		TeeCommonState {
40			stream: Heap::boxed(stream.reflector.get()),
41			branch: [Box::default(), Box::default()],
42
43			reading: Cell::new(false),
44			cancelled: [Cell::new(false), Cell::new(false)],
45
46			reason: [Heap::boxed(UndefinedValue()), Heap::boxed(UndefinedValue())],
47			cancel_promise: Heap::boxed(promise.get()),
48		}
49	}
50
51	#[expect(clippy::mut_from_ref)]
52	pub(crate) fn stream(&self, cx: &Context) -> ion::Result<&mut ReadableStream> {
53		let stream = Object::from(unsafe { Local::from_heap(&self.stream) });
54		ReadableStream::get_mut_private(cx, &stream)
55	}
56
57	#[expect(clippy::mut_from_ref)]
58	pub(crate) fn branch(&self, cx: &Context, second: bool) -> ion::Result<&mut ReadableStream> {
59		let stream = Object::from(unsafe { Local::from_heap(&self.branch[usize::from(second)]) });
60		ReadableStream::get_mut_private(cx, &stream)
61	}
62
63	pub(crate) fn cancel_promise(&self) -> Promise<'_> {
64		Promise::from(unsafe { Local::from_heap(&self.cancel_promise) }).unwrap()
65	}
66
67	pub(crate) fn cancel(&self, cx: &Context, value: &Value) {
68		if !self.cancelled[0].get() || !self.cancelled[1].get() {
69			self.cancel_promise().resolve(cx, value);
70		}
71	}
72}
73
74#[derive(Traceable)]
75pub struct TeeDefaultState {
76	pub(crate) common: TeeCommonState,
77	pub(crate) clone_branch_2: bool,
78	pub(crate) read_again: Cell<bool>,
79}
80
81impl TeeDefaultState {
82	pub(crate) fn new(cx: &Context, stream: &ReadableStream, clone_branch_2: bool) -> TeeDefaultState {
83		TeeDefaultState {
84			common: TeeCommonState::new(cx, stream),
85			clone_branch_2,
86			read_again: Cell::new(false),
87		}
88	}
89
90	pub(crate) fn pull<'cx>(self: &Rc<Self>, cx: &'cx Context, second: bool) -> ResultExc<Option<Promise<'cx>>> {
91		if self.common.reading.get() {
92			self.read_again.set(true);
93			return Ok(Some(Promise::resolved(cx, &Value::undefined_handle())));
94		}
95
96		self.common.reading.set(true);
97
98		let promise = Promise::new(cx);
99		let request = Request {
100			promise: Heap::boxed(promise.get()),
101			chunk: Rc::clone(self).chunk_closure(second),
102			close: Rc::clone(self).close_closure(),
103			error: error_closure(Rc::clone(self)),
104		};
105
106		let reader = self.common.stream(cx)?.native_reader(cx)?.unwrap().into_default().unwrap();
107		reader.read_internal(cx, request)?;
108
109		promise.resolve(cx, &Value::undefined_handle());
110		Ok(Some(promise))
111	}
112
113	fn chunk_closure(self: Rc<Self>, second: bool) -> Box<ChunkErrorClosure> {
114		Box::new(move |cx, _, chunk| {
115			let promise = Promise::resolved(cx, &Value::undefined_handle());
116			let chunk = TracedHeap::new(chunk.get());
117
118			let state = Rc::clone(&self);
119			promise.then(cx, move |cx, _| {
120				state.read_again.set(false);
121				let chunk = Value::from(chunk.to_local());
122				let mut chunk2 = None;
123
124				if !state.common.cancelled[1].get() && state.clone_branch_2 {
125					let policy = CloneDataPolicy {
126						allowIntraClusterClonableSharedObjects_: false,
127						allowSharedMemoryObjects_: true,
128					};
129
130					let mut buffer = StructuredCloneBuffer::new(
131						StructuredCloneScope::SameProcess,
132						&STRUCTURED_CLONE_CALLBACKS,
133						Some(Box::new(StructuredCloneDataHolder::default())),
134					);
135					let result = buffer.write(cx, &chunk, None, &policy).and_then(|_| buffer.read(cx, &policy));
136
137					match result {
138						Ok(chunk) => {
139							chunk2 = Some(chunk);
140						}
141						Err(e) => {
142							let value = e.as_value(cx);
143							let branch1 = state.common.branch(cx, false)?;
144							let controller1 = branch1.native_controller(cx)?.into_default().unwrap();
145							controller1.error_internal(cx, &value)?;
146
147							let branch2 = state.common.branch(cx, true)?;
148							let controller2 = branch2.native_controller(cx)?.into_default().unwrap();
149							controller2.error_internal(cx, &value)?;
150
151							state.common.cancel(cx, &value);
152							return Ok(Value::undefined_handle());
153						}
154					}
155				}
156
157				if !state.common.cancelled[0].get() {
158					let branch = state.common.branch(cx, false)?;
159					let controller = branch.native_controller(cx)?.into_default().unwrap();
160					controller.enqueue_internal(cx, &chunk)?;
161				}
162				if !state.common.cancelled[1].get() {
163					let branch = state.common.branch(cx, true)?;
164					let controller = branch.native_controller(cx)?.into_default().unwrap();
165					controller.enqueue_internal(cx, chunk2.as_ref().unwrap_or(&chunk))?;
166				}
167
168				state.common.reading.set(false);
169				if state.read_again.get() {
170					let branch = state.common.branch(cx, second)?;
171					let controller = branch.native_controller(cx)?.into_default().unwrap();
172					controller.common.source.pull(cx, controller.reflector().get())?;
173				}
174				Ok(Value::undefined_handle())
175			});
176		})
177	}
178
179	fn close_closure(self: Rc<Self>) -> Box<CloseClosure> {
180		Box::new(move |cx, _, _| {
181			self.common.reading.set(false);
182
183			if !self.common.cancelled[0].get() {
184				let branch = self.common.branch(cx, false)?;
185				branch.native_controller(cx)?.into_default().unwrap().close(cx)?;
186			}
187			if !self.common.cancelled[1].get() {
188				let branch = self.common.branch(cx, true)?;
189				branch.native_controller(cx)?.into_default().unwrap().close(cx)?;
190			}
191
192			self.common.cancel(cx, &Value::undefined_handle());
193			Ok(())
194		})
195	}
196}
197
198#[derive(Traceable)]
199pub struct TeeBytesState {
200	pub(crate) common: TeeCommonState,
201	pub(crate) read_again: [Cell<bool>; 2],
202}
203
204impl TeeBytesState {
205	pub(crate) fn new(cx: &Context, stream: &ReadableStream) -> TeeBytesState {
206		TeeBytesState {
207			common: TeeCommonState::new(cx, stream),
208			read_again: [Cell::new(false), Cell::new(false)],
209		}
210	}
211
212	pub(crate) fn pull<'cx>(self: &Rc<Self>, cx: &'cx Context, second: bool) -> ResultExc<Option<Promise<'cx>>> {
213		if self.common.reading.get() {
214			self.read_again[usize::from(second)].set(true);
215			return Ok(Some(Promise::resolved(cx, &Value::undefined_handle())));
216		}
217
218		self.common.reading.set(true);
219
220		let stream = self.common.stream(cx)?;
221		let controller = stream.native_controller(cx)?.into_byte_stream().unwrap();
222		let byob_request = controller.get_byob_request(cx);
223
224		let promise = Promise::new(cx);
225		if byob_request.is_null() {
226			if stream.reader_kind == ReaderKind::Byob {
227				{
228					let reader = stream.native_reader(cx)?.unwrap().into_byob().unwrap();
229					assert!(reader.common.requests.is_empty());
230					reader.release_lock(cx)?;
231				}
232				stream.get_reader(cx, Opt(None))?;
233
234				let reader = stream.native_reader(cx)?.unwrap().into_default().unwrap();
235				forward_reader_error(cx, &reader.common.closed(), Rc::clone(self))?;
236			}
237
238			let request = Request {
239				promise: Heap::boxed(promise.get()),
240				chunk: Rc::clone(self).chunk_closure(),
241				close: Rc::clone(self).close_closure(),
242				error: error_closure(Rc::clone(self)),
243			};
244
245			let reader = self.common.stream(cx)?.native_reader(cx)?.unwrap().into_default().unwrap();
246			reader.read_internal(cx, request)?;
247		} else {
248			if stream.reader_kind == ReaderKind::Default {
249				{
250					let reader = stream.native_reader(cx)?.unwrap().into_default().unwrap();
251					assert!(reader.common.requests.is_empty());
252					reader.release_lock(cx)?;
253				}
254				stream.get_reader(cx, Opt(Some(ReaderOptions { mode: Some(String::from("byob")) })))?;
255
256				let reader = stream.native_reader(cx)?.unwrap().into_byob().unwrap();
257				forward_reader_error(cx, &reader.common.closed(), Rc::clone(self))?;
258			}
259
260			let request = Request {
261				promise: Heap::boxed(promise.get()),
262				chunk: Rc::clone(self).chunk_byob_closure(second),
263				close: Rc::clone(self).close_closure(),
264				error: error_closure(Rc::clone(self)),
265			};
266
267			let reader = self.common.stream(cx)?.native_reader(cx)?.unwrap().into_byob().unwrap();
268			let byob_request = Object::from(cx.root(byob_request));
269			let byob_request = ByobRequest::get_private(cx, &byob_request)?;
270
271			let view = ArrayBufferView::from(cx.root(byob_request.get_view())).unwrap();
272			reader.read_internal(cx, view, 1, request)?;
273		}
274
275		promise.resolve(cx, &Value::undefined_handle());
276		Ok(Some(promise))
277	}
278
279	fn chunk_closure(self: Rc<Self>) -> Box<ChunkErrorClosure> {
280		Box::new(move |cx, _, chunk| {
281			let promise = Promise::resolved(cx, &Value::undefined_handle());
282			let chunk = TracedHeap::new(chunk.get());
283
284			let state = Rc::clone(&self);
285			promise.then(cx, move |cx, _| {
286				state.read_again[0].set(false);
287				state.read_again[1].set(false);
288
289				let chunk = Value::from(chunk.to_local());
290				let chunk = ArrayBufferView::from_value(cx, &chunk, true, ())?;
291				let controller1 = state.common.branch(cx, false)?.native_controller(cx)?.into_byte_stream().unwrap();
292				let controller2 = state.common.branch(cx, true)?.native_controller(cx)?.into_byte_stream().unwrap();
293
294				match (state.common.cancelled[0].get(), state.common.cancelled[1].get()) {
295					(false, false) => {
296						let chunk2 = clone_bytes_chunk(cx, &chunk);
297
298						if let Some(chunk2) = chunk2 {
299							controller1.enqueue(cx, chunk)?;
300							controller2.enqueue(cx, chunk2)?;
301						} else {
302							state.chunk_error(cx, [controller1, controller2])?;
303						}
304
305						state.common.reading.set(false);
306					}
307					(false, true) => controller1.enqueue(cx, chunk)?,
308					(true, false) => controller2.enqueue(cx, chunk)?,
309					_ => {}
310				}
311
312				state.common.reading.set(false);
313				if state.read_again[0].get() {
314					controller1.common.source.pull(cx, controller1.reflector().get())?;
315				} else if state.read_again[1].get() {
316					controller2.common.source.pull(cx, controller2.reflector().get())?;
317				}
318
319				Ok(Value::undefined_handle())
320			});
321		})
322	}
323
324	fn chunk_byob_closure(self: Rc<Self>, second: bool) -> Box<ChunkErrorClosure> {
325		Box::new(move |cx, _, chunk| {
326			let promise = Promise::resolved(cx, &Value::undefined_handle());
327			let chunk = TracedHeap::new(chunk.get());
328
329			let state = Rc::clone(&self);
330			promise.then(cx, move |cx, _| {
331				state.read_again[0].set(false);
332				state.read_again[1].set(false);
333
334				let chunk = Value::from(chunk.to_local());
335				let chunk = ArrayBufferView::from_value(cx, &chunk, true, ())?;
336
337				let byob_controller =
338					state.common.branch(cx, second)?.native_controller(cx)?.into_byte_stream().unwrap();
339				let other_controller =
340					state.common.branch(cx, !second)?.native_controller(cx)?.into_byte_stream().unwrap();
341
342				if !state.common.cancelled[usize::from(!second)].get() {
343					let chunk2 = clone_bytes_chunk(cx, &chunk);
344
345					if let Some(chunk2) = chunk2 {
346						other_controller.enqueue(cx, chunk2)?;
347					} else {
348						state.chunk_error(cx, [byob_controller, other_controller])?;
349					}
350				}
351				if !state.common.cancelled[usize::from(second)].get() {
352					byob_controller.respond_with_new_view(cx, chunk)?;
353				}
354
355				state.common.reading.set(false);
356				if state.read_again[usize::from(second)].get() {
357					byob_controller.common.source.pull(cx, byob_controller.reflector().get())?;
358				} else if state.read_again[usize::from(!second)].get() {
359					other_controller.common.source.pull(cx, other_controller.reflector().get())?;
360				}
361
362				Ok(Value::undefined_handle())
363			});
364		})
365	}
366
367	fn close_closure(self: Rc<Self>) -> Box<CloseClosure> {
368		Box::new(move |cx, _, _| {
369			self.common.reading.set(false);
370
371			let controller1 = self.common.branch(cx, false)?.native_controller(cx)?.into_byte_stream().unwrap();
372			let controller2 = self.common.branch(cx, true)?.native_controller(cx)?.into_byte_stream().unwrap();
373
374			if !self.common.cancelled[0].get() {
375				controller1.close(cx)?;
376			}
377			if !self.common.cancelled[1].get() {
378				controller2.close(cx)?;
379			}
380
381			if !controller1.pending_descriptors.is_empty() {
382				controller1.respond(cx, 0)?;
383			}
384			if !controller2.pending_descriptors.is_empty() {
385				controller2.respond(cx, 0)?;
386			}
387
388			self.common.cancel(cx, &Value::undefined_handle());
389			Ok(())
390		})
391	}
392
393	fn chunk_error(&self, cx: &Context, controllers: [&mut ByteStreamController; 2]) -> ResultExc<()> {
394		let exception = Exception::new(cx).unwrap().as_value(cx);
395		for controller in controllers {
396			controller.error_internal(cx, &exception)?;
397		}
398		self.common.cancel(cx, &Value::undefined_handle());
399		Ok(())
400	}
401}
402
403pub(crate) trait TeeState {
404	fn common(&self) -> &TeeCommonState;
405}
406
407impl TeeState for TeeDefaultState {
408	fn common(&self) -> &TeeCommonState {
409		&self.common
410	}
411}
412
413impl TeeState for TeeBytesState {
414	fn common(&self) -> &TeeCommonState {
415		&self.common
416	}
417}
418
419impl<T: TeeState> TeeState for Rc<T> {
420	fn common(&self) -> &TeeCommonState {
421		(**self).common()
422	}
423}
424
425fn clone_bytes_chunk<'cx>(cx: &'cx Context, chunk: &ArrayBufferView) -> Option<ArrayBufferView<'cx>> {
426	chunk
427		.buffer(cx)
428		.clone(cx, chunk.offset(), chunk.len())
429		.and_then(|buffer| Uint8Array::with_array_buffer(cx, &buffer, 0, buffer.len()))
430		.map(|array| ArrayBufferView::from(array.into_local()).unwrap())
431}
432
433fn error_closure<T: TeeState + 'static>(state: T) -> Box<ChunkErrorClosure> {
434	Box::new(move |_, _, _| {
435		state.common().reading.set(false);
436	})
437}
438
439pub(crate) fn forward_reader_error(
440	cx: &Context, closed_promise: &Promise, state: Rc<TeeBytesState>,
441) -> ion::Result<()> {
442	let controller1 = TracedHeap::new(state.common.branch(cx, false)?.controller.get());
443	let controller2 = TracedHeap::new(state.common.branch(cx, true)?.controller.get());
444
445	closed_promise.catch(cx, move |cx, reason| {
446		ByteStreamController::from_traced_heap(cx, &controller1)?.error_internal(cx, reason)?;
447		ByteStreamController::from_traced_heap(cx, &controller2)?.error_internal(cx, reason)?;
448		state.common.cancel(cx, &Value::undefined_handle());
449		Ok(Value::undefined_handle())
450	});
451	Ok(())
452}