I have a method called provision_new_canister
which creates canister in parallel. The error Canister trapped unreachable only comes when provision_canister_cnt
is high enough (>3000)
async fn provision_new_canister(provision_canister_cnt: u64) {
let mut create_canister_futures = vec![];
let individual_user_template_canister_wasm = CANISTER_DATA.with_borrow(|canister_data| canister_data.wasms.get(&WasmType::IndividualUserWasm).unwrap().clone());
for _ in 0..provision_canister_cnt {
create_canister_futures.push(
create_users_canister(
None,
individual_user_template_canister_wasm.version.clone(),
individual_user_template_canister_wasm.wasm_blob.clone()
)
)
}
let result_callback = |canister_id: Principal| {
CANISTER_DATA.with_borrow_mut(|canister_data| canister_data.available_canisters.insert(canister_id));
};
let breaking_condition = || {
CANISTER_DATA.with_borrow(|canister_data| canister_data.available_canisters.len() as u64 > INDIVIDUAL_USER_CANISTER_SUBNET_THREESHOLD)
};
run_task_concurrently(create_canister_futures.into_iter(), 10, result_callback, breaking_condition).await;
}
pub async fn run_task_concurrently<T>(
mut futures: impl Iterator<Item = impl Future<Output = T>>,
concurrency: usize,
mut result_callback: impl FnMut(T),
breaking_condition: impl Fn() -> bool) {
let mut in_progress_futures: FuturesUnordered<Pin<Box<dyn Future<Output = T>>>> = FuturesUnordered::new();
for _ in 0..concurrency {
let next_future = match futures.next() {
None => break,
Some(some)=> some,
};
if breaking_condition() {
break;
}
in_progress_futures.push(Box::pin(next_future));
}
for next_future in futures {
if breaking_condition() {
break;
}
let result = in_progress_futures.next().await.unwrap();
result_callback(result);
in_progress_futures.push(Box::pin(next_future));
}
loop {
match in_progress_futures.next().await {
None => break,
Some (result) => result_callback(result)
}
}
}
pub async fn create_users_canister(profile_owner: Option<Principal>, version: String, individual_user_wasm: Vec<u8>) -> Principal {
// * config for provisioning canister
let arg = CreateCanisterArgument {
settings: Some(CanisterSettings {
controllers: Some(vec![
// * this subnet_orchestrator canister
api::id(),
]),
compute_allocation: None,
memory_allocation: None,
freezing_threshold: None,
}),
};
// * provisioned canister
let canister_id: Principal =
main::create_canister(arg, INDIVIDUAL_USER_CANISTER_RECHARGE_AMOUNT)
.await
.unwrap()
.0
.canister_id;
let configuration = CANISTER_DATA
.with(|canister_data_ref_cell| canister_data_ref_cell.borrow().configuration.clone());
let individual_user_tempalate_init_args = IndividualUserTemplateInitArgs {
profile_owner: profile_owner,
known_principal_ids: Some(CANISTER_DATA.with(|canister_data_ref_cell| {
canister_data_ref_cell.borrow().configuration.known_principal_ids.clone()
})),
upgrade_version_number: Some(0),
version,
url_to_send_canister_metrics_to: Some(configuration.url_to_send_canister_metrics_to),
};
// * encode argument for user canister init lifecycle method
let arg = candid::encode_one(individual_user_tempalate_init_args)
.expect("Failed to serialize the install argument.");
// * install wasm to provisioned canister
main::install_code(InstallCodeArgument {
mode: CanisterInstallMode::Install,
canister_id,
wasm_module: individual_user_wasm,
arg,
})
.await
.unwrap();
canister_id
}