feat: batch RPC
ci/woodpecker/push/woodpecker Pipeline failed Details

This commit is contained in:
Aravinth Manivannan 2023-12-29 15:58:36 +05:30
parent 0f5762536b
commit a288450721
Signed by: realaravinth
GPG Key ID: F8F50389936984FF
5 changed files with 158 additions and 287 deletions

View File

@ -14,7 +14,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x64\x63\x61\x63he.proto\x12\x06\x64\x63\x61\x63he\"?\n\x05Level\x12\x1a\n\x11visitor_threshold\x18\xad\x02 \x01(\r\x12\x1a\n\x11\x64ifficulty_factor\x18\xae\x02 \x01(\r\")\n\x07\x44\x65\x66\x65nse\x12\x1e\n\x06levels\x18\x91\x03 \x03(\x0b\x32\r.dcache.Level\"@\n\x08MCaptcha\x12\x11\n\x08\x64uration\x18\xf6\x03 \x01(\x04\x12!\n\x07\x64\x65\x66\x65nse\x18\xf7\x03 \x01(\x0b\x32\x0f.dcache.Defense\"E\n\x11\x41\x64\x64\x43\x61ptchaRequest\x12\x0b\n\x02id\x18\xd9\x04 \x01(\t\x12#\n\x08mcaptcha\x18\xda\x04 \x01(\x0b\x32\x10.dcache.MCaptcha\"9\n\x14RenameCaptchaRequest\x12\r\n\x04name\x18\xbd\x05 \x01(\t\x12\x12\n\trename_to\x18\xbe\x05 \x01(\t\"_\n\x0f\x43\x61\x63hePowRequest\x12\x0f\n\x06string\x18\xa1\x06 \x01(\t\x12\x1a\n\x11\x64ifficulty_factor\x18\xa2\x06 \x01(\r\x12\x11\n\x08\x64uration\x18\xa3\x06 \x01(\x04\x12\x0c\n\x03key\x18\xa4\x06 \x01(\t\"E\n\x12\x43\x61\x63heResultRequest\x12\x0e\n\x05token\x18\xb1\x06 \x01(\t\x12\x0c\n\x03key\x18\xb2\x06 \x01(\t\x12\x11\n\x08\x64uration\x18\xb3\x06 \x01(\x04\",\n\x1a\x44\x65leteCaptchaResultRequest\x12\x0e\n\x05token\x18\xb5\x06 \x01(\t\"\x17\n\tCaptchaID\x12\n\n\x02id\x18\x01 \x01(\t\"\x12\n\x04PoID\x12\n\n\x02id\x18\x01 \x01(\t\"A\n\x10\x41\x64\x64VisitorResult\x12\x11\n\x08\x64uration\x18\x85\x07 \x01(\x04\x12\x1a\n\x11\x64ifficulty_factor\x18\x86\x07 \x01(\r\"S\n\x16OptionAddVisitorResult\x12.\n\x06result\x18\x8f\x07 \x01(\x0b\x32\x18.dcache.AddVisitorResultH\x00\x88\x01\x01\x42\t\n\x07_result\"\x1b\n\x0bRaftRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\"(\n\tRaftReply\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\x12\r\n\x05\x65rror\x18\x02 \x01(\t\"#\n\x07Learner\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04\x61\x64\x64r\x18\x02 \x01(\t\"\xbd\x02\n\rDcacheRequest\x12/\n\naddCaptcha\x18\x01 \x01(\x0b\x32\x19.dcache.AddCaptchaRequestH\x00\x12\'\n\naddVisitor\x18\x02 \x01(\x0b\x32\x11.dcache.CaptchaIDH\x00\x12\x35\n\rrenameCaptcha\x18\x03 \x01(\x0b\x32\x1c.dcache.RenameCaptchaRequestH\x00\x12*\n\rremoveCaptcha\x18\x04 \x01(\x0b\x32\x11.dcache.CaptchaIDH\x00\x12+\n\x08\x63\x61\x63hePow\x18\x05 \x01(\x0b\x32\x17.dcache.CachePowRequestH\x00\x12\x31\n\x0b\x63\x61\x63heResult\x18\x06 \x01(\x0b\x32\x1a.dcache.CacheResultRequestH\x00\x42\x0f\n\rDcacheRequest\"\x8b\x01\n\x0e\x44\x63\x61\x63heResponse\x12\x43\n\x19option_add_visitor_result\x18\x01 \x01(\x0b\x32\x1e.dcache.OptionAddVisitorResultH\x00\x12\"\n\x05other\x18\x02 \x01(\x0b\x32\x11.dcache.RaftReplyH\x00\x42\x10\n\x0e\x44\x63\x61\x63heResponse2\xc7\x05\n\rDcacheService\x12<\n\nAddCaptcha\x12\x19.dcache.AddCaptchaRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x41\n\nAddVisitor\x12\x11.dcache.CaptchaID\x1a\x1e.dcache.OptionAddVisitorResult\"\x00\x12\x42\n\rRenameCaptcha\x12\x1c.dcache.RenameCaptchaRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x37\n\rRemoveCaptcha\x12\x11.dcache.CaptchaID\x1a\x11.dcache.RaftReply\"\x00\x12\x38\n\x08\x43\x61\x63hePow\x12\x17.dcache.CachePowRequest\x1a\x11.dcache.RaftReply\"\x00\x12>\n\x0b\x43\x61\x63heResult\x12\x1a.dcache.CacheResultRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x32\n\nAddLearner\x12\x0f.dcache.Learner\x1a\x11.dcache.RaftReply\"\x00\x12\x31\n\x05Write\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x33\n\x07\x46orward\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x37\n\rAppendEntries\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\x12\x39\n\x0fInstallSnapshot\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\x12.\n\x04vote\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReplyb\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x64\x63\x61\x63he.proto\x12\x06\x64\x63\x61\x63he\"?\n\x05Level\x12\x1a\n\x11visitor_threshold\x18\xad\x02 \x01(\r\x12\x1a\n\x11\x64ifficulty_factor\x18\xae\x02 \x01(\r\")\n\x07\x44\x65\x66\x65nse\x12\x1e\n\x06levels\x18\x91\x03 \x03(\x0b\x32\r.dcache.Level\"@\n\x08MCaptcha\x12\x11\n\x08\x64uration\x18\xf6\x03 \x01(\x04\x12!\n\x07\x64\x65\x66\x65nse\x18\xf7\x03 \x01(\x0b\x32\x0f.dcache.Defense\"E\n\x11\x41\x64\x64\x43\x61ptchaRequest\x12\x0b\n\x02id\x18\xd9\x04 \x01(\t\x12#\n\x08mcaptcha\x18\xda\x04 \x01(\x0b\x32\x10.dcache.MCaptcha\"9\n\x14RenameCaptchaRequest\x12\r\n\x04name\x18\xbd\x05 \x01(\t\x12\x12\n\trename_to\x18\xbe\x05 \x01(\t\"_\n\x0f\x43\x61\x63hePowRequest\x12\x0f\n\x06string\x18\xa1\x06 \x01(\t\x12\x1a\n\x11\x64ifficulty_factor\x18\xa2\x06 \x01(\r\x12\x11\n\x08\x64uration\x18\xa3\x06 \x01(\x04\x12\x0c\n\x03key\x18\xa4\x06 \x01(\t\"E\n\x12\x43\x61\x63heResultRequest\x12\x0e\n\x05token\x18\xb1\x06 \x01(\t\x12\x0c\n\x03key\x18\xb2\x06 \x01(\t\x12\x11\n\x08\x64uration\x18\xb3\x06 \x01(\x04\",\n\x1a\x44\x65leteCaptchaResultRequest\x12\x0e\n\x05token\x18\xb5\x06 \x01(\t\"\x17\n\tCaptchaID\x12\n\n\x02id\x18\x01 \x01(\t\"\x12\n\x04PoID\x12\n\n\x02id\x18\x01 \x01(\t\"A\n\x10\x41\x64\x64VisitorResult\x12\x11\n\x08\x64uration\x18\x85\x07 \x01(\x04\x12\x1a\n\x11\x64ifficulty_factor\x18\x86\x07 \x01(\r\"S\n\x16OptionAddVisitorResult\x12.\n\x06result\x18\x8f\x07 \x01(\x0b\x32\x18.dcache.AddVisitorResultH\x00\x88\x01\x01\x42\t\n\x07_result\"\x1b\n\x0bRaftRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\"(\n\tRaftReply\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\x12\r\n\x05\x65rror\x18\x02 \x01(\t\"#\n\x07Learner\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04\x61\x64\x64r\x18\x02 \x01(\t\"\xbd\x02\n\rDcacheRequest\x12/\n\naddCaptcha\x18\x01 \x01(\x0b\x32\x19.dcache.AddCaptchaRequestH\x00\x12\'\n\naddVisitor\x18\x02 \x01(\x0b\x32\x11.dcache.CaptchaIDH\x00\x12\x35\n\rrenameCaptcha\x18\x03 \x01(\x0b\x32\x1c.dcache.RenameCaptchaRequestH\x00\x12*\n\rremoveCaptcha\x18\x04 \x01(\x0b\x32\x11.dcache.CaptchaIDH\x00\x12+\n\x08\x63\x61\x63hePow\x18\x05 \x01(\x0b\x32\x17.dcache.CachePowRequestH\x00\x12\x31\n\x0b\x63\x61\x63heResult\x18\x06 \x01(\x0b\x32\x1a.dcache.CacheResultRequestH\x00\x42\x0f\n\rDcacheRequest\"\x8b\x01\n\x0e\x44\x63\x61\x63heResponse\x12\x43\n\x19option_add_visitor_result\x18\x01 \x01(\x0b\x32\x1e.dcache.OptionAddVisitorResultH\x00\x12\"\n\x05other\x18\x02 \x01(\x0b\x32\x11.dcache.RaftReplyH\x00\x42\x10\n\x0e\x44\x63\x61\x63heResponse\"=\n\x12\x44\x63\x61\x63heBatchRequest\x12\'\n\x08requests\x18\x01 \x03(\x0b\x32\x15.dcache.DcacheRequest\"@\n\x13\x44\x63\x61\x63heBatchResponse\x12)\n\tresponses\x18\x01 \x03(\x0b\x32\x16.dcache.DcacheResponse2\x97\x06\n\rDcacheService\x12<\n\nAddCaptcha\x12\x19.dcache.AddCaptchaRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x41\n\nAddVisitor\x12\x11.dcache.CaptchaID\x1a\x1e.dcache.OptionAddVisitorResult\"\x00\x12\x42\n\rRenameCaptcha\x12\x1c.dcache.RenameCaptchaRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x37\n\rRemoveCaptcha\x12\x11.dcache.CaptchaID\x1a\x11.dcache.RaftReply\"\x00\x12\x38\n\x08\x43\x61\x63hePow\x12\x17.dcache.CachePowRequest\x1a\x11.dcache.RaftReply\"\x00\x12>\n\x0b\x43\x61\x63heResult\x12\x1a.dcache.CacheResultRequest\x1a\x11.dcache.RaftReply\"\x00\x12N\n\x11PipelineDcacheOps\x12\x1a.dcache.DcacheBatchRequest\x1a\x1b.dcache.DcacheBatchResponse\"\x00\x12\x32\n\nAddLearner\x12\x0f.dcache.Learner\x1a\x11.dcache.RaftReply\"\x00\x12\x31\n\x05Write\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x33\n\x07\x46orward\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x37\n\rAppendEntries\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\x12\x39\n\x0fInstallSnapshot\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\x12.\n\x04vote\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReplyb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@ -55,6 +55,10 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_globals['_DCACHEREQUEST']._serialized_end=1165
_globals['_DCACHERESPONSE']._serialized_start=1168
_globals['_DCACHERESPONSE']._serialized_end=1307
_globals['_DCACHESERVICE']._serialized_start=1310
_globals['_DCACHESERVICE']._serialized_end=2021
_globals['_DCACHEBATCHREQUEST']._serialized_start=1309
_globals['_DCACHEBATCHREQUEST']._serialized_end=1370
_globals['_DCACHEBATCHRESPONSE']._serialized_start=1372
_globals['_DCACHEBATCHRESPONSE']._serialized_end=1436
_globals['_DCACHESERVICE']._serialized_start=1439
_globals['_DCACHESERVICE']._serialized_end=2230
# @@protoc_insertion_point(module_scope)

