Aravinth Manivannan
ec2eabe19b
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
222 lines
6.5 KiB
Python
Executable file
222 lines
6.5 KiB
Python
Executable file
#!/bin/env /usr/bin/python
|
|
|
|
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
|
|
#
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
import logging
|
|
import os
|
|
import requests
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
LOG_FILE = "webfinger.log"
|
|
|
|
|
|
def configure_logger():
|
|
logger = logging.getLogger("webfinger")
|
|
logger.setLevel(logging.DEBUG)
|
|
fh = logging.FileHandler(LOG_FILE)
|
|
fh.setLevel(logging.DEBUG)
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.DEBUG)
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
fh.setFormatter(formatter)
|
|
ch.setFormatter(formatter)
|
|
logger.addHandler(fh)
|
|
logger.addHandler(ch)
|
|
return logger
|
|
|
|
|
|
def get_env(name) -> str:
|
|
env = FTEST_AUTH = os.environ.get(name)
|
|
logger.info(f"Environment: {name}: {env}")
|
|
if env is None:
|
|
raise Exception(
|
|
f"Please set environment variable {name}. See https://git.batsense.net/ForgeFlux/webfinger-test#environment-variables"
|
|
)
|
|
return env
|
|
|
|
|
|
logger = configure_logger()
|
|
|
|
FTEST_AUTH = get_env("FTEST_AUTH")
|
|
FTEST_HOST = get_env("FTEST_HOST")
|
|
TARGET_HOST = get_env("FTEST_TARGET_HOST")
|
|
TEST_USER = get_env("FTEST_USER") # actor ex: john@example.org
|
|
print(f"TEST USER: {TEST_USER}")
|
|
TEST_HOST = urlparse(TARGET_HOST).netloc
|
|
|
|
|
|
def query_webfinger():
|
|
parsed_target_host = urlparse(TARGET_HOST)
|
|
webfinger = urlunparse(
|
|
(
|
|
parsed_target_host.scheme,
|
|
parsed_target_host.netloc,
|
|
"/.well-known/webfinger",
|
|
"",
|
|
f"resource=acct:{TEST_USER}",
|
|
"",
|
|
)
|
|
)
|
|
logger.info(f"Query WebFinger: {webfinger}")
|
|
res = requests.get(webfinger, headers={"Origin": "http://example.org"})
|
|
logger.debug(
|
|
f"WebFinger response:\n\nSTATUS: {res.status_code}\n\nHEADERS:\n {res.headers}\n\nRESPONSE PAYLOAD:\n{res.json()}"
|
|
)
|
|
assert res.status_code == 200
|
|
logger.info("[SUCCESS] WebFinger query response is HTTP 200")
|
|
return res
|
|
|
|
|
|
def test_main_params(resp):
|
|
assert "subject" in resp, "Parameter 'subject' is not present in WebFinger response"
|
|
assert "links" in resp, "Parameter 'links' is not present in WebFinger response"
|
|
logger.info(
|
|
"[SUCCESS] WebFinger response has 'subject', 'aliases' and 'links' parameters"
|
|
)
|
|
|
|
|
|
def test_links(resp):
|
|
self_link = None
|
|
profile_link = None
|
|
|
|
for link in resp["links"]:
|
|
if link["rel"] == "self":
|
|
self_link = link
|
|
elif link["rel"] == "http://webfinger.net/rel/profile-page":
|
|
profile_link = link
|
|
logger.debug(
|
|
"'rel==http://webfinger.net/rel/profile-page' is present in 'links' WebFinger response parameter"
|
|
)
|
|
|
|
assert (
|
|
self_link is not None
|
|
), "'rel==self' is not present in 'links' WebFinger response parameter"
|
|
assert (
|
|
self_link["rel"] == "self"
|
|
), f'[rel==self] expected rel:self, got rel: {self_link["rel"]}'
|
|
assert (
|
|
self_link["type"] == f"application/activity+json"
|
|
), f"[rel==self] expected application/activity+json; got {self_link['type']}"
|
|
assert "href" in self_link, "[rel==self] href not present in link item"
|
|
logger.info("[SUCESS] rel==self passed schema validation")
|
|
|
|
if profile_link:
|
|
assert (
|
|
profile_link["rel"] == "http://webfinger.net/rel/profile-page"
|
|
), f"expected http://webfinger.net/rel/profile-page got {profile_link['rel']}"
|
|
assert (
|
|
profile_link["type"] == "text/html"
|
|
), f"expected text/html got {profile_link['type']}"
|
|
assert (
|
|
"href" in profile_link
|
|
), "[rel==profile link] href not present in link item"
|
|
logger.info("[SUCESS] rel==profile-page passed schema validation")
|
|
logger.info("[SUCESS] 'links' object passed validation")
|
|
|
|
|
|
def test_subject(resp):
|
|
subject = f"acct:{TEST_USER}"
|
|
assert (
|
|
resp["subject"] == subject
|
|
), f"Subject parameter doesn't match. Expected {subject} got {resp['subject']}"
|
|
|
|
|
|
def test_access_control_allow_origin(resp):
|
|
# request.headers is case insensitive
|
|
assert (
|
|
resp.headers["access-control-allow-origin"] == "*"
|
|
), "Access-Control-Allow-Origin header should be '*' to allow any domain to access the resource with CORS. Please see https://www.rfc-editor.org/rfc/rfc7033.html#section-5"
|
|
logger.info("[SUCESS] WebFinger endpoint is configured correctly for CORS")
|
|
|
|
|
|
def upload_logs_to_ftest(success: bool, logs: str):
|
|
parsed_ftest_host = urlparse(FTEST_HOST)
|
|
ftest = urlunparse(
|
|
(
|
|
parsed_ftest_host.scheme,
|
|
parsed_ftest_host.netloc,
|
|
"/api/v1/results",
|
|
"",
|
|
"" "",
|
|
)
|
|
)
|
|
logger.info("Uploading logs to ftest server {ftest}")
|
|
|
|
payload = {"success": success, "logs": logs, "auth": FTEST_AUTH}
|
|
|
|
res = requests.post(
|
|
FTEST_HOST, json=payload, headers={"Origin": "http://example.org"}
|
|
)
|
|
logger.info("Upload successful")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
max_score = 5
|
|
score = 0
|
|
resp = query_webfinger()
|
|
json = resp.json()
|
|
score += 1
|
|
|
|
success = []
|
|
failures = {}
|
|
|
|
try:
|
|
test_main_params(json)
|
|
score += 1
|
|
success.append("test_main_params")
|
|
except Exception as e:
|
|
logger.error(e)
|
|
failures["test_main_params"] = e
|
|
try:
|
|
test_links(json)
|
|
score += 1
|
|
success.append("test_links")
|
|
except Exception as e:
|
|
logger.error(e)
|
|
failures["test_links"] = e
|
|
|
|
try:
|
|
test_subject(json)
|
|
score += 1
|
|
success.append("test_subject")
|
|
except Exception as e:
|
|
logger.error(e)
|
|
failures["test_subject"] = e
|
|
|
|
try:
|
|
test_access_control_allow_origin(resp)
|
|
score += 1
|
|
success.append("test_access_control_allow_origin")
|
|
except Exception as e:
|
|
logger.error(e)
|
|
failures["test_access_control_allow_origin"] = e
|
|
|
|
print("\n\n===============")
|
|
if score == max_score:
|
|
print("All tests passed")
|
|
elif score > 0:
|
|
print(f"Partial success. {score} out of {max_score} tests passed")
|
|
|
|
print("Summary:\n")
|
|
|
|
logs = []
|
|
|
|
if success:
|
|
print(f"Successful tests:\n")
|
|
for s in success:
|
|
log = f"[OK] {s}"
|
|
print(log)
|
|
logs.append(log)
|
|
|
|
if failures:
|
|
print(f"\n\nFailed tests:\n")
|
|
for _, (test, error) in enumerate(failures.items()):
|
|
log = f"[FAIL] {test} failed with error:\n{error}\n-----\n"
|
|
print(log)
|
|
logs.append(log)
|
|
|
|
upload_logs_to_ftest(success, logs)
|