253 lines
7.5 KiB
Python
253 lines
7.5 KiB
Python
|
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
|
||
|
#
|
||
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||
|
|
||
|
import os
|
||
|
import collections.abc
|
||
|
from urllib.parse import urlparse, urlunparse
|
||
|
|
||
|
import requests
|
||
|
import xmltodict
|
||
|
from behave import *
|
||
|
|
||
|
from ftest_common.env import FTEST_USER
|
||
|
from ftest_common.logger import logger
|
||
|
from ftest_common.webfinger import get_webfinger
|
||
|
from ftest_common.obj import get_ap_obj
|
||
|
|
||
|
|
||
|
def query_host_meta(https: bool):
|
||
|
from ftest_common.env import TARGET_HOST
|
||
|
|
||
|
if https:
|
||
|
scheme = "https"
|
||
|
else:
|
||
|
scheme = "http"
|
||
|
parsed_target_host = urlparse(TARGET_HOST)
|
||
|
host_meta = urlunparse(
|
||
|
(
|
||
|
scheme,
|
||
|
parsed_target_host.netloc,
|
||
|
"/.well-known/host-meta",
|
||
|
"",
|
||
|
"",
|
||
|
"",
|
||
|
)
|
||
|
)
|
||
|
logger.debug(f"fetching {host_meta}")
|
||
|
resp = requests.get(host_meta)
|
||
|
logger.debug(
|
||
|
f"host-meta response:\n\nSTATUS: {resp.status_code}\n\nHEADERS:\n {resp.headers}\n\nRESPONSE PAYLOAD:\n{resp.content}"
|
||
|
)
|
||
|
return resp
|
||
|
|
||
|
|
||
|
@given("A Fediverse server")
|
||
|
def step_impl(context):
|
||
|
pass
|
||
|
|
||
|
|
||
|
@when("Querying /.well-known/host-meta")
|
||
|
def step_impl(context):
|
||
|
try:
|
||
|
http_res = query_host_meta(False)
|
||
|
context.http_res = http_res
|
||
|
if http_res.status_code == 200:
|
||
|
context.http_host_meta = xmltodict.parse(http_res.content)
|
||
|
logger.info("[SUCCESS] host-meta available on 80")
|
||
|
context.host_meta = context.http_host_meta
|
||
|
context.resp = context.http_res
|
||
|
except:
|
||
|
logger.debug("[ERROR] host-meta not available on 80")
|
||
|
pass
|
||
|
|
||
|
try:
|
||
|
https_res = query_host_meta(True)
|
||
|
context.https_res = https_res
|
||
|
if https_res.status_code == 200:
|
||
|
context.https_host_meta = xmltodict.parse(https_res.content)
|
||
|
logger.info("[SUCCESS] host-meta available on 443")
|
||
|
context.host_meta = context.https_host_meta
|
||
|
context.resp = context.https_res
|
||
|
except:
|
||
|
logger.debug("[ERROR] host-meta not available on 443")
|
||
|
pass
|
||
|
|
||
|
|
||
|
@then("The page must resolve at port 80 or 443")
|
||
|
def step_impl(context):
|
||
|
name = "Page must resolve at port 80 or 443"
|
||
|
|
||
|
try:
|
||
|
assert (
|
||
|
any(
|
||
|
[
|
||
|
"https_host_meta" in context,
|
||
|
"http_host_meta" in context,
|
||
|
]
|
||
|
)
|
||
|
is True
|
||
|
), name
|
||
|
logger.info("[SUCCESS] {name}")
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
context.success.append(name)
|
||
|
|
||
|
|
||
|
@then("If both port 80 _and_ 443 work, then they SHOULD return same document")
|
||
|
def step_impl(context):
|
||
|
if all(["https_host_meta" in context, "http_host_meta" in context]):
|
||
|
assert (
|
||
|
context.host_meta == context.https_host_meta
|
||
|
), "Both 80 and 443 returned the same response"
|
||
|
|
||
|
|
||
|
@then('The document SHOULD be served with "application/xrd+xml" media type')
|
||
|
def step_impl(context):
|
||
|
name = 'The document SHOULD be served with "application/xrd+xml" media type'
|
||
|
try:
|
||
|
resps = []
|
||
|
if all(["https_host_meta" in context, "http_host_meta" in context]):
|
||
|
resps = [context.http_res, context.https_res]
|
||
|
else:
|
||
|
resps = [context.resp]
|
||
|
|
||
|
for resp in resps:
|
||
|
assert (
|
||
|
"application/xrd+xml" in resp.headers["Content-Type"]
|
||
|
), "Document served with 'application/xrd+xml' media type"
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
context.success.append(name)
|
||
|
|
||
|
|
||
|
@then('The host-meta document root MUST be an "XRD" element')
|
||
|
def step_impl(context):
|
||
|
name = 'The host-meta document root MUST be an "XRD" element'
|
||
|
try:
|
||
|
assert len(context.host_meta) == 1, "Only one root element"
|
||
|
assert "XRD" in context.host_meta, "'XRX' is the root element"
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
context.success.append(name)
|
||
|
|
||
|
|
||
|
@then('The document SHOULD NOT include a "Subject" element')
|
||
|
def step_impl(context):
|
||
|
name = 'The document SHOULD NOT include a "Subject" element'
|
||
|
try:
|
||
|
assert (
|
||
|
"Subject" not in context.host_meta["XRD"]
|
||
|
), "Document SHOULD NOT include a 'Subject' element"
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
context.success.append(name)
|
||
|
|
||
|
|
||
|
@then('The document SHOULD include "Link" element')
|
||
|
def step_impl(context):
|
||
|
name = 'The document SHOULD include "Link" element'
|
||
|
try:
|
||
|
assert (
|
||
|
"Link" in context.host_meta["XRD"]
|
||
|
), "Document SHOULD include a 'Link' element"
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
context.success.append(name)
|
||
|
|
||
|
|
||
|
@then(
|
||
|
'The document\'s "Link" element should include either "template" or "href" attributes'
|
||
|
)
|
||
|
def step_impl(context):
|
||
|
def _check(obj):
|
||
|
return any(["@template" in obj, "@href" in obj])
|
||
|
|
||
|
name = 'The document\'s "Link" element should include either "template" or "href" attributes'
|
||
|
try:
|
||
|
if isinstance(context.host_meta["XRD"]["Link"], collections.abc.Sequence):
|
||
|
for link in context.host_meta["XRD"]["Link"]:
|
||
|
assert _check(
|
||
|
link
|
||
|
), f'The document\'s "Link" element should include either "template" or "href" attributes. Got {link}'
|
||
|
else:
|
||
|
link = context.host_meta["XRD"]["Link"]
|
||
|
assert _check(
|
||
|
link
|
||
|
), f'The document\'s "Link" element should include either "template" or "href" attributes. Got {link}'
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
context.success.append(name)
|
||
|
|
||
|
|
||
|
@then("The document should have at least on 'lrdd' document")
|
||
|
def step_impl(context):
|
||
|
def _check(obj):
|
||
|
return obj["@rel"] == "lrdd"
|
||
|
|
||
|
lrdd_found = False
|
||
|
name = "The document should have at least on 'lrdd' document"
|
||
|
|
||
|
if isinstance(context.host_meta["XRD"]["Link"], collections.abc.Sequence):
|
||
|
for link in context.host_meta["XRD"]["Link"]:
|
||
|
if _check(link):
|
||
|
lrdd_found = True
|
||
|
break
|
||
|
else:
|
||
|
link = context.host_meta["XRD"]["Link"]
|
||
|
if _check(link):
|
||
|
lrdd_found = True
|
||
|
|
||
|
try:
|
||
|
assert lrdd_found is True
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
context.success.append(name)
|
||
|
|
||
|
|
||
|
@then(
|
||
|
"Fediverse specific: The lrdd document must include template containing WebFinger well-known URI"
|
||
|
)
|
||
|
def step_impl(context):
|
||
|
def _check(obj):
|
||
|
if obj["@rel"] == "lrdd":
|
||
|
if "@template" in obj:
|
||
|
if "/.well-known/webfinger?resource={uri}" in obj["@template"]:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
webfinger_found = False
|
||
|
name = "Fediverse specific: The lrdd document must include template containing WebFinger well-known URI"
|
||
|
|
||
|
if isinstance(context.host_meta["XRD"]["Link"], collections.abc.Sequence):
|
||
|
for link in context.host_meta["XRD"]["Link"]:
|
||
|
if _check(link):
|
||
|
webfinger_found = True
|
||
|
break
|
||
|
else:
|
||
|
link = context.host_meta["XRD"]["Link"]
|
||
|
if _check(link):
|
||
|
webfinger_found = True
|
||
|
|
||
|
try:
|
||
|
assert webfinger_found is True
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
context.success.append(name)
|