Assigned: ICDevs.org Bounty #39 - Async Flow - One Shot - Motoko - $6,000

Async Flow - One Shot - Motoko - #39

Current Status: Discussion

  • Discussion (01/09/2023)
  • Ratification: (01/09/2023)
  • Open for application: (01/09/2023)
  • Assigned
  • In Review
  • Closed

Official Link

Bounty Details

  • Bounty Amount: $6,000 USD of ICP at award date.
  • ICDevs.org Bounty Acceleration: For each 1 ICP sent to f2b488b80eb22bf57c1556683efcb51e090b46c181ca28e167e6f095223dfa3e, ICDevs.org will add .25 ICP to this issue and .75 ICP to fund other ICDevs.org initiatives.
  • Project Type: Individual
  • Opened: 01/09/2023
  • Time Commitment: Days
  • Project Type: Library
  • Experience Type: Intermdiate - Motoko;

Description

The IC implements an asynchronous messaging system where requests are made to canisters and a transaction id is returned. The canister then queries the state of this transaction and returns the result when it detects that the function is complete. The Rust CDK and Motoko abstract this away from the user in a way that depends on the IC reliably fulfilling the request with certain guarantees.

Sometimes a canister developer may want to do away with this abstraction and implement their own async flow when the results of the called function are not important to the continuation of their code. This is more event based programming and it is especially useful while the IC still requires functions to return before upgrades can be performed. Future functionality will fix this upgrade issue, but async and event based programming is still a useful pattern when services are interacting. It removes dependencies and allows the developer to slip into an actor based frame of mind that more closely mirrors how the IC is actually working under the covers. Specifically it can keep the developer from making “await” assumptions that open the canister to reentrance attacks.

In this bounty the user will create a library to handle asynchronous messaging.

When a user initiates an async one-shot call they likely do want to handle some kind of response so that they can confirm that the call was received. In turn, the acknowledger needs to know that the acknowledgment was received. You end up with something that looks a lot like a TCP/IP flow.

This pattern should expose a number of functions from the canister to implement this flow:

Sending Canister:

com_asyncFlow_ack({msg_id: nat; hash: ?nat32; payload: blob}) -> () //confirm that a message was received. payload is candid encoded data

Receiving Canister:

com_asyncFlow_newMessage({msg_id: nat; payload: blob;}) -> () // process a new message - payload is candid encoded data
com_asyncFlow_ackack({msg_id: nat; hash: ?nat32}) -> () //acknowledges that the acknowledgment was received

The library function signature should look something like:

constructor :  ( return_async: (canister_id: principal, sent_payload : blob, received_payload : blob, msg_id: nat, hash: ?nat32) -> async* ()); //this function is called when an ack is recieved.
call_async(canister_id: principal, payload: blob) -> async* nat; // the nat should a ULID encoded as a Nat : https://github.com/aviate-labs/ulid.mo/blob/main/src/ULID.mo

Since the IC has a new 5 minute timeout on full queues, the Sender and Receiver should retry this messaging after 5.5 minutes. Therefore the library needs to keep state of sent messages and should discard those messages after the acknowledgement has been made.

The library should also keep track of processed message ids such that if a duplicate message is received, it is not reprocessed and instead, an acknowledgment is sent.

The hash is a check to make sure the acknowledgment was of the correct data. If an improper hash is received the library should hand the request to a corruption handler with enough data to manage the error.

This bounty gives the opportunity to

  • learn about Motoko
  • learn about async messaging
  • learn about state management in Motoko

To apply for this bounty you should:

  • Include links to previous work writing tutorials and any other open-source contributions(ie. your github).
  • Include a brief overview of how you will complete the task. This can include things like which dependencies you will use, how you will make it self-contained, the sacrifices you would have to make to achieve that, or how you will make it simple. Anything that can convince us you are taking a thoughtful and expert approach to this design.
  • Give an estimated timeline on completing the task.
  • Post your application text to the Bounty Thread

Selection Process

The ICDevs.org developer’s advisors will propose a vote to award the bounty and the Developer Advisors will vote.

Bounty Completion

Please keep your ongoing code in a public repository(fork or branch is ok). Please provide regular (at least weekly) updates. Code commits count as updates if you link to your branch/fork from the bounty thread. We just need to be able to see that you are making progress.

The balance of the bounty will be paid out at completion.

Once you have finished, please alert the dev forum thread that you have completed work and where we can find that work. We will review and award the bounty reward if the terms have been met. If there is any coordination work(like a pull request) or additional documentation needed we will inform you of what is needed before we can award the reward.

