dcache/src/protobuf.rs

517 lines
20 KiB
Rust
Raw Normal View History

2023-12-26 15:13:41 +05:30
use std::sync::Arc;
2023-12-27 22:56:54 +05:30
use libmcaptcha::cache::messages as CacheMessages;
use libmcaptcha::defense;
use libmcaptcha::master::messages as MasterMessages;
use libmcaptcha::mcaptcha;
2023-12-26 14:58:55 +05:30
use openraft::BasicNode;
2023-12-26 15:13:41 +05:30
use serde::de::DeserializeOwned;
use serde::Serialize;
use tonic::Response;
2023-12-26 14:58:55 +05:30
2023-12-28 19:15:07 +05:30
use dcache::dcache_request::DcacheRequest as PipelineReq;
use dcache::dcache_response::DcacheResponse as InnerPipelineRes;
2023-12-26 14:58:55 +05:30
use dcache::dcache_service_server::DcacheService;
2023-12-28 19:15:07 +05:30
use dcache::DcacheResponse as OuterPipelineRes;
2023-12-26 14:58:55 +05:30
use dcache::{Learner, RaftReply, RaftRequest};
use crate::app::DcacheApp;
2023-12-27 22:56:54 +05:30
use crate::store::{DcacheRequest, DcacheResponse};
2023-12-26 14:58:55 +05:30
pub mod dcache {
tonic::include_proto!("dcache"); // The string specified here must match the proto package name
}
#[derive(Clone)]
pub struct MyDcacheImpl {
2023-12-26 15:13:41 +05:30
app: Arc<DcacheApp>,
2023-12-26 14:58:55 +05:30
}
impl MyDcacheImpl {
2023-12-26 15:13:41 +05:30
pub fn new(app: Arc<DcacheApp>) -> Self {
2023-12-26 14:58:55 +05:30
Self { app }
}
}
#[tonic::async_trait]
impl DcacheService for MyDcacheImpl {
async fn add_learner(
&self,
request: tonic::Request<Learner>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let node_id = req.id;
let node = BasicNode {
addr: req.addr.clone(),
};
2023-12-28 13:43:53 +05:30
println!("Learner added: {:?}", &req.addr);
2023-12-26 14:58:55 +05:30
let res = self.app.raft.add_learner(node_id, node, true).await;
Ok(Response::new(res.into()))
}
2023-12-27 22:56:54 +05:30
async fn add_captcha(
&self,
request: tonic::Request<dcache::AddCaptchaRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
.app
.raft
.client_write(DcacheRequest::AddCaptcha(req.into()))
.await;
Ok(Response::new(res.into()))
}
async fn add_visitor(
&self,
request: tonic::Request<dcache::CaptchaId>,
) -> std::result::Result<tonic::Response<dcache::OptionAddVisitorResult>, tonic::Status> {
let req = request.into_inner();
let res = self
.app
.raft
.client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor(
req.id,
)))
.await
.map_err(|e| {
tonic::Status::new(tonic::Code::Internal, serde_json::to_string(&e).unwrap())
})?;
match res.data {
DcacheResponse::AddVisitorResult(res) => {
Ok(Response::new(dcache::OptionAddVisitorResult {
result: res.map(|f| f.into()),
}))
}
_ => unimplemented!(),
}
}
async fn rename_captcha(
&self,
request: tonic::Request<dcache::RenameCaptchaRequest>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
.app
.raft
.client_write(DcacheRequest::RenameCaptcha(req.into()))
.await;
Ok(Response::new(res.into()))
}
async fn remove_captcha(
&self,
request: tonic::Request<dcache::CaptchaId>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
.app
.raft
.client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha(
req.id,
)))
.await;
Ok(Response::new(res.into()))
}
async fn cache_pow(
&self,
request: tonic::Request<dcache::CachePowRequest>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
.app
.raft
.client_write(DcacheRequest::CachePoW(req.into()))
.await;
Ok(Response::new(res.into()))
}
async fn cache_result(
&self,
request: tonic::Request<dcache::CacheResultRequest>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
.app
.raft
.client_write(DcacheRequest::CacheResult(req.into()))
.await;
Ok(Response::new(res.into()))
}
2023-12-28 19:15:07 +05:30
// type PipelineDcacheOpsStream =
// Pin<Box<dyn Stream<Item = Result<OuterPipelineRes, tonic::Status>> + Send + 'static>>;
// async fn pipeline_dcache_ops(
// &self,
// request: tonic::Request<tonic::Streaming<dcache::DcacheRequest>>,
// ) -> std::result::Result<tonic::Response<Self::PipelineDcacheOpsStream>, tonic::Status> {
//// async fn pipeline_dcache_ops(
//// &self,
//// request: tonic::Request<dcache::DcacheBatchRequest>,
//// ) -> Result<Response<dcache::DcacheBatchResponse>, tonic::Status> {
//// let mut reqs = request.into_inner();
//// let mut responses = Vec::with_capacity(reqs.requests.len());
//// for req in reqs.requests.drain(0..) {
//// let res = match req.dcache_request.unwrap() {
//// PipelineReq::AddCaptcha(add_captcha_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::AddCaptcha(add_captcha_req.into()))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// PipelineReq::AddVisitor(add_visitor_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor(
//// add_visitor_req.id,
//// )))
//// .await;
//// match res {
//// Err(_) => OuterPipelineRes {
//// dcache_response: None,
//// },
//// Ok(res) => match res.data {
//// DcacheResponse::AddVisitorResult(res) => {
//// let res = dcache::OptionAddVisitorResult {
//// result: res.map(|f| f.into()),
//// };
//// OuterPipelineRes {
//// dcache_response: Some(
//// InnerPipelineRes::OptionAddVisitorResult(res),
//// ),
//// }
//// }
////
//// _ => unimplemented!(),
//// },
//// }
//// // if res.is_err() {
//// // OuterPipelineRes {
//// // dcache_response: None,
//// // }
//// // } else {
//// // let res = res.unwrap();
//// // match res.data {
//// // DcacheResponse::AddVisitorResult(res) => {
//// // let res = dcache::OptionAddVisitorResult {
//// // result: res.map(|f| f.into()),
//// // };
//// // OuterPipelineRes {
//// // dcache_response: Some(
//// // InnerPipelineRes::OptionAddVisitorResult(res),
//// // ),
//// // }
//// // }
//// //
//// // _ => unimplemented!(),
//// // }
//// // }
//// }
//// PipelineReq::RenameCaptcha(rename_captcha_req) => {
//// // let x: u8 = rename_visitor_req;
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::RenameCaptcha(rename_captcha_req.into()))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// PipelineReq::RemoveCaptcha(remove_captcha_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha(
//// remove_captcha_req.id,
//// )))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// PipelineReq::CachePow(cache_pow_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::CachePoW(cache_pow_req.into()))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// PipelineReq::CacheResult(cache_result_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::CacheResult(cache_result_req.into()))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// };
//// responses.push(res);
//// }
//// Ok(Response::new(dcache::DcacheBatchResponse { responses }))
//// }
// let mut reqs = request.into_inner().requests;
// let mut responses = Vec::with_capacity(reqs.len());
// for req in reqs.drain(0..) {
// let res = match req.dcache_request.unwrap() {
// PipelineReq::AddCaptcha(add_captcha_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::AddCaptcha(add_captcha_req.into()))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// PipelineReq::AddVisitor(add_visitor_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor(
// add_visitor_req.id,
// )))
// .await
// .map_err(|e| {
// tonic::Status::new(
// tonic::Code::Internal,
// serde_json::to_string(&e).unwrap(),
// )
// })?;
// match res.data {
// DcacheResponse::AddVisitorResult(res) => {
// let res = dcache::OptionAddVisitorResult {
// result: res.map(|f| f.into()),
// };
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::OptionAddVisitorResult(
// res,
// )),
// }
// }
//
// _ => unimplemented!(),
// }
// }
// PipelineReq::RenameCaptcha(rename_captcha_req) => {
// // let x: u8 = rename_visitor_req;
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::RenameCaptcha(rename_captcha_req.into()))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// PipelineReq::RemoveCaptcha(remove_captcha_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha(
// remove_captcha_req.id,
// )))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// PipelineReq::CachePow(cache_pow_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::CachePoW(cache_pow_req.into()))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// PipelineReq::CacheResult(cache_result_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::CacheResult(cache_result_req.into()))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// };
// responses.push(res);
// }
// Ok(Response::new(dcache::DcacheBatchResponse { responses }))
// }
2023-12-26 14:58:55 +05:30
async fn write(
&self,
request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let req = serde_json::from_str(&req.data).unwrap();
let res = self.app.raft.client_write(req).await;
Ok(Response::new(res.into()))
}
/// / Forward a request to other
async fn forward(
&self,
2023-12-26 15:13:41 +05:30
_request: tonic::Request<RaftRequest>,
2023-12-26 14:58:55 +05:30
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
unimplemented!();
}
async fn append_entries(
&self,
request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let req = serde_json::from_str(&req.data).unwrap();
let res = self.app.raft.append_entries(req).await;
Ok(Response::new(res.into()))
}
async fn install_snapshot(
&self,
request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let req = serde_json::from_str(&req.data).unwrap();
let res = self.app.raft.install_snapshot(req).await;
Ok(Response::new(res.into()))
}
async fn vote(
&self,
request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let req = serde_json::from_str(&req.data).unwrap();
let res = self.app.raft.vote(req).await;
Ok(Response::new(res.into()))
}
}
2023-12-26 15:13:41 +05:30
impl<T, E> From<RaftReply> for Result<T, E>
where
T: DeserializeOwned,
E: DeserializeOwned,
{
fn from(msg: RaftReply) -> Self {
if !msg.data.is_empty() {
let resp: T = serde_json::from_str(&msg.data).expect("fail to deserialize");
Ok(resp)
} else {
let err: E = serde_json::from_str(&msg.error).expect("fail to deserialize");
Err(err)
}
}
}
impl<T, E> From<Result<T, E>> for RaftReply
where
T: Serialize,
E: Serialize,
{
fn from(r: Result<T, E>) -> Self {
match r {
Ok(x) => {
let data = serde_json::to_string(&x).expect("fail to serialize");
RaftReply {
data,
error: Default::default(),
}
}
Err(e) => {
let error = serde_json::to_string(&e).expect("fail to serialize");
RaftReply {
data: Default::default(),
error,
}
}
}
}
}
2023-12-27 22:56:54 +05:30
impl From<dcache::AddCaptchaRequest> for MasterMessages::AddSite {
fn from(value: dcache::AddCaptchaRequest) -> Self {
let req_mcaptcha = value.mcaptcha.unwrap();
let mut defense = req_mcaptcha.defense.unwrap();
let mut new_defense = defense::DefenseBuilder::default();
for level in defense.levels.drain(0..) {
new_defense
.add_level(
defense::LevelBuilder::default()
.difficulty_factor(level.difficulty_factor)
.unwrap()
.visitor_threshold(level.visitor_threshold)
.build()
.unwrap(),
)
.unwrap();
}
let defense = new_defense.build().unwrap();
let mcaptcha = mcaptcha::MCaptchaBuilder::default()
.defense(defense)
.duration(req_mcaptcha.duration)
.build()
.unwrap();
Self {
id: value.id,
mcaptcha,
}
}
}
impl From<libmcaptcha::master::AddVisitorResult> for dcache::AddVisitorResult {
fn from(value: libmcaptcha::master::AddVisitorResult) -> Self {
Self {
duration: value.duration,
difficulty_factor: value.difficulty_factor,
}
}
}
impl From<dcache::RenameCaptchaRequest> for MasterMessages::Rename {
fn from(value: dcache::RenameCaptchaRequest) -> Self {
Self {
name: value.name,
rename_to: value.rename_to,
}
}
}
impl From<dcache::CachePowRequest> for CacheMessages::CachePoW {
fn from(value: dcache::CachePowRequest) -> Self {
Self {
string: value.string,
difficulty_factor: value.difficulty_factor,
duration: value.duration,
key: value.key,
}
}
}
impl From<dcache::CacheResultRequest> for CacheMessages::CacheResult {
fn from(value: dcache::CacheResultRequest) -> Self {
Self {
token: value.token,
key: value.key,
duration: value.duration,
}
}
}