feat: count node unreachable errors and remove node from cluster

This commit is contained in:
Aravinth Manivannan 2023-12-17 19:23:01 +05:30
parent 5f4c0818be
commit 8d373c5f53
Signed by: realaravinth
GPG key ID: F8F50389936984FF

View file

@ -17,6 +17,7 @@
*/
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
use actix_web::get;
use actix_web::post;
@ -41,6 +42,7 @@ pub async fn add_learner(
addr: req.0 .1.clone(),
};
let res = app.raft.add_learner(node_id, node, true).await;
Ok(Json(res))
}
@ -69,7 +71,77 @@ pub async fn init(app: Data<DcacheApp>) -> actix_web::Result<impl Responder> {
#[get("/metrics")]
pub async fn metrics(app: Data<DcacheApp>) -> actix_web::Result<impl Responder> {
let metrics = app.raft.metrics().borrow().clone();
let res: Result<RaftMetrics<DcacheNodeId, BasicNode>, Infallible> = Ok(metrics);
Ok(Json(res))
}
use tokio::sync::mpsc;
#[derive(Debug)]
pub enum HealthStatus {
Healthy(DcacheNodeId),
Down(DcacheNodeId),
}
pub struct HealthMetrics;
impl HealthMetrics {
pub async fn spawn(
app: Data<DcacheApp>,
threshold: usize,
mut rx: mpsc::Receiver<HealthStatus>,
) -> tokio::task::JoinHandle<()> {
let mut current_error_rate = HashMap::new();
let fut = async move {
while let Some(msg) = rx.recv().await {
match msg {
HealthStatus::Healthy(id) => {
current_error_rate.insert(id, 0);
}
HealthStatus::Down(id) => {
if let Some(existing_count) =
current_error_rate.get(&id).map(|s| s.to_owned())
{
current_error_rate.insert(id, existing_count + 1);
if existing_count == threshold {
let cluster_metrics = app.raft.metrics().borrow().clone();
let mut new_nodes: Vec<DcacheNodeId> = Vec::new();
for node in cluster_metrics.membership_config.nodes() {
if *node.0 == id {
continue;
}
new_nodes.push(*node.0);
}
let res =
app.raft.change_membership(new_nodes, false).await.unwrap();
}
} else {
current_error_rate.insert(id, 1);
}
}
}
}
};
tokio::spawn(fut)
}
}
//#[get("/self/remove/{id}")]
//pub async fn remove_node(app: Data<DcacheApp>, id: web::Path<u64>) -> actix_web::Result<impl Responder> {
// let cluster_metrics = app.raft.metrics().borrow().clone();
// let remote_id: u64 = 3;
// let mut new_nodes: Vec<DcacheNodeId> = Vec::new();
// for node in cluster_metrics.membership_config.nodes() {
// if *node.0 == remote_id {
// continue;
// }
//
// new_nodes.push(*node.0);
// }
//
// let res = app.raft.change_membership(new_nodes, false).await;
// Ok(Json(res))
//}