diff --git a/src/network/raft_network_impl.rs b/src/network/raft_network_impl.rs index 7e7e093..f8820b6 100644 --- a/src/network/raft_network_impl.rs +++ b/src/network/raft_network_impl.rs @@ -16,7 +16,11 @@ * along with this program. If not, see . */ use std::sync::Arc; +use std::time::Duration; +use super::management::HealthStatus; +use crate::DcacheNodeId; +use crate::DcacheTypeConfig; use async_trait::async_trait; use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; @@ -34,17 +38,38 @@ use openraft::RaftNetworkFactory; use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::mpsc::Sender; +use tonic::transport::channel::Channel; +use tower_service::Service; -use super::management::HealthStatus; -use crate::DcacheNodeId; -use crate::DcacheTypeConfig; - +use crate::pool::*; use crate::protobuf::dcache::dcache_service_client::DcacheServiceClient; use crate::protobuf::dcache::RaftRequest; -#[derive(Clone)] +#[derive(Debug)] +struct ChannelManager {} + +#[async_trait] +impl ItemManager for ChannelManager { + type Key = String; + type Item = Channel; + type Error = tonic::transport::Error; + + async fn build(&self, addr: &Self::Key) -> Result { + println!("New connection for {addr}"); + tonic::transport::Endpoint::new(addr.clone())? + .connect() + .await + } + + async fn check(&self, mut ch: Channel) -> Result { + futures::future::poll_fn(|cx| (&mut ch).poll_ready(cx)).await?; + Ok(ch) + } +} + pub struct DcacheNetwork { pub signal: Sender, + conn_pool: Pool, } pub enum RPCType { @@ -55,7 +80,13 @@ pub enum RPCType { impl DcacheNetwork { pub fn new(signal: Sender) -> Self { - Self { signal } + let mgr = ChannelManager {}; + + Self { + signal, + + conn_pool: Pool::new(mgr, Duration::from_millis(50)), + } } pub async fn send_rpc( &self, @@ -69,11 +100,7 @@ impl DcacheNetwork { Err: std::error::Error + DeserializeOwned, Resp: DeserializeOwned, { - let addr = &target_node.addr; - - let url = format!("http://{}", addr); - - let mut client = DcacheServiceClient::connect(url).await.unwrap(); + let mut client = self.make_client(&target, target_node).await; let res = match event { RPCType::Vote => { @@ -117,6 +144,23 @@ impl DcacheNetwork { } } } + + pub async fn make_client( + &self, + target: &DcacheNodeId, + target_node: &BasicNode, + ) -> DcacheServiceClient { + let addr = format!("http://{}", &target_node.addr); + + tracing::debug!("connect: target={}: {}", target, addr); + + let channel = self.conn_pool.get(&addr).await.unwrap(); + let client = DcacheServiceClient::new(channel); + + tracing::info!("connected: target={}: {}", target, addr); + + client + } } // NOTE: This could be implemented also on `Arc`, but since it's empty, implemented diff --git a/src/pool.rs b/src/pool.rs new file mode 100644 index 0000000..781e5d6 --- /dev/null +++ b/src/pool.rs @@ -0,0 +1,154 @@ +use std::collections::HashMap; +use std::fmt::Debug; +use std::hash::Hash; +use std::marker::PhantomData; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::time::sleep; +//use log::debug; + +//use crate::base::tokio; + +pub type PoolItem = Arc>>; + +/// To build or check an item. +/// +/// When an item is requested, ItemManager `build()` one for the pool. +/// When an item is reused, ItemManager `check()` if it is still valid. +#[async_trait] +pub trait ItemManager { + type Key; + type Item; + type Error; + + /// Make a new item to put into the pool. + /// + /// An impl should hold that an item returned by `build()` is passed `check()`. + async fn build(&self, key: &Self::Key) -> Result; + + /// Check if an existent item still valid. + /// + /// E.g.: check if a tcp connection still alive. + /// If the item is valid, `check` should return it in a Ok(). + /// Otherwise, the item should be dropped and `check` returns an Err(). + async fn check(&self, item: Self::Item) -> Result; +} + +/// Pool assumes the items in it is `Clone`, thus it keeps only one item for each key. +#[allow(clippy::type_complexity)] +#[derive(Debug, Clone)] +pub struct Pool +where + Mgr: ItemManager + Debug, +{ + /// The first sleep time when `build()` fails. + /// The next sleep time is 2 times of the previous one. + pub initial_retry_interval: Duration, + + /// Pooled items indexed by key. + pub items: Arc>>>, + + manager: Mgr, + + err_type: PhantomData, + + n_retries: u32, +} + +impl Pool +where + Mgr: ItemManager + Debug, + Mgr::Key: Clone + Eq + Hash + Send + Debug, + Mgr::Item: Clone + Sync + Send + Debug, + Mgr::Error: Sync + Debug, +{ + pub fn new(manager: Mgr, initial_retry_interval: Duration) -> Self { + Pool { + initial_retry_interval, + items: Default::default(), + manager, + err_type: Default::default(), + n_retries: 3, + } + } + + pub fn with_retries(mut self, retries: u32) -> Self { + self.n_retries = retries; + self + } + + pub fn item_manager(&self) -> &Mgr { + &self.manager + } + + /// Return an raw pool item. + /// + /// The returned one may be an uninitialized one, i.e., it contains a None. + /// The lock for `items` should not be held for long, e.g. when `build()` a new connection, it takes dozen ms. + fn get_pool_item(&self, key: &Mgr::Key) -> PoolItem { + let mut items = self.items.lock().unwrap(); + + if let Some(item) = items.get(key) { + item.clone() + } else { + let item = PoolItem::default(); + items.insert(key.clone(), item.clone()); + item + } + } + + /// Return a item, by cloning an existent one or making a new one. + /// + /// When returning an existent one, `check()` will be called on it to ensure it is still valid. + /// E.g., when returning a tcp connection. + // #[logcall::logcall(err = "debug")] + // #[minitrace::trace] + pub async fn get(&self, key: &Mgr::Key) -> Result { + let pool_item = self.get_pool_item(key); + + let mut guard = pool_item.lock().await; + let item_opt = (*guard).clone(); + + if let Some(ref item) = item_opt { + let check_res = self.manager.check(item.clone()).await; + // debug!("check reused item res: {:?}", check_res); + + if let Ok(itm) = check_res { + return Ok(itm); + } else { + // mark broken conn as deleted + *guard = None; + } + } + + let mut interval = self.initial_retry_interval; + + for i in 0..self.n_retries { + // debug!("build new item of key: {:?}", key); + + let new_item = self.manager.build(key).await; + + // debug!("build new item of key res: {:?}", new_item); + + match new_item { + Ok(x) => { + *guard = Some(x.clone()); + return Ok(x); + } + Err(err) => { + if i == self.n_retries - 1 { + return Err(err); + } + } + } + + sleep(interval).await; + interval *= 2; + } + + unreachable!("the loop should always return!"); + } +} diff --git a/src/protobuf.rs b/src/protobuf.rs index f049b6f..3a93a4f 100644 --- a/src/protobuf.rs +++ b/src/protobuf.rs @@ -195,7 +195,6 @@ impl DcacheService for MyDcacheImpl { _ => unimplemented!(), }, } - } PipelineReq::RenameCaptcha(rename_captcha_req) => { let res = self