import json import time import grpc import gevent from pprint import pprint from locust import FastHttpUser, between, task, events from dcache_py import dcache_pb2 as dcache from dcache_py.dcache_pb2 import RaftRequest from dcache_py.dcache_pb2_grpc import DcacheServiceStub import adaptor host = "localhost:9001" captcha_id = "locust" def add_captcha(stub: DcacheServiceStub, captcha_id: str): msg = dcache.AddCaptchaRequest( id=captcha_id, mcaptcha=dcache.MCaptcha( duration=30, defense=dcache.Defense( levels=[ dcache.Level(visitor_threshold=50, difficulty_factor=500), dcache.Level(visitor_threshold=5000, difficulty_factor=50000), ] ), ), ) resp = stub.AddCaptcha(msg) pprint(f"Captcha added {captcha_id}: {resp}") with grpc.insecure_channel(host) as channel: stub = DcacheServiceStub(channel) add_captcha(stub=stub, captcha_id=captcha_id) pipeline_msgs = [] for _ in range(0,10): pipeline_msgs.append(dcache.DcacheRequest(addVisitor=dcache.CaptchaID(id=captcha_id))) pipeline_msgs = dcache.DcacheBatchRequest(requests=pipeline_msgs) #def pipeline_generate_messages(): # for msg in pipeline_msgs: # yield msg class HelloGrpcUser(adaptor.GrpcUser): stub_class = DcacheServiceStub host = host captcha_id = captcha_id msg = dcache.CaptchaID(id=captcha_id) def add_vote(self, captcha_id: str): resp = self.stub.AddVisitor(self.msg) def add_vote_pipeline(self): res = self.stub.PipelineDcacheOps(pipeline_msgs) # @task # def addVote(self): # self.add_vote(self.captcha_id) @task def addVotePipeline(self): self.add_vote_pipeline()