feat: use grpc within locust
Some checks failed
ci/woodpecker/pr/woodpecker Pipeline is pending
ci/woodpecker/push/woodpecker Pipeline failed

This commit is contained in:
Aravinth Manivannan 2023-12-28 13:44:21 +05:30
parent e50b7a5751
commit 337f89f25a
Signed by: realaravinth
GPG key ID: F8F50389936984FF
7 changed files with 784 additions and 64 deletions

64
bench/adaptor.py Normal file
View file

@ -0,0 +1,64 @@
import time
from typing import Any, Callable
import grpc
import grpc.experimental.gevent as grpc_gevent
from grpc_interceptor import ClientInterceptor
from locust import User
from locust.exception import LocustError
# patch grpc so that it uses gevent instead of asyncio
grpc_gevent.init_gevent()
class LocustInterceptor(ClientInterceptor):
def __init__(self, environment, *args, **kwargs):
super().__init__(*args, **kwargs)
self.env = environment
def intercept(
self,
method: Callable,
request_or_iterator: Any,
call_details: grpc.ClientCallDetails,
):
response = None
exception = None
start_perf_counter = time.perf_counter()
response_length = 0
try:
response = method(request_or_iterator, call_details)
response_length = response.result().ByteSize()
except grpc.RpcError as e:
exception = e
self.env.events.request.fire(
request_type="grpc",
name=call_details.method,
response_time=(time.perf_counter() - start_perf_counter) * 1000,
response_length=response_length,
response=response,
context=None,
exception=exception,
)
return response
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in (
(self.host, "host"),
(self.stub_class, "stub_class"),
):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
interceptor = LocustInterceptor(environment=environment)
self._channel = grpc.intercept_channel(self._channel, interceptor)
self.stub = self.stub_class(self._channel)

View file

@ -1,66 +1,125 @@
import json
from pprint import pprint
from locust import FastHttpUser, between, task
password = "fooobarasdfasdf"
username = "realaravinth"
from dcache_py import dcache_pb2 as dcache
from dcache_py.dcache_pb2 import RaftRequest
from dcache_py.dcache_pb2_grpc import DcacheServiceStub
class Unprotected(FastHttpUser):
# wait_time = between(5, 15)
peers = [
"http://localhost:9001",
"http://localhost:9002",
"http://localhost:9003",
]
leader = "http://localhost:9001"
host = leader
captcha_id="locust"
pipeline_vote = []
for _ in range(0,1000):
pipeline_vote.append({"AddVisitor": captcha_id})
# def on_start(self):
# resp = self.client.get(f"{self.leader}/metrics")
# data = resp.json()
# leader = data["Ok"]["membership_config"]["log_id"]["leader_id"]["node_id"]
# self.leader = self.peers[leader - 1]
# self.host = self.leader
# print(f"Leader: {self.host}")
# self.add_captcha(captcha_id="locust")
def write(self, data):
resp = self.client.post(f"{self.host}/write", json=data)
print(f"RPC Status: {resp.status_code}")
resp = resp.json()
if "Err" in resp:
leader = resp["Err"]["APIError"]["ForwardToLeader"]["leader_node"]["addr"]
print(f"Forwarding write to leader {leader}")
return write(leader, data)
return resp["Ok"]["data"]
def pipeline_write(self, data):
resp = self.client.post(f"{self.host}/pipeline/write", json=data)
# password = "fooobarasdfasdf"
# username = "realaravinth"
#
#
# class Unprotected(FastHttpUser):
# # wait_time = between(5, 15)
# peers = [
# "http://localhost:9001",
# "http://localhost:9002",
# "http://localhost:9003",
# ]
# leader = "http://localhost:9001"
# host = leader
# captcha_id="locust"
#
# pipeline_vote = []
# for _ in range(0,1000):
# pipeline_vote.append({"AddVisitor": captcha_id})
#
## def on_start(self):
## resp = self.client.get(f"{self.leader}/metrics")
## data = resp.json()
## leader = data["Ok"]["membership_config"]["log_id"]["leader_id"]["node_id"]
## self.leader = self.peers[leader - 1]
## self.host = self.leader
## print(f"Leader: {self.host}")
## self.add_captcha(captcha_id="locust")
#
#
#
# def write(self, data):
# resp = self.client.post(f"{self.host}/write", json=data)
# print(f"RPC Status: {resp.status_code}")
resp = resp.json()
if "Err" in resp:
leader = resp["Err"]["APIError"]["ForwardToLeader"]["leader_node"]["addr"]
print(f"Forwarding write to leader {leader}")
return write(leader, data)
return resp
# resp = resp.json()
# if "Err" in resp:
# leader = resp["Err"]["APIError"]["ForwardToLeader"]["leader_node"]["addr"]
# print(f"Forwarding write to leader {leader}")
# return write(leader, data)
# return resp["Ok"]["data"]
#
# def pipeline_write(self, data):
# resp = self.client.post(f"{self.host}/pipeline/write", json=data)
## print(f"RPC Status: {resp.status_code}")
# resp = resp.json()
# if "Err" in resp:
# leader = resp["Err"]["APIError"]["ForwardToLeader"]["leader_node"]["addr"]
# print(f"Forwarding write to leader {leader}")
# return write(leader, data)
# return resp
#
#
#
#
# def add_vote(self, captcha_id: str):
# resp = self.write(data={"AddVisitor": captcha_id})
# pprint(resp)
#
# def add_vote_pipeline(self, captcha_id: str):
# resp = self.pipeline_write(data=self.pipeline_vote)
## pprint(resp)
#
# def add_captcha(self, captcha_id: str):
# params = {
# "AddCaptcha": {
# "id": captcha_id,
# "mcaptcha": {
# "visitor_threshold": 0,
# "defense": {
# "levels": [
# {"visitor_threshold": 50, "difficulty_factor": 500},
# {"visitor_threshold": 5000, "difficulty_factor": 50000},
# ],
# "current_visitor_threshold": 0,
# },
# "duration": 30,
# },
# }
# }
# resp = self.write(data=params)
# pprint(f"Captcha added {captcha_id}: {resp}")
#
#
# @task
# def unprotected(self):
# self.add_vote_pipeline(captcha_id=self.captcha_id)
# ##self.add_vote(captcha_id="locust")
## data = {
## "username": username,
## "password": username,
## "confirm_password": username,
## }
## self.client.post("/unprotected", data=data)
#
import gevent
import adaptor
from locust import events, task
class HelloGrpcUser(adaptor.GrpcUser):
stub_class = DcacheServiceStub
leader = "localhost:9001"
host = leader
captcha_id = "locust"
msg = RaftRequest(data=json.dumps({"AddVisitor": captcha_id}))
def on_start(self):
self.add_captcha(captcha_id=self.captcha_id)
def add_vote(self, captcha_id: str):
resp = self.write(data={"AddVisitor": captcha_id})
pprint(resp)
resp = self.stub.Write(self.msg)
def add_vote_pipeline(self, captcha_id: str):
resp = self.pipeline_write(data=self.pipeline_vote)
# pprint(resp)
# def add_vote_pipeline(self, captcha_id: str):
# resp = self.pipeline_write(data=self.pipeline_vote)
# pprint(resp)
def add_captcha(self, captcha_id: str):
params = {
@ -79,17 +138,16 @@ class Unprotected(FastHttpUser):
},
}
}
resp = self.write(data=params)
msg = RaftRequest(data=json.dumps(params))
resp = self.stub.Write(msg)
pprint(f"Captcha added {captcha_id}: {resp}")
@task
def unprotected(self):
self.add_vote_pipeline(captcha_id=self.captcha_id)
##self.add_vote(captcha_id="locust")
# data = {
# "username": username,
# "password": username,
# "confirm_password": username,
# }
# self.client.post("/unprotected", data=data)
def addVote(self):
self.add_vote(self.captcha_id)
# msg = dcache.CaptchaID(id=captcha_id)
# resp = self.stub.AddVisitor(msg)
# pprint(resp)

