dcache/src/pool.rs

155 lines
4.5 KiB
Rust

use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use async_trait::async_trait;
use tokio::time::sleep;
//use log::debug;
//use crate::base::tokio;
pub type PoolItem<T> = Arc<tokio::sync::Mutex<Option<T>>>;
/// To build or check an item.
///
/// When an item is requested, ItemManager `build()` one for the pool.
/// When an item is reused, ItemManager `check()` if it is still valid.
#[async_trait]
pub trait ItemManager {
type Key;
type Item;
type Error;
/// Make a new item to put into the pool.
///
/// An impl should hold that an item returned by `build()` is passed `check()`.
async fn build(&self, key: &Self::Key) -> Result<Self::Item, Self::Error>;
/// Check if an existent item still valid.
///
/// E.g.: check if a tcp connection still alive.
/// If the item is valid, `check` should return it in a Ok().
/// Otherwise, the item should be dropped and `check` returns an Err().
async fn check(&self, item: Self::Item) -> Result<Self::Item, Self::Error>;
}
/// Pool assumes the items in it is `Clone`, thus it keeps only one item for each key.
#[allow(clippy::type_complexity)]
#[derive(Debug, Clone)]
pub struct Pool<Mgr>
where
Mgr: ItemManager + Debug,
{
/// The first sleep time when `build()` fails.
/// The next sleep time is 2 times of the previous one.
pub initial_retry_interval: Duration,
/// Pooled items indexed by key.
pub items: Arc<Mutex<HashMap<Mgr::Key, PoolItem<Mgr::Item>>>>,
manager: Mgr,
err_type: PhantomData<Mgr::Error>,
n_retries: u32,
}
impl<Mgr> Pool<Mgr>
where
Mgr: ItemManager + Debug,
Mgr::Key: Clone + Eq + Hash + Send + Debug,
Mgr::Item: Clone + Sync + Send + Debug,
Mgr::Error: Sync + Debug,
{
pub fn new(manager: Mgr, initial_retry_interval: Duration) -> Self {
Pool {
initial_retry_interval,
items: Default::default(),
manager,
err_type: Default::default(),
n_retries: 3,
}
}
pub fn with_retries(mut self, retries: u32) -> Self {
self.n_retries = retries;
self
}
pub fn item_manager(&self) -> &Mgr {
&self.manager
}
/// Return an raw pool item.
///
/// The returned one may be an uninitialized one, i.e., it contains a None.
/// The lock for `items` should not be held for long, e.g. when `build()` a new connection, it takes dozen ms.
fn get_pool_item(&self, key: &Mgr::Key) -> PoolItem<Mgr::Item> {
let mut items = self.items.lock().unwrap();
if let Some(item) = items.get(key) {
item.clone()
} else {
let item = PoolItem::default();
items.insert(key.clone(), item.clone());
item
}
}
/// Return a item, by cloning an existent one or making a new one.
///
/// When returning an existent one, `check()` will be called on it to ensure it is still valid.
/// E.g., when returning a tcp connection.
// #[logcall::logcall(err = "debug")]
// #[minitrace::trace]
pub async fn get(&self, key: &Mgr::Key) -> Result<Mgr::Item, Mgr::Error> {
let pool_item = self.get_pool_item(key);
let mut guard = pool_item.lock().await;
let item_opt = (*guard).clone();
if let Some(ref item) = item_opt {
let check_res = self.manager.check(item.clone()).await;
// debug!("check reused item res: {:?}", check_res);
if let Ok(itm) = check_res {
return Ok(itm);
} else {
// mark broken conn as deleted
*guard = None;
}
}
let mut interval = self.initial_retry_interval;
for i in 0..self.n_retries {
// debug!("build new item of key: {:?}", key);
let new_item = self.manager.build(key).await;
// debug!("build new item of key res: {:?}", new_item);
match new_item {
Ok(x) => {
*guard = Some(x.clone());
return Ok(x);
}
Err(err) => {
if i == self.n_retries - 1 {
return Err(err);
}
}
}
sleep(interval).await;
interval *= 2;
}
unreachable!("the loop should always return!");
}
}