Use atomic types to speedup variable difficulty alogirthm #3

Merged
realaravinth merged 9 commits from optimize-libmcaptha into master 2023-12-31 01:31:22 +05:30
25 changed files with 4638 additions and 544 deletions

806
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -13,8 +13,8 @@ openraft = { version = "0.8.8", features = ["serde", "single-term-leader"]}
#libmcaptcha = { path="/src/atm/code/mcaptcha/libmcaptcha", features=["full"] }
libmcaptcha = { git = "https://github.com/mcaptcha/libmcaptcha", branch = "feat-dcache", features = ["full"]}
tracing = { version = "0.1.37", features = ["log"] }
serde_json = "1.0.96"
serde = { version = "1.0.163", features = ["derive"] }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
byteorder = "1.4.3"
futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
lazy_static = "1.4.0"
@ -26,7 +26,7 @@ derive_more = "0.99.17"
url = { version = "2.2.2", features = ["serde"]}
async-trait = "0.1.36"
clap = { version = "4.1.11", features = ["derive", "env"] }
tokio = { version = "1.0", default-features = false, features = ["sync", "macros", "rt-multi-thread"] }
tokio = { version = "1.0", default-features = false, features = ["sync", "macros", "rt-multi-thread", "time"] }
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
actix = "0.13.0"
tonic = { version = "0.10.2", features = ["transport", "channel"] }
@ -34,6 +34,9 @@ prost = "0.12.3"
tokio-stream = "0.1.14"
async-stream = "0.3.5"
actix-rt = "2.9.0"
futures = "0.3.30"
tower-service = "0.3.2"
dashmap = { version = "5.5.3", features = ["serde"] }
[build-dependencies]
@ -44,3 +47,6 @@ tonic-build = "0.10.2"
base64 = "0.13.0"
anyhow = "1.0.63"
maplit = "1.0.2"
#[profile.release]
#debug = true

View file

@ -0,0 +1 @@
Count,Message,Traceback,Nodes
1 Count Message Traceback Nodes

View file

@ -0,0 +1 @@
Method,Name,Error,Occurrences
1 Method Name Error Occurrences

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,3 @@
Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%
grpc,/dcache.DcacheService/PipelineDcacheOps,3480,0,98,104.35343347919283,85.40578499378171,842.1087349997833,14999.985632183909,95.67244900465325,0.0,98,99,100,100,100,110,120,360,840,840,840
,Aggregated,3480,0,98,104.35343347919283,85.40578499378171,842.1087349997833,14999.985632183909,95.67244900465325,0.0,98,99,100,100,100,110,120,360,840,840,840
1 Type Name Request Count Failure Count Median Response Time Average Response Time Min Response Time Max Response Time Average Content Size Requests/s Failures/s 50% 66% 75% 80% 90% 95% 98% 99% 99.9% 99.99% 100%
2 grpc /dcache.DcacheService/PipelineDcacheOps 3480 0 98 104.35343347919283 85.40578499378171 842.1087349997833 14999.985632183909 95.67244900465325 0.0 98 99 100 100 100 110 120 360 840 840 840
3 Aggregated 3480 0 98 104.35343347919283 85.40578499378171 842.1087349997833 14999.985632183909 95.67244900465325 0.0 98 99 100 100 100 110 120 360 840 840 840

View file

@ -0,0 +1 @@
Count,Message,Traceback,Nodes
1 Count Message Traceback Nodes

View file

@ -0,0 +1 @@
Method,Name,Error,Occurrences
1 Method Name Error Occurrences

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,3 @@
Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%
grpc,/dcache.DcacheService/AddVisitor,186109,0,79,74.60541254397303,3.7561320059467107,119.94536400015932,10.999731340236098,4816.33283284295,0.0,79,83,86,89,93,97,100,110,120,120,120
,Aggregated,186109,0,79,74.60541254397303,3.7561320059467107,119.94536400015932,10.999731340236098,4816.33283284295,0.0,79,83,86,89,93,97,100,110,120,120,120
1 Type Name Request Count Failure Count Median Response Time Average Response Time Min Response Time Max Response Time Average Content Size Requests/s Failures/s 50% 66% 75% 80% 90% 95% 98% 99% 99.9% 99.99% 100%
2 grpc /dcache.DcacheService/AddVisitor 186109 0 79 74.60541254397303 3.7561320059467107 119.94536400015932 10.999731340236098 4816.33283284295 0.0 79 83 86 89 93 97 100 110 120 120 120
3 Aggregated 186109 0 79 74.60541254397303 3.7561320059467107 119.94536400015932 10.999731340236098 4816.33283284295 0.0 79 83 86 89 93 97 100 110 120 120 120

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 674 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 1.2 MiB

File diff suppressed because one or more lines are too long

View file

@ -119,29 +119,55 @@ class Learner(_message.Message):
addr: str
def __init__(self, id: _Optional[int] = ..., addr: _Optional[str] = ...) -> None: ...
class CaptchaExistsResponse(_message.Message):
__slots__ = ("exists",)
EXISTS_FIELD_NUMBER: _ClassVar[int]
exists: bool
def __init__(self, exists: bool = ...) -> None: ...
class GetVisitorCountResponse(_message.Message):
__slots__ = ("visitors",)
VISITORS_FIELD_NUMBER: _ClassVar[int]
visitors: int
def __init__(self, visitors: _Optional[int] = ...) -> None: ...
class OptionGetVisitorCountResponse(_message.Message):
__slots__ = ("result",)
RESULT_FIELD_NUMBER: _ClassVar[int]
result: GetVisitorCountResponse
def __init__(self, result: _Optional[_Union[GetVisitorCountResponse, _Mapping]] = ...) -> None: ...
class DcacheRequest(_message.Message):
__slots__ = ("addCaptcha", "addVisitor", "renameCaptcha", "removeCaptcha", "cachePow", "cacheResult")
__slots__ = ("addCaptcha", "addVisitor", "renameCaptcha", "removeCaptcha", "cachePow", "cacheResult", "captchaExists", "getVisitorCount")
ADDCAPTCHA_FIELD_NUMBER: _ClassVar[int]
ADDVISITOR_FIELD_NUMBER: _ClassVar[int]
RENAMECAPTCHA_FIELD_NUMBER: _ClassVar[int]
REMOVECAPTCHA_FIELD_NUMBER: _ClassVar[int]
CACHEPOW_FIELD_NUMBER: _ClassVar[int]
CACHERESULT_FIELD_NUMBER: _ClassVar[int]
CAPTCHAEXISTS_FIELD_NUMBER: _ClassVar[int]
GETVISITORCOUNT_FIELD_NUMBER: _ClassVar[int]
addCaptcha: AddCaptchaRequest
addVisitor: CaptchaID
renameCaptcha: RenameCaptchaRequest
removeCaptcha: CaptchaID
cachePow: CachePowRequest
cacheResult: CacheResultRequest
def __init__(self, addCaptcha: _Optional[_Union[AddCaptchaRequest, _Mapping]] = ..., addVisitor: _Optional[_Union[CaptchaID, _Mapping]] = ..., renameCaptcha: _Optional[_Union[RenameCaptchaRequest, _Mapping]] = ..., removeCaptcha: _Optional[_Union[CaptchaID, _Mapping]] = ..., cachePow: _Optional[_Union[CachePowRequest, _Mapping]] = ..., cacheResult: _Optional[_Union[CacheResultRequest, _Mapping]] = ...) -> None: ...
captchaExists: CaptchaID
getVisitorCount: CaptchaID
def __init__(self, addCaptcha: _Optional[_Union[AddCaptchaRequest, _Mapping]] = ..., addVisitor: _Optional[_Union[CaptchaID, _Mapping]] = ..., renameCaptcha: _Optional[_Union[RenameCaptchaRequest, _Mapping]] = ..., removeCaptcha: _Optional[_Union[CaptchaID, _Mapping]] = ..., cachePow: _Optional[_Union[CachePowRequest, _Mapping]] = ..., cacheResult: _Optional[_Union[CacheResultRequest, _Mapping]] = ..., captchaExists: _Optional[_Union[CaptchaID, _Mapping]] = ..., getVisitorCount: _Optional[_Union[CaptchaID, _Mapping]] = ...) -> None: ...
class DcacheResponse(_message.Message):
__slots__ = ("option_add_visitor_result", "other")
__slots__ = ("option_add_visitor_result", "other", "captcha_exists", "get_visitor_count")
OPTION_ADD_VISITOR_RESULT_FIELD_NUMBER: _ClassVar[int]
OTHER_FIELD_NUMBER: _ClassVar[int]
CAPTCHA_EXISTS_FIELD_NUMBER: _ClassVar[int]
GET_VISITOR_COUNT_FIELD_NUMBER: _ClassVar[int]
option_add_visitor_result: OptionAddVisitorResult
other: RaftReply
def __init__(self, option_add_visitor_result: _Optional[_Union[OptionAddVisitorResult, _Mapping]] = ..., other: _Optional[_Union[RaftReply, _Mapping]] = ...) -> None: ...
captcha_exists: CaptchaExistsResponse
get_visitor_count: OptionGetVisitorCountResponse
def __init__(self, option_add_visitor_result: _Optional[_Union[OptionAddVisitorResult, _Mapping]] = ..., other: _Optional[_Union[RaftReply, _Mapping]] = ..., captcha_exists: _Optional[_Union[CaptchaExistsResponse, _Mapping]] = ..., get_visitor_count: _Optional[_Union[OptionGetVisitorCountResponse, _Mapping]] = ...) -> None: ...
class DcacheBatchRequest(_message.Message):
__slots__ = ("requests",)
@ -154,3 +180,39 @@ class DcacheBatchResponse(_message.Message):
RESPONSES_FIELD_NUMBER: _ClassVar[int]
responses: _containers.RepeatedCompositeFieldContainer[DcacheResponse]
def __init__(self, responses: _Optional[_Iterable[_Union[DcacheResponse, _Mapping]]] = ...) -> None: ...
class RetrievePowRequest(_message.Message):
__slots__ = ("token", "key")
TOKEN_FIELD_NUMBER: _ClassVar[int]
KEY_FIELD_NUMBER: _ClassVar[int]
token: str
key: str
def __init__(self, token: _Optional[str] = ..., key: _Optional[str] = ...) -> None: ...
class RetrievePowResponse(_message.Message):
__slots__ = ("difficulty_factor", "duration", "key")
DIFFICULTY_FACTOR_FIELD_NUMBER: _ClassVar[int]
DURATION_FIELD_NUMBER: _ClassVar[int]
KEY_FIELD_NUMBER: _ClassVar[int]
difficulty_factor: int
duration: int
key: str
def __init__(self, difficulty_factor: _Optional[int] = ..., duration: _Optional[int] = ..., key: _Optional[str] = ...) -> None: ...
class CaptchaResultVerified(_message.Message):
__slots__ = ("verified",)
VERIFIED_FIELD_NUMBER: _ClassVar[int]
verified: bool
def __init__(self, verified: bool = ...) -> None: ...
class DeletePowRequest(_message.Message):
__slots__ = ("string",)
STRING_FIELD_NUMBER: _ClassVar[int]
string: str
def __init__(self, string: _Optional[str] = ...) -> None: ...
class OptionalRetrievePoWResponse(_message.Message):
__slots__ = ("result",)
RESULT_FIELD_NUMBER: _ClassVar[int]
result: RetrievePowResponse
def __init__(self, result: _Optional[_Union[RetrievePowResponse, _Mapping]] = ...) -> None: ...