View file

@ -0,0 +1 @@
Count,Message,Traceback,Nodes
1 Count Message Traceback Nodes

View file

@ -0,0 +1 @@
Method,Name,Error,Occurrences
1 Method Name Error Occurrences

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,3 @@
Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%
grpc,/dcache.DcacheService/Write,96465,0,600.0,530.5241670541676,3.931416000114041,2860.153126999876,130.11822940963043,732.274667601832,0.0,600,720,830,880,1100,1200,1300,1500,2300,2900,2900
,Aggregated,96465,0,600.0,530.5241670541676,3.931416000114041,2860.153126999876,130.11822940963043,732.274667601832,0.0,600,720,830,880,1100,1200,1300,1500,2300,2900,2900
1 Type Name Request Count Failure Count Median Response Time Average Response Time Min Response Time Max Response Time Average Content Size Requests/s Failures/s 50% 66% 75% 80% 90% 95% 98% 99% 99.9% 99.99% 100%
2 grpc /dcache.DcacheService/Write 96465 0 600.0 530.5241670541676 3.931416000114041 2860.153126999876 130.11822940963043 732.274667601832 0.0 600 720 830 880 1100 1200 1300 1500 2300 2900 2900
3 Aggregated 96465 0 600.0 530.5241670541676 3.931416000114041 2860.153126999876 130.11822940963043 732.274667601832 0.0 600 720 830 880 1100 1200 1300 1500 2300 2900 2900

View file

@ -135,7 +135,8 @@ def grpc_add_captcha(stub: DcacheServiceStub, captcha_id: str):
def grpc_run():
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
grpc_add_captcha(stub, captcha_id)
# grpc_add_captcha(stub, captcha_id)
grpc_add_vote(stub, captcha_id)
if __name__ == "__main__":
grpc_run()