feat: start health supervisor

This commit is contained in:
Aravinth Manivannan 2023-12-17 19:25:08 +05:30
parent adadaff463
commit 285469ffed
Signed by: realaravinth
GPG key ID: F8F50389936984FF
3 changed files with 83 additions and 717 deletions

722
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,9 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
openraft = { version = "0.8.3", features = ["serde"]}
libmcaptcha = { branch = "master", git = "https://github.com/mCaptcha/libmcaptcha", features = ["full"]}
#libmcaptcha = { path="../libmcaptcha/", features = ["full"]}
openraft = { version = "0.8.8", features = ["serde", "single-term-leader"]}
#openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.9.0-alpha.1", features = ["serde", "single-term-leader"]}
#libmcaptcha = { branch = "master", git = "https://github.com/mCaptcha/libmcaptcha", features = ["full"]}
libmcaptcha = { path="/src/atm/code/mcaptcha/libmcaptcha", features=["full"] }
tracing = { version = "0.1.37", features = ["log"] }
serde_json = "1.0.96"
serde = { version = "1.0.163", features = ["derive"] }
@ -19,10 +20,9 @@ futures-util = { version = "0.3.17", default-features = false, features = ["std"
lazy_static = "1.4.0"
pretty_env_logger = "0.4.0"
uuid = { version = "1", features = ["v4"] }
sqlx = { version = "0.5.13", features = [ "runtime-actix-rustls", "postgres", "time", "offline" ] }
actix-web-codegen-const-routes = { version = "0.1.0", tag = "0.1.0", git = "https://github.com/realaravinth/actix-web-codegen-const-routes" }
derive_builder = "0.11.2"
config = "0.11"
config = { version = "0.11", features = ["toml"] }
derive_more = "0.99.17"
url = { version = "2.2.2", features = ["serde"]}
async-trait = "0.1.36"
@ -35,7 +35,6 @@ actix = "0.13.0"
[build-dependencies]
serde_json = "1"
sqlx = { version = "0.5.13", features = [ "runtime-actix-rustls", "postgres", "time", "offline"] }
[dev-dependencies]
actix-rt = "2.7.0"

View file

@ -17,6 +17,7 @@
*/
#![allow(clippy::uninlined_format_args)]
use std::io::Cursor;
use std::sync::Arc;
use actix_web::middleware;
@ -24,6 +25,7 @@ use actix_web::middleware::Logger;
use actix_web::web::Data;
use actix_web::App;
use actix_web::HttpServer;
use openraft::storage::Adaptor;
use openraft::BasicNode;
use openraft::Config;
use openraft::Raft;
@ -38,7 +40,6 @@ use crate::store::DcacheResponse;
use crate::store::DcacheStore;
pub mod app;
//pub mod client;
pub mod network;
pub mod store;
@ -46,10 +47,13 @@ pub type DcacheNodeId = u64;
openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub DcacheTypeConfig: D = DcacheRequest, R = DcacheResponse, NodeId = DcacheNodeId, Node = BasicNode
pub DcacheTypeConfig: D = DcacheRequest, R = DcacheResponse, NodeId = DcacheNodeId, Node = BasicNode,
Entry = openraft::Entry<DcacheTypeConfig>, SnapshotData = Cursor<Vec<u8>>
);
pub type DcacheRaft = Raft<DcacheTypeConfig, DcacheNetwork, Arc<DcacheStore>>;
pub type LogStore = Adaptor<DcacheTypeConfig, Arc<DcacheStore>>;
pub type StateMachineStore = Adaptor<DcacheTypeConfig, Arc<DcacheStore>>;
pub type DcacheRaft = Raft<DcacheTypeConfig, Arc<DcacheNetwork>, LogStore, StateMachineStore>;
pub mod typ {
use openraft::BasicNode;
@ -73,6 +77,8 @@ pub mod typ {
pub async fn start_example_raft_node(
node_id: DcacheNodeId,
http_addr: String,
introducer_addr: String,
introducer_id: DcacheNodeId,
) -> std::io::Result<()> {
// Create a configuration for the raft instance.
let config = Config {
@ -91,14 +97,30 @@ pub async fn start_example_raft_node(
// Create a instance of where the Raft data will be stored.
let store = Arc::new(DcacheStore::new(salt));
let (log_store, state_machine) = Adaptor::new(store.clone());
let client = reqwest::Client::new();
// Create the network layer that will connect and communicate the raft instances and
// will be used in conjunction with the store created above.
let network = DcacheNetwork {};
let (manager_tx, manager_rx) = tokio::sync::mpsc::channel(1000);
// let health = Arc::new(crate::network::raft_network_impl::HealthLedger::new(manager_tx));
// let network = Arc::new(DcacheNetwork::new(health));
let network = Arc::new(DcacheNetwork::new(manager_tx, client.clone()));
// Create a local raft instance.
let raft = Raft::new(node_id, config.clone(), network, store.clone())
let raft = Raft::new(
node_id,
config.clone(),
network.clone(),
log_store,
state_machine,
)
.await
.unwrap();
raft.enable_heartbeat(true);
raft.enable_elect(true);
// raft.enable_tick(true);
// Create an application that will store all the instances created above, this will
// be later used on the actix-web services.
@ -108,8 +130,14 @@ pub async fn start_example_raft_node(
raft,
store,
config,
network,
});
if introducer_addr == http_addr {
app.init().await.unwrap();
}
let app_copy = app.clone();
// Start the actix-web server.
let server = HttpServer::new(move || {
App::new()
@ -129,11 +157,30 @@ pub async fn start_example_raft_node(
// application API
.service(api::write)
.service(api::state)
// .service(api::read)
.service(api::read)
.service(api::pipeline_read)
.service(api::pipeline_write)
// .service(api::consistent_read)
});
let x = server.bind(http_addr)?;
let x = server.bind(&http_addr)?;
x.run().await
let server_fut = tokio::spawn(x.run());
tokio::time::sleep(std::time::Duration::new(3, 0)).await;
let req: (DcacheNodeId, String) = (node_id, http_addr);
let c = reqwest::Client::new();
c.post(format!("http://{}/add-learner", introducer_addr))
.json(&req)
.send()
.await
.unwrap();
// let health_job = tokio::spawn(DcacheApp::health_job(app_copy));
let health_metrics_handle =
crate::network::management::HealthMetrics::spawn(app_copy, 5, manager_rx).await;
server_fut.await??;
health_metrics_handle.abort();
// health_job.abort();
Ok(())
}