feat: import from forgejo-nc

This commit is contained in:
Aravinth Manivannan 2023-09-27 15:47:04 +05:30
commit 937a41896b
Signed by: realaravinth
GPG key ID: F8F50389936984FF
7 changed files with 778 additions and 0 deletions

3
forgejo/__init__.py Normal file
View file

@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

17
forgejo/__main__.py Normal file
View file

@ -0,0 +1,17 @@
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import argparse
from .cli import Cli
def admin(args):
print(args)
if __name__ == "__main__":
cli = Cli()
opts = cli.parse()
opts.func(opts, c=cli.c)

220
forgejo/cli.py Normal file
View file

@ -0,0 +1,220 @@
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import argparse
from requests import Session
def forgejo_from_args(args, c: Session):
from .forgejo import Forgejo
return Forgejo(
host=args.host,
username=args.username,
password=args.password,
email=args.email,
c=c,
)
class Forgejo:
def __init__(self, parser, c: Session):
self.c = c
self.parser = parser
self.subparser = self.parser.add_subparsers()
self.install()
self.register()
self.login()
self.create_repository()
self.create_issue()
self.create_comment()
self.create_access_token()
def __add_credentials_parser(self, parser):
group = parser.add_argument_group("credentials", "User credentials")
group.add_argument("username", type=str, help="Forgejo user's username")
group.add_argument("password", type=str, help="Forgejo user's password")
group.add_argument("email", type=str, help="Forgejo user's email")
group.add_argument("host", type=str, help="URI at which Forgejo is running")
def install(self):
def run(args, c: Session):
forgejo = forgejo_from_args(args, c=c)
forgejo.install()
self.install_parser = self.subparser.add_parser(
name="install", description="Install Forgejo", help="Install Forgejo"
)
self.__add_credentials_parser(self.install_parser)
self.install_parser.set_defaults(func=run)
def register(self):
def run(args, c: Session):
forgejo = forgejo_from_args(args, c=c)
forgejo.register()
self.register_parser = self.subparser.add_parser(
name="register",
description="Forgejo user registration",
help="Register a user on Forgejo",
)
self.__add_credentials_parser(self.register_parser)
self.register_parser.set_defaults(func=run)
def login(self):
def run(args, c: Session):
forgejo = forgejo_from_args(args, c=c)
forgejo.login()
self.login_parser = self.subparser.add_parser(
name="login", description="Forgejo user login", help="Login on Forgejo"
)
self.__add_credentials_parser(self.login_parser)
self.login_parser.set_defaults(func=run)
def create_repository(self):
def run(args, c: Session):
forgejo = forgejo_from_args(args, c=c)
forgejo.login()
forgejo.create_repository(name=args.repo_name)
self.create_repository_parser = self.subparser.add_parser(
name="create_repo",
description="Create repository on Forgejo",
help="Create repository on Forgejo",
)
self.__add_credentials_parser(self.create_repository_parser)
self.create_repository_parser.set_defaults(func=run)
self.create_repository_parser.add_argument(
"repo_name", type=str, help="Name of the repository to be created"
)
def create_issue(self):
def run(args, c: Session):
forgejo = forgejo_from_args(args, c=c)
forgejo.login()
forgejo.create_issue(
owner=args.owner, repo=args.repo, title=args.title, body=args.body
)
self.create_issue_parser = self.subparser.add_parser(
name="create_issue",
description="Create issue on a repository owned by someone on Forgejo",
help="Create issue on Forgejo",
)
self.__add_credentials_parser(self.create_issue_parser)
self.create_issue_parser.set_defaults(func=run)
self.create_issue_parser.add_argument(
"owner", type=str, help="Owner of the repo"
)
self.create_issue_parser.add_argument(
"repo", type=str, help="Name of the repository"
)
self.create_issue_parser.add_argument("title", type=str, help="title")
self.create_issue_parser.add_argument("body", type=str, help="body")
def create_comment(self):
def run(args, c: Session):
forgejo = forgejo_from_args(args, c=c)
forgejo.login()
forgejo.create_comment(
owner=args.owner, repo=args.repo, issue=args.issue, body=args.body
)
self.create_comment_parser = self.subparser.add_parser(
name="create_comment",
description="Create comment on a repository owned by someone on Forgejo",
help="Create comment on Forgejo",
)
self.__add_credentials_parser(self.create_comment_parser)
self.create_comment_parser.set_defaults(func=run)
self.create_comment_parser.add_argument(
"owner", type=str, help="Owner of the repo"
)
self.create_comment_parser.add_argument(
"repo", type=str, help="Name of the repository"
)
self.create_comment_parser.add_argument(
"issue", type=int, help="ID of the issue"
)
self.create_comment_parser.add_argument("body", type=str, help="body")
def create_access_token(self):
def run(args, c: Session):
forgejo = forgejo_from_args(args, c=c)
forgejo.login()
forgejo.create_access_token(name=args.name, file=args.file, repo=args.repo)
self.create_access_token_parser = self.subparser.add_parser(
name="create_access_token",
description="Create access token for user",
help="Create access toekn for user",
)
self.__add_credentials_parser(self.create_access_token_parser)
self.create_access_token_parser.set_defaults(func=run)
self.create_access_token_parser.add_argument(
"name", type=str, help="name of the access token"
)
self.create_access_token_parser.add_argument(
"file", type=str, help="filepath to write the token value"
)
self.create_access_token_parser.add_argument(
"repo", type=str, help="repo name write to json file"
)
class Cli:
def __init__(self):
c = Session()
self.c = c
self.parser = argparse.ArgumentParser(
description="Install and Bootstrap Forgejo and Hostea Dashboard"
)
self.subparser = self.parser.add_subparsers()
self.check_env()
self.forgejo()
def __add_credentials_parser(self, parser):
group = parser.add_argument_group("credentials", "User credentials")
group.add_argument("username", type=str, help="Forgejo user's username")
group.add_argument("password", type=str, help="Forgejo user's password")
group.add_argument("email", type=str, help="Forgejo user's email")
def check_env(self):
def run(args, c: Session):
from .forgejo import Forgejo
Forgejo.check_online(host=args.forgejo_host)
self.check_env_parser = self.subparser.add_parser(
name="check_env",
description="Check and block until environment is ready",
help="Check and block until environment is ready",
)
self.check_env_parser.add_argument(
"forgejo_host", type=str, help="URI at which Forgejo is running"
)
self.check_env_parser.set_defaults(func=run)
def forgejo(self):
self.forgejo = self.subparser.add_parser(
name="forgejo",
description="Forgejo",
help="Forgejo-related functionality",
)
Forgejo(parser=self.forgejo, c=self.c)
def parse(self):
return self.parser.parse_args()

