155 lines
4.5 KiB
Rust
155 lines
4.5 KiB
Rust
|
use std::collections::HashMap;
|
||
|
use std::fmt::Debug;
|
||
|
use std::hash::Hash;
|
||
|
use std::marker::PhantomData;
|
||
|
use std::sync::Arc;
|
||
|
use std::sync::Mutex;
|
||
|
use std::time::Duration;
|
||
|
|
||
|
use async_trait::async_trait;
|
||
|
use tokio::time::sleep;
|
||
|
//use log::debug;
|
||
|
|
||
|
//use crate::base::tokio;
|
||
|
|
||
|
pub type PoolItem<T> = Arc<tokio::sync::Mutex<Option<T>>>;
|
||
|
|
||
|
/// To build or check an item.
|
||
|
///
|
||
|
/// When an item is requested, ItemManager `build()` one for the pool.
|
||
|
/// When an item is reused, ItemManager `check()` if it is still valid.
|
||
|
#[async_trait]
|
||
|
pub trait ItemManager {
|
||
|
type Key;
|
||
|
type Item;
|
||
|
type Error;
|
||
|
|
||
|
/// Make a new item to put into the pool.
|
||
|
///
|
||
|
/// An impl should hold that an item returned by `build()` is passed `check()`.
|
||
|
async fn build(&self, key: &Self::Key) -> Result<Self::Item, Self::Error>;
|
||
|
|
||
|
/// Check if an existent item still valid.
|
||
|
///
|
||
|
/// E.g.: check if a tcp connection still alive.
|
||
|
/// If the item is valid, `check` should return it in a Ok().
|
||
|
/// Otherwise, the item should be dropped and `check` returns an Err().
|
||
|
async fn check(&self, item: Self::Item) -> Result<Self::Item, Self::Error>;
|
||
|
}
|
||
|
|
||
|
/// Pool assumes the items in it is `Clone`, thus it keeps only one item for each key.
|
||
|
#[allow(clippy::type_complexity)]
|
||
|
#[derive(Debug, Clone)]
|
||
|
pub struct Pool<Mgr>
|
||
|
where
|
||
|
Mgr: ItemManager + Debug,
|
||
|
{
|
||
|
/// The first sleep time when `build()` fails.
|
||
|
/// The next sleep time is 2 times of the previous one.
|
||
|
pub initial_retry_interval: Duration,
|
||
|
|
||
|
/// Pooled items indexed by key.
|
||
|
pub items: Arc<Mutex<HashMap<Mgr::Key, PoolItem<Mgr::Item>>>>,
|
||
|
|
||
|
manager: Mgr,
|
||
|
|
||
|
err_type: PhantomData<Mgr::Error>,
|
||
|
|
||
|
n_retries: u32,
|
||
|
}
|
||
|
|
||
|
impl<Mgr> Pool<Mgr>
|
||
|
where
|
||
|
Mgr: ItemManager + Debug,
|
||
|
Mgr::Key: Clone + Eq + Hash + Send + Debug,
|
||
|
Mgr::Item: Clone + Sync + Send + Debug,
|
||
|
Mgr::Error: Sync + Debug,
|
||
|
{
|
||
|
pub fn new(manager: Mgr, initial_retry_interval: Duration) -> Self {
|
||
|
Pool {
|
||
|
initial_retry_interval,
|
||
|
items: Default::default(),
|
||
|
manager,
|
||
|
err_type: Default::default(),
|
||
|
n_retries: 3,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub fn with_retries(mut self, retries: u32) -> Self {
|
||
|
self.n_retries = retries;
|
||
|
self
|
||
|
}
|
||
|
|
||
|
pub fn item_manager(&self) -> &Mgr {
|
||
|
&self.manager
|
||
|
}
|
||
|
|
||
|
/// Return an raw pool item.
|
||
|
///
|
||
|
/// The returned one may be an uninitialized one, i.e., it contains a None.
|
||
|
/// The lock for `items` should not be held for long, e.g. when `build()` a new connection, it takes dozen ms.
|
||
|
fn get_pool_item(&self, key: &Mgr::Key) -> PoolItem<Mgr::Item> {
|
||
|
let mut items = self.items.lock().unwrap();
|
||
|
|
||
|
if let Some(item) = items.get(key) {
|
||
|
item.clone()
|
||
|
} else {
|
||
|
let item = PoolItem::default();
|
||
|
items.insert(key.clone(), item.clone());
|
||
|
item
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Return a item, by cloning an existent one or making a new one.
|
||
|
///
|
||
|
/// When returning an existent one, `check()` will be called on it to ensure it is still valid.
|
||
|
/// E.g., when returning a tcp connection.
|
||
|
// #[logcall::logcall(err = "debug")]
|
||
|
// #[minitrace::trace]
|
||
|
pub async fn get(&self, key: &Mgr::Key) -> Result<Mgr::Item, Mgr::Error> {
|
||
|
let pool_item = self.get_pool_item(key);
|
||
|
|
||
|
let mut guard = pool_item.lock().await;
|
||
|
let item_opt = (*guard).clone();
|
||
|
|
||
|
if let Some(ref item) = item_opt {
|
||
|
let check_res = self.manager.check(item.clone()).await;
|
||
|
// debug!("check reused item res: {:?}", check_res);
|
||
|
|
||
|
if let Ok(itm) = check_res {
|
||
|
return Ok(itm);
|
||
|
} else {
|
||
|
// mark broken conn as deleted
|
||
|
*guard = None;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
let mut interval = self.initial_retry_interval;
|
||
|
|
||
|
for i in 0..self.n_retries {
|
||
|
// debug!("build new item of key: {:?}", key);
|
||
|
|
||
|
let new_item = self.manager.build(key).await;
|
||
|
|
||
|
// debug!("build new item of key res: {:?}", new_item);
|
||
|
|
||
|
match new_item {
|
||
|
Ok(x) => {
|
||
|
*guard = Some(x.clone());
|
||
|
return Ok(x);
|
||
|
}
|
||
|
Err(err) => {
|
||
|
if i == self.n_retries - 1 {
|
||
|
return Err(err);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
sleep(interval).await;
|
||
|
interval *= 2;
|
||
|
}
|
||
|
|
||
|
unreachable!("the loop should always return!");
|
||
|
}
|
||
|
}
|