use std::sync::Arc; use libmcaptcha::cache::messages as CacheMessages; use libmcaptcha::defense; use libmcaptcha::master::messages as MasterMessages; use libmcaptcha::mcaptcha; use openraft::BasicNode; use serde::de::DeserializeOwned; use serde::Serialize; use tonic::Response; use dcache::dcache_request::DcacheRequest as PipelineReq; use dcache::dcache_response::DcacheResponse as InnerPipelineRes; use dcache::dcache_service_server::DcacheService; use dcache::DcacheResponse as OuterPipelineRes; use dcache::{Learner, RaftReply, RaftRequest}; use crate::app::DcacheApp; use crate::store::{DcacheRequest, DcacheResponse}; pub mod dcache { tonic::include_proto!("dcache"); // The string specified here must match the proto package name } #[derive(Clone)] pub struct MyDcacheImpl { app: Arc, } impl MyDcacheImpl { pub fn new(app: Arc) -> Self { Self { app } } } #[tonic::async_trait] impl DcacheService for MyDcacheImpl { async fn add_learner( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let node_id = req.id; let node = BasicNode { addr: req.addr.clone(), }; println!("Learner added: {:?}", &req.addr); let res = self.app.raft.add_learner(node_id, node, true).await; Ok(Response::new(res.into())) } async fn add_captcha( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let res = self .app .raft .client_write(DcacheRequest::AddCaptcha(req.into())) .await; Ok(Response::new(res.into())) } async fn add_visitor( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let res = self .app .raft .client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor( req.id, ))) .await .map_err(|e| { tonic::Status::new(tonic::Code::Internal, serde_json::to_string(&e).unwrap()) })?; match res.data { DcacheResponse::AddVisitorResult(res) => { Ok(Response::new(dcache::OptionAddVisitorResult { result: res.map(|f| f.into()), })) } _ => unimplemented!(), } } async fn rename_captcha( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let res = self .app .raft .client_write(DcacheRequest::RenameCaptcha(req.into())) .await; Ok(Response::new(res.into())) } async fn remove_captcha( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let res = self .app .raft .client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha( req.id, ))) .await; Ok(Response::new(res.into())) } async fn cache_pow( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let res = self .app .raft .client_write(DcacheRequest::CachePoW(req.into())) .await; Ok(Response::new(res.into())) } async fn cache_result( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let res = self .app .raft .client_write(DcacheRequest::CacheResult(req.into())) .await; Ok(Response::new(res.into())) } // type PipelineDcacheOpsStream = // Pin> + Send + 'static>>; // async fn pipeline_dcache_ops( // &self, // request: tonic::Request>, // ) -> std::result::Result, tonic::Status> { async fn pipeline_dcache_ops( &self, request: tonic::Request, ) -> Result, tonic::Status> { let mut reqs = request.into_inner(); let mut responses = Vec::with_capacity(reqs.requests.len()); for req in reqs.requests.drain(0..) { let res = match req.dcache_request.unwrap() { PipelineReq::AddCaptcha(add_captcha_req) => { let res = self .app .raft .client_write(DcacheRequest::AddCaptcha(add_captcha_req.into())) .await; OuterPipelineRes { dcache_response: Some(InnerPipelineRes::Other(res.into())), } } PipelineReq::AddVisitor(add_visitor_req) => { let res = self .app .raft .client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor( add_visitor_req.id, ))) .await; match res { Err(_) => OuterPipelineRes { dcache_response: None, }, Ok(res) => match res.data { DcacheResponse::AddVisitorResult(res) => { let res = dcache::OptionAddVisitorResult { result: res.map(|f| f.into()), }; OuterPipelineRes { dcache_response: Some( InnerPipelineRes::OptionAddVisitorResult(res), ), } } _ => unimplemented!(), }, } } PipelineReq::RenameCaptcha(rename_captcha_req) => { let res = self .app .raft .client_write(DcacheRequest::RenameCaptcha(rename_captcha_req.into())) .await; OuterPipelineRes { dcache_response: Some(InnerPipelineRes::Other(res.into())), } } PipelineReq::RemoveCaptcha(remove_captcha_req) => { let res = self .app .raft .client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha( remove_captcha_req.id, ))) .await; OuterPipelineRes { dcache_response: Some(InnerPipelineRes::Other(res.into())), } } PipelineReq::CachePow(cache_pow_req) => { let res = self .app .raft .client_write(DcacheRequest::CachePoW(cache_pow_req.into())) .await; OuterPipelineRes { dcache_response: Some(InnerPipelineRes::Other(res.into())), } } PipelineReq::CacheResult(cache_result_req) => { let res = self .app .raft .client_write(DcacheRequest::CacheResult(cache_result_req.into())) .await; OuterPipelineRes { dcache_response: Some(InnerPipelineRes::Other(res.into())), } } }; responses.push(res); } Ok(Response::new(dcache::DcacheBatchResponse { responses })) } async fn write( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let req = serde_json::from_str(&req.data).unwrap(); let res = self.app.raft.client_write(req).await; Ok(Response::new(res.into())) } /// / Forward a request to other async fn forward( &self, _request: tonic::Request, ) -> std::result::Result, tonic::Status> { unimplemented!(); } async fn append_entries( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let req = serde_json::from_str(&req.data).unwrap(); let res = self.app.raft.append_entries(req).await; Ok(Response::new(res.into())) } async fn install_snapshot( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let req = serde_json::from_str(&req.data).unwrap(); let res = self.app.raft.install_snapshot(req).await; Ok(Response::new(res.into())) } async fn vote( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let req = serde_json::from_str(&req.data).unwrap(); let res = self.app.raft.vote(req).await; Ok(Response::new(res.into())) } } impl From for Result where T: DeserializeOwned, E: DeserializeOwned, { fn from(msg: RaftReply) -> Self { if !msg.data.is_empty() { let resp: T = serde_json::from_str(&msg.data).expect("fail to deserialize"); Ok(resp) } else { let err: E = serde_json::from_str(&msg.error).expect("fail to deserialize"); Err(err) } } } impl From> for RaftReply where T: Serialize, E: Serialize, { fn from(r: Result) -> Self { match r { Ok(x) => { let data = serde_json::to_string(&x).expect("fail to serialize"); RaftReply { data, error: Default::default(), } } Err(e) => { let error = serde_json::to_string(&e).expect("fail to serialize"); RaftReply { data: Default::default(), error, } } } } } impl From for MasterMessages::AddSite { fn from(value: dcache::AddCaptchaRequest) -> Self { let req_mcaptcha = value.mcaptcha.unwrap(); let mut defense = req_mcaptcha.defense.unwrap(); let mut new_defense = defense::DefenseBuilder::default(); for level in defense.levels.drain(0..) { new_defense .add_level( defense::LevelBuilder::default() .difficulty_factor(level.difficulty_factor) .unwrap() .visitor_threshold(level.visitor_threshold) .build() .unwrap(), ) .unwrap(); } let defense = new_defense.build().unwrap(); let mcaptcha = mcaptcha::MCaptchaBuilder::default() .defense(defense) .duration(req_mcaptcha.duration) .build() .unwrap(); Self { id: value.id, mcaptcha, } } } impl From for dcache::AddVisitorResult { fn from(value: libmcaptcha::master::AddVisitorResult) -> Self { Self { duration: value.duration, difficulty_factor: value.difficulty_factor, } } } impl From for MasterMessages::Rename { fn from(value: dcache::RenameCaptchaRequest) -> Self { Self { name: value.name, rename_to: value.rename_to, } } } impl From for CacheMessages::CachePoW { fn from(value: dcache::CachePowRequest) -> Self { Self { string: value.string, difficulty_factor: value.difficulty_factor, duration: value.duration, key: value.key, } } } impl From for CacheMessages::CacheResult { fn from(value: dcache::CacheResultRequest) -> Self { Self { token: value.token, key: value.key, duration: value.duration, } } }