We have updated the semantics of CallFuture
produced by ic_cdk::api::call
in Rust CDK to only make the inter-canister call (i.e., call ic0::call_new
, …, ic0::call_perform
) when the future is actually awaited. This aligns with the Rust semantics of futures that are not supposed to do anything unless awaited (and there is a compiler warning about that if you don’t await a future). Please refer to the PR (fix: lazy CallFuture by mraszyk · Pull Request #391 · dfinity/cdk-rs · GitHub) for further motivation.
In particular, if a future is never awaited, then the inter-canister call is not performed at all. Otherwise, firing the call is postponed to the code location that awaits the future.
This means that patterns such as
let mut futs = vec![];
for m in messages.iter() { futs.push(call(m)); }
for f in futs.iter() { f.await; }
do not fire the inter-canister calls in parallel, but rather sequentially (as the inter-canister calls are awaited sequentially).
To fire the inter-canister calls in parallel, patterns such as
let mut futs = vec![];
for m in messages.iter() { futs.push(call(m)); }
let stream = futures::stream::iter(futs).buffer_unordered(16);
stream.collect::<Vec<_>>().await;
must be used.
Note. This change will be released in the next Rust CDK release.
5 Likes
Hey there @mraszyk
Great job, thanks!
Is it possible to define a custom future on the IC?
I have a use-case, when I want to suspend the execution of a user-called canister method, until another canister method is called.
Here is my code:
#[derive(Default)]
pub struct SharedState {
completed: bool,
waker: Option<Waker>,
}
pub struct MyFuture {
shared_state: Arc<Mutex<SharedState>>,
}
impl MyFuture {
pub fn new() -> Self {
let shared_state = Arc::new(Mutex::new(SharedState::default()));
unsafe { FUTURE_STATE = Some(shared_state.clone()) };
Self { shared_state }
}
}
impl Future for MyFuture {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
static mut FUTURE_STATE: Option<Arc<Mutex<SharedState>>> = None;
#[update]
async fn start() {
println!("[{}] start", time());
let f = MyFuture::new();
println!("[{}] future created, awaiting...", time());
f.await;
println!("[{}] await finished", time());
}
#[update]
fn resolve() {
println!("[{}] resolving the future...", time());
let arc = unsafe { FUTURE_STATE.as_mut().unwrap() };
let mut future_state = arc.lock().unwrap();
future_state.completed = true;
if let Some(waker) = future_state.waker.take() {
waker.wake();
}
println!("[{}] future resolved", time());
}
Calling start
returns:
Reject code: 5
Reject text: Canister rrkah-fqaaa-aaaaa-aaaaq-cai did not reply to the call
Node’s logs are:
[Canister rrkah-fqaaa-aaaaa-aaaaq-cai] [1684325125083617071] start
[Canister rrkah-fqaaa-aaaaa-aaaaq-cai] [1684325125083617071] future created, awaiting...
Any help is appreciated!
You’re getting the error Canister rrkah-fqaaa-aaaaa-aaaaq-cai did not reply to the call
because start
does not make any inter-canister call and thus its call context is starved after yielding control at the await statement. Without further refactoring, the only approach I can see is to replace f.await
by a loop of the form
loop {
call(id(), "noop").await;
if future_state.completed
{break;}
}
making an inter-canister call to a “no-op” method of the same canister to yield control. However, this would call “noop” in a tight loop and might result in a high cycle cost.
1 Like
Thanks a lot!
Could you please elaborate on what would I need to do in order to implement this behaviour efficiently?
I would recommend to refactor the code after f.await
in start
into a separate method that is called by resolve
once the condition is met.
1 Like