From 0f5762536ba8e6a2bc1d73328d53cb11dfb39d92 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Thu, 28 Dec 2023 19:15:07 +0530 Subject: [PATCH] debug: pipeline dump --- bench/locustfile.py | 57 ++++++++- dcache_py/dcache_pb2.py | 10 +- dcache_py/dcache_pb2.pyi | 24 ++++ dcache_py/dcache_pb2_grpc.py | 34 +++++- proto/dcache/dcache.proto | 55 +++++---- src/protobuf.rs | 220 +++++++++++++++++++++++++++++++++++ test.py | 24 +++- 7 files changed, 391 insertions(+), 33 deletions(-) diff --git a/bench/locustfile.py b/bench/locustfile.py index c846dfc..cd688f6 100644 --- a/bench/locustfile.py +++ b/bench/locustfile.py @@ -1,4 +1,5 @@ import json +import time import grpc from pprint import pprint from locust import FastHttpUser, between, task @@ -130,23 +131,69 @@ with grpc.insecure_channel(host) as channel: add_captcha(stub=stub, captcha_id=captcha_id) +#pipeline_msgs = [] +#for _ in range(0,1000): +# pipeline_msgs.append(dcache.DcacheRequest(addVisitor=dcache.CaptchaID(id=captcha_id))) +#pipeline_msgs = dcache.DcacheBatchRequest(requests=pipeline_msgs) + +#def pipeline_generate_messages(): +# for msg in pipeline_msgs: +# yield msg + + +#import threading +#threadLock = threading.Lock() +#global_counter = 0 + +def nothing(r): + pass + + class HelloGrpcUser(adaptor.GrpcUser): stub_class = DcacheServiceStub host = host captcha_id = captcha_id - msg = RaftRequest(data=json.dumps({"AddVisitor": captcha_id})) + msg = dcache.CaptchaID(id=captcha_id) +# pipeline_msgs = [] +# for _ in range(0,10): +# pipeline_msgs.append(dcache.DcacheRequest(addVisitor=dcache.CaptchaID(id=captcha_id))) +# pipeline_msgs = dcache.DcacheBatchRequest(requests=pipeline_msgs) + + +# pipeline_vote = [] +# for _ in range(0,1000): +# pipeline_vote.append( +# dcache.DcacheRequest(addVisitor=dcache.CaptchaID(id=captcha_id)) +# ) + + + def add_vote(self, captcha_id: str): - resp = self.stub.Write(self.msg) + resp = self.stub.AddVisitor(self.msg) - # def add_vote_pipeline(self, captcha_id: str): - # resp = self.pipeline_write(data=self.pipeline_vote) - # pprint(resp) +# def add_vote_pipeline(self): +# with threadLock: +# global global_counter +# global_counter += 1 +# print(f"count: {global_counter}") + +# res = self.stub.PipelineDcacheOps(pipeline_msgs) +# for r in res.responses: +# print(r.option_add_visitor_result.result.duration) +# resp = self.pipeline_write(data=self.pipeline_vote) +# #pprint(resp) @task def addVote(self): self.add_vote(self.captcha_id) +# @task +# def addVotePipeline(self): +# self.add_vote_pipeline() + #self.add_vote(self.captcha_id) + #self.stub.PipelineDcacheOps(pipeline_generate_messages()): + # msg = dcache.CaptchaID(id=captcha_id) # resp = self.stub.AddVisitor(msg) diff --git a/dcache_py/dcache_pb2.py b/dcache_py/dcache_pb2.py index 8d7300a..4cd82ab 100644 --- a/dcache_py/dcache_pb2.py +++ b/dcache_py/dcache_pb2.py @@ -14,7 +14,7 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x64\x63\x61\x63he.proto\x12\x06\x64\x63\x61\x63he\"?\n\x05Level\x12\x1a\n\x11visitor_threshold\x18\xad\x02 \x01(\r\x12\x1a\n\x11\x64ifficulty_factor\x18\xae\x02 \x01(\r\")\n\x07\x44\x65\x66\x65nse\x12\x1e\n\x06levels\x18\x91\x03 \x03(\x0b\x32\r.dcache.Level\"@\n\x08MCaptcha\x12\x11\n\x08\x64uration\x18\xf6\x03 \x01(\x04\x12!\n\x07\x64\x65\x66\x65nse\x18\xf7\x03 \x01(\x0b\x32\x0f.dcache.Defense\"E\n\x11\x41\x64\x64\x43\x61ptchaRequest\x12\x0b\n\x02id\x18\xd9\x04 \x01(\t\x12#\n\x08mcaptcha\x18\xda\x04 \x01(\x0b\x32\x10.dcache.MCaptcha\"9\n\x14RenameCaptchaRequest\x12\r\n\x04name\x18\xbd\x05 \x01(\t\x12\x12\n\trename_to\x18\xbe\x05 \x01(\t\"_\n\x0f\x43\x61\x63hePowRequest\x12\x0f\n\x06string\x18\xa1\x06 \x01(\t\x12\x1a\n\x11\x64ifficulty_factor\x18\xa2\x06 \x01(\r\x12\x11\n\x08\x64uration\x18\xa3\x06 \x01(\x04\x12\x0c\n\x03key\x18\xa4\x06 \x01(\t\"E\n\x12\x43\x61\x63heResultRequest\x12\x0e\n\x05token\x18\xb1\x06 \x01(\t\x12\x0c\n\x03key\x18\xb2\x06 \x01(\t\x12\x11\n\x08\x64uration\x18\xb3\x06 \x01(\x04\",\n\x1a\x44\x65leteCaptchaResultRequest\x12\x0e\n\x05token\x18\xb5\x06 \x01(\t\"\x17\n\tCaptchaID\x12\n\n\x02id\x18\x01 \x01(\t\"\x12\n\x04PoID\x12\n\n\x02id\x18\x01 \x01(\t\"A\n\x10\x41\x64\x64VisitorResult\x12\x11\n\x08\x64uration\x18\x85\x07 \x01(\x04\x12\x1a\n\x11\x64ifficulty_factor\x18\x86\x07 \x01(\r\"S\n\x16OptionAddVisitorResult\x12.\n\x06result\x18\x8f\x07 \x01(\x0b\x32\x18.dcache.AddVisitorResultH\x00\x88\x01\x01\x42\t\n\x07_result\"\x1b\n\x0bRaftRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\"(\n\tRaftReply\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\x12\r\n\x05\x65rror\x18\x02 \x01(\t\"#\n\x07Learner\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04\x61\x64\x64r\x18\x02 \x01(\t2\xc7\x05\n\rDcacheService\x12<\n\nAddCaptcha\x12\x19.dcache.AddCaptchaRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x41\n\nAddVisitor\x12\x11.dcache.CaptchaID\x1a\x1e.dcache.OptionAddVisitorResult\"\x00\x12\x42\n\rRenameCaptcha\x12\x1c.dcache.RenameCaptchaRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x37\n\rRemoveCaptcha\x12\x11.dcache.CaptchaID\x1a\x11.dcache.RaftReply\"\x00\x12\x38\n\x08\x43\x61\x63hePow\x12\x17.dcache.CachePowRequest\x1a\x11.dcache.RaftReply\"\x00\x12>\n\x0b\x43\x61\x63heResult\x12\x1a.dcache.CacheResultRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x32\n\nAddLearner\x12\x0f.dcache.Learner\x1a\x11.dcache.RaftReply\"\x00\x12\x31\n\x05Write\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x33\n\x07\x46orward\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x37\n\rAppendEntries\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\x12\x39\n\x0fInstallSnapshot\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\x12.\n\x04vote\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReplyb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x64\x63\x61\x63he.proto\x12\x06\x64\x63\x61\x63he\"?\n\x05Level\x12\x1a\n\x11visitor_threshold\x18\xad\x02 \x01(\r\x12\x1a\n\x11\x64ifficulty_factor\x18\xae\x02 \x01(\r\")\n\x07\x44\x65\x66\x65nse\x12\x1e\n\x06levels\x18\x91\x03 \x03(\x0b\x32\r.dcache.Level\"@\n\x08MCaptcha\x12\x11\n\x08\x64uration\x18\xf6\x03 \x01(\x04\x12!\n\x07\x64\x65\x66\x65nse\x18\xf7\x03 \x01(\x0b\x32\x0f.dcache.Defense\"E\n\x11\x41\x64\x64\x43\x61ptchaRequest\x12\x0b\n\x02id\x18\xd9\x04 \x01(\t\x12#\n\x08mcaptcha\x18\xda\x04 \x01(\x0b\x32\x10.dcache.MCaptcha\"9\n\x14RenameCaptchaRequest\x12\r\n\x04name\x18\xbd\x05 \x01(\t\x12\x12\n\trename_to\x18\xbe\x05 \x01(\t\"_\n\x0f\x43\x61\x63hePowRequest\x12\x0f\n\x06string\x18\xa1\x06 \x01(\t\x12\x1a\n\x11\x64ifficulty_factor\x18\xa2\x06 \x01(\r\x12\x11\n\x08\x64uration\x18\xa3\x06 \x01(\x04\x12\x0c\n\x03key\x18\xa4\x06 \x01(\t\"E\n\x12\x43\x61\x63heResultRequest\x12\x0e\n\x05token\x18\xb1\x06 \x01(\t\x12\x0c\n\x03key\x18\xb2\x06 \x01(\t\x12\x11\n\x08\x64uration\x18\xb3\x06 \x01(\x04\",\n\x1a\x44\x65leteCaptchaResultRequest\x12\x0e\n\x05token\x18\xb5\x06 \x01(\t\"\x17\n\tCaptchaID\x12\n\n\x02id\x18\x01 \x01(\t\"\x12\n\x04PoID\x12\n\n\x02id\x18\x01 \x01(\t\"A\n\x10\x41\x64\x64VisitorResult\x12\x11\n\x08\x64uration\x18\x85\x07 \x01(\x04\x12\x1a\n\x11\x64ifficulty_factor\x18\x86\x07 \x01(\r\"S\n\x16OptionAddVisitorResult\x12.\n\x06result\x18\x8f\x07 \x01(\x0b\x32\x18.dcache.AddVisitorResultH\x00\x88\x01\x01\x42\t\n\x07_result\"\x1b\n\x0bRaftRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\"(\n\tRaftReply\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\t\x12\r\n\x05\x65rror\x18\x02 \x01(\t\"#\n\x07Learner\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04\x61\x64\x64r\x18\x02 \x01(\t\"\xbd\x02\n\rDcacheRequest\x12/\n\naddCaptcha\x18\x01 \x01(\x0b\x32\x19.dcache.AddCaptchaRequestH\x00\x12\'\n\naddVisitor\x18\x02 \x01(\x0b\x32\x11.dcache.CaptchaIDH\x00\x12\x35\n\rrenameCaptcha\x18\x03 \x01(\x0b\x32\x1c.dcache.RenameCaptchaRequestH\x00\x12*\n\rremoveCaptcha\x18\x04 \x01(\x0b\x32\x11.dcache.CaptchaIDH\x00\x12+\n\x08\x63\x61\x63hePow\x18\x05 \x01(\x0b\x32\x17.dcache.CachePowRequestH\x00\x12\x31\n\x0b\x63\x61\x63heResult\x18\x06 \x01(\x0b\x32\x1a.dcache.CacheResultRequestH\x00\x42\x0f\n\rDcacheRequest\"\x8b\x01\n\x0e\x44\x63\x61\x63heResponse\x12\x43\n\x19option_add_visitor_result\x18\x01 \x01(\x0b\x32\x1e.dcache.OptionAddVisitorResultH\x00\x12\"\n\x05other\x18\x02 \x01(\x0b\x32\x11.dcache.RaftReplyH\x00\x42\x10\n\x0e\x44\x63\x61\x63heResponse2\xc7\x05\n\rDcacheService\x12<\n\nAddCaptcha\x12\x19.dcache.AddCaptchaRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x41\n\nAddVisitor\x12\x11.dcache.CaptchaID\x1a\x1e.dcache.OptionAddVisitorResult\"\x00\x12\x42\n\rRenameCaptcha\x12\x1c.dcache.RenameCaptchaRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x37\n\rRemoveCaptcha\x12\x11.dcache.CaptchaID\x1a\x11.dcache.RaftReply\"\x00\x12\x38\n\x08\x43\x61\x63hePow\x12\x17.dcache.CachePowRequest\x1a\x11.dcache.RaftReply\"\x00\x12>\n\x0b\x43\x61\x63heResult\x12\x1a.dcache.CacheResultRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x32\n\nAddLearner\x12\x0f.dcache.Learner\x1a\x11.dcache.RaftReply\"\x00\x12\x31\n\x05Write\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x33\n\x07\x46orward\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\"\x00\x12\x37\n\rAppendEntries\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\x12\x39\n\x0fInstallSnapshot\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReply\x12.\n\x04vote\x12\x13.dcache.RaftRequest\x1a\x11.dcache.RaftReplyb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -51,6 +51,10 @@ if _descriptor._USE_C_DESCRIPTORS == False: _globals['_RAFTREPLY']._serialized_end=808 _globals['_LEARNER']._serialized_start=810 _globals['_LEARNER']._serialized_end=845 - _globals['_DCACHESERVICE']._serialized_start=848 - _globals['_DCACHESERVICE']._serialized_end=1559 + _globals['_DCACHEREQUEST']._serialized_start=848 + _globals['_DCACHEREQUEST']._serialized_end=1165 + _globals['_DCACHERESPONSE']._serialized_start=1168 + _globals['_DCACHERESPONSE']._serialized_end=1307 + _globals['_DCACHESERVICE']._serialized_start=1310 + _globals['_DCACHESERVICE']._serialized_end=2021 # @@protoc_insertion_point(module_scope) diff --git a/dcache_py/dcache_pb2.pyi b/dcache_py/dcache_pb2.pyi index 6d4d216..55850b8 100644 --- a/dcache_py/dcache_pb2.pyi +++ b/dcache_py/dcache_pb2.pyi @@ -118,3 +118,27 @@ class Learner(_message.Message): id: int addr: str def __init__(self, id: _Optional[int] = ..., addr: _Optional[str] = ...) -> None: ... + +class DcacheRequest(_message.Message): + __slots__ = ("addCaptcha", "addVisitor", "renameCaptcha", "removeCaptcha", "cachePow", "cacheResult") + ADDCAPTCHA_FIELD_NUMBER: _ClassVar[int] + ADDVISITOR_FIELD_NUMBER: _ClassVar[int] + RENAMECAPTCHA_FIELD_NUMBER: _ClassVar[int] + REMOVECAPTCHA_FIELD_NUMBER: _ClassVar[int] + CACHEPOW_FIELD_NUMBER: _ClassVar[int] + CACHERESULT_FIELD_NUMBER: _ClassVar[int] + addCaptcha: AddCaptchaRequest + addVisitor: CaptchaID + renameCaptcha: RenameCaptchaRequest + removeCaptcha: CaptchaID + cachePow: CachePowRequest + cacheResult: CacheResultRequest + def __init__(self, addCaptcha: _Optional[_Union[AddCaptchaRequest, _Mapping]] = ..., addVisitor: _Optional[_Union[CaptchaID, _Mapping]] = ..., renameCaptcha: _Optional[_Union[RenameCaptchaRequest, _Mapping]] = ..., removeCaptcha: _Optional[_Union[CaptchaID, _Mapping]] = ..., cachePow: _Optional[_Union[CachePowRequest, _Mapping]] = ..., cacheResult: _Optional[_Union[CacheResultRequest, _Mapping]] = ...) -> None: ... + +class DcacheResponse(_message.Message): + __slots__ = ("option_add_visitor_result", "other") + OPTION_ADD_VISITOR_RESULT_FIELD_NUMBER: _ClassVar[int] + OTHER_FIELD_NUMBER: _ClassVar[int] + option_add_visitor_result: OptionAddVisitorResult + other: RaftReply + def __init__(self, option_add_visitor_result: _Optional[_Union[OptionAddVisitorResult, _Mapping]] = ..., other: _Optional[_Union[RaftReply, _Mapping]] = ...) -> None: ... diff --git a/dcache_py/dcache_pb2_grpc.py b/dcache_py/dcache_pb2_grpc.py index f75e9ec..910eb89 100644 --- a/dcache_py/dcache_pb2_grpc.py +++ b/dcache_py/dcache_pb2_grpc.py @@ -6,7 +6,15 @@ import dcache_py.dcache_pb2 as dcache__pb2 class DcacheServiceStub(object): - """Missing associated documentation comment in .proto file.""" + """message DcacheBatchRequest { + repeated DcacheRequest requests = 1; + } + + message DcacheBatchResponse { + repeated DcacheResponse responses = 1; + } + + """ def __init__(self, channel): """Constructor. @@ -77,7 +85,15 @@ class DcacheServiceStub(object): class DcacheServiceServicer(object): - """Missing associated documentation comment in .proto file.""" + """message DcacheBatchRequest { + repeated DcacheRequest requests = 1; + } + + message DcacheBatchResponse { + repeated DcacheResponse responses = 1; + } + + """ def AddCaptcha(self, request, context): """Missing associated documentation comment in .proto file.""" @@ -116,7 +132,9 @@ class DcacheServiceServicer(object): raise NotImplementedError('Method not implemented!') def AddLearner(self, request, context): - """Missing associated documentation comment in .proto file.""" + """ rpc PipelineDcacheOps(DcacheBatchRequest) returns (DcacheBatchResponse) {} + + """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') @@ -225,7 +243,15 @@ def add_DcacheServiceServicer_to_server(servicer, server): # This class is part of an EXPERIMENTAL API. class DcacheService(object): - """Missing associated documentation comment in .proto file.""" + """message DcacheBatchRequest { + repeated DcacheRequest requests = 1; + } + + message DcacheBatchResponse { + repeated DcacheResponse responses = 1; + } + + """ @staticmethod def AddCaptcha(request, diff --git a/proto/dcache/dcache.proto b/proto/dcache/dcache.proto index 87d33f2..511a35b 100644 --- a/proto/dcache/dcache.proto +++ b/proto/dcache/dcache.proto @@ -60,25 +60,6 @@ message PoID{ } -//message DcacheRequest { -// oneof DcacheRequest { -// string AddVisitor = 1; -// AddCaptchaRequest add_captcha = 2; -// RenameCaptchaRequest rename_captcha = 3; -// string remove_captcha = 4; -// CachePoWRequest cache_pow = 5; -// string delete_pow = 6; -// CacheResultRequest cache_result = 7; -// } -//} - -//message AddVisitorRequest { -// string id = 1; -//} - - - - message AddVisitorResult { uint64 duration = 901; uint32 difficulty_factor = 902; @@ -118,6 +99,40 @@ message Learner { string addr = 2; } + +//message AddVisitorRequest { +// string id = 1; +//} + + + +message DcacheRequest { + oneof DcacheRequest { + AddCaptchaRequest addCaptcha = 1; + CaptchaID addVisitor = 2; + RenameCaptchaRequest renameCaptcha = 3; + CaptchaID removeCaptcha = 4; + CachePowRequest cachePow = 5; + CacheResultRequest cacheResult = 6; + } +} + + +message DcacheResponse { + oneof DcacheResponse { + OptionAddVisitorResult option_add_visitor_result = 1; + RaftReply other = 2; + } +} + +//message DcacheBatchRequest { +// repeated DcacheRequest requests = 1; +//} +// +//message DcacheBatchResponse { +// repeated DcacheResponse responses = 1; +//} + service DcacheService { rpc AddCaptcha(AddCaptchaRequest) returns (RaftReply) {} rpc AddVisitor(CaptchaID) returns (OptionAddVisitorResult) {} @@ -126,11 +141,11 @@ service DcacheService { rpc CachePow(CachePowRequest) returns (RaftReply) {} rpc CacheResult(CacheResultRequest) returns (RaftReply) {} +// rpc PipelineDcacheOps(DcacheBatchRequest) returns (DcacheBatchResponse) {} rpc AddLearner(Learner) returns (RaftReply) {} rpc Write(RaftRequest) returns (RaftReply) {} - // rpc Get(GetRequest) returns (GetReply) {} /// Forward a request to other rpc Forward(RaftRequest) returns (RaftReply) {} diff --git a/src/protobuf.rs b/src/protobuf.rs index 836b87d..1302068 100644 --- a/src/protobuf.rs +++ b/src/protobuf.rs @@ -9,7 +9,10 @@ use serde::de::DeserializeOwned; use serde::Serialize; use tonic::Response; +use dcache::dcache_request::DcacheRequest as PipelineReq; +use dcache::dcache_response::DcacheResponse as InnerPipelineRes; use dcache::dcache_service_server::DcacheService; +use dcache::DcacheResponse as OuterPipelineRes; use dcache::{Learner, RaftReply, RaftRequest}; use crate::app::DcacheApp; @@ -139,6 +142,223 @@ impl DcacheService for MyDcacheImpl { Ok(Response::new(res.into())) } + // type PipelineDcacheOpsStream = + // Pin> + Send + 'static>>; + + // async fn pipeline_dcache_ops( + // &self, + // request: tonic::Request>, + // ) -> std::result::Result, tonic::Status> { + +//// async fn pipeline_dcache_ops( +//// &self, +//// request: tonic::Request, +//// ) -> Result, tonic::Status> { +//// let mut reqs = request.into_inner(); +//// let mut responses = Vec::with_capacity(reqs.requests.len()); +//// for req in reqs.requests.drain(0..) { +//// let res = match req.dcache_request.unwrap() { +//// PipelineReq::AddCaptcha(add_captcha_req) => { +//// let res = self +//// .app +//// .raft +//// .client_write(DcacheRequest::AddCaptcha(add_captcha_req.into())) +//// .await; +//// OuterPipelineRes { +//// dcache_response: Some(InnerPipelineRes::Other(res.into())), +//// } +//// } +//// PipelineReq::AddVisitor(add_visitor_req) => { +//// let res = self +//// .app +//// .raft +//// .client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor( +//// add_visitor_req.id, +//// ))) +//// .await; +//// match res { +//// Err(_) => OuterPipelineRes { +//// dcache_response: None, +//// }, +//// Ok(res) => match res.data { +//// DcacheResponse::AddVisitorResult(res) => { +//// let res = dcache::OptionAddVisitorResult { +//// result: res.map(|f| f.into()), +//// }; +//// OuterPipelineRes { +//// dcache_response: Some( +//// InnerPipelineRes::OptionAddVisitorResult(res), +//// ), +//// } +//// } +//// +//// _ => unimplemented!(), +//// }, +//// } +//// // if res.is_err() { +//// // OuterPipelineRes { +//// // dcache_response: None, +//// // } +//// // } else { +//// // let res = res.unwrap(); +//// // match res.data { +//// // DcacheResponse::AddVisitorResult(res) => { +//// // let res = dcache::OptionAddVisitorResult { +//// // result: res.map(|f| f.into()), +//// // }; +//// // OuterPipelineRes { +//// // dcache_response: Some( +//// // InnerPipelineRes::OptionAddVisitorResult(res), +//// // ), +//// // } +//// // } +//// // +//// // _ => unimplemented!(), +//// // } +//// // } +//// } +//// PipelineReq::RenameCaptcha(rename_captcha_req) => { +//// // let x: u8 = rename_visitor_req; +//// let res = self +//// .app +//// .raft +//// .client_write(DcacheRequest::RenameCaptcha(rename_captcha_req.into())) +//// .await; +//// OuterPipelineRes { +//// dcache_response: Some(InnerPipelineRes::Other(res.into())), +//// } +//// } +//// PipelineReq::RemoveCaptcha(remove_captcha_req) => { +//// let res = self +//// .app +//// .raft +//// .client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha( +//// remove_captcha_req.id, +//// ))) +//// .await; +//// OuterPipelineRes { +//// dcache_response: Some(InnerPipelineRes::Other(res.into())), +//// } +//// } +//// PipelineReq::CachePow(cache_pow_req) => { +//// let res = self +//// .app +//// .raft +//// .client_write(DcacheRequest::CachePoW(cache_pow_req.into())) +//// .await; +//// OuterPipelineRes { +//// dcache_response: Some(InnerPipelineRes::Other(res.into())), +//// } +//// } +//// PipelineReq::CacheResult(cache_result_req) => { +//// let res = self +//// .app +//// .raft +//// .client_write(DcacheRequest::CacheResult(cache_result_req.into())) +//// .await; +//// OuterPipelineRes { +//// dcache_response: Some(InnerPipelineRes::Other(res.into())), +//// } +//// } +//// }; +//// responses.push(res); +//// } +//// Ok(Response::new(dcache::DcacheBatchResponse { responses })) +//// } + + // let mut reqs = request.into_inner().requests; + // let mut responses = Vec::with_capacity(reqs.len()); + // for req in reqs.drain(0..) { + // let res = match req.dcache_request.unwrap() { + // PipelineReq::AddCaptcha(add_captcha_req) => { + // let res = self + // .app + // .raft + // .client_write(DcacheRequest::AddCaptcha(add_captcha_req.into())) + // .await; + // OuterPipelineRes { + // dcache_response: Some(InnerPipelineRes::Other(res.into())), + // } + // } + // PipelineReq::AddVisitor(add_visitor_req) => { + // let res = self + // .app + // .raft + // .client_write(DcacheRequest::AddVisitor(MasterMessages::AddVisitor( + // add_visitor_req.id, + // ))) + // .await + // .map_err(|e| { + // tonic::Status::new( + // tonic::Code::Internal, + // serde_json::to_string(&e).unwrap(), + // ) + // })?; + // match res.data { + // DcacheResponse::AddVisitorResult(res) => { + // let res = dcache::OptionAddVisitorResult { + // result: res.map(|f| f.into()), + // }; + // OuterPipelineRes { + // dcache_response: Some(InnerPipelineRes::OptionAddVisitorResult( + // res, + // )), + // } + // } + // + // _ => unimplemented!(), + // } + // } + // PipelineReq::RenameCaptcha(rename_captcha_req) => { + // // let x: u8 = rename_visitor_req; + // let res = self + // .app + // .raft + // .client_write(DcacheRequest::RenameCaptcha(rename_captcha_req.into())) + // .await; + // OuterPipelineRes { + // dcache_response: Some(InnerPipelineRes::Other(res.into())), + // } + // } + // PipelineReq::RemoveCaptcha(remove_captcha_req) => { + // let res = self + // .app + // .raft + // .client_write(DcacheRequest::RemoveCaptcha(MasterMessages::RemoveCaptcha( + // remove_captcha_req.id, + // ))) + // .await; + // OuterPipelineRes { + // dcache_response: Some(InnerPipelineRes::Other(res.into())), + // } + // } + // PipelineReq::CachePow(cache_pow_req) => { + // let res = self + // .app + // .raft + // .client_write(DcacheRequest::CachePoW(cache_pow_req.into())) + // .await; + // OuterPipelineRes { + // dcache_response: Some(InnerPipelineRes::Other(res.into())), + // } + // } + // PipelineReq::CacheResult(cache_result_req) => { + // let res = self + // .app + // .raft + // .client_write(DcacheRequest::CacheResult(cache_result_req.into())) + // .await; + // OuterPipelineRes { + // dcache_response: Some(InnerPipelineRes::Other(res.into())), + // } + // } + // }; + // responses.push(res); + // } + + // Ok(Response::new(dcache::DcacheBatchResponse { responses })) + // } + async fn write( &self, request: tonic::Request, diff --git a/test.py b/test.py index 6b8dfb3..0bc8050 100755 --- a/test.py +++ b/test.py @@ -145,11 +145,33 @@ def grpc_add_captcha(stub: DcacheServiceStub, captcha_id: str): pprint(f"Captcha added {captcha_id}: {resp}") + + +msgs = [] +for _ in range(0,1000): + msgs.append( + dcache.DcacheRequest(addVisitor=dcache.CaptchaID(id=captcha_id)), + ) + +msgs = dcache.DcacheBatchRequest(requests=msgs) + + + +def grpc_pipeline_add_vote(stub): + responses = stub.PipelineDcacheOps(msgs) + for r in responses.responses: + print(f"received respo: {r}") + + + + def grpc_run(): with grpc.insecure_channel(host) as channel: stub = DcacheServiceStub(channel) grpc_add_captcha(stub, captcha_id) - grpc_add_vote(stub, captcha_id) + grpc_pipeline_add_vote(stub) + + #grpc_add_vote(stub, captcha_id) if __name__ == "__main__": grpc_run()