chore: linting

This commit is contained in:
Aravinth Manivannan 2023-12-31 01:28:59 +05:30
parent e548a532a0
commit 240b5ec13a
Signed by: realaravinth
GPG key ID: F8F50389936984FF
7 changed files with 98 additions and 73 deletions

View file

@ -1,16 +1,16 @@
#!/bin/env /usr/bin/python3
# # Copyright (C) 2021 Aravinth Manivannan <realaravinth@batsense.net>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from asyncio import sleep
@ -21,9 +21,11 @@ from mcaptcha import register
from dcache import grpc_add_vote, grpc_get_visitor_count
def incr(key):
return grpc_add_vote(key)
def get_count(key):
try:
count = grpc_get_visitor_count(key)
@ -31,10 +33,12 @@ def get_count(key):
except:
return 0
def assert_count(expect, key):
count = get_count(key)
assert count == expect
async def incr_one_works():
try:
key = "incr_one"
@ -74,7 +78,7 @@ async def difficulty_works():
register(key)
data = incr(key)
assert data.difficulty_factor == 50
for _ in range(501):
incr(key)
data = incr(key)

View file

@ -8,6 +8,7 @@ from dcache_py.dcache_pb2_grpc import DcacheServiceStub
host = "localhost:9001"
def grpc_add_vote(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
@ -34,7 +35,6 @@ def grpc_add_captcha(captcha_id: str):
),
)
resp = stub.AddCaptcha(msg)
return resp
@ -47,7 +47,7 @@ def grpc_captcha_exists(captcha_id: str):
return resp.exists
def grpc_rename_captcha( captcha_id: str, new_id: str):
def grpc_rename_captcha(captcha_id: str, new_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
@ -55,7 +55,7 @@ def grpc_rename_captcha( captcha_id: str, new_id: str):
resp = stub.RenameCaptcha(msg)
def grpc_delete_captcha( captcha_id: str):
def grpc_delete_captcha(captcha_id: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
@ -74,10 +74,10 @@ def grpc_add_challenge(token: str, key: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CacheResultRequest(
token= token,
key= key,
duration = 5,
)
token=token,
key=key,
duration=5,
)
stub.CacheResult(msg)
@ -85,44 +85,42 @@ def grpc_get_challenge(token: str, key: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.RetrievePowRequest(
token= token,
key= key,
)
token=token,
key=key,
)
return stub.VerifyCaptchaResult(msg)
def grpc_delete_challenge(token: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.DeleteCaptchaResultRequest(
token= token,
)
token=token,
)
stub.DeleteCaptchaResult(msg)
def grpc_add_pow(token: str, string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.CachePowRequest(
key= token,
string= string,
duration = 5,
difficulty_factor= 500
)
key=token, string=string, duration=5, difficulty_factor=500
)
return stub.CachePow(msg)
def grpc_get_pow(token: str, string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.RetrievePowRequest(
token= string,
key= token)
resp= stub.RetrievePow(msg)
msg = dcache.RetrievePowRequest(token=string, key=token)
resp = stub.RetrievePow(msg)
return resp
def grpc_delete_pow(string: str):
with grpc.insecure_channel(host) as channel:
stub = DcacheServiceStub(channel)
msg = dcache.DeletePowRequest(
string= string,
)
string=string,
)
stub.DeletePow(msg)

View file

@ -1,28 +1,34 @@
#!/bin/env /usr/bin/python3
#
# Copyright (C) 2021 Aravinth Manivannan <realaravinth@batsense.net>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import json
import utils
from dcache import grpc_add_captcha, grpc_add_vote, grpc_captcha_exists, grpc_rename_captcha
from dcache import (
grpc_add_captcha,
grpc_add_vote,
grpc_captcha_exists,
grpc_rename_captcha,
)
from dcache import grpc_delete_captcha
from stub import get_stub
def delete_captcha(key):
grpc_delete_captcha(key)
@ -30,6 +36,7 @@ def delete_captcha(key):
def add_captcha(key):
grpc_add_captcha(key)
def rename_captcha(key, new_key):
grpc_rename_captcha(key, new_key)
@ -37,11 +44,13 @@ def rename_captcha(key, new_key):
def captcha_exists(key):
return grpc_captcha_exists(captcha_id=key)
def register(key):
if captcha_exists(key):
delete_captcha(key)
add_captcha(key)
async def captcha_exists_works():
key = "captcha_delete_works"
if captcha_exists(key):
@ -51,12 +60,14 @@ async def captcha_exists_works():
assert captcha_exists(key) is True
print("[*] Captcha delete works")
async def register_captcha_works():
key = "register_captcha_works"
register(key)
assert captcha_exists(key) is True
print("[*] Add captcha works")
async def delete_captcha_works():
key = "delete_captcha_works"
register(key)

View file

@ -1,17 +1,17 @@
#!/bin/env /usr/bin/python3
#
# Copyright (C) 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from asyncio import sleep
@ -25,32 +25,36 @@ from dcache import grpc_add_pow, grpc_get_pow, grpc_delete_pow
# 4. Read pow
# 5. Read expired pow
def add_pow(captcha, pow):
"""Add pow to """
try :
"""Add pow to"""
try:
res = grpc_add_pow(captcha, pow)
return res
except Exception as e:
return e
def get_pow_from(captcha, pow):
"""Add pow to """
try :
"""Add pow to"""
try:
res = grpc_get_pow(captcha, pow)
if res.HasField('result'):
if res.HasField("result"):
return res.result
else:
return None
except Exception as e:
return e
def delete_pow(captcha, pow):
"""Add pow to """
try :
"""Add pow to"""
try:
grpc_delete_pow(pow)
except Exception as e:
return e
async def add_pow_works():
"""Test: Add pow"""
try:
@ -66,6 +70,7 @@ async def add_pow_works():
except Exception as e:
raise e
async def pow_ttl_works():
"""Test: pow TTL"""
try:
@ -102,14 +107,13 @@ async def delete_pow_works():
try:
pow_name = "delete_pow"
key = "delete_pow_key"
#pow = get_pow(pow_name)
# pow = get_pow(pow_name)
add_pow(key, pow_name)
delete_pow(key, pow_name)
error = get_pow_from(key, pow_name)
assert error is None
print("[*] Delete pow works")
except Exception as e:
raise e

View file

@ -1,17 +1,17 @@
#!/bin/env /usr/bin/python3
#
# Copyright (C) 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from asyncio import sleep
@ -28,36 +28,40 @@ from dcache import grpc_add_challenge, grpc_get_challenge, grpc_delete_challenge
COMMANDS = {
"ADD" :"MCAPTCHA_CACHE.ADD_result",
"GET" :"MCAPTCHA_CACHE.GET_result",
"DEL" :"MCAPTCHA_CACHE.DELETE_result"
"ADD": "MCAPTCHA_CACHE.ADD_result",
"GET": "MCAPTCHA_CACHE.GET_result",
"DEL": "MCAPTCHA_CACHE.DELETE_result",
}
result_NOT_FOUND = "result not found"
DUPLICATE_result = "result already exists"
REDIS_OK = bytes("OK", 'utf-8')
REDIS_OK = bytes("OK", "utf-8")
def add_result(captcha, result):
"""Add result to """
try :
grpc_add_challenge(captcha,result)
except Exception as e:
return e
def get_result_from(captcha, result):
"""Add result to """
try :
return grpc_get_challenge(captcha,result)
"""Add result to"""
try:
grpc_add_challenge(captcha, result)
except Exception as e:
return e
def get_result_from(captcha, result):
"""Add result to"""
try:
return grpc_get_challenge(captcha, result)
except Exception as e:
return e
def delete_result(captcha, result):
"""Add result to """
try :
"""Add result to"""
try:
grpc_delete_challenge(captcha)
except Exception as e:
return e
async def add_result_works():
"""Test: Add result"""
try:
@ -72,6 +76,7 @@ async def add_result_works():
except Exception as e:
raise e
async def result_ttl_works():
"""Test: result TTL"""
try:
@ -82,7 +87,7 @@ async def result_ttl_works():
await sleep(5 + 2)
error = get_result_from(key, result_name)
# assert str(error) == result_NOT_FOUND
# assert str(error) == result_NOT_FOUND
print("[*] result TTL works")
except Exception as e:

View file

@ -1,16 +1,16 @@
#!/bin/env /usr/bin/python3
# Copyright (C) 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from threading import Thread
@ -18,13 +18,15 @@ import asyncio
import importlib.util
import sys
sys.path.append('/home/atm/code/mcaptcha/dcache/')
sys.path.append("/home/atm/code/mcaptcha/dcache/")
import bucket
import mcaptcha
import result
import pow
class Runner(object):
__fn = [
bucket.incr_one_works,
@ -46,12 +48,11 @@ class Runner(object):
__tasks = []
async def __register(self):
""" Register functions to be run"""
"""Register functions to be run"""
for fn in self.__fn:
task = asyncio.create_task(fn())
self.__tasks.append(task)
async def run(self):
"""Wait for registered functions to finish executing"""
await self.__register()
@ -59,5 +60,6 @@ class Runner(object):
await task
"""Runs in separate threads"""
def __init__(self):
super(Runner, self).__init__()

View file

@ -1,17 +1,17 @@
#!/bin/env /usr/bin/python3
#
# Copyright (C) 2021 Aravinth Manivannan <realaravinth@batsense.net>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import asyncio
@ -25,5 +25,6 @@ async def main():
await runner.run()
print("All tests passed")
if __name__ == "__main__":
asyncio.run(main())