/* * mCaptcha - A proof of work based DoS protection system * Copyright © 2023 Aravinth Manivannan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #![allow(clippy::uninlined_format_args)] use std::io::Cursor; use std::sync::Arc; use actix_web::middleware; use actix_web::middleware::Logger; use actix_web::web::Data; use actix_web::App; use actix_web::HttpServer; use openraft::storage::Adaptor; use openraft::BasicNode; use openraft::Config; use openraft::Raft; use crate::app::DcacheApp; use crate::network::api; use crate::network::management; use crate::network::raft; use crate::network::raft_network_impl::DcacheNetwork; use crate::store::DcacheRequest; use crate::store::DcacheResponse; use crate::store::DcacheStore; use tonic::transport::Server; pub mod app; pub mod network; mod server; pub mod store; pub mod types; pub type DcacheNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub DcacheTypeConfig: D = DcacheRequest, R = DcacheResponse, NodeId = DcacheNodeId, Node = BasicNode, Entry = openraft::Entry, SnapshotData = Cursor> ); pub type LogStore = Adaptor>; pub type StateMachineStore = Adaptor>; pub type DcacheRaft = Raft, LogStore, StateMachineStore>; pub mod typ { use openraft::BasicNode; use crate::DcacheNodeId; use crate::DcacheTypeConfig; pub type RaftError = openraft::error::RaftError; pub type RPCError = openraft::error::RPCError>; pub type ClientWriteError = openraft::error::ClientWriteError; pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; pub type ForwardToLeader = openraft::error::ForwardToLeader; pub type InitializeError = openraft::error::InitializeError; pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; } pub async fn start_example_raft_node( node_id: DcacheNodeId, http_addr: String, introducer_addr: String, introducer_id: DcacheNodeId, ) -> std::io::Result<()> { // Create a configuration for the raft instance. let config = Config { heartbeat_interval: 500, election_timeout_min: 1500, election_timeout_max: 3000, ..Default::default() }; let salt = "foobar12".into(); #[cfg(release)] todo!("set salt"); let config = Arc::new(config.validate().unwrap()); // Create a instance of where the Raft data will be stored. let store = Arc::new(DcacheStore::new(salt)); let (log_store, state_machine) = Adaptor::new(store.clone()); let client = reqwest::Client::new(); // Create the network layer that will connect and communicate the raft instances and // will be used in conjunction with the store created above. let (manager_tx, manager_rx) = tokio::sync::mpsc::channel(1000); // let health = Arc::new(crate::network::raft_network_impl::HealthLedger::new(manager_tx)); // let network = Arc::new(DcacheNetwork::new(health)); let network = Arc::new(DcacheNetwork::new(manager_tx, client.clone())); // Create a local raft instance. let raft = Raft::new( node_id, config.clone(), network.clone(), log_store, state_machine, ) .await .unwrap(); raft.enable_heartbeat(true); raft.enable_elect(true); let captcha = serde_json::json!({ "AddCaptcha": { "id": "test_1", "mcaptcha": { "visitor_threshold": 0, "defense": { "levels": [ {"visitor_threshold": 50, "difficulty_factor": 500}, {"visitor_threshold": 5000, "difficulty_factor": 50000}, ], "current_visitor_threshold": 0, }, "duration": 30, }}}); #[derive(serde::Serialize)] struct X { data: String, } let x = X { data: serde_json::to_string(&captcha).unwrap(), }; println!("{}", serde_json::to_string(&x).unwrap()); // raft.enable_tick(true); // Create an application that will store all the instances created above, this will // be later used on the actix-web services. let app = DcacheApp { id: node_id, addr: http_addr.clone(), raft, store, config, network, }; let app = Data::new(app); let dcache_service = crate::server::MyDcacheImpl::new(app.clone()); if introducer_addr == http_addr { app.init().await.unwrap(); } let app_copy = app.clone(); // Start the actix-web server. // let server = HttpServer::new(move || { // App::new() // .wrap(Logger::default()) // .wrap(Logger::new("%a %{User-Agent}i")) // .wrap(middleware::Compress::default()) // .app_data(app.clone()) // // raft internal RPC // .service(raft::append) // .service(raft::snapshot) // .service(raft::vote) // // admin API // .service(management::init) // .service(management::add_learner) // .service(management::change_membership) // .service(management::metrics) // // application API // .service(api::write) // .service(api::state) // .service(api::read) // .service(api::pipeline_read) // .service(api::pipeline_write) // // .service(api::consistent_read) // }); // // let x = server.bind(&http_addr)?; // let server_fut = tokio::spawn(x.run()); use crate::server::dcache::dcache_service_server::DcacheServiceServer; let svc = DcacheServiceServer::new(dcache_service); let x = Server::builder() .add_service(svc) .serve(http_addr.clone().parse().unwrap()); let server_fut = tokio::spawn(x); tokio::time::sleep(std::time::Duration::new(3, 0)).await; // let req: (DcacheNodeId, String) = (node_id, http_addr); // let c = reqwest::Client::new(); // c.post(format!("http://{}/add-learner", introducer_addr)) // .json(&req) // .send() // .await // .unwrap(); // let health_job = tokio::spawn(DcacheApp::health_job(app_copy)); let url = format!("http://{}", introducer_addr); use crate::server::dcache::dcache_service_client::DcacheServiceClient; use crate::server::dcache::Learner; use crate::server::dcache::RaftRequest; let mut client = DcacheServiceClient::connect(url).await.unwrap(); client .add_learner(Learner { id: node_id, addr: http_addr, }) .await .unwrap(); let health_metrics_handle = crate::network::management::HealthMetrics::spawn(app_copy, 5, manager_rx).await; server_fut.await?.unwrap(); health_metrics_handle.abort(); // health_job.abort(); Ok(()) }