34
forgejo/csrf.py Normal file
View file

@ -0,0 +1,34 @@
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
from html.parser import HTMLParser
class ParseCSRF(HTMLParser):
token: str = None
def __init__(self, name):
HTMLParser.__init__(self)
self.name = name
def handle_starttag(self, tag: str, attrs: (str, str)):
if self.token:
return
if tag != "input":
return
token = None
for index, (k, v) in enumerate(attrs):
if k == "value":
token = v
if all([k == "name", v == self.name]):
if token:
self.token = token
return
for inner_index, (nk, nv) in enumerate(attrs, start=index):
if nk == "value":
self.token = nv
return

315
forgejo/forgejo.py Executable file
View file

@ -0,0 +1,315 @@
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import os
import random
import json
from urllib.parse import urlunparse, urlparse
from html.parser import HTMLParser
from time import sleep
from requests import Session
from requests.auth import HTTPBasicAuth
import requests
from .csrf import ParseCSRF
# FORGEJO_USER = "root"
# FORGEJO_EMAIL = "root@example.com"
# FORGEJO_PASSWORD = "foobarpassword"
# HOST = "http://localhost:8080"
#
# REPOS = []
class Forgejo:
def __init__(self, host: str, username: str, password: str, email: str, c: Session):
self.host = host
self.username = username
self.password = password
self.email = email
self.c = c
self.__csrf_key = "_csrf"
self.__logged_in = False
def get_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse((parsed.scheme, parsed.netloc, path, "", "", ""))
def get_api_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse(
(
parsed.scheme,
f"{self.username}:{self.password}@{parsed.netloc}",
path,
"",
"",
"",
)
)
@staticmethod
def check_online(host: str):
"""
Check if Forgejo instance is online
"""
count = 0
parsed = urlparse(host)
url = urlunparse((parsed.scheme, parsed.netloc, "/", "", "", ""))
while True:
try:
res = requests.get(url, allow_redirects=False)
if any([res.status_code == 302, res.status_code == 200]):
break
except:
sleep(2)
print(f"Retrying {count} time")
count += 1
continue
def install(self):
"""
Install Forgejo, first form that a user sees when a new instance is
deployed
"""
parsed = urlparse(self.host)
# cwd = os.environ.get("PWD")
# user = os.environ.get("USER")
payload = {
"db_type": "sqlite3",
"db_host": "localhost:3306",
"db_user": "root",
"db_passwd": "",
"db_name": "forgejo",
"ssl_mode": "disable",
"db_schema": "",
"charset": "utf8",
"db_path": "/data/gitea/gitea.db",
"app_name": "Forgejo:+Beyond+Coding+We+Forge",
"repo_root_path": "/data/git/repositories",
"lfs_root_path": "/data/git/lfs",
"run_user": "git",
"domain": parsed.netloc,
"ssh_port": "22",
"http_port": "3000",
"app_url": self.get_uri(""),
"log_root_path": "/data/gitea/log",
"smtp_host": "",
"smtp_from": "",
"smtp_user": "",
"smtp_passwd": "",
"enable_federated_avatar": "on",
"enable_open_id_sign_in": "on",
"enable_open_id_sign_up": "on",
"default_allow_create_organization": "on",
"default_enable_timetracking": "on",
"no_reply_address": "noreply.localhost",
"password_algorithm": "pbkdf2",
"admin_name": self.username,
"admin_passwd": self.password,
"admin_confirm_passwd": self.password,
"admin_email": self.email,
}
resp = self.c.post(self.get_uri(""), data=payload, allow_redirects=False)
sleep(10)
def get_csrf_token(self, url: str) -> str:
"""
Get CSRF token at a URI
"""
resp = self.c.get(url, allow_redirects=False)
if resp.status_code != 200 and resp.status_code != 302:
print(url, resp.status_code)
raise Exception(f"Can't get csrf token: {resp.status_code}")
parser = ParseCSRF(name=self.__csrf_key)
parser.feed(resp.text)
csrf = parser.token
return csrf
def register(self):
"""
Register User
"""
url = self.get_uri("/user/sign_up")
csrf = self.get_csrf_token(url)
payload = {
"_csrf": csrf,
"user_name": self.username,
"password": self.password,
"retype": self.password,
"email": self.email,
}
self.c.post(url, data=payload, allow_redirects=False)
def login(self):
"""
Login, must be called at least once before performing authenticated
operations
"""
if self.__logged_in:
return
url = self.get_uri("/user/login")
csrf = self.get_csrf_token(url)
payload = {
"_csrf": csrf,
"user_name": self.username,
"password": self.password,
"remember": "on",
}
resp = self.c.post(url, data=payload, allow_redirects=False)
if any(
[resp.status_code == 302, resp.status_code == 200, resp.status_code == 303]
):
print("User logged in")
self.__logged_in = True
return
raise Exception(
f"[ERROR] Authentication failed. status code {resp.status_code}"
)
def create_repository(self, name: str):
"""
Create repository
"""
self.login()
def get_repository_payload(csrf: str, name: str, user_id: str):
data = {
"_csrf": csrf,
"uid": user_id,
"repo_name": name,
"description": f"this repository is named {name}",
"repo_template": "",
"issue_labels": "",
"gitignores": "",
"license": "",
"readme": "Default",
"default_branch": "master",
"trust_model": "default",
}
return data
url = self.get_uri("/repo/create")
user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"]
csrf = self.get_csrf_token(url)
data = get_repository_payload(csrf, name, user_id=user_id)
resp = self.c.post(url, data=data, allow_redirects=False)
print(f"Created repository {name}")
if (
resp.status_code != 302
and resp.status_code != 200
and resp.status_code != 303
):
raise Exception(
f"Error while creating repository: {name} {resp.status_code}"
)
def create_issue(self, owner: str, repo: str, title: str, body: str):
"""
Create issue
"""
self.login()
def create_issue_payload(csrf: str, title: str, body: str):
data = {
"_csrf": csrf,
"title": title,
"content": body,
"search": "",
"label_ids": "",
"milestone_id": "",
"project_id": "",
"assignee_ids": "",
"redirect_after_creation": "",
}
return data
url = self.get_uri(f"/{owner}/{repo}/issues/new")
csrf = self.get_csrf_token(url)
data = create_issue_payload(csrf=csrf, title=title, body=body)
resp = self.c.post(url, data=data, allow_redirects=False)
print(f"Created issue")
if (
resp.status_code != 302
and resp.status_code != 200
and resp.status_code != 303
):
raise Exception(f"Error while creating issue: {resp.status_code}")
def create_comment(self, owner: str, repo: str, issue: int, body: str):
"""
Create comment
"""
self.login()
def create_comment_payload(csrf: str, body: str):
data = {
"_csrf": csrf,
"content": body,
"status": "",
}
return data
url = self.get_uri(f"/{owner}/{repo}/issues/{issue}")
csrf = self.get_csrf_token(url)
data = create_comment_payload(csrf, body)
url = self.get_uri(f"/{owner}/{repo}/issues/{issue}/comments")
resp = self.c.post(url, data=data, allow_redirects=False)
print(f"Created comment")
if (
resp.status_code != 302
and resp.status_code != 200
and resp.status_code != 303
):
raise Exception(f"Error while creating comment: {resp.status_code}")
def create_access_token(self, name: str, file: str, repo: str):
"""
Create access token
"""
def create_access_token_payload(name: str):
data = {
"name": name,
"scopes": [
"read:issue",
"write:notification",
"read:repository",
"read:user",
],
}
return data
url = self.get_uri(f"/api/v1/users/{self.username}/tokens")
data = create_access_token_payload(name)
session = Session()
session.auth = (self.username, self.password)
resp = session.post(url, json=data, allow_redirects=False)
if resp.status_code != 201:
raise Exception(f"Error while creating access token: {resp.status_code} {resp} {resp.text}")
print("Created access token")
data = resp.json()
with open(file, "w") as f:
data["login"] = self.username
data["email"] = self.email
data["repo"] = repo
data["forgejo_url"] = self.get_uri("")
content = json.dumps(data)
f.write(content)
print(f"Wrote access token to {file}")

