import os from urllib.parse import urlparse, urlunparse import requests import jsonschema import logging LOG_FILE = "nodeinfo.log" NODEINFO_VERSIONS = { 1.0: "http://nodeinfo.diaspora.software/ns/schema/1.0", 1.1: "http://nodeinfo.diaspora.software/ns/schema/1.1", 2.0: "http://nodeinfo.diaspora.software/ns/schema/2.0", 2.1: "http://nodeinfo.diaspora.software/ns/schema/2.1", } def configure_logger(): logger = logging.getLogger("nodeinfo") logger.setLevel(logging.DEBUG) fh = logging.FileHandler(LOG_FILE) fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) fh.setFormatter(formatter) ch.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) return logger def get_env(name) -> str: env = FTEST_AUTH = os.environ.get(name) logger.info(f"Environment: {name}: {env}") if env is None: raise Exception( f"Please set environment variable {name}. See https://git.batsense.net/ForgeFlux/nodeinfo-test#environment-variables" ) return env logger = configure_logger() FTEST_AUTH = get_env("FTEST_AUTH") FTEST_HOST = get_env("FTEST_HOST") TARGET_HOST = get_env("FTEST_TARGET_HOST") TEST_HOST = urlparse(TARGET_HOST).netloc def query_nodeinfo(): parsed_target_host = urlparse(TARGET_HOST) nodeinfo = urlunparse( ( parsed_target_host.scheme, parsed_target_host.netloc, "/.well-known/nodeinfo", "", "", "", ) ) logger.info(f"Query nodeinfo: {nodeinfo}") res = requests.get(nodeinfo) logger.debug( f"nodeinfo response:\n\nSTATUS: {res.status_code}\n\nHEADERS:\n {res.headers}\n\nRESPONSE PAYLOAD:\n{res.json()}" ) assert res.status_code == 200 logger.info("[SUCCESS] nodeinfo query response is HTTP 200") return res def test_links(resp): for link in resp["links"]: assert "href" in link, "'href' present in link item" assert "rel" in link, "'rel' present in link item" logger.info("[SUCESS] links passed schema validation") def test_schema(source, version): schema = requests.get(NODEINFO_VERSIONS[version]).json() resp = requests.get(source).json() jsonschema.validate(resp, schema) logger.info(f"[SUCESS] passed Nodeinfo {version} schema validation") def upload_logs_to_ftest(success: bool, logs: str): parsed_ftest_host = urlparse(FTEST_HOST) ftest = urlunparse( ( parsed_ftest_host.scheme, parsed_ftest_host.netloc, f"/api/v1/{FTEST_AUTH}/results", "", "", "", ) ) logger.info(f"Uploading logs to ftest server {ftest}") payload = {"success": success, "logs": logs} res = requests.post(ftest, json=payload, headers={"Origin": "http://example.org"}) if res.status_code == 200: logger.info("Upload successful") else: print(res) if __name__ == "__main__": max_score = "NaN" score = 0 resp = query_nodeinfo() json = resp.json() success = [] failures = {} try: test_links(json) score += 1 success.append("test_links") except Exception as e: logger.error(e) failures["test_links"] = e max_score = 1 + len(json["links"]) for link in json["links"]: for version in NODEINFO_VERSIONS: if NODEINFO_VERSIONS[version] == link["rel"]: test_name = f"test_schema_Nodeinfo_{version}" try: test_schema(link["href"], version) score += 1 success.append(test_name) except Exception as e: logger.error(e) failures[test_name] = e print("\n\n===============") if score == max_score: logger.info("All tests passed") elif score > 0: logger.info(f"Partial success. {score} out of {max_score} tests passed") logger.info("Summary:\n") logs = "" if success: logger.info(f"Successful tests:\n") for s in success: log = f"[OK] {s}\n" logger.info(log) logs += log if failures: logger.error(f"\n\nFailed tests:\n") for _, (test, error) in enumerate(failures.items()): log = f"[FAIL] {test} failed with error:\n{error}\n-----\n" logger.error(log) logs += log upload_logs_to_ftest(len(failures) == 0, logs)