Bounty Abandonment and Re-awarding

If you cease work on the bounty for a prolonged(at the Developer Advisory Board’s discretion) or if the quality of work degrades to the point that we think someone else should be working on the bounty we may re-award it. We will be transparent about this and try to work with you to push through and complete the project, but sometimes, it may be necessary to move on or to augment your contribution with another resource which would result in a split bounty.

Funding

The bounty was generously funded by the DFINITY Foundation. If you would like to turbocharge this bounty you can seed additional donations of ICP to f2b488b80eb22bf57c1556683efcb51e090b46c181ca28e167e6f095223dfa3e. ICDevs will match the bounty $40:1 ICP for the first 75 ICP out of the DFINITY grant and then 0.25:1. All donations will be tax deductible for US Citizens and Corporations. If you send a donation and need a donation receipt, please email the hash of your donation transaction, physical address, and name to donations@icdevs.org. More information about how you can contribute can be found at our donations page.

FYI: General Bounty Process

Discussion

The draft bounty is posted to the DFINITY developer’s forum for discussion

Ratification: (01/09/2023)

The developer advisor’s board will propose a bounty be ratified and a vote will take place to ratify the bounty. Until a bounty is ratified by the Dev it hasn’t been officially adopted. Please take this into consideration if you are considering starting early.

Open for application

Developers can submit applications to the Dev Forum post. The council will consider these as they come in and propose a vote to award the bounty to one of the applicants. If you would like to apply anonymously you can send an email to austin at icdevs dot org or sending a PM on the dev forum.

Assigned

A developer is currently working on this bounty, you are free to contribute, but any splitting of the award will need to be discussed with the currently assigned developer.

In Review

The Dev Council is reviewing the submission

Awarded

The award has been given and the bounty is closed.

Other ICDevs.org Bounties

Hi.

I’d like to make this.

I have a few questions.

It is unclear to me how the two canister will be connected. Why is that? Because if the library is a class. And if this class (library) is initialized in each of the cans. They won’t be connected. Since the variables of the library class will have their own canisters for each of them.

There are several options for linking:

  • The library is initialized as itself as a canister. Then the access to the canister (library) will be by means of Principal

  • The library is initialized as a class in one of the canisters (canisters: broadcaster or receiver)

Next, this instance of the library class is passed to the function of any of the canisters.

  • The library is initialized as a class. In the constructor of the library class, the Principals of the canisters are specified: broadcaster; receiver.

I can give a pseudocode, if not quite clear.

I would like to clarify these points.

Maybe there are other ways of architectural solutions?

Note: com_asyncFlow_ackack has been renamed
You may want to coordinate with @GLdev on some of this as he is doing the rust counterpart.

The point of the library is that the canisters are not connected and are likely developed by different (potentially malicious) people.

mscgenjs_chart (8)

my_args could be something like:

type MyArgs = {
     #transfer: TransferArgs;
     #mint: MintArgs;
     #burn: BurnArgs;
}

The sender would instantiate the class:


