dcache/tests/dcache.py

126 lines
3.5 KiB
Python

import requests
import grpc
import json
from dcache_py import dcache_pb2 as dcache
from dcache_py.dcache_pb2 import RaftRequest
from dcache_py.dcache_pb2_grpc import DcacheServiceStub
host = "localhost:9001"
def grpc_add_vote(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CaptchaID(id=captcha_id)
resp = stub.AddVisitor(msg)
return resp.result
def grpc_add_captcha(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.AddCaptchaRequest(
id=captcha_id,
mcaptcha=dcache.MCaptcha(
duration=5,
defense=dcache.Defense(
levels=[
dcache.Level(visitor_threshold=50, difficulty_factor=50),
dcache.Level(visitor_threshold=500, difficulty_factor=500),
]
),
),
)
resp = stub.AddCaptcha(msg)
return resp
def grpc_captcha_exists(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CaptchaID(id=captcha_id)
resp = stub.CaptchaExists(msg)
return resp.exists
def grpc_rename_captcha(captcha_id: str, new_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.RenameCaptchaRequest(name=captcha_id, rename_to=new_id)
resp = stub.RenameCaptcha(msg)
def grpc_delete_captcha(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CaptchaID(id=captcha_id)
stub.RemoveCaptcha(msg)
def grpc_get_visitor_count(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CaptchaID(id=captcha_id)
return stub.GetVisitorCount(msg).result
def grpc_add_challenge(token: str, key: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CacheResultRequest(
token=token,
key=key,
duration=5,
)
stub.CacheResult(msg)
def grpc_get_challenge(token: str, key: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.RetrievePowRequest(
token=token,
key=key,
)
return stub.VerifyCaptchaResult(msg)
def grpc_delete_challenge(token: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.DeleteCaptchaResultRequest(
token=token,
)
stub.DeleteCaptchaResult(msg)
def grpc_add_pow(token: str, string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CachePowRequest(
key=token, string=string, duration=5, difficulty_factor=500
)
return stub.CachePow(msg)
def grpc_get_pow(token: str, string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.RetrievePowRequest(token=string, key=token)
resp = stub.RetrievePow(msg)
return resp
def grpc_delete_pow(string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.DeletePowRequest(
string=string,
)
stub.DeletePow(msg)