Updated semantics of `CallFuture` in Rust CDK

We have updated the semantics of CallFuture produced by ic_cdk::api::call in Rust CDK to only make the inter-canister call (i.e., call ic0::call_new, …, ic0::call_perform) when the future is actually awaited. This aligns with the Rust semantics of futures that are not supposed to do anything unless awaited (and there is a compiler warning about that if you don’t await a future). Please refer to the PR (fix: lazy CallFuture by mraszyk · Pull Request #391 · dfinity/cdk-rs · GitHub) for further motivation.

In particular, if a future is never awaited, then the inter-canister call is not performed at all. Otherwise, firing the call is postponed to the code location that awaits the future.

This means that patterns such as

let mut futs = vec![];
for m in messages.iter() { futs.push(call(m)); }
for f in futs.iter() { f.await; }

do not fire the inter-canister calls in parallel, but rather sequentially (as the inter-canister calls are awaited sequentially).

To fire the inter-canister calls in parallel, patterns such as

let mut futs = vec![];
for m in messages.iter() { futs.push(call(m)); }
let stream = futures::stream::iter(futs).buffer_unordered(16);
stream.collect::<Vec<_>>().await;

must be used.

Note. This change will be released in the next Rust CDK release.

5 Likes

Hey there @mraszyk
Great job, thanks!

Is it possible to define a custom future on the IC?
I have a use-case, when I want to suspend the execution of a user-called canister method, until another canister method is called.

Here is my code:


#[derive(Default)]
pub struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

pub struct MyFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

impl MyFuture {
    pub fn new() -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState::default()));

        unsafe { FUTURE_STATE = Some(shared_state.clone()) };

        Self { shared_state }
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();

        if shared_state.completed {
            Poll::Ready(())
        } else {
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

static mut FUTURE_STATE: Option<Arc<Mutex<SharedState>>> = None;

#[update]
async fn start() {
    println!("[{}] start", time());
    let f = MyFuture::new();

    println!("[{}] future created, awaiting...", time());

    f.await;

    println!("[{}] await finished", time());
}

#[update]
fn resolve() {
    println!("[{}] resolving the future...", time());

    let arc = unsafe { FUTURE_STATE.as_mut().unwrap() };
    let mut future_state = arc.lock().unwrap();
    future_state.completed = true;

    if let Some(waker) = future_state.waker.take() {
        waker.wake();
    }

    println!("[{}] future resolved", time());
}

Calling start returns:

Reject code: 5
Reject text: Canister rrkah-fqaaa-aaaaa-aaaaq-cai did not reply to the call

Node’s logs are:

[Canister rrkah-fqaaa-aaaaa-aaaaq-cai] [1684325125083617071] start
[Canister rrkah-fqaaa-aaaaa-aaaaq-cai] [1684325125083617071] future created, awaiting...

Any help is appreciated!

You’re getting the error Canister rrkah-fqaaa-aaaaa-aaaaq-cai did not reply to the call because start does not make any inter-canister call and thus its call context is starved after yielding control at the await statement. Without further refactoring, the only approach I can see is to replace f.await by a loop of the form

loop {
  call(id(), "noop").await;
  if future_state.completed
    {break;}
}

making an inter-canister call to a “no-op” method of the same canister to yield control. However, this would call “noop” in a tight loop and might result in a high cycle cost.

1 Like

Thanks a lot!
Could you please elaborate on what would I need to do in order to implement this behaviour efficiently?

I would recommend to refactor the code after f.await in start into a separate method that is called by resolve once the condition is met.

1 Like