From 8d373c5f5358a6eddc11edfa8bb97fb5998c1d4a Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Sun, 17 Dec 2023 19:23:01 +0530 Subject: [PATCH] feat: count node unreachable errors and remove node from cluster --- src/network/management.rs | 74 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/src/network/management.rs b/src/network/management.rs index 9ff4205..d3616a7 100644 --- a/src/network/management.rs +++ b/src/network/management.rs @@ -17,6 +17,7 @@ */ use std::collections::BTreeMap; use std::collections::BTreeSet; +use std::collections::HashMap; use actix_web::get; use actix_web::post; @@ -41,6 +42,7 @@ pub async fn add_learner( addr: req.0 .1.clone(), }; let res = app.raft.add_learner(node_id, node, true).await; + Ok(Json(res)) } @@ -69,7 +71,77 @@ pub async fn init(app: Data) -> actix_web::Result { #[get("/metrics")] pub async fn metrics(app: Data) -> actix_web::Result { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); Ok(Json(res)) } + +use tokio::sync::mpsc; + +#[derive(Debug)] +pub enum HealthStatus { + Healthy(DcacheNodeId), + Down(DcacheNodeId), +} + +pub struct HealthMetrics; + +impl HealthMetrics { + pub async fn spawn( + app: Data, + threshold: usize, + mut rx: mpsc::Receiver, + ) -> tokio::task::JoinHandle<()> { + let mut current_error_rate = HashMap::new(); + let fut = async move { + while let Some(msg) = rx.recv().await { + match msg { + HealthStatus::Healthy(id) => { + current_error_rate.insert(id, 0); + } + HealthStatus::Down(id) => { + if let Some(existing_count) = + current_error_rate.get(&id).map(|s| s.to_owned()) + { + current_error_rate.insert(id, existing_count + 1); + if existing_count == threshold { + let cluster_metrics = app.raft.metrics().borrow().clone(); + let mut new_nodes: Vec = Vec::new(); + for node in cluster_metrics.membership_config.nodes() { + if *node.0 == id { + continue; + } + + new_nodes.push(*node.0); + } + + let res = + app.raft.change_membership(new_nodes, false).await.unwrap(); + } + } else { + current_error_rate.insert(id, 1); + } + } + } + } + }; + + tokio::spawn(fut) + } +} + +//#[get("/self/remove/{id}")] +//pub async fn remove_node(app: Data, id: web::Path) -> actix_web::Result { +// let cluster_metrics = app.raft.metrics().borrow().clone(); +// let remote_id: u64 = 3; +// let mut new_nodes: Vec = Vec::new(); +// for node in cluster_metrics.membership_config.nodes() { +// if *node.0 == remote_id { +// continue; +// } +// +// new_nodes.push(*node.0); +// } +// +// let res = app.raft.change_membership(new_nodes, false).await; +// Ok(Json(res)) +//}