diff --git a/src/network/raft_network_impl.rs b/src/network/raft_network_impl.rs index 11a16ae..7bc2c8d 100644 --- a/src/network/raft_network_impl.rs +++ b/src/network/raft_network_impl.rs @@ -1,3 +1,6 @@ +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::collections::HashSet; /* * mCaptcha - A proof of work based DoS protection system * Copyright © 2023 Aravinth Manivannan @@ -15,6 +18,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::RwLock; +use std::time::Duration; +use std::time::Instant; + use async_trait::async_trait; use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; @@ -30,15 +39,25 @@ use openraft::raft::VoteResponse; use openraft::BasicNode; use openraft::RaftNetwork; use openraft::RaftNetworkFactory; +use reqwest::Client; use serde::de::DeserializeOwned; use serde::Serialize; +use tokio::sync::mpsc::Sender; +use super::management::HealthStatus; use crate::DcacheNodeId; use crate::DcacheTypeConfig; -pub struct DcacheNetwork {} +#[derive(Clone)] +pub struct DcacheNetwork { + pub signal: Sender, + pub client: Client, +} impl DcacheNetwork { + pub fn new(signal: Sender, client: Client) -> Self { + Self { signal, client } + } pub async fn send_rpc( &self, target: DcacheNodeId, @@ -57,16 +76,13 @@ impl DcacheNetwork { tracing::debug!("send_rpc to url: {}", url); - let client = reqwest::Client::new(); - - tracing::debug!("client is created for: {}", url); - - let resp = client - .post(url) - .json(&req) - .send() - .await - .map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let resp = match self.client.post(url).json(&req).send().await { + Ok(resp) => Ok(resp), + Err(e) => { + self.signal.send(HealthStatus::Down(target)).await; + Err(RPCError::Network(NetworkError::new(&e))) + } + }?; tracing::debug!("client.post() is sent"); @@ -75,19 +91,27 @@ impl DcacheNetwork { .await .map_err(|e| RPCError::Network(NetworkError::new(&e)))?; - res.map_err(|e| RPCError::RemoteError(RemoteError::new(target, e))) + let res = res.map_err(|e| RPCError::RemoteError(RemoteError::new(target, e))); + if res.is_ok() { + let signal2 = self.signal.clone(); + let fut = async move { + let _ = signal2.send(HealthStatus::Healthy(target)).await; + }; + tokio::spawn(fut); + } + res } } // NOTE: This could be implemented also on `Arc`, but since it's empty, implemented // directly. #[async_trait] -impl RaftNetworkFactory for DcacheNetwork { +impl RaftNetworkFactory for Arc { type Network = DcacheNetworkConnection; async fn new_client(&mut self, target: DcacheNodeId, node: &BasicNode) -> Self::Network { DcacheNetworkConnection { - owner: DcacheNetwork {}, + owner: self.clone(), target, target_node: node.clone(), } @@ -95,7 +119,7 @@ impl RaftNetworkFactory for DcacheNetwork { } pub struct DcacheNetworkConnection { - owner: DcacheNetwork, + owner: Arc, target: DcacheNodeId, target_node: BasicNode, }