/* * mCaptcha - A proof of work based DoS protection system * Copyright © 2023 Aravinth Manivannan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use std::sync::Arc; use std::time::Duration; use super::management::HealthStatus; use crate::DcacheNodeId; use crate::DcacheTypeConfig; use async_trait::async_trait; use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; use openraft::error::RPCError; use openraft::error::RaftError; use openraft::raft::AppendEntriesRequest; use openraft::raft::AppendEntriesResponse; use openraft::raft::InstallSnapshotRequest; use openraft::raft::InstallSnapshotResponse; use openraft::raft::VoteRequest; use openraft::raft::VoteResponse; use openraft::BasicNode; use openraft::RaftNetwork; use openraft::RaftNetworkFactory; use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::mpsc::Sender; use tonic::transport::channel::Channel; use tower_service::Service; use crate::pool::*; use crate::protobuf::dcache::dcache_service_client::DcacheServiceClient; use crate::protobuf::dcache::RaftRequest; #[derive(Debug)] struct ChannelManager {} #[async_trait] impl ItemManager for ChannelManager { type Key = String; type Item = Channel; type Error = tonic::transport::Error; async fn build(&self, addr: &Self::Key) -> Result { tonic::transport::Endpoint::new(addr.clone())? .connect() .await } async fn check(&self, mut ch: Channel) -> Result { futures::future::poll_fn(|cx| (&mut ch).poll_ready(cx)).await?; Ok(ch) } } pub struct DcacheNetwork { pub signal: Sender, conn_pool: Pool, } pub enum RPCType { Vote, Snapshot, Append, } impl DcacheNetwork { pub fn new(signal: Sender) -> Self { let mgr = ChannelManager {}; Self { signal, conn_pool: Pool::new(mgr, Duration::from_millis(50)), } } pub async fn send_rpc( &self, target: DcacheNodeId, target_node: &BasicNode, req: Req, event: RPCType, ) -> Result> where Req: Serialize, Err: std::error::Error + DeserializeOwned, Resp: DeserializeOwned, { let mut client = self.make_client(&target, target_node).await; let res = match event { RPCType::Vote => { client .vote(RaftRequest { data: serde_json::to_string(&req).unwrap(), }) .await } RPCType::Snapshot => { client .install_snapshot(RaftRequest { data: serde_json::to_string(&req).unwrap(), }) .await } RPCType::Append => { client .append_entries(RaftRequest { data: serde_json::to_string(&req).unwrap(), }) .await } }; match res { Ok(res) => { let signal2 = self.signal.clone(); let fut = async move { let _ = signal2.send(HealthStatus::Healthy(target)).await; }; tokio::spawn(fut); let res = res.into_inner(); Ok(serde_json::from_str(&res.data).unwrap()) } Err(e) => { let _ = self.signal.send(HealthStatus::Down(target)).await; Err(RPCError::Network(NetworkError::new(&e))) } } } pub async fn make_client( &self, target: &DcacheNodeId, target_node: &BasicNode, ) -> DcacheServiceClient { let addr = format!("http://{}", &target_node.addr); tracing::debug!("connect: target={}: {}", target, addr); let channel = self.conn_pool.get(&addr).await.unwrap(); let client = DcacheServiceClient::new(channel); tracing::info!("connected: target={}: {}", target, addr); client } } // NOTE: This could be implemented also on `Arc`, but since it's empty, implemented // directly. #[async_trait] impl RaftNetworkFactory for Arc { type Network = DcacheNetworkConnection; async fn new_client(&mut self, target: DcacheNodeId, node: &BasicNode) -> Self::Network { DcacheNetworkConnection { owner: self.clone(), target, target_node: node.clone(), } } } pub struct DcacheNetworkConnection { owner: Arc, target: DcacheNodeId, target_node: BasicNode, } #[async_trait] impl RaftNetwork for DcacheNetworkConnection { async fn send_append_entries( &mut self, req: AppendEntriesRequest, ) -> Result< AppendEntriesResponse, RPCError>, > { self.owner .send_rpc(self.target, &self.target_node, req, RPCType::Append) .await } async fn send_install_snapshot( &mut self, req: InstallSnapshotRequest, ) -> Result< InstallSnapshotResponse, RPCError>, > { self.owner .send_rpc(self.target, &self.target_node, req, RPCType::Append) .await } async fn send_vote( &mut self, req: VoteRequest, ) -> Result< VoteResponse, RPCError>, > { self.owner .send_rpc(self.target, &self.target_node, req, RPCType::Vote) .await } }