feat: use connection pooling and report unreachable errors to supervisor

This commit is contained in:
Aravinth Manivannan 2023-12-17 19:23:34 +05:30
parent 8d373c5f53
commit b84e3ef275
Signed by: realaravinth
GPG key ID: F8F50389936984FF

View file

@ -1,3 +1,6 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashSet;
/* /*
* mCaptcha - A proof of work based DoS protection system * mCaptcha - A proof of work based DoS protection system
* Copyright © 2023 Aravinth Manivannan <realravinth@batsense.net> * Copyright © 2023 Aravinth Manivannan <realravinth@batsense.net>
@ -15,6 +18,12 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait; use async_trait::async_trait;
use openraft::error::InstallSnapshotError; use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError; use openraft::error::NetworkError;
@ -30,15 +39,25 @@ use openraft::raft::VoteResponse;
use openraft::BasicNode; use openraft::BasicNode;
use openraft::RaftNetwork; use openraft::RaftNetwork;
use openraft::RaftNetworkFactory; use openraft::RaftNetworkFactory;
use reqwest::Client;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use tokio::sync::mpsc::Sender;
use super::management::HealthStatus;
use crate::DcacheNodeId; use crate::DcacheNodeId;
use crate::DcacheTypeConfig; use crate::DcacheTypeConfig;
pub struct DcacheNetwork {} #[derive(Clone)]
pub struct DcacheNetwork {
pub signal: Sender<HealthStatus>,
pub client: Client,
}
impl DcacheNetwork { impl DcacheNetwork {
pub fn new(signal: Sender<HealthStatus>, client: Client) -> Self {
Self { signal, client }
}
pub async fn send_rpc<Req, Resp, Err>( pub async fn send_rpc<Req, Resp, Err>(
&self, &self,
target: DcacheNodeId, target: DcacheNodeId,
@ -57,16 +76,13 @@ impl DcacheNetwork {
tracing::debug!("send_rpc to url: {}", url); tracing::debug!("send_rpc to url: {}", url);
let client = reqwest::Client::new(); let resp = match self.client.post(url).json(&req).send().await {
Ok(resp) => Ok(resp),
tracing::debug!("client is created for: {}", url); Err(e) => {
self.signal.send(HealthStatus::Down(target)).await;
let resp = client Err(RPCError::Network(NetworkError::new(&e)))
.post(url) }
.json(&req) }?;
.send()
.await
.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
tracing::debug!("client.post() is sent"); tracing::debug!("client.post() is sent");
@ -75,19 +91,27 @@ impl DcacheNetwork {
.await .await
.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; .map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
res.map_err(|e| RPCError::RemoteError(RemoteError::new(target, e))) let res = res.map_err(|e| RPCError::RemoteError(RemoteError::new(target, e)));
if res.is_ok() {
let signal2 = self.signal.clone();
let fut = async move {
let _ = signal2.send(HealthStatus::Healthy(target)).await;
};
tokio::spawn(fut);
}
res
} }
} }
// NOTE: This could be implemented also on `Arc<DcacheNetwork>`, but since it's empty, implemented // NOTE: This could be implemented also on `Arc<DcacheNetwork>`, but since it's empty, implemented
// directly. // directly.
#[async_trait] #[async_trait]
impl RaftNetworkFactory<DcacheTypeConfig> for DcacheNetwork { impl RaftNetworkFactory<DcacheTypeConfig> for Arc<DcacheNetwork> {
type Network = DcacheNetworkConnection; type Network = DcacheNetworkConnection;
async fn new_client(&mut self, target: DcacheNodeId, node: &BasicNode) -> Self::Network { async fn new_client(&mut self, target: DcacheNodeId, node: &BasicNode) -> Self::Network {
DcacheNetworkConnection { DcacheNetworkConnection {
owner: DcacheNetwork {}, owner: self.clone(),
target, target,
target_node: node.clone(), target_node: node.clone(),
} }
@ -95,7 +119,7 @@ impl RaftNetworkFactory<DcacheTypeConfig> for DcacheNetwork {
} }
pub struct DcacheNetworkConnection { pub struct DcacheNetworkConnection {
owner: DcacheNetwork, owner: Arc<DcacheNetwork>,
target: DcacheNodeId, target: DcacheNodeId,
target_node: BasicNode, target_node: BasicNode,
} }