actor {

func handle_return(canister_id: principal, sent_payload : blob, received_payload : blob, msg_id: nat, hash: ?nat32){
    let my_args = to_candid(sent_payload)  : MyArgs;
    //check hash if desired;
     switch(my_args){
         case(#transfer(val){
           let my_response = to_candid(recieved_payload) : TransferResponse;
           //handle transfer
         };
         ///......etc
     };

     one_shot_hadler.finish(msg_id);
};

//instantiate the oneshot handler
let one_shot_handler = OneShot.Handler(
   return_async = ?handle_response;
   request_async = null; // you'd set this if you were a receiver and a sender
);

public shared func com_asyncFlow_ack(id: Nat, hash: ?Hash, return_args: Blob){
     one_shot_hadler.handle_response(id, hash, return_args);
};

public shared func transfer_proxy_async(request: TransferArgs) : async (){
     let request_id = one_shot_handler.call_async(target_canister, from_candid(transfer_args);
};

//handle timers upon upgrade
public system pre_upgragde{
    stable_one_shot_timers := one_shot_handler.backup_timers()
}

public system post_upgragde{
    one_shot_hadler.hydrate_timers(stable_one_shot_timers);
}


}

The receiver:


actor {

func handle_request(canister_id: principal, sent_payload : blob, msg_id: nat, hash: ?nat32){
    let my_args = to_candid(sent_payload)  : MyArgs;
    //check hash if desired; <-maybe the library should handle this and this is only for info
     switch(my_args){
         case(#transfer(val){
           //do request transfer
          let response = #transfer(#ok(block));
           one_shot_hadler.return(msg_id, from_candid(response));
         };
         ///......etc
     };

     
};

//instantiate the oneshot handler
let one_shot_handler = OneShot.Handler(
   return_async = null; // you'd set this if you were a receiver and a sender
   request_async = ?handle_request;
);

public shared({caller}) func com_asyncFlow_newMessage(id: Nat, payload: Blob){
     one_shot_hadler.handle_request(caller, id, payload);
};

public shared({caller}) func com_asyncFlow_finish(id: Nat, hash: ?Blob){
     one_shot_hadler.handle_finish(id, hash);
};

public shared func transfer_proxy_async(request: TransferArgs) : async (){
     let request_id = one_shot_handler.call_async(target_canister, from_candid(transfer_args);
};

//handle timers upon upgrade
public system pre_upgragde{
    stable_one_shot_timers := one_shot_handler.backup_timers()
}

public system post_upgragde{
    one_shot_hadler.hydrate_timers(stable_one_shot_timers);
}

}

You will likely run across some other maintenance functions that you may need.

1 Like

I’ve proposed that you get assigned the bounty and I expect it to pass:

1 Like

Hi,

It’s great that we’ll have a motoko version as well, the real test of the flow will be with both versions talking to each other.

Over at the rust version I was thinking of merging all functionality in a com_asyncFlow() fn and using messages for NEW, ACK and FIN, but I’m good with splitting them again if that’s not something that you’d like to implement. I just thought it would make sense for future expansions without changing the did signature.


enum MessageType {
    // A new message that starts a new flow
    NEW {
        msg_id: u128,
        payload: Vec<u8>,
    },
    // Message signaling that a message was received / processed
    ACK {
        msg_id: u128,
        payload: Option<Vec<u8>>,
    },
    // Message signaling that a flow has ended
    FIN {
        msg_id: u128,
    },
}

For the moment the candid looks something like this:

type MessageType = variant {
  ACK : record { msg_id : nat; payload : opt vec nat8 };
  FIN : record { msg_id : nat };
  NEW : record { msg_id : nat; payload : vec nat8 };
};
service : {
  asyncFlow : (MessageType) -> ();
}

I’m currently mapping all the stages of the flow, with possible failure modes, and will come back shortly with a brief overview. There are two main failure modes, one at sending the notice (i.e. if your sender canister queue is full) and one at not receiving the correct message for whatever reason, you’d need to re-send the previous message.

1 Like

Ok. I think you will be the guiding star for my realization. But so far, everything is just at the beginning of my work. Thanks!

It looks like I will have similar questions
Why not do a temporary denial of service with the notification of the calling canister?

Great idea as long as it supports only needing to support one side invade the library is only a receiver or only a sender.

Using inspect message may be good here. Dropping by throttling would hopefully be handled gracefully by the sender via retries.

AFAIK inspect message is only for agent → canister calls (i.e. external calls), it doesn’t work on inter-canister calls. We’ll need to implement firewalls / accesslists at the fn level.

1 Like

A good thing, and a possible solution for avoiding resource drain DoS is that Notify supports sending cycles. So we could add support for verifying cycles, and only accepting NEW messages that include x amount of cycles (calculated by each developer to cover the case of processing the request and sending an ACK w/ a set number of retries, averaged over y amount of time).

Great idea. If they want the function accessible via ingress they can have a traditional endpoint.

Hi,

I’m commenting here since we’ll need to sync anyway. I’ve mapped the flows, and believe I’ve touched on every step involved. Please let me know if I missed anything.

Brief flows overview

Outgoing

  1. Encapsulate a new message, store payload, etc. Return a msg_id

(can fail due to canister memory, etc. Ignored for the purposes of this lib)

  1. Send Notify(NEW{msg_id, payload})

(Failure1: the notify call can fail if the canister’s queue is full)

[Need to re-attempt this call after a set timeout, with a set retry count]

  1. Waiting for ACK

(Failure2: the ACK does not arrive after a set timeout)

[Need to re-attempt step2, with a set retry count]

  1. Received ACK, call processing fn

(Failure3: the processing fn call traps / panics. Ignored, should be handled by the other canister)

  1. Send FIN

(Failure4: the notify call can fail if the canister’s queue is full)

[Need to re-attempt this call after a set timeout, with a set retry count]

  1. Wait for a set timeout while maintaining the state

(we need this step in case the other canister doesn’t receive a FIN, and re-sends the ACK. We could simply drop the state at this stage, and reply with FIN to any unknown msg_id. Implementation decision / unsure)

Incoming

  1. Receive a NEW message. Create an entry, store payload, decide to accept, call processing fn.

(can fail due to canister memory, etc. Ignored for the purposes of this lib)

(Failure1: the processing fn call traps / panics. Ignored, should be handled by the other canister)

  1. Processed (Result). Send ACK

(Failure2: the notify call can fail if the canister’s queue is full)

[Need to re-attempt this call after a set timeout, with a set retry count]

  1. Waiting for FIN

(Failure3: the FIN does not arrive after a set timeout)

[Need to re-attempt step2, with a set retry count]

  1. Received FIN. Mark task as complete.
1 Like

Hi, colleagues

I have a question about the 5th timeout in IC.

You write here:

Where can I find out more about this?

I would like to continue my reasoning further. Maybe I need to set my own five-minute timeout to send to the recipient (in case the confirmation didn’t arrive). Maybe this is the meaning of the lines above?

Note that it is technically not correct that messages will time out after 5 minutes on full queues.

It is true that requests will time out if they are sitting in queues for 5 minutes.

However, a request that was sent out does not necessarily sit in the queue for that long. Messages in (canister-to-canister) output queues are routed into (subnet-to-subnet) streams as long as there is space in the stream. Only once backpressure from the respective stream builds up messages will remain in queues. Messages in streams can no longer time out.

Given that there is no way for the canister to know whether or not a message made it out of the queue into the stream one can not simply conclude that it timed out after seeing no reply for 5.5 minutes. However, if a message times out a “message timed out”-response to the request will eventually (potentially much later than 5.5 minutes after sending a request if the system is badly backlogged) arrive. This response could be the trigger for trying to resend. When resending earlier one might run into a situation where a requests arrives twice (unless there is some explicit deduplication done by the canister).

1 Like

A scenario matrix may be in order.

1 Like

Thanks, derlerd-dfinity1

It turns out like this:

The request from the container goes through 2 states (queue → stream).

The 1st state is that it is in the queue

The request is queued for 300 seconds.
Two options are waiting for him:

  • within <= 300 seconds he gets into the stream
  • 300 seconds have passed. This is discarded. He will never get into the stream.
    No more news from him (not from the IC system, not from the canister being called)

The 2nd state is that it (the request) got into the system data exchange flow

There is one possible outcome that awaits him:

  • 100% processing of the addressed canister. The execution time is unknown.

It looks something like this.

The protocol doesn’t provide any guarantee that a destination canister 100% processes a request. The only guarantee the protocol gives is that every request will get exactly one reply. However this reply can also be system generated in case the request can not be delivered/processed (OOM, queue full on the receiver side, destination canister trapping… etc.). So what you can rely on is that you will eventually receive a reply that will tell you what happened to the request.

This is also true for state 1. If a request times out in the output queue the canister will get a system generated reply that the message timed out. So this statement is also not correct:

To sum up: it doesn’t really make sense to distinguish between the two states you sketch above from a canister’s perspective as there are no delivery guarantees in any of these cases. In both states the only thing you can rely on is that you will get a reply which is gonna tell you what happened to the request. Based on this one can then make a decision on whether or not to retry.

1 Like

Since we’re working with Notify here, the canister that issues a Notify call doesn’t even get that, correct? The only thing we know from the sender canister is that the Notify was successfully added to the outgoing queue or not (the Notify call can either succeed or fail, in sync mode).

Then there’s the question of how often shall we re-try a call that wasn’t answered (via another Notify from the other canister). My intuition here is that our lib should implement something like retry with back-off. First retry at x seconds, then x*2, etc. for a number of retries and then just give up on the call.

Since we’re generating a unique ID for each new flow, we should be OK even if sometimes 2 messages reach a canister. The lib should cover the case where an identical message_id was received, and not re-process the message, instead re-issue the ACK (as described by Austin in the main bounty proposal).

1 Like

I’m not familiar with notify but I assume that notify will just pass invalid callback IDs when making the calls, right?

If so, keep in mind that notify will make the canister not see the reply to a notify but this doesn’t mean that there is no reply. The system still makes a reservation for the reply and the notify will consume a slot in the queue until that reply (which may be system generated if the notified canister doesn’t reply explicitly) arrives. It is just that when consuming the reply the invalid callback ID will make sure that no canister state is changed. So if you retry too aggressively you will end up filling up your own queue and enqueuing new requests/notifys will eventually fail.