View File

@ -142,3 +142,15 @@ class DcacheResponse(_message.Message):
option_add_visitor_result: OptionAddVisitorResult
other: RaftReply
def __init__(self, option_add_visitor_result: _Optional[_Union[OptionAddVisitorResult, _Mapping]] = ..., other: _Optional[_Union[RaftReply, _Mapping]] = ...) -> None: ...
class DcacheBatchRequest(_message.Message):
__slots__ = ("requests",)
REQUESTS_FIELD_NUMBER: _ClassVar[int]
requests: _containers.RepeatedCompositeFieldContainer[DcacheRequest]
def __init__(self, requests: _Optional[_Iterable[_Union[DcacheRequest, _Mapping]]] = ...) -> None: ...
class DcacheBatchResponse(_message.Message):
__slots__ = ("responses",)
RESPONSES_FIELD_NUMBER: _ClassVar[int]
responses: _containers.RepeatedCompositeFieldContainer[DcacheResponse]
def __init__(self, responses: _Optional[_Iterable[_Union[DcacheResponse, _Mapping]]] = ...) -> None: ...

View File

@ -2,19 +2,11 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import dcache_py.dcache_pb2 as dcache__pb2
import dcache_pb2 as dcache__pb2
class DcacheServiceStub(object):
"""message DcacheBatchRequest {
repeated DcacheRequest requests = 1;
}
message DcacheBatchResponse {
repeated DcacheResponse responses = 1;
}
"""
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
@ -52,6 +44,11 @@ class DcacheServiceStub(object):
request_serializer=dcache__pb2.CacheResultRequest.SerializeToString,
response_deserializer=dcache__pb2.RaftReply.FromString,
)
self.PipelineDcacheOps = channel.unary_unary(
'/dcache.DcacheService/PipelineDcacheOps',
request_serializer=dcache__pb2.DcacheBatchRequest.SerializeToString,
response_deserializer=dcache__pb2.DcacheBatchResponse.FromString,
)
self.AddLearner = channel.unary_unary(
'/dcache.DcacheService/AddLearner',
request_serializer=dcache__pb2.Learner.SerializeToString,
@ -85,15 +82,7 @@ class DcacheServiceStub(object):
class DcacheServiceServicer(object):
"""message DcacheBatchRequest {
repeated DcacheRequest requests = 1;
}
message DcacheBatchResponse {
repeated DcacheResponse responses = 1;
}
"""
"""Missing associated documentation comment in .proto file."""
def AddCaptcha(self, request, context):
"""Missing associated documentation comment in .proto file."""
@ -131,10 +120,14 @@ class DcacheServiceServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def AddLearner(self, request, context):
""" rpc PipelineDcacheOps(DcacheBatchRequest) returns (DcacheBatchResponse) {}
def PipelineDcacheOps(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
"""
def AddLearner(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
@ -205,6 +198,11 @@ def add_DcacheServiceServicer_to_server(servicer, server):
request_deserializer=dcache__pb2.CacheResultRequest.FromString,
response_serializer=dcache__pb2.RaftReply.SerializeToString,
),
'PipelineDcacheOps': grpc.unary_unary_rpc_method_handler(
servicer.PipelineDcacheOps,
request_deserializer=dcache__pb2.DcacheBatchRequest.FromString,
response_serializer=dcache__pb2.DcacheBatchResponse.SerializeToString,
),
'AddLearner': grpc.unary_unary_rpc_method_handler(
servicer.AddLearner,
request_deserializer=dcache__pb2.Learner.FromString,
@ -243,15 +241,7 @@ def add_DcacheServiceServicer_to_server(servicer, server):
# This class is part of an EXPERIMENTAL API.
class DcacheService(object):
"""message DcacheBatchRequest {
repeated DcacheRequest requests = 1;
}
message DcacheBatchResponse {
repeated DcacheResponse responses = 1;
}
"""
"""Missing associated documentation comment in .proto file."""
@staticmethod
def AddCaptcha(request,
@ -355,6 +345,23 @@ class DcacheService(object):
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def PipelineDcacheOps(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/dcache.DcacheService/PipelineDcacheOps',
dcache__pb2.DcacheBatchRequest.SerializeToString,
dcache__pb2.DcacheBatchResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def AddLearner(request,
target,

View File

@ -2,13 +2,6 @@ syntax = "proto3";
package dcache;
//message GetRequest { string key = 1; }
//
//message GetReply {
// bool ok = 1;
// string value = 2;
//}
message Level {
uint32 visitor_threshold = 301;
uint32 difficulty_factor= 302;
@ -69,13 +62,6 @@ message OptionAddVisitorResult {
optional AddVisitorResult result = 911;
}
//message DcacheResponse {
// oneof DcacheResponse {
// OptionAddVisitorResult option_add_visitor_result = 1;
// uint32 empty = 2;
// }
//}
message RaftRequest {
string data = 1;
}
@ -84,28 +70,13 @@ message RaftReply {
string data = 1;
string error = 2;
}
//
//message HandshakeRequest {
// uint64 protocol_version = 1;
// bytes payload = 2;
//}
//
//message HandshakeResponse {
// uint64 protocol_version = 1;
// bytes payload = 2;
//}
message Learner {
uint64 id = 1;
string addr = 2;
}
//message AddVisitorRequest {
// string id = 1;
//}
message DcacheRequest {
oneof DcacheRequest {
AddCaptchaRequest addCaptcha = 1;
@ -125,13 +96,13 @@ message DcacheResponse {
}
}
//message DcacheBatchRequest {
// repeated DcacheRequest requests = 1;
//}
//
//message DcacheBatchResponse {
// repeated DcacheResponse responses = 1;
//}
message DcacheBatchRequest {
repeated DcacheRequest requests = 1;
}
message DcacheBatchResponse {
repeated DcacheResponse responses = 1;
}
service DcacheService {
rpc AddCaptcha(AddCaptchaRequest) returns (RaftReply) {}
@ -141,7 +112,7 @@ service DcacheService {
rpc CachePow(CachePowRequest) returns (RaftReply) {}
rpc CacheResult(CacheResultRequest) returns (RaftReply) {}
// rpc PipelineDcacheOps(DcacheBatchRequest) returns (DcacheBatchResponse) {}
rpc PipelineDcacheOps(DcacheBatchRequest) returns (DcacheBatchResponse) {}
rpc AddLearner(Learner) returns (RaftReply) {}
@ -156,12 +127,3 @@ service DcacheService {
rpc InstallSnapshot(RaftRequest) returns (RaftReply);
rpc vote(RaftRequest) returns (RaftReply);
}
//service Dcache {
// // handshake
// rpc Handshake(HandshakeRequest) returns (HandshakeResponse);
//
// // message
// rpc WriteMsg(RaftRequest) returns (RaftReply);
//// rpc ReadMsg(GetRequest) returns (GetReply);
//}

View File

@ -150,214 +150,100 @@ impl DcacheService for MyDcacheImpl {
// request: tonic::Request<tonic::Streaming<dcache::DcacheRequest>>,
// ) -> std::result::Result<tonic::Response<Self::PipelineDcacheOpsStream>, tonic::Status> {
//// async fn pipeline_dcache_ops(
//// &self,
//// request: tonic::Request<dcache::DcacheBatchRequest>,
//// ) -> Result<Response<dcache::DcacheBatchResponse>, tonic::Status> {
//// let mut reqs = request.into_inner();
//// let mut responses = Vec::with_capacity(reqs.requests.len());
//// for req in reqs.requests.drain(0..) {
//// let res = match req.dcache_request.unwrap() {
//// PipelineReq::AddCaptcha(add_captcha_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::AddCaptcha(add_captcha_req.into()))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// PipelineReq::AddVisitor(add_visitor_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor(
//// add_visitor_req.id,
//// )))
//// .await;
//// match res {
//// Err(_) => OuterPipelineRes {
//// dcache_response: None,
//// },
//// Ok(res) => match res.data {
//// DcacheResponse::AddVisitorResult(res) => {
//// let res = dcache::OptionAddVisitorResult {
//// result: res.map(|f| f.into()),
//// };
//// OuterPipelineRes {
//// dcache_response: Some(
//// InnerPipelineRes::OptionAddVisitorResult(res),
//// ),
//// }
//// }
////
//// _ => unimplemented!(),
//// },
//// }
//// // if res.is_err() {
//// // OuterPipelineRes {
//// // dcache_response: None,
//// // }
//// // } else {
//// // let res = res.unwrap();
//// // match res.data {
//// // DcacheResponse::AddVisitorResult(res) => {
//// // let res = dcache::OptionAddVisitorResult {
//// // result: res.map(|f| f.into()),
//// // };
//// // OuterPipelineRes {
//// // dcache_response: Some(
//// // InnerPipelineRes::OptionAddVisitorResult(res),
//// // ),
//// // }
//// // }
//// //
//// // _ => unimplemented!(),
//// // }
//// // }
//// }
//// PipelineReq::RenameCaptcha(rename_captcha_req) => {
//// // let x: u8 = rename_visitor_req;
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::RenameCaptcha(rename_captcha_req.into()))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// PipelineReq::RemoveCaptcha(remove_captcha_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha(
//// remove_captcha_req.id,
//// )))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// PipelineReq::CachePow(cache_pow_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::CachePoW(cache_pow_req.into()))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// PipelineReq::CacheResult(cache_result_req) => {
//// let res = self
//// .app
//// .raft
//// .client_write(DcacheRequest::CacheResult(cache_result_req.into()))
//// .await;
//// OuterPipelineRes {
//// dcache_response: Some(InnerPipelineRes::Other(res.into())),
//// }
//// }
//// };
//// responses.push(res);
//// }
//// Ok(Response::new(dcache::DcacheBatchResponse { responses }))
//// }
async fn pipeline_dcache_ops(
&self,
request: tonic::Request<dcache::DcacheBatchRequest>,
) -> Result<Response<dcache::DcacheBatchResponse>, tonic::Status> {
let mut reqs = request.into_inner();
let mut responses = Vec::with_capacity(reqs.requests.len());
for req in reqs.requests.drain(0..) {
let res = match req.dcache_request.unwrap() {
PipelineReq::AddCaptcha(add_captcha_req) => {
let res = self
.app
.raft
.client_write(DcacheRequest::AddCaptcha(add_captcha_req.into()))
.await;
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
}
}
PipelineReq::AddVisitor(add_visitor_req) => {
let res = self
.app
.raft
.client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor(
add_visitor_req.id,
)))
.await;
match res {
Err(_) => OuterPipelineRes {
dcache_response: None,
},
Ok(res) => match res.data {
DcacheResponse::AddVisitorResult(res) => {
let res = dcache::OptionAddVisitorResult {
result: res.map(|f| f.into()),
};
OuterPipelineRes {
dcache_response: Some(
InnerPipelineRes::OptionAddVisitorResult(res),
),
}
}
// let mut reqs = request.into_inner().requests;
// let mut responses = Vec::with_capacity(reqs.len());
// for req in reqs.drain(0..) {
// let res = match req.dcache_request.unwrap() {
// PipelineReq::AddCaptcha(add_captcha_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::AddCaptcha(add_captcha_req.into()))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// PipelineReq::AddVisitor(add_visitor_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor(
// add_visitor_req.id,
// )))
// .await
// .map_err(|e| {
// tonic::Status::new(
// tonic::Code::Internal,
// serde_json::to_string(&e).unwrap(),
// )
// })?;
// match res.data {
// DcacheResponse::AddVisitorResult(res) => {
// let res = dcache::OptionAddVisitorResult {
// result: res.map(|f| f.into()),
// };
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::OptionAddVisitorResult(
// res,
// )),
// }
// }
//
// _ => unimplemented!(),
// }
// }
// PipelineReq::RenameCaptcha(rename_captcha_req) => {
// // let x: u8 = rename_visitor_req;
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::RenameCaptcha(rename_captcha_req.into()))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// PipelineReq::RemoveCaptcha(remove_captcha_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha(
// remove_captcha_req.id,
// )))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// PipelineReq::CachePow(cache_pow_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::CachePoW(cache_pow_req.into()))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// PipelineReq::CacheResult(cache_result_req) => {
// let res = self
// .app
// .raft
// .client_write(DcacheRequest::CacheResult(cache_result_req.into()))
// .await;
// OuterPipelineRes {
// dcache_response: Some(InnerPipelineRes::Other(res.into())),
// }
// }
// };
// responses.push(res);
// }
_ => unimplemented!(),
},
}
// Ok(Response::new(dcache::DcacheBatchResponse { responses }))
// }
}
PipelineReq::RenameCaptcha(rename_captcha_req) => {
let res = self
.app
.raft
.client_write(DcacheRequest::RenameCaptcha(rename_captcha_req.into()))
.await;
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
}
}
PipelineReq::RemoveCaptcha(remove_captcha_req) => {
let res = self
.app
.raft
.client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha(
remove_captcha_req.id,
)))
.await;
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
}
}
PipelineReq::CachePow(cache_pow_req) => {
let res = self
.app
.raft
.client_write(DcacheRequest::CachePoW(cache_pow_req.into()))
.await;
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
}
}
PipelineReq::CacheResult(cache_result_req) => {
let res = self
.app
.raft
.client_write(DcacheRequest::CacheResult(cache_result_req.into()))
.await;
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
}
}
};
responses.push(res);
}
Ok(Response::new(dcache::DcacheBatchResponse { responses }))
}
async fn write(
&self,