91 lines
2.4 KiB
Python
91 lines
2.4 KiB
Python
|
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
|
||
|
#
|
||
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||
|
|
||
|
import os
|
||
|
from urllib.parse import urlparse, urlunparse
|
||
|
|
||
|
import requests
|
||
|
from behave import *
|
||
|
|
||
|
from common.env import FTEST_USER
|
||
|
from common import logger
|
||
|
from common.webfinger import get_webfinger
|
||
|
from common.obj import get_ap_obj
|
||
|
|
||
|
|
||
|
@given("A Fediverse server")
|
||
|
def get_fediverse_actor(context):
|
||
|
webfinger = get_webfinger()
|
||
|
actor_url = None
|
||
|
for link in webfinger["links"]:
|
||
|
if "type" in link:
|
||
|
if all(
|
||
|
[link["rel"] == "self", link["type"] == "application/activity+json"]
|
||
|
):
|
||
|
actor_url = link["href"]
|
||
|
|
||
|
assert actor_url is not None
|
||
|
logger.info(f"Actor URL {actor_url}")
|
||
|
context.actor_url = actor_url
|
||
|
|
||
|
|
||
|
@when("Receiving or querying an object")
|
||
|
def check_actor_exists(context):
|
||
|
name = "check_actor_exists"
|
||
|
try:
|
||
|
actor = get_ap_obj(context.actor_url)
|
||
|
assert actor is not None
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
|
||
|
context.success.append(name)
|
||
|
context.actor = actor
|
||
|
|
||
|
|
||
|
@then("the response must contain an 'id' and a 'type' parameter")
|
||
|
def verify_obj_attrs(context):
|
||
|
name = "verify_obj_attrs"
|
||
|
actor = context.actor
|
||
|
try:
|
||
|
assert "id" in actor, "'id' attribute present in object'"
|
||
|
assert "type" in actor, "'type' attribute present in object'"
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
|
||
|
context.success.append(name)
|
||
|
|
||
|
|
||
|
@then("The 'id' must resolve to the same object")
|
||
|
def verify_id_resolves(context):
|
||
|
name = "verify_id_resolves"
|
||
|
actor = context.actor
|
||
|
try:
|
||
|
assert get_ap_obj(actor["id"]) == actor, "'id' URL resolves to same object"
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|
||
|
|
||
|
|
||
|
@then("context must be ActivityPub")
|
||
|
def ctx_is_ap(context):
|
||
|
name = "ctx_is_ap"
|
||
|
|
||
|
obj_ctx = context.actor["@context"]
|
||
|
activitypub_present = False
|
||
|
for ctx in obj_ctx:
|
||
|
if ctx == "https://www.w3.org/ns/activitystreams":
|
||
|
activitypub_present = True
|
||
|
break
|
||
|
try:
|
||
|
assert activitypub_present is True, "ActivityPub context is included"
|
||
|
except Exception as e:
|
||
|
logger.error(e)
|
||
|
context.failure[name] = e
|
||
|
raise e
|