runtime/event_loop/
future.rs1use std::task;
8use std::task::Poll;
9
10use futures_util::StreamExt as _;
11use futures_util::stream::FuturesUnordered;
12use ion::conversions::BoxedIntoValue;
13use ion::{Context, Error, ErrorKind, ErrorReport, Promise, ThrowException as _, Value};
14use mozjs::jsapi::JSObject;
15use tokio::task::JoinHandle;
16
17type FutureOutput = (Result<BoxedIntoValue, BoxedIntoValue>, *mut JSObject);
18
19#[derive(Default)]
20pub struct FutureQueue {
21 queue: FuturesUnordered<JoinHandle<FutureOutput>>,
22}
23
24impl FutureQueue {
25 pub fn run_futures(&mut self, cx: &Context, wcx: &mut task::Context) -> Result<(), Option<ErrorReport>> {
26 let mut results = Vec::new();
27
28 while let Poll::Ready(Some(item)) = self.queue.poll_next_unpin(wcx) {
29 match item {
30 Ok(item) => results.push(item),
31 Err(error) => {
32 Error::new(error.to_string(), ErrorKind::Normal).throw(cx);
33 return Err(None);
34 }
35 }
36 }
37
38 for (result, promise) in results {
39 let mut value = Value::undefined(cx);
40 let promise = Promise::from(cx.root(promise)).unwrap();
41
42 let result = match result {
43 Ok(o) => {
44 o.into_value(cx, &mut value);
45 promise.resolve(cx, &value)
46 }
47 Err(e) => {
48 e.into_value(cx, &mut value);
49 promise.reject(cx, &value)
50 }
51 };
52
53 if !result {
54 return Err(ErrorReport::new_with_exception_stack(cx).unwrap());
55 }
56 }
57
58 Ok(())
59 }
60
61 pub fn enqueue(&self, handle: JoinHandle<FutureOutput>) {
62 self.queue.push(handle);
63 }
64
65 pub fn is_empty(&self) -> bool {
66 self.queue.is_empty()
67 }
68}