feat: connection pool for log replication

This commit is contained in:
Aravinth Manivannan 2023-12-29 20:07:39 +05:30
parent 41e438828c
commit 59180fd86f
Signed by: realaravinth
GPG key ID: F8F50389936984FF
3 changed files with 209 additions and 12 deletions

View file

@ -16,7 +16,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use std::time::Duration;
use super::management::HealthStatus;
use crate::DcacheNodeId;
use crate::DcacheTypeConfig;
use async_trait::async_trait;
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
@ -34,17 +38,38 @@ use openraft::RaftNetworkFactory;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::mpsc::Sender;
use tonic::transport::channel::Channel;
use tower_service::Service;
use super::management::HealthStatus;
use crate::DcacheNodeId;
use crate::DcacheTypeConfig;
use crate::pool::*;
use crate::protobuf::dcache::dcache_service_client::DcacheServiceClient;
use crate::protobuf::dcache::RaftRequest;
#[derive(Clone)]
#[derive(Debug)]
struct ChannelManager {}
#[async_trait]
impl ItemManager for ChannelManager {
type Key = String;
type Item = Channel;
type Error = tonic::transport::Error;
async fn build(&self, addr: &Self::Key) -> Result<Channel, tonic::transport::Error> {
println!("New connection for {addr}");
tonic::transport::Endpoint::new(addr.clone())?
.connect()
.await
}
async fn check(&self, mut ch: Channel) -> Result<Channel, tonic::transport::Error> {
futures::future::poll_fn(|cx| (&mut ch).poll_ready(cx)).await?;
Ok(ch)
}
}
pub struct DcacheNetwork {
pub signal: Sender<HealthStatus>,
conn_pool: Pool<ChannelManager>,
}
pub enum RPCType {
@ -55,7 +80,13 @@ pub enum RPCType {
impl DcacheNetwork {
pub fn new(signal: Sender<HealthStatus>) -> Self {
Self { signal }
let mgr = ChannelManager {};
Self {
signal,
conn_pool: Pool::new(mgr, Duration::from_millis(50)),
}
}
pub async fn send_rpc<Req, Resp, Err>(
&self,
@ -69,11 +100,7 @@ impl DcacheNetwork {
Err: std::error::Error + DeserializeOwned,
Resp: DeserializeOwned,
{
let addr = &target_node.addr;
let url = format!("http://{}", addr);
let mut client = DcacheServiceClient::connect(url).await.unwrap();
let mut client = self.make_client(&target, target_node).await;
let res = match event {
RPCType::Vote => {
@ -117,6 +144,23 @@ impl DcacheNetwork {
}
}
}
pub async fn make_client(
&self,
target: &DcacheNodeId,
target_node: &BasicNode,
) -> DcacheServiceClient<Channel> {
let addr = format!("http://{}", &target_node.addr);
tracing::debug!("connect: target={}: {}", target, addr);
let channel = self.conn_pool.get(&addr).await.unwrap();
let client = DcacheServiceClient::new(channel);
tracing::info!("connected: target={}: {}", target, addr);
client
}
}
// NOTE: This could be implemented also on `Arc<DcacheNetwork>`, but since it's empty, implemented

154
src/pool.rs Normal file
View file

@ -0,0 +1,154 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use async_trait::async_trait;
use tokio::time::sleep;
//use log::debug;
//use crate::base::tokio;
pub type PoolItem<T> = Arc<tokio::sync::Mutex<Option<T>>>;
/// To build or check an item.
///
/// When an item is requested, ItemManager `build()` one for the pool.
/// When an item is reused, ItemManager `check()` if it is still valid.
#[async_trait]
pub trait ItemManager {
type Key;
type Item;
type Error;
/// Make a new item to put into the pool.
///
/// An impl should hold that an item returned by `build()` is passed `check()`.
async fn build(&self, key: &Self::Key) -> Result<Self::Item, Self::Error>;
/// Check if an existent item still valid.
///
/// E.g.: check if a tcp connection still alive.
/// If the item is valid, `check` should return it in a Ok().
/// Otherwise, the item should be dropped and `check` returns an Err().
async fn check(&self, item: Self::Item) -> Result<Self::Item, Self::Error>;
}
/// Pool assumes the items in it is `Clone`, thus it keeps only one item for each key.
#[allow(clippy::type_complexity)]
#[derive(Debug, Clone)]
pub struct Pool<Mgr>
where
Mgr: ItemManager + Debug,
{
/// The first sleep time when `build()` fails.
/// The next sleep time is 2 times of the previous one.
pub initial_retry_interval: Duration,
/// Pooled items indexed by key.
pub items: Arc<Mutex<HashMap<Mgr::Key, PoolItem<Mgr::Item>>>>,
manager: Mgr,
err_type: PhantomData<Mgr::Error>,
n_retries: u32,
}
impl<Mgr> Pool<Mgr>
where
Mgr: ItemManager + Debug,
Mgr::Key: Clone + Eq + Hash + Send + Debug,
Mgr::Item: Clone + Sync + Send + Debug,
Mgr::Error: Sync + Debug,
{
pub fn new(manager: Mgr, initial_retry_interval: Duration) -> Self {
Pool {
initial_retry_interval,
items: Default::default(),
manager,
err_type: Default::default(),
n_retries: 3,
}
}
pub fn with_retries(mut self, retries: u32) -> Self {
self.n_retries = retries;
self
}
pub fn item_manager(&self) -> &Mgr {
&self.manager
}
/// Return an raw pool item.
///
/// The returned one may be an uninitialized one, i.e., it contains a None.
/// The lock for `items` should not be held for long, e.g. when `build()` a new connection, it takes dozen ms.
fn get_pool_item(&self, key: &Mgr::Key) -> PoolItem<Mgr::Item> {
let mut items = self.items.lock().unwrap();
if let Some(item) = items.get(key) {
item.clone()
} else {
let item = PoolItem::default();
items.insert(key.clone(), item.clone());
item
}
}
/// Return a item, by cloning an existent one or making a new one.
///
/// When returning an existent one, `check()` will be called on it to ensure it is still valid.
/// E.g., when returning a tcp connection.
// #[logcall::logcall(err = "debug")]
// #[minitrace::trace]
pub async fn get(&self, key: &Mgr::Key) -> Result<Mgr::Item, Mgr::Error> {
let pool_item = self.get_pool_item(key);
let mut guard = pool_item.lock().await;
let item_opt = (*guard).clone();
if let Some(ref item) = item_opt {
let check_res = self.manager.check(item.clone()).await;
// debug!("check reused item res: {:?}", check_res);
if let Ok(itm) = check_res {
return Ok(itm);
} else {
// mark broken conn as deleted
*guard = None;
}
}
let mut interval = self.initial_retry_interval;
for i in 0..self.n_retries {
// debug!("build new item of key: {:?}", key);
let new_item = self.manager.build(key).await;
// debug!("build new item of key res: {:?}", new_item);
match new_item {
Ok(x) => {
*guard = Some(x.clone());
return Ok(x);
}
Err(err) => {
if i == self.n_retries - 1 {
return Err(err);
}
}
}
sleep(interval).await;
interval *= 2;
}
unreachable!("the loop should always return!");
}
}

View file

@ -195,7 +195,6 @@ impl DcacheService for MyDcacheImpl {
_ => unimplemented!(),
},
}
}
PipelineReq::RenameCaptcha(rename_captcha_req) => {
let res = self