# SPDX-FileCopyrightText: 2023 Aravinth Manivannan # # SPDX-License-Identifier: AGPL-3.0-or-later import os import collections.abc from urllib.parse import urlparse, urlunparse import requests import xmltodict from behave import * from ftest_common.env import FTEST_USER from ftest_common.logger import logger from ftest_common.webfinger import get_webfinger from ftest_common.obj import get_ap_obj def query_host_meta(https: bool): from ftest_common.env import TARGET_HOST if https: scheme = "https" else: scheme = "http" parsed_target_host = urlparse(TARGET_HOST) host_meta = urlunparse( ( scheme, parsed_target_host.netloc, "/.well-known/host-meta", "", "", "", ) ) logger.debug(f"fetching {host_meta}") resp = requests.get(host_meta) logger.debug( f"host-meta response:\n\nSTATUS: {resp.status_code}\n\nHEADERS:\n {resp.headers}\n\nRESPONSE PAYLOAD:\n{resp.content}" ) return resp @given("A Fediverse server") def step_impl(context): pass @when("Querying /.well-known/host-meta") def step_impl(context): try: http_res = query_host_meta(False) context.http_res = http_res if http_res.status_code == 200: context.http_host_meta = xmltodict.parse(http_res.content) logger.info("[SUCCESS] host-meta available on 80") context.host_meta = context.http_host_meta context.resp = context.http_res except: logger.debug("[ERROR] host-meta not available on 80") pass try: https_res = query_host_meta(True) context.https_res = https_res if https_res.status_code == 200: context.https_host_meta = xmltodict.parse(https_res.content) logger.info("[SUCCESS] host-meta available on 443") context.host_meta = context.https_host_meta context.resp = context.https_res except: logger.debug("[ERROR] host-meta not available on 443") pass @then("The page must resolve at port 80 or 443") def step_impl(context): name = "Page must resolve at port 80 or 443" try: assert ( any( [ "https_host_meta" in context, "http_host_meta" in context, ] ) is True ), name logger.info("[SUCCESS] {name}") except Exception as e: logger.error(e) context.failure[name] = e raise e context.success.append(name) @then("If both port 80 _and_ 443 work, then they SHOULD return same document") def step_impl(context): if all(["https_host_meta" in context, "http_host_meta" in context]): assert ( context.host_meta == context.https_host_meta ), "Both 80 and 443 returned the same response" @then('The document SHOULD be served with "application/xrd+xml" media type') def step_impl(context): name = 'The document SHOULD be served with "application/xrd+xml" media type' try: resps = [] if all(["https_host_meta" in context, "http_host_meta" in context]): resps = [context.http_res, context.https_res] else: resps = [context.resp] for resp in resps: assert ( "application/xrd+xml" in resp.headers["Content-Type"] ), "Document served with 'application/xrd+xml' media type" except Exception as e: logger.error(e) context.failure[name] = e raise e context.success.append(name) @then('The host-meta document root MUST be an "XRD" element') def step_impl(context): name = 'The host-meta document root MUST be an "XRD" element' try: assert len(context.host_meta) == 1, "Only one root element" assert "XRD" in context.host_meta, "'XRX' is the root element" except Exception as e: logger.error(e) context.failure[name] = e raise e context.success.append(name) @then('The document SHOULD NOT include a "Subject" element') def step_impl(context): name = 'The document SHOULD NOT include a "Subject" element' try: assert ( "Subject" not in context.host_meta["XRD"] ), "Document SHOULD NOT include a 'Subject' element" except Exception as e: logger.error(e) context.failure[name] = e raise e context.success.append(name) @then('The document SHOULD include "Link" element') def step_impl(context): name = 'The document SHOULD include "Link" element' try: assert ( "Link" in context.host_meta["XRD"] ), "Document SHOULD include a 'Link' element" except Exception as e: logger.error(e) context.failure[name] = e raise e context.success.append(name) @then( 'The document\'s "Link" element should include either "template" or "href" attributes' ) def step_impl(context): def _check(obj): return any(["@template" in obj, "@href" in obj]) name = 'The document\'s "Link" element should include either "template" or "href" attributes' try: if isinstance(context.host_meta["XRD"]["Link"], collections.abc.Sequence): for link in context.host_meta["XRD"]["Link"]: assert _check( link ), f'The document\'s "Link" element should include either "template" or "href" attributes. Got {link}' else: link = context.host_meta["XRD"]["Link"] assert _check( link ), f'The document\'s "Link" element should include either "template" or "href" attributes. Got {link}' except Exception as e: logger.error(e) context.failure[name] = e raise e context.success.append(name) @then("The document should have at least on 'lrdd' document") def step_impl(context): def _check(obj): return obj["@rel"] == "lrdd" lrdd_found = False name = "The document should have at least on 'lrdd' document" if isinstance(context.host_meta["XRD"]["Link"], collections.abc.Sequence): for link in context.host_meta["XRD"]["Link"]: if _check(link): lrdd_found = True break else: link = context.host_meta["XRD"]["Link"] if _check(link): lrdd_found = True try: assert lrdd_found is True except Exception as e: logger.error(e) context.failure[name] = e raise e context.success.append(name) @then( "Fediverse specific: The lrdd document must include template containing WebFinger well-known URI" ) def step_impl(context): def _check(obj): if obj["@rel"] == "lrdd": if "@template" in obj: if "/.well-known/webfinger?resource={uri}" in obj["@template"]: return True return False webfinger_found = False name = "Fediverse specific: The lrdd document must include template containing WebFinger well-known URI" if isinstance(context.host_meta["XRD"]["Link"], collections.abc.Sequence): for link in context.host_meta["XRD"]["Link"]: if _check(link): webfinger_found = True break else: link = context.host_meta["XRD"]["Link"] if _check(link): webfinger_found = True try: assert webfinger_found is True except Exception as e: logger.error(e) context.failure[name] = e raise e context.success.append(name)