dcache/src/network/raft_network_impl.rs
Aravinth Manivannan 70ef43b720
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
feat: use protobuf for RPC
2023-12-26 14:58:55 +05:30

240 lines
7.2 KiB
Rust

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashSet;
/*
* mCaptcha - A proof of work based DoS protection system
* Copyright © 2023 Aravinth Manivannan <realravinth@batsense.net>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait;
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::InstallSnapshotRequest;
use openraft::raft::InstallSnapshotResponse;
use openraft::raft::VoteRequest;
use openraft::raft::VoteResponse;
use openraft::BasicNode;
use openraft::RaftNetwork;
use openraft::RaftNetworkFactory;
use reqwest::Client;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::mpsc::Sender;
use tonic::transport::channel::Channel;
use super::management::HealthStatus;
use super::raft::snapshot;
use crate::DcacheNodeId;
use crate::DcacheTypeConfig;
use crate::server::dcache::dcache_service_client::DcacheServiceClient;
use crate::server::dcache::{RaftReply, RaftRequest};
#[derive(Clone)]
pub struct DcacheNetwork {
pub signal: Sender<HealthStatus>,
pub client: Client,
}
pub enum RPCType {
Vote,
Snapshot,
Append,
}
impl DcacheNetwork {
pub fn new(signal: Sender<HealthStatus>, client: Client) -> Self {
Self { signal, client }
}
pub async fn send_rpc<Req, Resp, Err>(
&self,
target: DcacheNodeId,
target_node: &BasicNode,
uri: &str,
req: Req,
event: RPCType,
) -> Result<Resp, RPCError<DcacheNodeId, BasicNode, Err>>
where
Req: Serialize,
Err: std::error::Error + DeserializeOwned,
Resp: DeserializeOwned,
{
let addr = &target_node.addr;
//let url = format!("http://{}/{}", addr, uri);
let url = format!("http://{}", addr);
let mut client = DcacheServiceClient::connect(url).await.unwrap();
let res = match event {
RPCType::Vote => {
client
.vote(RaftRequest {
data: serde_json::to_string(&req).unwrap(),
})
.await
}
RPCType::Snapshot => {
client
.install_snapshot(RaftRequest {
data: serde_json::to_string(&req).unwrap(),
})
.await
}
RPCType::Append => {
client
.append_entries(RaftRequest {
data: serde_json::to_string(&req).unwrap(),
})
.await
}
};
match res {
Ok(res) => {
let signal2 = self.signal.clone();
let fut = async move {
let _ = signal2.send(HealthStatus::Healthy(target)).await;
};
tokio::spawn(fut);
let res = res.into_inner();
Ok(serde_json::from_str(&res.data).unwrap())
}
Err(e) => {
let _ = self.signal.send(HealthStatus::Down(target)).await;
Err(RPCError::Network(NetworkError::new(&e)))
}
}
// tracing::debug!("send_rpc to url: {}", url);
//
// let resp = match self.client.post(url).json(&req).send().await {
// Ok(resp) => Ok(resp),
// Err(e) => {
// self.signal.send(HealthStatus::Down(target)).await;
// Err(RPCError::Network(NetworkError::new(&e)))
// }
// }?;
//
// tracing::debug!("client.post() is sent");
//
// let res: Result<Resp, Err> = resp
// .json()
// .await
// .map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
//
// let res = res.map_err(|e| RPCError::RemoteError(RemoteError::new(target, e)));
// if res.is_ok() {
// let signal2 = self.signal.clone();
// let fut = async move {
// let _ = signal2.send(HealthStatus::Healthy(target)).await;
// };
// tokio::spawn(fut);
// }
// res
}
}
// NOTE: This could be implemented also on `Arc<DcacheNetwork>`, but since it's empty, implemented
// directly.
#[async_trait]
impl RaftNetworkFactory<DcacheTypeConfig> for Arc<DcacheNetwork> {
type Network = DcacheNetworkConnection;
async fn new_client(&mut self, target: DcacheNodeId, node: &BasicNode) -> Self::Network {
DcacheNetworkConnection {
owner: self.clone(),
target,
target_node: node.clone(),
}
}
}
pub struct DcacheNetworkConnection {
owner: Arc<DcacheNetwork>,
target: DcacheNodeId,
target_node: BasicNode,
}
#[async_trait]
impl RaftNetwork<DcacheTypeConfig> for DcacheNetworkConnection {
async fn send_append_entries(
&mut self,
req: AppendEntriesRequest<DcacheTypeConfig>,
) -> Result<
AppendEntriesResponse<DcacheNodeId>,
RPCError<DcacheNodeId, BasicNode, RaftError<DcacheNodeId>>,
> {
self.owner
.send_rpc(
self.target,
&self.target_node,
"raft-append",
req,
RPCType::Append,
)
.await
}
async fn send_install_snapshot(
&mut self,
req: InstallSnapshotRequest<DcacheTypeConfig>,
) -> Result<
InstallSnapshotResponse<DcacheNodeId>,
RPCError<DcacheNodeId, BasicNode, RaftError<DcacheNodeId, InstallSnapshotError>>,
> {
self.owner
.send_rpc(
self.target,
&self.target_node,
"raft-snapshot",
req,
RPCType::Append,
)
.await
}
async fn send_vote(
&mut self,
req: VoteRequest<DcacheNodeId>,
) -> Result<
VoteResponse<DcacheNodeId>,
RPCError<DcacheNodeId, BasicNode, RaftError<DcacheNodeId>>,
> {
self.owner
.send_rpc(
self.target,
&self.target_node,
"raft-vote",
req,
RPCType::Vote,
)
.await
}
}