170
lib.sh Executable file
View file

@ -0,0 +1,170 @@
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
is_ci(){
if [ -z ${CI+x} ];
then
return 1
else
return 0
fi
}
if is_ci
then
mkdir /tmp/forgejo-init-script
FORGEJO_USER1_ACCESS_TOKEN_FILE_PATH=/tmp/forgeo-init-scrit/user1-accesstoken.json
else
FORGEJO_USER1_ACCESS_TOKEN_FILE_PATH=tmp/user1-accesstoken.json
fi
readonly FORGEJO_ROOT_USERNAME=root
readonly FORGEJO_ROOT_EMAIL="$FORGEJO_ROOT_USERNAME@example.org"
readonly FORGEJO_ROOT_PASSOWRD=supercomplicatedpassword
readonly FORGEJO_USER1_USERNAME=john
readonly FORGEJO_USER1_PASSWORD=supercomplicatedpassword
readonly FORGEJO_USER1_EMAIL="$FORGEJO_USER1_USERNAME@example.org"
readonly FORGEJO_USER1_SUPPORT_REPO="test_repo"
readonly FORGEJO_USER1_ACCESS_TOKEN_NAME="coreaccesstoken"
rm $FORGEJO_USER1_ACCESS_TOKEN_FILE_PATH || true
readonly FORGEJO_TESTUSER_USERNAME=alice
readonly FORGEJO_TESTUSER_PASSWORD=supercomplicatedpassword
readonly FORGEJO_TESTUSER_EMAIL="$FORGEJO_TESTUSER_USERNAME@example.org"
wait_for_env() {
python -m forgejo \
check_env $FORGEJO_URL
}
# register root user on Forgejo to simulate Hoste admin and integrate SSO
forgejo_root(){
# python -m forgejo \
# forgejo install \
# $FORGEJO_ROOT_USERNAME $FORGEJO_ROOT_PASSOWRD \
# $FORGEJO_ROOT_EMAIL \
# $FORGEJO_URL
python -m forgejo \
forgejo register \
$FORGEJO_ROOT_USERNAME $FORGEJO_ROOT_PASSOWRD \
$FORGEJO_ROOT_EMAIL \
$FORGEJO_URL
python -m forgejo \
forgejo login \
$FORGEJO_ROOT_USERNAME $FORGEJO_ROOT_PASSOWRD \
$FORGEJO_ROOT_EMAIL \
$FORGEJO_URL
}
# register user "Hostea" on Forgejo and create support repository
init_users_repo() {
python -m forgejo \
forgejo register \
$FORGEJO_USER1_USERNAME $FORGEJO_USER1_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL
python -m forgejo \
forgejo login \
$FORGEJO_USER1_USERNAME $FORGEJO_USER1_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL
python -m forgejo \
forgejo create_repo \
$FORGEJO_USER1_USERNAME $FORGEJO_USER1_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_SUPPORT_REPO
python -m forgejo \
forgejo register \
$FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \
$FORGEJO_TESTUSER_EMAIL \
$FORGEJO_URL
python -m forgejo \
forgejo create_issue \
$FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_USERNAME \
$FORGEJO_USER1_SUPPORT_REPO \
"normal issue title" "normal issue body"
python -m forgejo \
forgejo create_comment \
$FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_USERNAME \
$FORGEJO_USER1_SUPPORT_REPO \
1 "normal body"
python -m forgejo \
forgejo create_comment \
$FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_USERNAME \
$FORGEJO_USER1_SUPPORT_REPO \
1 "mention body @$FORGEJO_USER1_USERNAME"
python -m forgejo \
forgejo create_issue \
$FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_USERNAME \
$FORGEJO_USER1_SUPPORT_REPO \
"normal issue title" "mention issue @$FORGEJO_USER1_USERNAME"
python -m forgejo \
forgejo create_issue \
$FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_USERNAME \
$FORGEJO_USER1_SUPPORT_REPO \
"mention issue @$FORGEJO_USER1_USERNAME" "normal issue body"
python -m forgejo \
forgejo create_issue \
$FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_USERNAME \
$FORGEJO_USER1_SUPPORT_REPO \
"mention issue @$FORGEJO_USER1_USERNAME" "mention issue @$FORGEJO_USER1_USERNAME"
python -m forgejo \
forgejo create_issue \
$FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_USERNAME \
$FORGEJO_USER1_SUPPORT_REPO \
"normal issue title and normal body" "normal body"
python -m forgejo \
forgejo create_access_token \
$FORGEJO_USER1_USERNAME $FORGEJO_USER1_PASSWORD \
$FORGEJO_USER1_EMAIL \
$FORGEJO_URL \
$FORGEJO_USER1_ACCESS_TOKEN_NAME \
$FORGEJO_USER1_ACCESS_TOKEN_FILE_PATH \
$FORGEJO_USER1_SUPPORT_REPO
}
docker_compose_up() {
echo "[*] Starting Forgejo"
docker-compose -f docker-compose-dev-deps.yml up
}
docker_compose_down() {
docker-compose -f docker-compose-dev-deps.yml down
docker-compose -f docker-compose-dev-deps.yml down --remove-orphans
}

19
tests.sh Executable file
View file

@ -0,0 +1,19 @@
#!/bin/bash
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
set -Exeuo pipefail
source lib.sh
main() {
wait_for_env
forgejo_root
init_users_repo
echo "All Good! :)"
}
main
exit 0