webfinger-test/run.py

182 lines
5.4 KiB
Python
Raw Normal View History

#!/bin/env /usr/bin/python
import logging
import os
import requests
from urllib.parse import urlparse, urlunparse
def configure_logger():
logger = logging.getLogger("webfinger")
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler("webfinger.log")
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
def get_env(name) -> str:
env = FTEST_AUTH = os.environ.get(name)
logger.info(f"Environment: {name}: {env}")
return env
logger = configure_logger()
FTEST_AUTH = get_env("FTEST_AUTH")
TARGET_HOST = get_env("FTEST_TARGET_HOST")
TEST_USER = get_env("FTEST_USER") # actor ex: john@example.org
TEST_HOST = urlparse(TARGET_HOST).netloc
def query_webfinger():
parsed_target_host = urlparse(TARGET_HOST)
webfinger = urlunparse(
(
parsed_target_host.scheme,
parsed_target_host.netloc,
"/.well-known/webfinger",
"",
f"resource=acct:{TEST_USER}",
"",
)
)
logger.info(f"Query WebFinger: {webfinger}")
res = requests.get(webfinger, headers={"Origin": "http://example.org"})
logger.debug(
f"WebFinger response:\n\nSTATUS: {res.status_code}\n\nHEADERS:\n {res.headers}\n\nRESPONSE PAYLOAD:\n{res.json()}"
)
assert res.status_code == 200
logger.info("[SUCCESS] WebFinger query response is HTTP 200")
return res
def test_main_params(resp):
assert "subject" in resp, "Parameter 'subject' is not present in WebFinger response"
assert "links" in resp, "Parameter 'links' is not present in WebFinger response"
logger.info(
"[SUCCESS] WebFinger response has 'subject', 'aliases' and 'links' parameters'"
)
def test_links(resp):
self_link = None
profile_link = None
for link in resp["links"]:
if link["rel"] == "self":
self_link = link
elif link["rel"] == "http://webfinger.net/rel/profile-page":
profile_link = link
logger.debug(
"'rel==http://webfinger.net/rel/profile-page' is present in 'links' WebFinger response parameter"
)
assert (
self_link is not None
), "'rel==self' is not present in 'links' WebFinger response parameter"
assert (
self_link["rel"] == "self"
), f'[rel==self] expected rel:self, got rel: {self_link["rel"]}'
assert (
self_link["type"] == f"application/activity+json"
), f"[rel==self] expected application/activity+json; got {self_link['type']}"
assert "href" in self_link, "[rel==self] href not present in link item"
logger.info("[SUCESS] rel==self passed schema validation")
if profile_link:
assert (
profile_link["rel"] == "http://webfinger.net/rel/profile-page"
), f"expected http://webfinger.net/rel/profile-page got {profile_link['rel']}"
assert (
profile_link["type"] == "text/html"
), f"expected text/html got {profile_link['type']}"
assert (
"href" in profile_link
), "[rel==profile link] href not present in link item"
logger.info("[SUCESS] rel==profile-page passed schema validation")
logger.info("[SUCESS] 'links' object passed validation")
def test_subject(resp):
subject = f"acct:{TEST_USER}"
assert (
resp["subject"] == subject
), f"Subject parameter doesn't match. Expected {subject} got {resp['subject']}"
def test_access_control_allow_origin(resp):
# request.headers is case insensitive
assert resp.headers[
"access-control-allow-origin"
] == "*", "Access-Control-Allow-Origin header should be '*' to allow any domain to access the resource with CORS. Please see https://www.rfc-editor.org/rfc/rfc7033.html#section-5"
logger.info("[SUCESS] WebFinger endpoint is configured correctly for CORS")
if __name__ == "__main__":
max_score = 5
score = 0
resp = query_webfinger()
json = resp.json()
score += 1
success = []
failures = { }
try:
test_main_params(json)
score += 1
success.append("test_main_params")
except Exception as e:
logger.error(e)
failures["test_main_params"] = e
try:
test_links(json)
score += 1
success.append("test_links")
except Exception as e:
logger.error(e)
failures["test_links"] = e
try:
test_subject(json)
score += 1
success.append("test_subject")
except Exception as e:
logger.error(e)
failures["test_subject"] = e
try:
test_access_control_allow_origin(resp)
score += 1
success.append("test_access_control_allow_origin")
except Exception as e:
logger.error(e)
failures["test_access_control_allow_origin"] = e
print("\n\n===============")
if score == max_score:
print("All tests passed")
elif score > 0:
print(f"Partial success. {score} out of {max_score} tests passed")
print("Summary:\n")
if success:
print(f"Successful tests:\n")
for s in success:
print(f"[OK] {s}")
if failures:
print(f"\n\nFailed tests:\n")
for _, (test, error) in enumerate(failures.items()):
print(f"[FAIL] {test} failed with error:\n{error}\n-----\n")