/* * mCaptcha - A proof of work based DoS protection system * Copyright © 2023 Aravinth Manivannan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use std::collections::HashMap; use std::sync::Arc; //use actix_web::web; //use actix_web::web::Data; use crate::app::DcacheApp; use crate::DcacheNodeId; use tokio::sync::mpsc; #[derive(Debug)] pub enum HealthStatus { Healthy(DcacheNodeId), Down(DcacheNodeId), } pub struct HealthMetrics; impl HealthMetrics { pub async fn spawn( app: Arc, threshold: usize, mut rx: mpsc::Receiver, ) -> tokio::task::JoinHandle<()> { let mut current_error_rate = HashMap::new(); let fut = async move { while let Some(msg) = rx.recv().await { match msg { HealthStatus::Healthy(id) => { current_error_rate.insert(id, 0); } HealthStatus::Down(id) => { if let Some(existing_count) = current_error_rate.get(&id).map(|s| s.to_owned()) { current_error_rate.insert(id, existing_count + 1); if existing_count == threshold { let cluster_metrics = app.raft.metrics().borrow().clone(); let mut new_nodes: Vec = Vec::new(); for node in cluster_metrics.membership_config.nodes() { if *node.0 == id { continue; } new_nodes.push(*node.0); } let _res = app.raft.change_membership(new_nodes, false).await.unwrap(); } } else { current_error_rate.insert(id, 1); } } } } }; tokio::spawn(fut) } }