use std::collections::HashMap; use std::fmt::Debug; use std::hash::Hash; use std::marker::PhantomData; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; use async_trait::async_trait; use tokio::time::sleep; //use log::debug; //use crate::base::tokio; pub type PoolItem = Arc>>; /// To build or check an item. /// /// When an item is requested, ItemManager `build()` one for the pool. /// When an item is reused, ItemManager `check()` if it is still valid. #[async_trait] pub trait ItemManager { type Key; type Item; type Error; /// Make a new item to put into the pool. /// /// An impl should hold that an item returned by `build()` is passed `check()`. async fn build(&self, key: &Self::Key) -> Result; /// Check if an existent item still valid. /// /// E.g.: check if a tcp connection still alive. /// If the item is valid, `check` should return it in a Ok(). /// Otherwise, the item should be dropped and `check` returns an Err(). async fn check(&self, item: Self::Item) -> Result; } /// Pool assumes the items in it is `Clone`, thus it keeps only one item for each key. #[allow(clippy::type_complexity)] #[derive(Debug, Clone)] pub struct Pool where Mgr: ItemManager + Debug, { /// The first sleep time when `build()` fails. /// The next sleep time is 2 times of the previous one. pub initial_retry_interval: Duration, /// Pooled items indexed by key. pub items: Arc>>>, manager: Mgr, err_type: PhantomData, n_retries: u32, } impl Pool where Mgr: ItemManager + Debug, Mgr::Key: Clone + Eq + Hash + Send + Debug, Mgr::Item: Clone + Sync + Send + Debug, Mgr::Error: Sync + Debug, { pub fn new(manager: Mgr, initial_retry_interval: Duration) -> Self { Pool { initial_retry_interval, items: Default::default(), manager, err_type: Default::default(), n_retries: 3, } } pub fn with_retries(mut self, retries: u32) -> Self { self.n_retries = retries; self } pub fn item_manager(&self) -> &Mgr { &self.manager } /// Return an raw pool item. /// /// The returned one may be an uninitialized one, i.e., it contains a None. /// The lock for `items` should not be held for long, e.g. when `build()` a new connection, it takes dozen ms. fn get_pool_item(&self, key: &Mgr::Key) -> PoolItem { let mut items = self.items.lock().unwrap(); if let Some(item) = items.get(key) { item.clone() } else { let item = PoolItem::default(); items.insert(key.clone(), item.clone()); item } } /// Return a item, by cloning an existent one or making a new one. /// /// When returning an existent one, `check()` will be called on it to ensure it is still valid. /// E.g., when returning a tcp connection. // #[logcall::logcall(err = "debug")] // #[minitrace::trace] pub async fn get(&self, key: &Mgr::Key) -> Result { let pool_item = self.get_pool_item(key); let mut guard = pool_item.lock().await; let item_opt = (*guard).clone(); if let Some(ref item) = item_opt { let check_res = self.manager.check(item.clone()).await; // debug!("check reused item res: {:?}", check_res); if let Ok(itm) = check_res { return Ok(itm); } else { // mark broken conn as deleted *guard = None; } } let mut interval = self.initial_retry_interval; for i in 0..self.n_retries { // debug!("build new item of key: {:?}", key); let new_item = self.manager.build(key).await; // debug!("build new item of key res: {:?}", new_item); match new_item { Ok(x) => { *guard = Some(x.clone()); return Ok(x); } Err(err) => { if i == self.n_retries - 1 { return Err(err); } } } sleep(interval).await; interval *= 2; } unreachable!("the loop should always return!"); } }