dcache/test.py

179 lines
5.2 KiB
Python
Executable File

#!/bin/env /bin/python
# mCaptcha - A proof of work based DoS protection system
# Copyright © 2023 Aravinth Manivannan <realravinth@batsense.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pprint import pprint
import requests
import grpc
import json
from dcache_py import dcache_pb2 as dcache
from dcache_py.dcache_pb2 import RaftRequest
from dcache_py.dcache_pb2_grpc import DcacheServiceStub
# import dcache_py.dcache_resources
def init(host: str):
resp = requests.post(f"http://{host}/init")
print(f"Initialization status: {resp.status_code}")
def add_host(host: str, id: int, peer: str):
params = [id, peer]
resp = requests.post(f"http://{host}/add-learner", json=params)
print(f"Adding host {peer}. Status: {resp.status_code}")
def switch_to_cluster(host: str, nodes: [int]):
resp = requests.post(f"http://{host}/change-membership", json=nodes)
print(f"Switching to cluster. Status: {resp.status_code}")
def metrics(host: str):
resp = requests.get(f"http://{host}/metrics")
data = resp.json()
pprint(data)
def write(host, data):
resp = requests.post(f"http://{host}/write", json=data)
print(f"RPC Status: {resp.status_code}")
resp = resp.json()
if "Err" in resp:
leader = resp["Err"]["APIError"]["ForwardToLeader"]["leader_node"]["addr"]
print(f"Forwarding write to leader {leader}")
return write(leader, data)
return resp["Ok"]["data"]
def add_vote(host: str, captcha_id: str):
resp = write(host, data={"AddVisitor": captcha_id})
pprint(resp)
def add_captcha(host: str, captcha_id: str):
params = {
"AddCaptcha": {
"id": captcha_id,
"mcaptcha": {
"visitor_threshold": 0,
"defense": {
"levels": [
{"visitor_threshold": 50, "difficulty_factor": 500},
{"visitor_threshold": 5000, "difficulty_factor": 50000},
],
"current_visitor_threshold": 0,
},
"duration": 30,
},
}
}
resp = write(host, data=params)
pprint(f"Captcha added {captcha_id}: {resp}")
host = "localhost:9001"
peers = [(2, "localhost:9002"), (3, "localhost:9003"), (4, "localhost:9004")]
captcha_id = "test_1"
def initialize_cluster():
init(host)
for peer_id, peer in peers:
add_host(host=host, id=peer_id, peer=peer)
switch_to_cluster(host, nodes=[1, 2, 3, 4])
add_captcha(host, captcha_id)
add_vote(host, captcha_id)
for _ in range(0, 600):
add_vote(host, captcha_id)
def grpc_add_vote(stub: DcacheServiceStub, captcha_id: str):
msg = dcache.CaptchaID(id=captcha_id)
# msg = RaftRequest(data=json.dumps({"AddVisitor": captcha_id}))
# resp = stub.Write(msg)
resp = stub.AddVisitor(msg)
pprint(resp)
def grpc_add_captcha(stub: DcacheServiceStub, captcha_id: str):
msg = dcache.AddCaptchaRequest(
id=captcha_id,
mcaptcha=dcache.MCaptcha(
duration=30,
defense=dcache.Defense(
levels=[
dcache.Level(visitor_threshold=50, difficulty_factor=500),
dcache.Level(visitor_threshold=5000, difficulty_factor=50000),
]
),
),
)
# params = {
# "AddCaptcha": {
# "id": captcha_id,
# "mcaptcha": {
# "defense": {
# "levels": [
# {"visitor_threshold": 50, "difficulty_factor": 500},
# {"visitor_threshold": 5000, "difficulty_factor": 50000},
# ],
# "current_visitor_threshold": 0,
# },
# "duration": 30,
# },
# }
# }
# msg = RaftRequest(data = json.dumps(params))
resp = stub.AddCaptcha(msg)
pprint(f"Captcha added {captcha_id}: {resp}")
msgs = []
for _ in range(0,1000):
msgs.append(
dcache.DcacheRequest(addVisitor=dcache.CaptchaID(id=captcha_id)),
)
msgs = dcache.DcacheBatchRequest(requests=msgs)
def grpc_pipeline_add_vote(stub):
responses = stub.PipelineDcacheOps(msgs)
for r in responses.responses:
print(f"received respo: {r}")
def grpc_run():
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
grpc_add_captcha(stub, captcha_id)
grpc_pipeline_add_vote(stub)
#grpc_add_vote(stub, captcha_id)
if __name__ == "__main__":
grpc_run()
# add_vote("localhost:9002", captcha_id)