#!/bin/env /usr/bin/python # SPDX-FileCopyrightText: 2023 Aravinth Manivannan # # SPDX-License-Identifier: AGPL-3.0-or-later import logging import os import requests from urllib.parse import urlparse, urlunparse LOG_FILE = "webfinger.log" def configure_logger(): logger = logging.getLogger("webfinger") logger.setLevel(logging.DEBUG) fh = logging.FileHandler(LOG_FILE) fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) fh.setFormatter(formatter) ch.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) return logger def get_env(name) -> str: env = FTEST_AUTH = os.environ.get(name) logger.info(f"Environment: {name}: {env}") if env is None: raise Exception( f"Please set environment variable {name}. See https://git.batsense.net/ForgeFlux/webfinger-test#environment-variables" ) return env logger = configure_logger() FTEST_AUTH = get_env("FTEST_AUTH") FTEST_HOST = get_env("FTEST_HOST") TARGET_HOST = get_env("FTEST_TARGET_HOST") TEST_USER = get_env("FTEST_USER") # actor ex: john@example.org print(f"TEST USER: {TEST_USER}") TEST_HOST = urlparse(TARGET_HOST).netloc def query_webfinger(): parsed_target_host = urlparse(TARGET_HOST) webfinger = urlunparse( ( parsed_target_host.scheme, parsed_target_host.netloc, "/.well-known/webfinger", "", f"resource=acct:{TEST_USER}", "", ) ) logger.info(f"Query WebFinger: {webfinger}") res = requests.get(webfinger, headers={"Origin": "http://example.org"}) logger.debug( f"WebFinger response:\n\nSTATUS: {res.status_code}\n\nHEADERS:\n {res.headers}\n\nRESPONSE PAYLOAD:\n{res.json()}" ) assert res.status_code == 200 logger.info("[SUCCESS] WebFinger query response is HTTP 200") return res def test_main_params(resp): assert "subject" in resp, "Parameter 'subject' is not present in WebFinger response" assert "links" in resp, "Parameter 'links' is not present in WebFinger response" logger.info( "[SUCCESS] WebFinger response has 'subject', 'aliases' and 'links' parameters" ) def test_links(resp): self_link = None profile_link = None for link in resp["links"]: if link["rel"] == "self": self_link = link elif link["rel"] == "http://webfinger.net/rel/profile-page": profile_link = link logger.debug( "'rel==http://webfinger.net/rel/profile-page' is present in 'links' WebFinger response parameter" ) assert ( self_link is not None ), "'rel==self' is not present in 'links' WebFinger response parameter" assert ( self_link["rel"] == "self" ), f'[rel==self] expected rel:self, got rel: {self_link["rel"]}' assert ( self_link["type"] == f"application/activity+json" ), f"[rel==self] expected application/activity+json; got {self_link['type']}" assert "href" in self_link, "[rel==self] href not present in link item" logger.info("[SUCESS] rel==self passed schema validation") if profile_link: assert ( profile_link["rel"] == "http://webfinger.net/rel/profile-page" ), f"expected http://webfinger.net/rel/profile-page got {profile_link['rel']}" assert ( profile_link["type"] == "text/html" ), f"expected text/html got {profile_link['type']}" assert ( "href" in profile_link ), "[rel==profile link] href not present in link item" logger.info("[SUCESS] rel==profile-page passed schema validation") logger.info("[SUCESS] 'links' object passed validation") def test_subject(resp): subject = f"acct:{TEST_USER}" assert ( resp["subject"] == subject ), f"Subject parameter doesn't match. Expected {subject} got {resp['subject']}" def test_access_control_allow_origin(resp): # request.headers is case insensitive assert ( resp.headers["access-control-allow-origin"] == "*" ), "Access-Control-Allow-Origin header should be '*' to allow any domain to access the resource with CORS. Please see https://www.rfc-editor.org/rfc/rfc7033.html#section-5" logger.info("[SUCESS] WebFinger endpoint is configured correctly for CORS") def upload_logs_to_ftest(success: bool, logs: str): parsed_ftest_host = urlparse(FTEST_HOST) ftest = urlunparse( ( parsed_ftest_host.scheme, parsed_ftest_host.netloc, f"/api/v1/{FTEST_AUTH}/results", "", "", "", ) ) logger.info(f"Uploading logs to ftest server {ftest}") payload = {"success": success, "logs": logs} res = requests.post( FTEST_HOST, json=payload, headers={"Origin": "http://example.org"} ) if res.status_code == 200: logger.info("Upload successful") else: print(res) if __name__ == "__main__": max_score = 5 score = 0 resp = query_webfinger() json = resp.json() score += 1 success = [] failures = {} try: test_main_params(json) score += 1 success.append("test_main_params") except Exception as e: logger.error(e) failures["test_main_params"] = e try: test_links(json) score += 1 success.append("test_links") except Exception as e: logger.error(e) failures["test_links"] = e try: test_subject(json) score += 1 success.append("test_subject") except Exception as e: logger.error(e) failures["test_subject"] = e try: test_access_control_allow_origin(resp) score += 1 success.append("test_access_control_allow_origin") except Exception as e: logger.error(e) failures["test_access_control_allow_origin"] = e print("\n\n===============") if score == max_score: print("All tests passed") elif score > 0: print(f"Partial success. {score} out of {max_score} tests passed") print("Summary:\n") logs = [] if success: print(f"Successful tests:\n") for s in success: log = f"[OK] {s}" print(log) logs.append(log) if failures: print(f"\n\nFailed tests:\n") for _, (test, error) in enumerate(failures.items()): log = f"[FAIL] {test} failed with error:\n{error}\n-----\n" print(log) logs.append(log) upload_logs_to_ftest(success, logs)