webfinger-test/run.py

225 lines
6.6 KiB
Python
Executable File

#!/bin/env /usr/bin/python
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import logging
import os
import requests
from urllib.parse import urlparse, urlunparse
LOG_FILE = "webfinger.log"
def configure_logger():
logger = logging.getLogger("webfinger")
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(LOG_FILE)
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
def get_env(name) -> str:
env = FTEST_AUTH = os.environ.get(name)
logger.info(f"Environment: {name}: {env}")
if env is None:
raise Exception(
f"Please set environment variable {name}. See https://git.batsense.net/ForgeFlux/webfinger-test#environment-variables"
)
return env
logger = configure_logger()
FTEST_AUTH = get_env("FTEST_AUTH")
FTEST_HOST = get_env("FTEST_HOST")
TARGET_HOST = get_env("FTEST_TARGET_HOST")
TEST_USER = get_env("FTEST_USER") # actor ex: john@example.org
print(f"TEST USER: {TEST_USER}")
TEST_HOST = urlparse(TARGET_HOST).netloc
def query_webfinger():
parsed_target_host = urlparse(TARGET_HOST)
webfinger = urlunparse(
(
parsed_target_host.scheme,
parsed_target_host.netloc,
"/.well-known/webfinger",
"",
f"resource=acct:{TEST_USER}",
"",
)
)
logger.info(f"Query WebFinger: {webfinger}")
res = requests.get(webfinger)
logger.debug(
f"WebFinger response:\n\nSTATUS: {res.status_code}\n\nHEADERS:\n {res.headers}\n\nRESPONSE PAYLOAD:\n{res.json()}"
)
assert res.status_code == 200
logger.info("[SUCCESS] WebFinger query response is HTTP 200")
return res
def test_main_params(resp):
assert "subject" in resp, "Parameter 'subject' is not present in WebFinger response"
assert "links" in resp, "Parameter 'links' is not present in WebFinger response"
logger.info(
"[SUCCESS] WebFinger response has 'subject', 'aliases' and 'links' parameters"
)
def test_links(resp):
self_link = None
profile_link = None
for link in resp["links"]:
if link["rel"] == "self":
self_link = link
elif link["rel"] == "http://webfinger.net/rel/profile-page":
profile_link = link
logger.debug(
"'rel==http://webfinger.net/rel/profile-page' is present in 'links' WebFinger response parameter"
)
assert (
self_link is not None
), "'rel==self' is not present in 'links' WebFinger response parameter"
assert (
self_link["rel"] == "self"
), f'[rel==self] expected rel:self, got rel: {self_link["rel"]}'
assert (
self_link["type"] == f"application/activity+json"
), f"[rel==self] expected application/activity+json; got {self_link['type']}"
assert "href" in self_link, "[rel==self] href not present in link item"
logger.info("[SUCESS] rel==self passed schema validation")
if profile_link:
assert (
profile_link["rel"] == "http://webfinger.net/rel/profile-page"
), f"expected http://webfinger.net/rel/profile-page got {profile_link['rel']}"
assert (
profile_link["type"] == "text/html"
), f"expected text/html got {profile_link['type']}"
assert (
"href" in profile_link
), "[rel==profile link] href not present in link item"
logger.info("[SUCESS] rel==profile-page passed schema validation")
logger.info("[SUCESS] 'links' object passed validation")
def test_subject(resp):
subject = f"acct:{TEST_USER}"
assert (
resp["subject"] == subject
), f"Subject parameter doesn't match. Expected {subject} got {resp['subject']}"
def test_access_control_allow_origin(resp):
# request.headers is case insensitive
assert (
resp.headers["Access-Control-Allow-Origin"] == "*"
), f"Access-Control-Allow-Origin header should be '*' to allow any domain to access the resource with CORS. Please see https://www.rfc-editor.org/rfc/rfc7033.html#section-5. Got {resp.headers['Access-Control-Allow-Origin']}"
logger.info("[SUCESS] WebFinger endpoint is configured correctly for CORS")
def upload_logs_to_ftest(success: bool, logs: str):
parsed_ftest_host = urlparse(FTEST_HOST)
ftest = urlunparse(
(
parsed_ftest_host.scheme,
parsed_ftest_host.netloc,
f"/api/v1/{FTEST_AUTH}/results",
"",
"",
"",
)
)
logger.info(f"Uploading logs to ftest server {ftest}")
payload = {"success": success, "logs": logs}
res = requests.post(ftest, json=payload)
if res.status_code == 200:
logger.info("Upload successful")
else:
print(res)
if __name__ == "__main__":
max_score = 5
score = 0
resp = query_webfinger()
json = resp.json()
score += 1
success = []
failures = {}
try:
test_main_params(json)
score += 1
success.append("test_main_params")
except Exception as e:
logger.error(e)
failures["test_main_params"] = e
try:
test_links(json)
score += 1
success.append("test_links")
except Exception as e:
logger.error(e)
failures["test_links"] = e
try:
test_subject(json)
score += 1
success.append("test_subject")
except Exception as e:
logger.error(e)
failures["test_subject"] = e
try:
test_access_control_allow_origin(resp)
score += 1
success.append("test_access_control_allow_origin")
except Exception as e:
logger.error(e)
failures["test_access_control_allow_origin"] = e
print("\n\n===============")
if score == max_score:
print("All tests passed")
elif score > 0:
print(f"Partial success. {score} out of {max_score} tests passed")
print("Summary:\n")
logs = ""
if success:
print(f"Successful tests:\n")
for s in success:
log = f"[OK] {s}\n"
print(log)
logs += log
if failures:
print(f"\n\nFailed tests:\n")
for _, (test, error) in enumerate(failures.items()):
log = f"[FAIL] {test} failed with error:\n{error}\n-----\n"
print(log)
logs += log
upload_logs_to_ftest(len(failures) == 0, logs)