Creating Canister in Concurrently -> Canister Trapped unreachable code

I have a method called provision_new_canister which creates canister in parallel. The error Canister trapped unreachable only comes when provision_canister_cnt is high enough (>3000)

async fn provision_new_canister(provision_canister_cnt: u64) {
    let mut create_canister_futures = vec![];
    let individual_user_template_canister_wasm = CANISTER_DATA.with_borrow(|canister_data| canister_data.wasms.get(&WasmType::IndividualUserWasm).unwrap().clone());

    for _ in 0..provision_canister_cnt {
        create_canister_futures.push(
            create_users_canister(
                None,
                individual_user_template_canister_wasm.version.clone(),
                individual_user_template_canister_wasm.wasm_blob.clone()
            )
        )
    }
    let result_callback = |canister_id: Principal| {
        CANISTER_DATA.with_borrow_mut(|canister_data| canister_data.available_canisters.insert(canister_id));
    };
    let breaking_condition = || {
        CANISTER_DATA.with_borrow(|canister_data| canister_data.available_canisters.len() as u64 > INDIVIDUAL_USER_CANISTER_SUBNET_THREESHOLD)
    };
    run_task_concurrently(create_canister_futures.into_iter(), 10, result_callback, breaking_condition).await;
}
pub async fn run_task_concurrently<T>(
    mut futures: impl Iterator<Item = impl Future<Output = T>>, 
    concurrency: usize,
    mut result_callback: impl FnMut(T), 
    breaking_condition: impl Fn() -> bool)   {


        let mut in_progress_futures: FuturesUnordered<Pin<Box<dyn Future<Output = T>>>> = FuturesUnordered::new();

        for _ in 0..concurrency {
            let next_future = match futures.next() {
                None => break,
                Some(some)=> some,
            };
            if breaking_condition() {
                break;
            }
            in_progress_futures.push(Box::pin(next_future));
        }

        for next_future in futures {
            if breaking_condition() {
                break;
            }
            let result = in_progress_futures.next().await.unwrap();
            result_callback(result);
            in_progress_futures.push(Box::pin(next_future));
        }

        loop {
            match in_progress_futures.next().await {
                None => break,
                Some (result) => result_callback(result)
            }
        }

}
pub async fn create_users_canister(profile_owner: Option<Principal>, version: String, individual_user_wasm: Vec<u8>) -> Principal {
    // * config for provisioning canister
    let arg = CreateCanisterArgument {
        settings: Some(CanisterSettings {
            controllers: Some(vec![
                // * this subnet_orchestrator canister
                api::id(),
            ]),
            compute_allocation: None,
            memory_allocation: None,
            freezing_threshold: None,
        }),
    };

    // * provisioned canister
    let canister_id: Principal =
        main::create_canister(arg, INDIVIDUAL_USER_CANISTER_RECHARGE_AMOUNT)
            .await
            .unwrap()
            .0
            .canister_id;

    let configuration = CANISTER_DATA
        .with(|canister_data_ref_cell| canister_data_ref_cell.borrow().configuration.clone());

    let individual_user_tempalate_init_args = IndividualUserTemplateInitArgs {
        profile_owner: profile_owner,
        known_principal_ids: Some(CANISTER_DATA.with(|canister_data_ref_cell| {
            canister_data_ref_cell.borrow().configuration.known_principal_ids.clone()
        })),
        upgrade_version_number: Some(0),
        version,
        url_to_send_canister_metrics_to: Some(configuration.url_to_send_canister_metrics_to),
    };

    // * encode argument for user canister init lifecycle method
    let arg = candid::encode_one(individual_user_tempalate_init_args)
        .expect("Failed to serialize the install argument.");

    // * install wasm to provisioned canister
    main::install_code(InstallCodeArgument {
        mode: CanisterInstallMode::Install,
        canister_id,
        wasm_module: individual_user_wasm,
        arg,
    })
    .await
    .unwrap();

    canister_id
}

Replacing Vec with iterator solved the issue.

async fn provision_new_canister(provision_canister_cnt: u64, individual_user_template_canister_wasm: CanisterWasm) {
    let create_canister_futures = (0..provision_canister_cnt).map(|_| {
        let future = create_users_canister(
            None,
            individual_user_template_canister_wasm.version.clone(),
            individual_user_template_canister_wasm.wasm_blob.clone()
        );
        future
    });

    let result_callback = |canister_id: Principal| {
        CANISTER_DATA.with_borrow_mut(|canister_data| canister_data.available_canisters.insert(canister_id));
    };

    let breaking_condition = || {
        CANISTER_DATA.with_borrow(|canister_data| canister_data.available_canisters.len() as u64 > INDIVIDUAL_USER_CANISTER_SUBNET_THREESHOLD)
    };

    run_task_concurrently(create_canister_futures.into_iter(), 10, result_callback, breaking_condition).await;
}
1 Like