dcache/tests/dcache.py

127 lines
3.5 KiB
Python
Raw Normal View History

2023-12-31 01:19:53 +05:30
import requests
import grpc
import json
from dcache_py import dcache_pb2 as dcache
from dcache_py.dcache_pb2 import RaftRequest
from dcache_py.dcache_pb2_grpc import DcacheServiceStub
host = "localhost:9001"
2023-12-31 01:28:59 +05:30
2023-12-31 01:19:53 +05:30
def grpc_add_vote(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CaptchaID(id=captcha_id)
resp = stub.AddVisitor(msg)
return resp.result
def grpc_add_captcha(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.AddCaptchaRequest(
id=captcha_id,
mcaptcha=dcache.MCaptcha(
duration=5,
defense=dcache.Defense(
levels=[
dcache.Level(visitor_threshold=50, difficulty_factor=50),
dcache.Level(visitor_threshold=500, difficulty_factor=500),
]
),
),
)
resp = stub.AddCaptcha(msg)
return resp
def grpc_captcha_exists(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CaptchaID(id=captcha_id)
resp = stub.CaptchaExists(msg)
return resp.exists
2023-12-31 01:28:59 +05:30
def grpc_rename_captcha(captcha_id: str, new_id: str):
2023-12-31 01:19:53 +05:30
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.RenameCaptchaRequest(name=captcha_id, rename_to=new_id)
resp = stub.RenameCaptcha(msg)
2023-12-31 01:28:59 +05:30
def grpc_delete_captcha(captcha_id: str):
2023-12-31 01:19:53 +05:30
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CaptchaID(id=captcha_id)
stub.RemoveCaptcha(msg)
def grpc_get_visitor_count(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CaptchaID(id=captcha_id)
return stub.GetVisitorCount(msg).result
def grpc_add_challenge(token: str, key: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CacheResultRequest(
2023-12-31 01:28:59 +05:30
token=token,
key=key,
duration=5,
)
2023-12-31 01:19:53 +05:30
stub.CacheResult(msg)
def grpc_get_challenge(token: str, key: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.RetrievePowRequest(
2023-12-31 01:28:59 +05:30
token=token,
key=key,
)
2023-12-31 01:19:53 +05:30
return stub.VerifyCaptchaResult(msg)
2023-12-31 01:28:59 +05:30
2023-12-31 01:19:53 +05:30
def grpc_delete_challenge(token: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.DeleteCaptchaResultRequest(
2023-12-31 01:28:59 +05:30
token=token,
)
2023-12-31 01:19:53 +05:30
stub.DeleteCaptchaResult(msg)
2023-12-31 01:28:59 +05:30
2023-12-31 01:19:53 +05:30
def grpc_add_pow(token: str, string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CachePowRequest(
2023-12-31 01:28:59 +05:30
key=token, string=string, duration=5, difficulty_factor=500
)
2023-12-31 01:19:53 +05:30
return stub.CachePow(msg)
def grpc_get_pow(token: str, string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
2023-12-31 01:28:59 +05:30
msg = dcache.RetrievePowRequest(token=string, key=token)
resp = stub.RetrievePow(msg)
2023-12-31 01:19:53 +05:30
return resp
2023-12-31 01:28:59 +05:30
2023-12-31 01:19:53 +05:30
def grpc_delete_pow(string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.DeletePowRequest(
2023-12-31 01:28:59 +05:30
string=string,
)
2023-12-31 01:19:53 +05:30
stub.DeletePow(msg)