View file

@ -39,11 +39,41 @@ class DcacheServiceStub(object):
request_serializer=dcache__pb2.CachePowRequest.SerializeToString,
response_deserializer=dcache__pb2.RaftReply.FromString,
)
self.RetrievePow = channel.unary_unary(
'/dcache.DcacheService/RetrievePow',
request_serializer=dcache__pb2.RetrievePowRequest.SerializeToString,
response_deserializer=dcache__pb2.OptionalRetrievePoWResponse.FromString,
)
self.DeletePow = channel.unary_unary(
'/dcache.DcacheService/DeletePow',
request_serializer=dcache__pb2.DeletePowRequest.SerializeToString,
response_deserializer=dcache__pb2.RaftReply.FromString,
)
self.CacheResult = channel.unary_unary(
'/dcache.DcacheService/CacheResult',
request_serializer=dcache__pb2.CacheResultRequest.SerializeToString,
response_deserializer=dcache__pb2.RaftReply.FromString,
)
self.VerifyCaptchaResult = channel.unary_unary(
'/dcache.DcacheService/VerifyCaptchaResult',
request_serializer=dcache__pb2.RetrievePowRequest.SerializeToString,
response_deserializer=dcache__pb2.CaptchaResultVerified.FromString,
)
self.DeleteCaptchaResult = channel.unary_unary(
'/dcache.DcacheService/DeleteCaptchaResult',
request_serializer=dcache__pb2.DeleteCaptchaResultRequest.SerializeToString,
response_deserializer=dcache__pb2.RaftReply.FromString,
)
self.CaptchaExists = channel.unary_unary(
'/dcache.DcacheService/CaptchaExists',
request_serializer=dcache__pb2.CaptchaID.SerializeToString,
response_deserializer=dcache__pb2.CaptchaExistsResponse.FromString,
)
self.GetVisitorCount = channel.unary_unary(
'/dcache.DcacheService/GetVisitorCount',
request_serializer=dcache__pb2.CaptchaID.SerializeToString,
response_deserializer=dcache__pb2.OptionGetVisitorCountResponse.FromString,
)
self.PipelineDcacheOps = channel.unary_unary(
'/dcache.DcacheService/PipelineDcacheOps',
request_serializer=dcache__pb2.DcacheBatchRequest.SerializeToString,
@ -114,12 +144,48 @@ class DcacheServiceServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def RetrievePow(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def DeletePow(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CacheResult(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def VerifyCaptchaResult(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def DeleteCaptchaResult(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CaptchaExists(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetVisitorCount(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def PipelineDcacheOps(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
@ -193,11 +259,41 @@ def add_DcacheServiceServicer_to_server(servicer, server):
request_deserializer=dcache__pb2.CachePowRequest.FromString,
response_serializer=dcache__pb2.RaftReply.SerializeToString,
),
'RetrievePow': grpc.unary_unary_rpc_method_handler(
servicer.RetrievePow,
request_deserializer=dcache__pb2.RetrievePowRequest.FromString,
response_serializer=dcache__pb2.OptionalRetrievePoWResponse.SerializeToString,
),
'DeletePow': grpc.unary_unary_rpc_method_handler(
servicer.DeletePow,
request_deserializer=dcache__pb2.DeletePowRequest.FromString,
response_serializer=dcache__pb2.RaftReply.SerializeToString,
),
'CacheResult': grpc.unary_unary_rpc_method_handler(
servicer.CacheResult,
request_deserializer=dcache__pb2.CacheResultRequest.FromString,
response_serializer=dcache__pb2.RaftReply.SerializeToString,
),
'VerifyCaptchaResult': grpc.unary_unary_rpc_method_handler(
servicer.VerifyCaptchaResult,
request_deserializer=dcache__pb2.RetrievePowRequest.FromString,
response_serializer=dcache__pb2.CaptchaResultVerified.SerializeToString,
),
'DeleteCaptchaResult': grpc.unary_unary_rpc_method_handler(
servicer.DeleteCaptchaResult,
request_deserializer=dcache__pb2.DeleteCaptchaResultRequest.FromString,
response_serializer=dcache__pb2.RaftReply.SerializeToString,
),
'CaptchaExists': grpc.unary_unary_rpc_method_handler(
servicer.CaptchaExists,
request_deserializer=dcache__pb2.CaptchaID.FromString,
response_serializer=dcache__pb2.CaptchaExistsResponse.SerializeToString,
),
'GetVisitorCount': grpc.unary_unary_rpc_method_handler(
servicer.GetVisitorCount,
request_deserializer=dcache__pb2.CaptchaID.FromString,
response_serializer=dcache__pb2.OptionGetVisitorCountResponse.SerializeToString,
),
'PipelineDcacheOps': grpc.unary_unary_rpc_method_handler(
servicer.PipelineDcacheOps,
request_deserializer=dcache__pb2.DcacheBatchRequest.FromString,
@ -328,6 +424,40 @@ class DcacheService(object):
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def RetrievePow(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/dcache.DcacheService/RetrievePow',
dcache__pb2.RetrievePowRequest.SerializeToString,
dcache__pb2.OptionalRetrievePoWResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def DeletePow(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/dcache.DcacheService/DeletePow',
dcache__pb2.DeletePowRequest.SerializeToString,
dcache__pb2.RaftReply.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def CacheResult(request,
target,
@ -345,6 +475,74 @@ class DcacheService(object):
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def VerifyCaptchaResult(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/dcache.DcacheService/VerifyCaptchaResult',
dcache__pb2.RetrievePowRequest.SerializeToString,
dcache__pb2.CaptchaResultVerified.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def DeleteCaptchaResult(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/dcache.DcacheService/DeleteCaptchaResult',
dcache__pb2.DeleteCaptchaResultRequest.SerializeToString,
dcache__pb2.RaftReply.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def CaptchaExists(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/dcache.DcacheService/CaptchaExists',
dcache__pb2.CaptchaID.SerializeToString,
dcache__pb2.CaptchaExistsResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetVisitorCount(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/dcache.DcacheService/GetVisitorCount',
dcache__pb2.CaptchaID.SerializeToString,
dcache__pb2.OptionGetVisitorCountResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def PipelineDcacheOps(request,
target,

View file

@ -77,6 +77,20 @@ message Learner {
}
message CaptchaExistsResponse {
bool exists = 1;
}
message GetVisitorCountResponse {
uint32 visitors = 1;
}
message OptionGetVisitorCountResponse {
optional GetVisitorCountResponse result = 1;
}
message DcacheRequest {
oneof DcacheRequest {
AddCaptchaRequest addCaptcha = 1;
@ -85,6 +99,8 @@ message DcacheRequest {
CaptchaID removeCaptcha = 4;
CachePowRequest cachePow = 5;
CacheResultRequest cacheResult = 6;
CaptchaID captchaExists = 7;
CaptchaID getVisitorCount = 8;
}
}
@ -93,6 +109,8 @@ message DcacheResponse {
oneof DcacheResponse {
OptionAddVisitorResult option_add_visitor_result = 1;
RaftReply other = 2;
CaptchaExistsResponse captcha_exists = 3;
OptionGetVisitorCountResponse get_visitor_count = 4;
}
}
@ -104,13 +122,43 @@ message DcacheBatchResponse {
repeated DcacheResponse responses = 1;
}
message RetrievePowRequest {
string token = 1;
string key = 2;
}
message RetrievePowResponse {
uint32 difficulty_factor = 1;
uint64 duration = 2;
string key = 3;
}
message CaptchaResultVerified {
bool verified = 1;
}
message DeletePowRequest {
string string = 1;
}
message OptionalRetrievePoWResponse {
optional RetrievePowResponse result = 1;
}
service DcacheService {
rpc AddCaptcha(AddCaptchaRequest) returns (RaftReply) {}
rpc AddVisitor(CaptchaID) returns (OptionAddVisitorResult) {}
rpc RenameCaptcha(RenameCaptchaRequest) returns (RaftReply) {}
rpc RemoveCaptcha(CaptchaID) returns (RaftReply) {}
rpc CachePow(CachePowRequest) returns (RaftReply) {}
rpc RetrievePow(RetrievePowRequest) returns (OptionalRetrievePoWResponse) {}
rpc DeletePow(DeletePowRequest) returns (RaftReply) {}
rpc CacheResult(CacheResultRequest) returns (RaftReply) {}
rpc VerifyCaptchaResult(RetrievePowRequest) returns (CaptchaResultVerified) {}
rpc DeleteCaptchaResult(DeleteCaptchaResultRequest) returns (RaftReply) {}
rpc CaptchaExists(CaptchaID) returns (CaptchaExistsResponse) {}
rpc GetVisitorCount(CaptchaID) returns (OptionGetVisitorCountResponse) {}
rpc PipelineDcacheOps(DcacheBatchRequest) returns (DcacheBatchResponse) {}

View file

@ -36,7 +36,9 @@ use crate::store::DcacheResponse;
use crate::store::DcacheStore;
pub mod app;
mod mcaptcha;
pub mod network;
mod pool;
mod protobuf;
pub mod store;
@ -117,29 +119,6 @@ pub async fn start_example_raft_node(
raft.enable_heartbeat(true);
raft.enable_elect(true);
let captcha = serde_json::json!({
"AddCaptcha": {
"id": "test_1",
"mcaptcha": {
"visitor_threshold": 0,
"defense": {
"levels": [
{"visitor_threshold": 50, "difficulty_factor": 500},
{"visitor_threshold": 5000, "difficulty_factor": 50000},
],
"current_visitor_threshold": 0,
},
"duration": 30,
}}});
#[derive(serde::Serialize)]
struct X {
data: String,
}
let x = X {
data: serde_json::to_string(&captcha).unwrap(),
};
println!("{}", serde_json::to_string(&x).unwrap());
// raft.enable_tick(true);
// Create an application that will store all the instances created above, this will

330
src/mcaptcha/cache.rs Normal file
View file

@ -0,0 +1,330 @@
/*
* mCaptcha - A proof of work based DoS protection system
* Copyright © 2021 Aravinth Manivannan <realravinth@batsense.net>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//! In-memory cache implementation that uses [HashMap]
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use libmcaptcha::cache::messages::*;
use libmcaptcha::errors::*;
#[derive(Clone, Default, Serialize, Deserialize)]
/// cache datastructure implementing [Save]
pub struct HashCache {
difficulty_map: Arc<DashMap<String, CachedPoWConfig>>,
result_map: Arc<DashMap<String, (String, u64)>>,
}
impl HashCache {
// save [PoWConfig] to cache
fn save_pow_config(&self, config: CachePoW) -> CaptchaResult<()> {
let challenge = config.string;
let config: CachedPoWConfig = CachedPoWConfig {
key: config.key,
difficulty_factor: config.difficulty_factor,
duration: config.duration,
};
if self.difficulty_map.get(&challenge).is_none() {
self.difficulty_map.insert(challenge, config);
Ok(())
} else {
Err(CaptchaError::InvalidPoW)
}
}
pub async fn clean_all_after_cold_start(&self, updated: HashCache) {
updated.difficulty_map.iter().for_each(|x| {
self.difficulty_map
.insert(x.key().to_owned(), x.value().to_owned());
});
updated.result_map.iter().for_each(|x| {
self.result_map
.insert(x.key().to_owned(), x.value().to_owned());
});
let cache = self.clone();
let fut = async move {
for values in cache.result_map.iter() {
let inner_cache = cache.clone();
let duration = values.value().1;
let key = values.key().to_owned();
let inner_fut = async move {
tokio::time::sleep(Duration::new(duration, 0)).await;
inner_cache.remove_cache_result(&key);
};
tokio::spawn(inner_fut);
}
for values in cache.difficulty_map.iter() {
let inner_cache = cache.clone();
let duration = values.value().duration;
let key = values.key().to_owned();
let inner_fut = async move {
tokio::time::sleep(Duration::new(duration, 0)).await;
inner_cache.remove_pow_config(&key);
};
tokio::spawn(inner_fut);
}
};
tokio::spawn(fut);
}
// retrieve [PoWConfig] from cache. Deletes config post retrival
pub fn retrieve_pow_config(&self, msg: VerifyCaptchaResult) -> Option<CachedPoWConfig> {
if let Some(difficulty_factor) = self.remove_pow_config(&msg.token) {
Some(difficulty_factor)
} else {
None
}
}
// delete [PoWConfig] from cache
pub fn remove_pow_config(&self, string: &str) -> Option<CachedPoWConfig> {
self.difficulty_map.remove(string).map(|x| x.1)
}
// save captcha result
fn save_captcha_result(&self, res: CacheResult) {
self.result_map.insert(res.token, (res.key, res.duration));
}
// verify captcha result
pub fn verify_captcha_result(&self, challenge: VerifyCaptchaResult) -> bool {
if let Some(captcha_id) = self.remove_cache_result(&challenge.token) {
if captcha_id == challenge.key {
true
} else {
false
}
} else {
false
}
}
// delete cache result
pub fn remove_cache_result(&self, string: &str) -> Option<String> {
self.result_map.remove(string).map(|x| x.1 .0)
}
pub fn cache_pow(&self, msg: CachePoW) {
use std::time::Duration;
use tokio::time::sleep;
let duration: Duration = Duration::new(msg.duration, 0);
let string = msg.string.clone();
let cache = self.clone();
let wait_for = async move {
sleep(duration).await;
//delay_for(duration).await;
cache.remove_pow_config(&string);
};
let _ = self.save_pow_config(msg);
tokio::spawn(wait_for);
}
/// cache PoW result
pub fn cache_result(&self, msg: CacheResult) {
use std::time::Duration;
use tokio::time::sleep;
let token = msg.token.clone();
msg.token.clone();
msg.token.clone();
msg.token.clone();
let duration: Duration = Duration::new(msg.duration, 0);
let cache = self.clone();
let wait_for = async move {
sleep(duration).await;
//delay_for(duration).await;
cache.remove_cache_result(&token);
};
tokio::spawn(wait_for);
let _ = self.save_captcha_result(msg);
}
}
#[cfg(test)]
mod tests {
use super::*;
use libmcaptcha::master::AddVisitorResult;
use libmcaptcha::pow::PoWConfig;
use std::time::Duration;
#[actix_rt::test]
async fn merge_works() {
const DIFFICULTY_FACTOR: u32 = 54;
const RES: &str = "b";
const DURATION: u64 = 5;
const KEY: &str = "mcaptchakey";
let pow: PoWConfig = PoWConfig::new(DIFFICULTY_FACTOR, KEY.into()); //salt is dummy here
let cache = HashCache::default();
let new_cache = HashCache::default();
let visitor_result = AddVisitorResult {
difficulty_factor: DIFFICULTY_FACTOR,
duration: DURATION,
};
let string = pow.string.clone();
let msg = CachePoWBuilder::default()
.string(pow.string.clone())
.difficulty_factor(DIFFICULTY_FACTOR)
.duration(visitor_result.duration)
.key(KEY.into())
.build()
.unwrap();
cache.cache_pow(msg);
let add_cache = CacheResult {
key: KEY.into(),
token: RES.into(),
duration: DURATION,
};
cache.cache_result(add_cache.clone());
new_cache.clean_all_after_cold_start(cache.clone()).await;
let msg = VerifyCaptchaResult {
token: string.clone(),
key: KEY.into(),
};
let cache_difficulty_factor = cache.retrieve_pow_config(msg.clone()).unwrap();
let new_cache_difficulty_factor = new_cache.retrieve_pow_config(msg.clone()).unwrap();
assert_eq!(DIFFICULTY_FACTOR, cache_difficulty_factor.difficulty_factor);
assert_eq!(
DIFFICULTY_FACTOR,
new_cache_difficulty_factor.difficulty_factor
);
let verify_msg = VerifyCaptchaResult {
key: KEY.into(),
token: RES.into(),
};
assert!(new_cache.verify_captcha_result(verify_msg.clone()));
assert!(!new_cache.verify_captcha_result(verify_msg.clone()));
let duration: Duration = Duration::new(5, 0);
//sleep(DURATION + DURATION).await;
tokio::time::sleep(duration + duration).await;
let expired_string = cache.retrieve_pow_config(msg.clone());
assert_eq!(None, expired_string);
let expired_string = new_cache.retrieve_pow_config(msg);
assert_eq!(None, expired_string);
cache.cache_result(add_cache);
new_cache.clean_all_after_cold_start(cache.clone()).await;
tokio::time::sleep(duration + duration).await;
assert!(!new_cache.verify_captcha_result(verify_msg.clone()));
assert!(!cache.verify_captcha_result(verify_msg));
}
#[actix_rt::test]
async fn hashcache_pow_cache_works() {
const DIFFICULTY_FACTOR: u32 = 54;
const DURATION: u64 = 5;
const KEY: &str = "mcaptchakey";
let cache = HashCache::default();
let pow: PoWConfig = PoWConfig::new(DIFFICULTY_FACTOR, KEY.into()); //salt is dummy here
let visitor_result = AddVisitorResult {
difficulty_factor: DIFFICULTY_FACTOR,
duration: DURATION,
};
let string = pow.string.clone();
let msg = CachePoWBuilder::default()
.string(pow.string.clone())
.difficulty_factor(DIFFICULTY_FACTOR)
.duration(visitor_result.duration)
.key(KEY.into())
.build()
.unwrap();
cache.cache_pow(msg);
let msg = VerifyCaptchaResult {
token: string.clone(),
key: KEY.into(),
};
let cache_difficulty_factor = cache.retrieve_pow_config(msg.clone()).unwrap();
assert_eq!(DIFFICULTY_FACTOR, cache_difficulty_factor.difficulty_factor);
let duration: Duration = Duration::new(5, 0);
//sleep(DURATION + DURATION).await;
tokio::time::sleep(duration + duration).await;
let expired_string = cache.retrieve_pow_config(msg);
assert_eq!(None, expired_string);
}
#[actix_rt::test]
async fn hashcache_result_cache_works() {
const DURATION: u64 = 5;
const KEY: &str = "a";
const RES: &str = "b";
let cache = HashCache::default();
// send value to cache
// send another value to cache for auto delete
// verify_captcha_result
// delete
// wait for timeout and verify_captcha_result against second value
let add_cache = CacheResult {
key: KEY.into(),
token: RES.into(),
duration: DURATION,
};
cache.cache_result(add_cache);
let verify_msg = VerifyCaptchaResult {
key: KEY.into(),
token: RES.into(),
};
assert!(cache.verify_captcha_result(verify_msg.clone()));
// duplicate
assert!(!cache.verify_captcha_result(verify_msg));
let verify_msg = VerifyCaptchaResult {
key: "cz".into(),
token: RES.into(),
};
assert!(!cache.verify_captcha_result(verify_msg));
let duration: Duration = Duration::new(5, 0);
tokio::time::sleep(duration + duration).await;
let verify_msg = VerifyCaptchaResult {
key: KEY.into(),
token: RES.into(),
};
assert!(!cache.verify_captcha_result(verify_msg));
}
}

398
src/mcaptcha/defense.rs Normal file
View file

@ -0,0 +1,398 @@
/*
* mCaptcha - A proof of work based DoS protection system
* Copyright © 2021 Aravinth Manivannan <realravinth@batsense.net>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use serde::{Deserialize, Serialize};
use libmcaptcha::defense::Level;
use libmcaptcha::errors::*;
//
///// Level struct that describes threshold-difficulty factor mapping
//#[derive(Debug, Deserialize, Serialize, Copy, Clone, PartialEq)]
//pub struct Level {
// pub visitor_threshold: u32,
// pub difficulty_factor: u32,
//}
//
///// Bulder struct for [Level] to describe threshold-difficulty factor mapping
//#[derive(Debug, Copy, Clone, PartialEq)]
//pub struct LevelBuilder {
// visitor_threshold: Option<u32>,
// difficulty_factor: Option<u32>,
//}
//
//impl Default for LevelBuilder {
// fn default() -> Self {
// LevelBuilder {
// visitor_threshold: None,
// difficulty_factor: None,
// }
// }
//}
//
//impl LevelBuilder {
// /// set visitor count for level
// pub fn visitor_threshold(&mut self, visitor_threshold: u32) -> &mut Self {
// self.visitor_threshold = Some(visitor_threshold);
// self
// }
//
// /// set difficulty factor for level. difficulty_factor can't be zero because
// /// Difficulty is calculated as:
// /// ```no_run
// /// let difficulty_factor = 500;
// /// let difficulty = u128::max_value() - u128::max_value() / difficulty_factor;
// /// ```
// /// the higher the `difficulty_factor`, the higher the difficulty.
// pub fn difficulty_factor(&mut self, difficulty_factor: u32) -> CaptchaResult<&mut Self> {
// if difficulty_factor > 0 {
// self.difficulty_factor = Some(difficulty_factor);
// Ok(self)
// } else {
// Err(CaptchaError::DifficultyFactorZero)
// }
// }
//
// /// build Level struct
// pub fn build(&mut self) -> CaptchaResult<Level> {
// if self.visitor_threshold.is_none() {
// Err(CaptchaError::SetVisitorThreshold)
// } else if self.difficulty_factor.is_none() {
// Err(CaptchaError::SetDifficultyFactor)
// } else {
// Ok(Level {
// difficulty_factor: self.difficulty_factor.unwrap(),
// visitor_threshold: self.visitor_threshold.unwrap(),
// })
// }
// }
//}
//
/// Builder struct for [Defense]
#[derive(Debug, Clone, PartialEq)]
pub struct DefenseBuilder {
levels: Vec<Level>,
}
impl Default for DefenseBuilder {
fn default() -> Self {
DefenseBuilder { levels: vec![] }
}
}
impl DefenseBuilder {
/// add a level to [Defense]
pub fn add_level(&mut self, level: Level) -> CaptchaResult<&mut Self> {
for i in self.levels.iter() {
if i.visitor_threshold == level.visitor_threshold {
return Err(CaptchaError::DuplicateVisitorCount);
}
}
self.levels.push(level);
Ok(self)
}
/// Build [Defense]
pub fn build(&mut self) -> CaptchaResult<Defense> {
if !self.levels.is_empty() {
// sort levels to arrange in ascending order
self.levels.sort_by_key(|a| a.visitor_threshold);
for level in self.levels.iter() {
if level.difficulty_factor == 0 {
return Err(CaptchaError::DifficultyFactorZero);
}
}
// as visitor count increases, difficulty_factor too should increse
// if it decreses, an error must be thrown
for i in 0..self.levels.len() - 1 {
if self.levels[i].difficulty_factor > self.levels[i + 1].difficulty_factor {
return Err(CaptchaError::DecreaseingDifficultyFactor);
}
}
Ok(Defense {
levels: self.levels.to_owned(),
})
} else {
Err(CaptchaError::LevelEmpty)
}
}
}
/// struct describes all the different [Level]s at which an mCaptcha system operates
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct Defense {
levels: Vec<Level>,
// index of current visitor threshold
}
impl From<Defense> for Vec<Level> {
fn from(d: Defense) -> Self {
d.levels
}
}
impl Defense {
///! Difficulty is calculated as:
///! ```rust
///! let difficulty = u128::max_value() - u128::max_value() / difficulty_factor;
///! ```
///! The higher the `difficulty_factor`, the higher the difficulty.
// /// Get difficulty factor of current level of defense
// pub fn get_difficulty(&self, current_visitor_threshold: usize) -> u32 {
// self.levels[current_visitor_threshold].difficulty_factor
// }
//
// /// tighten up defense. Increases defense level by a factor of one.
// /// When defense is at max level, calling this method will have no effect
// pub fn tighten_up(&mut self) {
// if self.current_visitor_threshold < self.levels.len() - 1 {
// self.current_visitor_threshold += 1;
// }
// }
// /// Loosen up defense. Decreases defense level by a factor of one.
// /// When defense is at the lowest level, calling this method will have no effect.
// pub fn loosen_up(&mut self) {
// if self.current_visitor_threshold > 0 {
// self.current_visitor_threshold -= 1;
// }
// }
//
// /// Set defense to maximum level
// pub fn max_defense(&mut self) {
// self.current_visitor_threshold = self.levels.len() - 1;
// }
//
// /// Set defense to minimum level
// pub fn min_defense(&mut self) {
// self.current_visitor_threshold = 0;
// }
//
pub fn get_levels(&self) -> Vec<Level> {
self.levels.clone()
}
/// Get current level's visitor threshold
pub fn current_level(&self, current_visitor_level: u32) -> &Level {
for level in self.levels.iter() {
if current_visitor_level <= level.visitor_threshold {
return level;
}
}
self.levels.last().as_ref().unwrap()
// &self.levels[self.current_visitor_threshold]
}
//
// /// Get current level's visitor threshold
// pub fn visitor_threshold(&self) -> u32 {
// self.levels[self.current_visitor_threshold].difficulty_factor
// }
}
#[cfg(test)]
mod tests {
use super::*;
use libmcaptcha::defense::Level;
use libmcaptcha::LevelBuilder;
#[test]
fn defense_builder_duplicate_visitor_threshold() {
let mut defense_builder = DefenseBuilder::default();
let err = defense_builder
.add_level(
LevelBuilder::default()
.visitor_threshold(50)
.difficulty_factor(50)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(50)
.difficulty_factor(50)
.unwrap()
.build()
.unwrap(),
);
assert_eq!(err, Err(CaptchaError::DuplicateVisitorCount));
}
#[test]
fn defense_builder_decreasing_difficulty_factor() {
let mut defense_builder = DefenseBuilder::default();
let err = defense_builder
.add_level(
LevelBuilder::default()
.visitor_threshold(50)
.difficulty_factor(50)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(500)
.difficulty_factor(10)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.build();
assert_eq!(err, Err(CaptchaError::DecreaseingDifficultyFactor));
}
#[test]
fn checking_for_integer_overflow() {
let mut defense = DefenseBuilder::default()
.add_level(
LevelBuilder::default()
.visitor_threshold(5)
.difficulty_factor(5)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(10)
.difficulty_factor(50)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(20)
.difficulty_factor(60)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(30)
.difficulty_factor(65)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.build()
.unwrap();
// for _ in 0..500 {
// defense.tighten_up();
// }
//
// defense.get_difficulty();
// for _ in 0..500000 {
// defense.tighten_up();
// }
//
defense.current_level(10_000_000);
}
fn get_defense() -> Defense {
DefenseBuilder::default()
.add_level(
LevelBuilder::default()
.visitor_threshold(50)
.difficulty_factor(50)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(500)
.difficulty_factor(5000)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(5000)
.difficulty_factor(50000)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(50000)
.difficulty_factor(500000)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(500000)
.difficulty_factor(5000000)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.build()
.unwrap()
}
#[test]
fn defense_builder_works() {
let defense = get_defense();
assert_eq!(defense.levels[0].difficulty_factor, 50);
assert_eq!(defense.levels[1].difficulty_factor, 5000);
assert_eq!(defense.levels[2].difficulty_factor, 50_000);
assert_eq!(defense.levels[3].difficulty_factor, 500_000);
assert_eq!(defense.levels[4].difficulty_factor, 5_000_000);
}
#[test]
fn tighten_up_works() {
let defense = get_defense();
assert_eq!(defense.current_level(0).difficulty_factor, 50);
assert_eq!(defense.current_level(500).difficulty_factor, 5_000);
assert_eq!(defense.current_level(501).difficulty_factor, 50_000);
assert_eq!(defense.current_level(5_000).difficulty_factor, 50_000);
assert_eq!(defense.current_level(5_001).difficulty_factor, 500_000);
assert_eq!(defense.current_level(50_000).difficulty_factor, 500_000);
assert_eq!(defense.current_level(50_001).difficulty_factor, 5_000_000);
assert_eq!(defense.current_level(500_000).difficulty_factor, 5_000_000);
assert_eq!(defense.current_level(500_001).difficulty_factor, 5_000_000);
}
}

595
src/mcaptcha/mcaptcha.rs Normal file
View file

@ -0,0 +1,595 @@
/* mCaptcha - A proof of work based DoS protection system
* Copyright © 2021 Aravinth Manivannan <realravinth@batsense.net>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use super::defense::Defense;
use libmcaptcha::errors::*;
use libmcaptcha::master::messages as ManagerMessages;
/// Builder for [MCaptcha]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct MCaptchaBuilder {
visitor_threshold: u32,
defense: Option<Defense>,
duration: Option<u64>,
}
impl Default for MCaptchaBuilder {
fn default() -> Self {
MCaptchaBuilder {
visitor_threshold: 0,
defense: None,
duration: None,
}
}
}
impl MCaptchaBuilder {
/// set defense
pub fn defense(&mut self, d: Defense) -> &mut Self {
self.defense = Some(d);
self
}
/// set duration
pub fn duration(&mut self, d: u64) -> &mut Self {
self.duration = Some(d);
self
}
/// Builds new [MCaptcha]
pub fn build(self: &mut MCaptchaBuilder) -> CaptchaResult<MCaptcha> {
if self.duration.is_none() {
Err(CaptchaError::PleaseSetValue("duration".into()))
} else if self.defense.is_none() {
Err(CaptchaError::PleaseSetValue("defense".into()))
} else if self.duration <= Some(0) {
Err(CaptchaError::CaptchaDurationZero)
} else {
let m = MCaptcha {
duration: self.duration.unwrap(),
defense: self.defense.clone().unwrap(),
visitor_threshold: Arc::new(AtomicU32::new(self.visitor_threshold)),
};
Ok(m)
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct MCaptcha {
visitor_threshold: Arc<AtomicU32>,
defense: Defense,
duration: u64,
}
impl MCaptcha {
/// increments the visitor count by one
#[inline]
pub fn add_visitor(&self) -> u32 {
// self.visitor_threshold += 1;
let current_visitor_level = self.visitor_threshold.fetch_add(1, Ordering::SeqCst) + 1;
let current_level = self.defense.current_level(current_visitor_level);
current_level.difficulty_factor
}
/// decrements the visitor count by specified count
#[inline]
pub fn set_visitor_count(&self, new_current: u32) {
self.visitor_threshold
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut current| {
if current != new_current {
Some(new_current)
} else {
None
}
});
}
/// decrements the visitor count by specified count
#[inline]
pub fn decrement_visitor_by(&self, count: u32) {
self.visitor_threshold
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut current| {
if current > 0 {
if current >= count {
current -= count;
} else {
current = 0;
}
Some(current)
} else {
None
}
});
}
/// get [Counter]'s current visitor_threshold
pub fn get_visitors(&self) -> u32 {
self.visitor_threshold.load(Ordering::SeqCst)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Manager {
pub captchas: Arc<DashMap<String, Arc<MCaptcha>>>,
pub gc: u64,
}
impl Manager {
/// add [Counter] actor to [Manager]
pub fn add_captcha(&self, m: Arc<MCaptcha>, id: String) {
self.captchas.insert(id, m);
}
/// create new master
/// accepts a `u64` to configure garbage collection period
pub fn new(gc: u64) -> Self {
Manager {
captchas: Arc::new(DashMap::new()),
gc,
}
}
fn gc(captchas: Arc<DashMap<String, Arc<MCaptcha>>>) {
for captcha in captchas.iter() {
let visitor = { captcha.value().get_visitors() };
if visitor == 0 {
captchas.remove(captcha.key());
}
}
}
/// get [Counter] actor from [Manager]
pub fn get_captcha(&self, id: &str) -> Option<Arc<MCaptcha>> {
if let Some(captcha) = self.captchas.get(id) {
Some(captcha.clone())
} else {
None
}
}
/// removes [Counter] actor from [Manager]
pub fn rm_captcha(&self, id: &str) -> Option<(String, Arc<MCaptcha>)> {
self.captchas.remove(id)
}
/// renames [Counter] actor
pub fn rename(&self, current_id: &str, new_id: String) {
// If actor isn't present, it's okay to not throw an error
// since actors are lazyily initialized and are cleaned up when inactive
if let Some((_, captcha)) = self.captchas.remove(current_id) {
self.add_captcha(captcha, new_id);
}
}
pub async fn clean_all_after_cold_start(&self, updated: Manager) {
updated.captchas.iter().for_each(|x| {
self.captchas
.insert(x.key().to_owned(), x.value().to_owned());
});
let captchas = self.clone();
let keys: Vec<String> = captchas
.captchas
.clone()
.iter()
.map(|x| x.key().to_owned())
.collect();
let fut = async move {
tokio::time::sleep(Duration::new(captchas.gc, 0)).await;
for key in keys.iter() {
captchas.rm_captcha(key);
}
};
tokio::spawn(fut);
}
pub fn add_visitor(
&self,
msg: &ManagerMessages::AddVisitor,
) -> Option<libmcaptcha::master::AddVisitorResult> {
if let Some(captcha) = self.captchas.get(&msg.0) {
let difficulty_factor = captcha.add_visitor();
// let id = msg.0.clone();
let c = captcha.clone();
let captchas = self.captchas.clone();
let fut = async move {
tokio::time::sleep(Duration::new(c.duration, 0)).await;
c.decrement_visitor_by(1);
// Self::gc(captchas);
// if c.get_visitors() == 0 {
// println!("Removing captcha addvivi");
// captchas.remove(&id);
// }
};
tokio::spawn(fut);
Some(libmcaptcha::master::AddVisitorResult {
duration: captcha.duration,
difficulty_factor,
})
} else {
None
}
}
pub fn get_internal_data(&self) -> HashMap<String, libmcaptcha::mcaptcha::MCaptcha> {
let mut res = HashMap::with_capacity(self.captchas.len());
for value in self.captchas.iter() {
res.insert(value.key().to_owned(), value.value().as_ref().into());
}
res
}
pub fn set_internal_data(&self, mut map: HashMap<String, libmcaptcha::mcaptcha::MCaptcha>) {
for (id, captcha) in map.drain() {
let visitors = captcha.get_visitors();
let new_captcha: MCaptcha = (&captcha).into();
let new_captcha = Arc::new(new_captcha);
self.captchas.insert(id.clone(), new_captcha.clone());
let msg = ManagerMessages::AddVisitor(id);
for _ in 0..visitors {
self.add_visitor(&msg);
}
}
}
}
impl From<&libmcaptcha::mcaptcha::MCaptcha> for MCaptcha {
fn from(value: &libmcaptcha::mcaptcha::MCaptcha) -> Self {
let mut defense = super::defense::DefenseBuilder::default();
for level in value.get_defense().get_levels() {
let _ = defense.add_level(level);
}
let defense = defense.build().unwrap();
let new_captcha = MCaptchaBuilder::default()
.defense(defense)
.duration(value.get_duration())
.build()
.unwrap();
// for _ in 0..value.get_visitors() {
// new_captcha.add_visitor();
// }
new_captcha
}
}
impl From<&MCaptcha> for libmcaptcha::mcaptcha::MCaptcha {
fn from(value: &MCaptcha) -> Self {
let mut defense = libmcaptcha::defense::DefenseBuilder::default();
for level in value.defense.get_levels().drain(0..) {
let _ = defense.add_level(level);
}
let defense = defense.build().unwrap();
let mut new_captcha = libmcaptcha::mcaptcha::MCaptchaBuilder::default()
.defense(defense)
.duration(value.duration)
.build()
.unwrap();
for _ in 0..value.get_visitors() {
new_captcha.add_visitor();
}
new_captcha
}
}
#[cfg(test)]
mod tests {
use super::*;
use libmcaptcha::defense::LevelBuilder;
use libmcaptcha::master::messages::*;
pub const LEVEL_1: (u32, u32) = (50, 50);
pub const LEVEL_2: (u32, u32) = (500, 500);
pub const DURATION: u64 = 5;
use crate::mcaptcha::defense::*;
pub fn get_defense() -> Defense {
DefenseBuilder::default()
.add_level(
LevelBuilder::default()
.visitor_threshold(LEVEL_1.0)
.difficulty_factor(LEVEL_1.1)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.add_level(
LevelBuilder::default()
.visitor_threshold(LEVEL_2.0)
.difficulty_factor(LEVEL_2.1)
.unwrap()
.build()
.unwrap(),
)
.unwrap()
.build()
.unwrap()
}
async fn race(manager: &Manager, id: String, count: (u32, u32)) {
let msg = ManagerMessages::AddVisitor(id);
for _ in 0..count.0 as usize - 1 {
manager.add_visitor(&msg);
}
}
// pub fn get_counter() -> Counter {
// get_mcaptcha().into()
// }
pub fn get_mcaptcha() -> MCaptcha {
MCaptchaBuilder::default()
.defense(get_defense())
.duration(DURATION)
.build()
.unwrap()
}
#[actix_rt::test]
async fn manager_works() {
let manager = Manager::new(1);
// let get_add_site_msg = |id: String, mcaptcha: MCaptcha| {
// AddSiteBuilder::default()
// .id(id)
// .mcaptcha(mcaptcha)
// .build()
// .unwrap()
// };
let id = "yo";
manager.add_captcha(Arc::new(get_mcaptcha()), id.into());
let mcaptcha_addr = manager.get_captcha(id);
assert!(mcaptcha_addr.is_some());
let mut mcaptcha_data = manager.get_internal_data();
mcaptcha_data.get_mut(id).unwrap().add_visitor();
mcaptcha_data.get_mut(id).unwrap().add_visitor();
mcaptcha_data.get_mut(id).unwrap().add_visitor();
// let mcaptcha_data: HashMap<String, libmcaptcha::mcaptcha::MCaptcha> = {
// let serialized = serde_json::to_string(&mcaptcha_data).unwrap();
// serde_json::from_str(&serialized).unwrap()
// };
// println!("{:?}", mcaptcha_data);
manager.set_internal_data(mcaptcha_data);
let mcaptcha_data = manager.get_internal_data();
assert_eq!(
manager.get_captcha(id).unwrap().get_visitors(),
mcaptcha_data.get(id).unwrap().get_visitors()
);
let new_id = "yoyo";
manager.rename(id, new_id.into());
{
let mcaptcha_addr = manager.get_captcha(new_id);
assert!(mcaptcha_addr.is_some());
let addr_doesnt_exist = manager.get_captcha(id);
assert!(addr_doesnt_exist.is_none());
let timer_expire = Duration::new(DURATION, 0);
tokio::time::sleep(timer_expire).await;
tokio::time::sleep(timer_expire).await;
}
// Manager::gc(manager.captchas.clone());
// let mcaptcha_addr = manager.get_captcha(new_id);
// assert_eq!(mcaptcha_addr.as_ref().unwrap().get_visitors(), 0);
// assert!(mcaptcha_addr.is_none());
//
// assert!(
// manager.rm_captcha(new_id.into()).is_some());
}
#[actix_rt::test]
async fn counter_defense_works() {
let manager = Manager::new(1);
let id = "yo";
manager.add_captcha(Arc::new(get_mcaptcha()), id.into());
let mut mcaptcha = manager
.add_visitor(&ManagerMessages::AddVisitor(id.to_string()))
.unwrap();
assert_eq!(mcaptcha.difficulty_factor, LEVEL_1.0);
race(&manager, id.to_string(), LEVEL_2).await;
mcaptcha = manager
.add_visitor(&ManagerMessages::AddVisitor(id.to_string()))
.unwrap();
assert_eq!(mcaptcha.difficulty_factor, LEVEL_2.1);
tokio::time::sleep(Duration::new(DURATION * 2, 0)).await;
assert_eq!(manager.get_captcha(id).unwrap().get_visitors(), 0);
}
}
//
//#[cfg(test)]
//pub mod tests {
// use super::*;
// use crate::defense::*;
// use crate::errors::*;
// use crate::mcaptcha;
// use crate::mcaptcha::MCaptchaBuilder;
//
// // constants for testing
// // (visitor count, level)
// pub const LEVEL_1: (u32, u32) = (50, 50);
// pub const LEVEL_2: (u32, u32) = (500, 500);
// pub const DURATION: u64 = 5;
//
// type MyActor = Addr<Counter>;
//
// pub fn get_defense() -> Defense {
// DefenseBuilder::default()
// .add_level(
// LevelBuilder::default()
// .visitor_threshold(LEVEL_1.0)
// .difficulty_factor(LEVEL_1.1)
// .unwrap()
// .build()
// .unwrap(),
// )
// .unwrap()
// .add_level(
// LevelBuilder::default()
// .visitor_threshold(LEVEL_2.0)
// .difficulty_factor(LEVEL_2.1)
// .unwrap()
// .build()
// .unwrap(),
// )
// .unwrap()
// .build()
// .unwrap()
// }
//
// async fn race(addr: Addr<Counter>, count: (u32, u32)) {
// for _ in 0..count.0 as usize - 1 {
// let _ = addr.send(AddVisitor).await.unwrap();
// }
// }
//
// pub fn get_counter() -> Counter {
// get_mcaptcha().into()
// }
//
// pub fn get_mcaptcha() -> MCaptcha {
// MCaptchaBuilder::default()
// .defense(get_defense())
// .duration(DURATION)
// .build()
// .unwrap()
// }
//
// #[test]
// fn mcaptcha_decrement_by_works() {
// let mut m = get_mcaptcha();
// for _ in 0..100 {
// m.add_visitor();
// }
// m.decrement_visitor_by(50);
// assert_eq!(m.get_visitors(), 50);
// m.decrement_visitor_by(500);
// assert_eq!(m.get_visitors(), 0);
// }
//
//
// #[actix_rt::test]
// async fn counter_defense_loosenup_works() {
// //use actix::clock::sleep;
// //use actix::clock::delay_for;
// let addr: MyActor = get_counter().start();
//
// race(addr.clone(), LEVEL_2).await;
// race(addr.clone(), LEVEL_2).await;
// let mut mcaptcha = addr.send(AddVisitor).await.unwrap();
// assert_eq!(mcaptcha.difficulty_factor, LEVEL_2.1);
//
// let duration = Duration::new(DURATION, 0);
// sleep(duration).await;
// //delay_for(duration).await;
//
// mcaptcha = addr.send(AddVisitor).await.unwrap();
// assert_eq!(mcaptcha.difficulty_factor, LEVEL_1.1);
// }
//
// #[test]
// fn test_mcatcptha_builder() {
// let defense = get_defense();
// let m = MCaptchaBuilder::default()
// .duration(0)
// .defense(defense.clone())
// .build();
//
// assert_eq!(m.err(), Some(CaptchaError::CaptchaDurationZero));
//
// let m = MCaptchaBuilder::default().duration(30).build();
// assert_eq!(
// m.err(),
// Some(CaptchaError::PleaseSetValue("defense".into()))
// );
//
// let m = MCaptchaBuilder::default().defense(defense).build();
// assert_eq!(
// m.err(),
// Some(CaptchaError::PleaseSetValue("duration".into()))
// );
// }
//
// #[actix_rt::test]
// async fn get_current_visitor_count_works() {
// let addr: MyActor = get_counter().start();
//
// addr.send(AddVisitor).await.unwrap();
// addr.send(AddVisitor).await.unwrap();
// addr.send(AddVisitor).await.unwrap();
// addr.send(AddVisitor).await.unwrap();
// let count = addr.send(GetCurrentVisitorCount).await.unwrap();
//
// assert_eq!(count, 4);
// }
//
// #[actix_rt::test]
// #[should_panic]
// async fn stop_works() {
// let addr: MyActor = get_counter().start();
// addr.send(Stop).await.unwrap();
// addr.send(AddVisitor).await.unwrap();
// }
//
// #[actix_rt::test]
// async fn get_set_internal_data_works() {
// let addr: MyActor = get_counter().start();
// let mut mcaptcha = addr.send(GetInternalData).await.unwrap();
// mcaptcha.add_visitor();
// addr.send(SetInternalData(mcaptcha.clone())).await.unwrap();
// assert_eq!(
// addr.send(GetInternalData).await.unwrap().get_visitors(),
// mcaptcha.get_visitors()
// );
//
// let duration = Duration::new(mcaptcha.get_duration() + 3, 0);
// sleep(duration).await;
// assert_eq!(addr.send(GetCurrentVisitorCount).await.unwrap(), 0);
// }
//
// #[actix_rt::test]
// async fn bulk_delete_works() {
// let addr: MyActor = get_counter().start();
// addr.send(AddVisitor).await.unwrap();
// addr.send(AddVisitor).await.unwrap();
// assert_eq!(addr.send(GetCurrentVisitorCount).await.unwrap(), 2);
// addr.send(BulkDecrement(3)).await.unwrap();
// assert_eq!(addr.send(GetCurrentVisitorCount).await.unwrap(), 0);
// }
//}

3
src/mcaptcha/mod.rs Normal file
View file

@ -0,0 +1,3 @@
pub mod cache;
mod defense;
pub mod mcaptcha;

View file

@ -16,7 +16,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use std::time::Duration;
use super::management::HealthStatus;
use crate::DcacheNodeId;
use crate::DcacheTypeConfig;
use async_trait::async_trait;
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
@ -34,17 +38,37 @@ use openraft::RaftNetworkFactory;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::mpsc::Sender;
use tonic::transport::channel::Channel;
use tower_service::Service;
use super::management::HealthStatus;
use crate::DcacheNodeId;
use crate::DcacheTypeConfig;
use crate::pool::*;
use crate::protobuf::dcache::dcache_service_client::DcacheServiceClient;
use crate::protobuf::dcache::RaftRequest;
#[derive(Clone)]
#[derive(Debug)]
struct ChannelManager {}
#[async_trait]
impl ItemManager for ChannelManager {
type Key = String;
type Item = Channel;
type Error = tonic::transport::Error;
async fn build(&self, addr: &Self::Key) -> Result<Channel, tonic::transport::Error> {
tonic::transport::Endpoint::new(addr.clone())?
.connect()
.await
}
async fn check(&self, mut ch: Channel) -> Result<Channel, tonic::transport::Error> {
futures::future::poll_fn(|cx| (&mut ch).poll_ready(cx)).await?;
Ok(ch)
}
}
pub struct DcacheNetwork {
pub signal: Sender<HealthStatus>,
conn_pool: Pool<ChannelManager>,
}
pub enum RPCType {
@ -55,7 +79,13 @@ pub enum RPCType {
impl DcacheNetwork {
pub fn new(signal: Sender<HealthStatus>) -> Self {
Self { signal }
let mgr = ChannelManager {};
Self {
signal,
conn_pool: Pool::new(mgr, Duration::from_millis(50)),
}
}
pub async fn send_rpc<Req, Resp, Err>(
&self,
@ -69,11 +99,7 @@ impl DcacheNetwork {
Err: std::error::Error + DeserializeOwned,
Resp: DeserializeOwned,
{
let addr = &target_node.addr;
let url = format!("http://{}", addr);
let mut client = DcacheServiceClient::connect(url).await.unwrap();
let mut client = self.make_client(&target, target_node).await;
let res = match event {
RPCType::Vote => {
@ -117,6 +143,23 @@ impl DcacheNetwork {
}
}
}
pub async fn make_client(
&self,
target: &DcacheNodeId,
target_node: &BasicNode,
) -> DcacheServiceClient<Channel> {
let addr = format!("http://{}", &target_node.addr);
tracing::debug!("connect: target={}: {}", target, addr);
let channel = self.conn_pool.get(&addr).await.unwrap();
let client = DcacheServiceClient::new(channel);
tracing::info!("connected: target={}: {}", target, addr);
client
}
}
// NOTE: This could be implemented also on `Arc<DcacheNetwork>`, but since it's empty, implemented

154
src/pool.rs Normal file
View file

@ -0,0 +1,154 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use async_trait::async_trait;
use tokio::time::sleep;
//use log::debug;
//use crate::base::tokio;
pub type PoolItem<T> = Arc<tokio::sync::Mutex<Option<T>>>;
/// To build or check an item.
///
/// When an item is requested, ItemManager `build()` one for the pool.
/// When an item is reused, ItemManager `check()` if it is still valid.
#[async_trait]
pub trait ItemManager {
type Key;
type Item;
type Error;
/// Make a new item to put into the pool.
///
/// An impl should hold that an item returned by `build()` is passed `check()`.
async fn build(&self, key: &Self::Key) -> Result<Self::Item, Self::Error>;
/// Check if an existent item still valid.
///
/// E.g.: check if a tcp connection still alive.
/// If the item is valid, `check` should return it in a Ok().
/// Otherwise, the item should be dropped and `check` returns an Err().
async fn check(&self, item: Self::Item) -> Result<Self::Item, Self::Error>;
}
/// Pool assumes the items in it is `Clone`, thus it keeps only one item for each key.
#[allow(clippy::type_complexity)]
#[derive(Debug, Clone)]
pub struct Pool<Mgr>
where
Mgr: ItemManager + Debug,
{
/// The first sleep time when `build()` fails.
/// The next sleep time is 2 times of the previous one.
pub initial_retry_interval: Duration,
/// Pooled items indexed by key.
pub items: Arc<Mutex<HashMap<Mgr::Key, PoolItem<Mgr::Item>>>>,
manager: Mgr,
err_type: PhantomData<Mgr::Error>,
n_retries: u32,
}
impl<Mgr> Pool<Mgr>
where
Mgr: ItemManager + Debug,
Mgr::Key: Clone + Eq + Hash + Send + Debug,
Mgr::Item: Clone + Sync + Send + Debug,
Mgr::Error: Sync + Debug,
{
pub fn new(manager: Mgr, initial_retry_interval: Duration) -> Self {
Pool {
initial_retry_interval,
items: Default::default(),
manager,
err_type: Default::default(),
n_retries: 3,
}
}
pub fn with_retries(mut self, retries: u32) -> Self {
self.n_retries = retries;
self
}
pub fn item_manager(&self) -> &Mgr {
&self.manager
}
/// Return an raw pool item.
///
/// The returned one may be an uninitialized one, i.e., it contains a None.
/// The lock for `items` should not be held for long, e.g. when `build()` a new connection, it takes dozen ms.
fn get_pool_item(&self, key: &Mgr::Key) -> PoolItem<Mgr::Item> {
let mut items = self.items.lock().unwrap();
if let Some(item) = items.get(key) {
item.clone()
} else {
let item = PoolItem::default();
items.insert(key.clone(), item.clone());
item
}
}
/// Return a item, by cloning an existent one or making a new one.
///
/// When returning an existent one, `check()` will be called on it to ensure it is still valid.
/// E.g., when returning a tcp connection.
// #[logcall::logcall(err = "debug")]
// #[minitrace::trace]
pub async fn get(&self, key: &Mgr::Key) -> Result<Mgr::Item, Mgr::Error> {
let pool_item = self.get_pool_item(key);
let mut guard = pool_item.lock().await;
let item_opt = (*guard).clone();
if let Some(ref item) = item_opt {
let check_res = self.manager.check(item.clone()).await;
// debug!("check reused item res: {:?}", check_res);
if let Ok(itm) = check_res {
return Ok(itm);
} else {
// mark broken conn as deleted
*guard = None;
}
}
let mut interval = self.initial_retry_interval;
for i in 0..self.n_retries {
// debug!("build new item of key: {:?}", key);
let new_item = self.manager.build(key).await;
// debug!("build new item of key res: {:?}", new_item);
match new_item {
Ok(x) => {
*guard = Some(x.clone());
return Ok(x);
}
Err(err) => {
if i == self.n_retries - 1 {
return Err(err);
}
}
}
sleep(interval).await;
interval *= 2;
}
unreachable!("the loop should always return!");
}
}

View file

@ -129,6 +129,36 @@ impl DcacheService for MyDcacheImpl {
Ok(Response::new(res.into()))
}
async fn retrieve_pow(
&self,
request: tonic::Request<dcache::RetrievePowRequest>,
) -> std::result::Result<tonic::Response<dcache::OptionalRetrievePoWResponse>, tonic::Status>
{
let req = request.into_inner();
let sm = self.app.store.state_machine.read().await;
let res = sm.results.retrieve_pow_config(req.into());
Ok(Response::new(dcache::OptionalRetrievePoWResponse {
result: res.map(|x| x.into()),
}))
}
async fn delete_pow(
&self,
request: tonic::Request<dcache::DeletePowRequest>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
.app
.raft
.client_write(DcacheRequest::DeletePoW(CacheMessages::DeletePoW(
req.string,
)))
.await;
Ok(Response::new(res.into()))
}
async fn cache_result(
&self,
request: tonic::Request<dcache::CacheResultRequest>,
@ -142,6 +172,62 @@ impl DcacheService for MyDcacheImpl {
Ok(Response::new(res.into()))
}
async fn verify_captcha_result(
&self,
request: tonic::Request<dcache::RetrievePowRequest>,
) -> std::result::Result<tonic::Response<dcache::CaptchaResultVerified>, tonic::Status> {
let req = request.into_inner();
let sm = self.app.store.state_machine.read().await;
let verified = sm.results.verify_captcha_result(req.into());
Ok(Response::new(dcache::CaptchaResultVerified { verified }))
}
async fn delete_captcha_result(
&self,
request: tonic::Request<dcache::DeleteCaptchaResultRequest>,
) -> std::result::Result<tonic::Response<dcache::RaftReply>, tonic::Status> {
let req = request.into_inner();
let res = self
.app
.raft
.client_write(DcacheRequest::DeleteCaptchaResult(
CacheMessages::DeleteCaptchaResult { token: req.token },
))
.await;
Ok(Response::new(res.into()))
}
async fn captcha_exists(
&self,
request: tonic::Request<dcache::CaptchaId>,
) -> std::result::Result<tonic::Response<dcache::CaptchaExistsResponse>, tonic::Status> {
let req = request.into_inner();
let sm = self.app.store.state_machine.read().await;
let exists = sm.counter.get_captcha(&req.id).is_some();
Ok(Response::new(dcache::CaptchaExistsResponse { exists }))
}
async fn get_visitor_count(
&self,
request: tonic::Request<dcache::CaptchaId>,
) -> std::result::Result<tonic::Response<dcache::OptionGetVisitorCountResponse>, tonic::Status>
{
let req = request.into_inner();
let sm = self.app.store.state_machine.read().await;
if let Some(captcha) = sm.counter.get_captcha(&req.id) {
let res = captcha.get_visitors();
Ok(Response::new(dcache::OptionGetVisitorCountResponse {
result: Some(dcache::GetVisitorCountResponse { visitors: res }),
}))
} else {
Ok(Response::new(dcache::OptionGetVisitorCountResponse {
result: None,
}))
}
}
// type PipelineDcacheOpsStream =
// Pin<Box<dyn Stream<Item = Result<OuterPipelineRes, tonic::Status>> + Send + 'static>>;
@ -195,7 +281,6 @@ impl DcacheService for MyDcacheImpl {
_ => unimplemented!(),
},
}
}
PipelineReq::RenameCaptcha(rename_captcha_req) => {
let res = self
@ -239,6 +324,35 @@ impl DcacheService for MyDcacheImpl {
dcache_response: Some(InnerPipelineRes::Other(res.into())),
}
}
PipelineReq::CaptchaExists(captcha_exists_req) => {
let sm = self.app.store.state_machine.read().await;
let exists = sm.counter.get_captcha(&captcha_exists_req.id).is_some();
let res = dcache::CaptchaExistsResponse { exists };
drop(sm);
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::CaptchaExists(res)),
}
}
PipelineReq::GetVisitorCount(get_visitor_count_req) => {
let sm = self.app.store.state_machine.read().await;
if let Some(captcha) = sm.counter.get_captcha(&get_visitor_count_req.id) {
let res = captcha.get_visitors();
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::GetVisitorCount(
dcache::OptionGetVisitorCountResponse {
result: Some(dcache::GetVisitorCountResponse { visitors: res }),
},
)),
}
} else {
OuterPipelineRes {
dcache_response: Some(InnerPipelineRes::GetVisitorCount(
dcache::OptionGetVisitorCountResponse { result: None },
)),
}
}
}
};
responses.push(res);
}
@ -391,6 +505,27 @@ impl From<dcache::CachePowRequest> for CacheMessages::CachePoW {
}
}
impl From<CacheMessages::CachePoW> for dcache::CachePowRequest {
fn from(value: CacheMessages::CachePoW) -> Self {
Self {
string: value.string,
difficulty_factor: value.difficulty_factor,
duration: value.duration,
key: value.key,
}
}
}
impl From<CacheMessages::CachedPoWConfig> for dcache::RetrievePowResponse {
fn from(value: CacheMessages::CachedPoWConfig) -> Self {
Self {
difficulty_factor: value.difficulty_factor,
duration: value.duration,
key: value.key,
}
}
}
impl From<dcache::CacheResultRequest> for CacheMessages::CacheResult {
fn from(value: dcache::CacheResultRequest) -> Self {
Self {
@ -400,3 +535,12 @@ impl From<dcache::CacheResultRequest> for CacheMessages::CacheResult {
}
}
}
impl From<dcache::RetrievePowRequest> for CacheMessages::VerifyCaptchaResult {
fn from(value: dcache::RetrievePowRequest) -> Self {
Self {
token: value.token,
key: value.key,
}
}
}

View file

@ -94,7 +94,9 @@ pub struct DcacheStateMachine {
pub last_membership: StoredMembership<DcacheNodeId, BasicNode>,
/// Application data.
pub data: Arc<System<HashCache, EmbeddedMaster>>,
// pub data: Arc<System<HashCache, EmbeddedMaster>>,
pub counter: crate::mcaptcha::mcaptcha::Manager,
pub results: crate::mcaptcha::cache::HashCache,
}
#[derive(Serialize, Deserialize, Clone)]
@ -103,42 +105,34 @@ struct PersistableStateMachine {
last_membership: StoredMembership<DcacheNodeId, BasicNode>,
/// Application data.
data: HashMap<String, MCaptcha>,
counter: crate::mcaptcha::mcaptcha::Manager,
results: crate::mcaptcha::cache::HashCache,
}
impl PersistableStateMachine {
async fn from_statemachine(m: &DcacheStateMachine) -> Self {
let internal_data = m
.data
.master
.send(GetInternalData)
.await
.unwrap()
.await
.unwrap()
.unwrap();
let counter = m.counter.clone();
let results = m.results.clone();
Self {
last_applied_log: m.last_applied_log,
last_membership: m.last_membership.clone(),
data: internal_data,
counter,
results,
}
}
async fn to_statemachine(
self,
data: Arc<System<HashCache, EmbeddedMaster>>,
counter: crate::mcaptcha::mcaptcha::Manager,
results: crate::mcaptcha::cache::HashCache,
) -> DcacheStateMachine {
data.master
.send(SetInternalData {
mcaptcha: self.data,
})
.await
.unwrap();
self.counter.clean_all_after_cold_start(counter).await;
self.results.clean_all_after_cold_start(results).await;
DcacheStateMachine {
last_applied_log: self.last_applied_log,
last_membership: self.last_membership,
data,
results: self.results,
counter: self.counter,
}
}
}
@ -165,7 +159,8 @@ impl DcacheStore {
let state_machine = RwLock::new(DcacheStateMachine {
last_applied_log: Default::default(),
last_membership: Default::default(),
data: system::init_system(salt),
counter: crate::mcaptcha::mcaptcha::Manager::new(30),
results: crate::mcaptcha::cache::HashCache::default(),
});
Self {
@ -391,83 +386,42 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
EntryPayload::Blank => res.push(DcacheResponse::Empty),
EntryPayload::Normal(ref req) => match req {
DcacheRequest::AddVisitor(msg) => {
let r = sm
.data
.master
.send(msg.clone())
.await
.unwrap()
.await
.unwrap()
.unwrap();
let r = sm.counter.add_visitor(msg);
res.push(DcacheResponse::AddVisitorResult(r));
}
DcacheRequest::AddCaptcha(msg) => {
sm.data
.master
.send(msg.clone())
.await
.unwrap()
.await
.unwrap()
.unwrap();
sm.counter
.add_captcha(Arc::new((&msg.mcaptcha).into()), msg.id.clone());
res.push(DcacheResponse::Empty);
}
DcacheRequest::RenameCaptcha(msg) => {
sm.data
.master
.send(msg.clone())
.await
.unwrap()
.await
.unwrap()
.unwrap();
sm.counter.rename(&msg.name, msg.rename_to.clone());
res.push(DcacheResponse::Empty);
}
DcacheRequest::RemoveCaptcha(msg) => {
sm.data
.master
.send(msg.clone())
.await
.unwrap()
.await
.unwrap()
.unwrap();
sm.counter.rm_captcha(&msg.0);
res.push(DcacheResponse::Empty);
}
// cache
DcacheRequest::CachePoW(msg) => {
sm.data
.cache
.send(msg.clone())
.await
.unwrap()
.await
.unwrap()
.unwrap();
sm.results.cache_pow(msg.clone());
res.push(DcacheResponse::Empty);
}
DcacheRequest::DeletePoW(msg) => {
sm.data.cache.send(msg.clone()).await.unwrap().unwrap();
sm.results.remove_pow_config(&msg.0);
// sm.data.cache.send(msg.clone()).await.unwrap().unwrap();
res.push(DcacheResponse::Empty);
}
DcacheRequest::CacheResult(msg) => {
sm.data
.cache
.send(msg.clone())
.await
.unwrap()
.await
.unwrap()
.unwrap();
sm.results.cache_result(msg.clone());
res.push(DcacheResponse::Empty);
}
DcacheRequest::DeleteCaptchaResult(msg) => {
sm.data.cache.send(msg.clone()).await.unwrap().unwrap();
sm.results.remove_cache_result(&msg.token);
res.push(DcacheResponse::Empty);
}
},
@ -514,7 +468,7 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
})?;
let mut state_machine = self.state_machine.write().await;
let updated_state_machine = updated_state_machine
.to_statemachine(state_machine.data.clone())
.to_statemachine(state_machine.counter.clone(), state_machine.results.clone())
.await;
*state_machine = updated_state_machine;
}
@ -549,3 +503,19 @@ impl RaftStorage<DcacheTypeConfig> for Arc<DcacheStore> {
self.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn provision_dcache_store() -> Arc<DcacheStore> {
Arc::new(DcacheStore::new(
"adsfasdfasdfadsfadfadfadfadsfasdfasdfasdfasdf".into(),
))
}
#[test]
fn test_dcache_store() {
openraft::testing::Suite::test_all(provision_dcache_store).unwrap();
}
}