Aravinth Manivannan a288450721
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
feat: batch RPC
2023-12-29 15:58:36 +05:30

403 lines
13 KiB

use std::sync::Arc;
use libmcaptcha::cache::messages as CacheMessages;
use libmcaptcha::defense;
use libmcaptcha::master::messages as MasterMessages;
use libmcaptcha::mcaptcha;
use openraft::BasicNode;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tonic::Response;
use dcache::dcache_request::DcacheRequest as PipelineReq;
use dcache::dcache_response::DcacheResponse as InnerPipelineRes;
use dcache::dcache_service_server::DcacheService;
use dcache::DcacheResponse as OuterPipelineRes;
use dcache::{Learner, RaftReply, RaftRequest};
use crate::app::DcacheApp;
use crate::store::{DcacheRequest, DcacheResponse};
pub mod dcache {
tonic::include_proto!("dcache"); // The string specified here must match the proto package name
pub struct MyDcacheImpl {
app: Arc<DcacheApp>,
impl MyDcacheImpl {
pub fn new(app: Arc<DcacheApp>) -> Self {
Self { app }
impl DcacheService for MyDcacheImpl {
async fn add_learner(
request: tonic::Request<Learner>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let node_id = req.id;
let node = BasicNode {
addr: req.addr.clone(),
println!("Learner added: {:?}", &req.addr);
let res = self.app.raft.add_learner(node_id, node, true).await;
async fn add_captcha(
request: tonic::Request<dcache::AddCaptchaRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
async fn add_visitor(
request: tonic::Request<dcache::CaptchaId>,
) -> std::result::Result<tonic::Response<dcache::OptionAddVisitorResult>, tonic::Status> {
let req = request.into_inner();
let res = self
.map_err(|e| {
tonic::Status::new(tonic::Code::Internal, serde_json::to_string(&e).unwrap())
match res.data {
DcacheResponse::AddVisitorResult(res) => {
Ok(Response::new(dcache::OptionAddVisitorResult {
result: res.map(|f| f.into()),
_ => unimplemented!(),
async fn rename_captcha(
request: tonic::Request<dcache::RenameCaptchaRequest>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
async fn remove_captcha(
request: tonic::Request<dcache::CaptchaId>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
async fn cache_pow(
request: tonic::Request<dcache::CachePowRequest>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
async fn cache_result(
request: tonic::Request<dcache::CacheResultRequest>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
// type PipelineDcacheOpsStream =
// Pin<Box<dyn Stream<Item = Result<OuterPipelineRes, tonic::Status>> + Send + 'static>>;
// async fn pipeline_dcache_ops(
// &self,
// request: tonic::Request<tonic::Streaming<dcache::DcacheRequest>>,
// ) -> std::result::Result<tonic::Response<Self::PipelineDcacheOpsStream>, tonic::Status> {
async fn pipeline_dcache_ops(
request: tonic::Request<dcache::DcacheBatchRequest>,
) -> Result<Response<dcache::DcacheBatchResponse>, tonic::Status> {
let mut reqs = request.into_inner();
let mut responses = Vec::with_capacity(reqs.requests.len());
for req in reqs.requests.drain(0..) {
let res = match req.dcache_request.unwrap() {
PipelineReq::AddCaptcha(add_captcha_req) => {
let res = self
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
PipelineReq::AddVisitor(add_visitor_req) => {
let res = self
match res {
Err(_) => OuterPipelineRes {
dcache_response: None,
Ok(res) => match res.data {
DcacheResponse::AddVisitorResult(res) => {
let res = dcache::OptionAddVisitorResult {
result: res.map(|f| f.into()),
OuterPipelineRes {
dcache_response: Some(
_ => unimplemented!(),
PipelineReq::RenameCaptcha(rename_captcha_req) => {
let res = self
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
PipelineReq::RemoveCaptcha(remove_captcha_req) => {
let res = self
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
PipelineReq::CachePow(cache_pow_req) => {
let res = self
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
PipelineReq::CacheResult(cache_result_req) => {
let res = self
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
Ok(Response::new(dcache::DcacheBatchResponse { responses }))
async fn write(
request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let req = serde_json::from_str(&req.data).unwrap();
let res = self.app.raft.client_write(req).await;
/// / Forward a request to other
async fn forward(
_request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
async fn append_entries(
request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let req = serde_json::from_str(&req.data).unwrap();
let res = self.app.raft.append_entries(req).await;
async fn install_snapshot(
request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let req = serde_json::from_str(&req.data).unwrap();
let res = self.app.raft.install_snapshot(req).await;
async fn vote(
request: tonic::Request<RaftRequest>,
) -> std::result::Result<tonic::Response<RaftReply>, tonic::Status> {
let req = request.into_inner();
let req = serde_json::from_str(&req.data).unwrap();
let res = self.app.raft.vote(req).await;
impl<T, E> From<RaftReply> for Result<T, E>
T: DeserializeOwned,
E: DeserializeOwned,
fn from(msg: RaftReply) -> Self {
if !msg.data.is_empty() {
let resp: T = serde_json::from_str(&msg.data).expect("fail to deserialize");
} else {
let err: E = serde_json::from_str(&msg.error).expect("fail to deserialize");
impl<T, E> From<Result<T, E>> for RaftReply
T: Serialize,
E: Serialize,
fn from(r: Result<T, E>) -> Self {
match r {
Ok(x) => {
let data = serde_json::to_string(&x).expect("fail to serialize");
RaftReply {
error: Default::default(),
Err(e) => {
let error = serde_json::to_string(&e).expect("fail to serialize");
RaftReply {
data: Default::default(),
impl From<dcache::AddCaptchaRequest> for MasterMessages::AddSite {
fn from(value: dcache::AddCaptchaRequest) -> Self {
let req_mcaptcha = value.mcaptcha.unwrap();
let mut defense = req_mcaptcha.defense.unwrap();
let mut new_defense = defense::DefenseBuilder::default();
for level in defense.levels.drain(0..) {
let defense = new_defense.build().unwrap();
let mcaptcha = mcaptcha::MCaptchaBuilder::default()
Self {
id: value.id,
impl From<libmcaptcha::master::AddVisitorResult> for dcache::AddVisitorResult {
fn from(value: libmcaptcha::master::AddVisitorResult) -> Self {
Self {
duration: value.duration,
difficulty_factor: value.difficulty_factor,
impl From<dcache::RenameCaptchaRequest> for MasterMessages::Rename {
fn from(value: dcache::RenameCaptchaRequest) -> Self {
Self {
name: value.name,
rename_to: value.rename_to,
impl From<dcache::CachePowRequest> for CacheMessages::CachePoW {
fn from(value: dcache::CachePowRequest) -> Self {
Self {
string: value.string,
difficulty_factor: value.difficulty_factor,
duration: value.duration,
key: value.key,
impl From<dcache::CacheResultRequest> for CacheMessages::CacheResult {
fn from(value: dcache::CacheResultRequest) -> Self {
Self {
token: value.token,
key: value.key,
duration: value.duration,