2023-09-06 19:42:37 +05:30
#!/bin/env /usr/bin/python
2023-09-06 19:53:23 +05:30
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
2023-09-06 19:42:37 +05:30
import logging
import os
import requests
from urllib . parse import urlparse , urlunparse
2023-09-28 00:06:16 +05:30
LOG_FILE = " webfinger.log "
2023-09-06 19:42:37 +05:30
def configure_logger ( ) :
logger = logging . getLogger ( " webfinger " )
logger . setLevel ( logging . DEBUG )
2023-09-28 00:06:16 +05:30
fh = logging . FileHandler ( LOG_FILE )
2023-09-06 19:42:37 +05:30
fh . setLevel ( logging . DEBUG )
ch = logging . StreamHandler ( )
ch . setLevel ( logging . DEBUG )
formatter = logging . Formatter (
" %(asctime)s - %(name)s - %(levelname)s - %(message)s "
)
fh . setFormatter ( formatter )
ch . setFormatter ( formatter )
logger . addHandler ( fh )
logger . addHandler ( ch )
return logger
def get_env ( name ) - > str :
env = FTEST_AUTH = os . environ . get ( name )
logger . info ( f " Environment: { name } : { env } " )
2023-09-07 18:43:38 +05:30
if env is None :
2023-09-08 16:43:57 +05:30
raise Exception (
f " Please set environment variable { name } . See https://git.batsense.net/ForgeFlux/webfinger-test#environment-variables "
)
2023-09-06 19:42:37 +05:30
return env
logger = configure_logger ( )
FTEST_AUTH = get_env ( " FTEST_AUTH " )
2023-09-28 00:06:16 +05:30
FTEST_HOST = get_env ( " FTEST_HOST " )
2023-09-06 19:42:37 +05:30
TARGET_HOST = get_env ( " FTEST_TARGET_HOST " )
TEST_USER = get_env ( " FTEST_USER " ) # actor ex: john@example.org
TEST_HOST = urlparse ( TARGET_HOST ) . netloc
def query_webfinger ( ) :
parsed_target_host = urlparse ( TARGET_HOST )
webfinger = urlunparse (
(
parsed_target_host . scheme ,
parsed_target_host . netloc ,
" /.well-known/webfinger " ,
" " ,
2023-09-28 00:06:16 +05:30
f " resource=acct: { TEST_USER } @ { TARGET_HOST } " ,
2023-09-06 19:42:37 +05:30
" " ,
)
)
logger . info ( f " Query WebFinger: { webfinger } " )
res = requests . get ( webfinger , headers = { " Origin " : " http://example.org " } )
logger . debug (
f " WebFinger response: \n \n STATUS: { res . status_code } \n \n HEADERS: \n { res . headers } \n \n RESPONSE PAYLOAD: \n { res . json ( ) } "
)
assert res . status_code == 200
logger . info ( " [SUCCESS] WebFinger query response is HTTP 200 " )
return res
def test_main_params ( resp ) :
assert " subject " in resp , " Parameter ' subject ' is not present in WebFinger response "
assert " links " in resp , " Parameter ' links ' is not present in WebFinger response "
logger . info (
2023-09-08 16:43:57 +05:30
" [SUCCESS] WebFinger response has ' subject ' , ' aliases ' and ' links ' parameters "
2023-09-06 19:42:37 +05:30
)
def test_links ( resp ) :
self_link = None
profile_link = None
for link in resp [ " links " ] :
if link [ " rel " ] == " self " :
self_link = link
elif link [ " rel " ] == " http://webfinger.net/rel/profile-page " :
profile_link = link
logger . debug (
" ' rel==http://webfinger.net/rel/profile-page ' is present in ' links ' WebFinger response parameter "
)
assert (
self_link is not None
) , " ' rel==self ' is not present in ' links ' WebFinger response parameter "
assert (
self_link [ " rel " ] == " self "
) , f ' [rel==self] expected rel:self, got rel: { self_link [ " rel " ] } '
assert (
self_link [ " type " ] == f " application/activity+json "
) , f " [rel==self] expected application/activity+json; got { self_link [ ' type ' ] } "
assert " href " in self_link , " [rel==self] href not present in link item "
logger . info ( " [SUCESS] rel==self passed schema validation " )
if profile_link :
assert (
profile_link [ " rel " ] == " http://webfinger.net/rel/profile-page "
) , f " expected http://webfinger.net/rel/profile-page got { profile_link [ ' rel ' ] } "
assert (
profile_link [ " type " ] == " text/html "
) , f " expected text/html got { profile_link [ ' type ' ] } "
assert (
" href " in profile_link
) , " [rel==profile link] href not present in link item "
logger . info ( " [SUCESS] rel==profile-page passed schema validation " )
logger . info ( " [SUCESS] ' links ' object passed validation " )
def test_subject ( resp ) :
subject = f " acct: { TEST_USER } "
assert (
resp [ " subject " ] == subject
) , f " Subject parameter doesn ' t match. Expected { subject } got { resp [ ' subject ' ] } "
def test_access_control_allow_origin ( resp ) :
# request.headers is case insensitive
2023-09-08 16:43:57 +05:30
assert (
resp . headers [ " access-control-allow-origin " ] == " * "
) , " Access-Control-Allow-Origin header should be ' * ' to allow any domain to access the resource with CORS. Please see https://www.rfc-editor.org/rfc/rfc7033.html#section-5 "
2023-09-06 19:42:37 +05:30
logger . info ( " [SUCESS] WebFinger endpoint is configured correctly for CORS " )
2023-09-28 00:06:16 +05:30
def upload_logs_to_ftest ( success : bool , logs : str ) :
parsed_ftest_host = urlparse ( FTEST_HOST )
ftest = urlunparse (
(
parsed_ftest_host . scheme ,
parsed_ftest_host . netloc ,
" /suites//tests/results " ,
" " ,
" "
" " ,
)
)
raw_logs = " "
with open ( LOG_FILE ) as log_file :
for line in log_file :
raw_logs + = line
payload = {
" success " : success ,
" logs " : logs ,
" raw_logs " : raw_logs ,
}
res = requests . post ( webfinger , headers = { " Origin " : " http://example.org " } )
2023-09-06 19:42:37 +05:30
if __name__ == " __main__ " :
max_score = 5
score = 0
resp = query_webfinger ( )
json = resp . json ( )
score + = 1
2023-09-08 16:43:57 +05:30
2023-09-06 19:42:37 +05:30
success = [ ]
2023-09-08 16:43:57 +05:30
failures = { }
2023-09-06 19:42:37 +05:30
try :
test_main_params ( json )
score + = 1
success . append ( " test_main_params " )
except Exception as e :
logger . error ( e )
failures [ " test_main_params " ] = e
try :
test_links ( json )
score + = 1
success . append ( " test_links " )
except Exception as e :
logger . error ( e )
failures [ " test_links " ] = e
try :
test_subject ( json )
score + = 1
success . append ( " test_subject " )
except Exception as e :
logger . error ( e )
failures [ " test_subject " ] = e
try :
test_access_control_allow_origin ( resp )
score + = 1
success . append ( " test_access_control_allow_origin " )
except Exception as e :
logger . error ( e )
failures [ " test_access_control_allow_origin " ] = e
print ( " \n \n =============== " )
if score == max_score :
print ( " All tests passed " )
elif score > 0 :
print ( f " Partial success. { score } out of { max_score } tests passed " )
print ( " Summary: \n " )
2023-09-28 00:06:16 +05:30
logs = [ ]
2023-09-06 19:42:37 +05:30
if success :
print ( f " Successful tests: \n " )
for s in success :
2023-09-28 00:06:16 +05:30
log = f " [OK] { s } "
print ( log )
logs . append ( log )
2023-09-06 19:42:37 +05:30
if failures :
print ( f " \n \n Failed tests: \n " )
for _ , ( test , error ) in enumerate ( failures . items ( ) ) :
2023-09-28 00:06:16 +05:30
log = f " [FAIL] { test } failed with error: \n { error } \n ----- \n "
print ( log )
logs . append ( log )