From 937a41896b38c048b99fbadc74fbc242316f4464 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Wed, 27 Sep 2023 15:47:04 +0530 Subject: [PATCH] feat: import from forgejo-nc --- forgejo/__init__.py | 3 + forgejo/__main__.py | 17 +++ forgejo/cli.py | 220 +++++++++++++++++++++++++++++++ forgejo/csrf.py | 34 +++++ forgejo/forgejo.py | 315 ++++++++++++++++++++++++++++++++++++++++++++ lib.sh | 170 ++++++++++++++++++++++++ tests.sh | 19 +++ 7 files changed, 778 insertions(+) create mode 100644 forgejo/__init__.py create mode 100644 forgejo/__main__.py create mode 100644 forgejo/cli.py create mode 100644 forgejo/csrf.py create mode 100755 forgejo/forgejo.py create mode 100755 lib.sh create mode 100755 tests.sh diff --git a/forgejo/__init__.py b/forgejo/__init__.py new file mode 100644 index 0000000..7d2a576 --- /dev/null +++ b/forgejo/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2023 Aravinth Manivannan +# +# SPDX-License-Identifier: AGPL-3.0-or-later diff --git a/forgejo/__main__.py b/forgejo/__main__.py new file mode 100644 index 0000000..c2ffd00 --- /dev/null +++ b/forgejo/__main__.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: 2023 Aravinth Manivannan +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +import argparse + +from .cli import Cli + + +def admin(args): + print(args) + + +if __name__ == "__main__": + cli = Cli() + opts = cli.parse() + opts.func(opts, c=cli.c) diff --git a/forgejo/cli.py b/forgejo/cli.py new file mode 100644 index 0000000..f3334e0 --- /dev/null +++ b/forgejo/cli.py @@ -0,0 +1,220 @@ +# SPDX-FileCopyrightText: 2023 Aravinth Manivannan +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +import argparse + +from requests import Session + + +def forgejo_from_args(args, c: Session): + from .forgejo import Forgejo + + return Forgejo( + host=args.host, + username=args.username, + password=args.password, + email=args.email, + c=c, + ) + + +class Forgejo: + def __init__(self, parser, c: Session): + self.c = c + self.parser = parser + self.subparser = self.parser.add_subparsers() + self.install() + self.register() + self.login() + self.create_repository() + self.create_issue() + self.create_comment() + self.create_access_token() + + def __add_credentials_parser(self, parser): + group = parser.add_argument_group("credentials", "User credentials") + group.add_argument("username", type=str, help="Forgejo user's username") + group.add_argument("password", type=str, help="Forgejo user's password") + group.add_argument("email", type=str, help="Forgejo user's email") + group.add_argument("host", type=str, help="URI at which Forgejo is running") + + def install(self): + def run(args, c: Session): + forgejo = forgejo_from_args(args, c=c) + forgejo.install() + + self.install_parser = self.subparser.add_parser( + name="install", description="Install Forgejo", help="Install Forgejo" + ) + self.__add_credentials_parser(self.install_parser) + self.install_parser.set_defaults(func=run) + + def register(self): + def run(args, c: Session): + forgejo = forgejo_from_args(args, c=c) + forgejo.register() + + self.register_parser = self.subparser.add_parser( + name="register", + description="Forgejo user registration", + help="Register a user on Forgejo", + ) + self.__add_credentials_parser(self.register_parser) + self.register_parser.set_defaults(func=run) + + def login(self): + def run(args, c: Session): + forgejo = forgejo_from_args(args, c=c) + forgejo.login() + + self.login_parser = self.subparser.add_parser( + name="login", description="Forgejo user login", help="Login on Forgejo" + ) + self.__add_credentials_parser(self.login_parser) + self.login_parser.set_defaults(func=run) + + def create_repository(self): + def run(args, c: Session): + forgejo = forgejo_from_args(args, c=c) + forgejo.login() + forgejo.create_repository(name=args.repo_name) + + self.create_repository_parser = self.subparser.add_parser( + name="create_repo", + description="Create repository on Forgejo", + help="Create repository on Forgejo", + ) + self.__add_credentials_parser(self.create_repository_parser) + self.create_repository_parser.set_defaults(func=run) + self.create_repository_parser.add_argument( + "repo_name", type=str, help="Name of the repository to be created" + ) + + def create_issue(self): + def run(args, c: Session): + forgejo = forgejo_from_args(args, c=c) + forgejo.login() + forgejo.create_issue( + owner=args.owner, repo=args.repo, title=args.title, body=args.body + ) + + self.create_issue_parser = self.subparser.add_parser( + name="create_issue", + description="Create issue on a repository owned by someone on Forgejo", + help="Create issue on Forgejo", + ) + self.__add_credentials_parser(self.create_issue_parser) + self.create_issue_parser.set_defaults(func=run) + self.create_issue_parser.add_argument( + "owner", type=str, help="Owner of the repo" + ) + + self.create_issue_parser.add_argument( + "repo", type=str, help="Name of the repository" + ) + + self.create_issue_parser.add_argument("title", type=str, help="title") + + self.create_issue_parser.add_argument("body", type=str, help="body") + + def create_comment(self): + def run(args, c: Session): + forgejo = forgejo_from_args(args, c=c) + forgejo.login() + forgejo.create_comment( + owner=args.owner, repo=args.repo, issue=args.issue, body=args.body + ) + + self.create_comment_parser = self.subparser.add_parser( + name="create_comment", + description="Create comment on a repository owned by someone on Forgejo", + help="Create comment on Forgejo", + ) + self.__add_credentials_parser(self.create_comment_parser) + self.create_comment_parser.set_defaults(func=run) + self.create_comment_parser.add_argument( + "owner", type=str, help="Owner of the repo" + ) + + self.create_comment_parser.add_argument( + "repo", type=str, help="Name of the repository" + ) + + self.create_comment_parser.add_argument( + "issue", type=int, help="ID of the issue" + ) + + self.create_comment_parser.add_argument("body", type=str, help="body") + + def create_access_token(self): + def run(args, c: Session): + forgejo = forgejo_from_args(args, c=c) + forgejo.login() + forgejo.create_access_token(name=args.name, file=args.file, repo=args.repo) + + self.create_access_token_parser = self.subparser.add_parser( + name="create_access_token", + description="Create access token for user", + help="Create access toekn for user", + ) + self.__add_credentials_parser(self.create_access_token_parser) + self.create_access_token_parser.set_defaults(func=run) + self.create_access_token_parser.add_argument( + "name", type=str, help="name of the access token" + ) + + self.create_access_token_parser.add_argument( + "file", type=str, help="filepath to write the token value" + ) + + self.create_access_token_parser.add_argument( + "repo", type=str, help="repo name write to json file" + ) + + +class Cli: + def __init__(self): + c = Session() + self.c = c + self.parser = argparse.ArgumentParser( + description="Install and Bootstrap Forgejo and Hostea Dashboard" + ) + self.subparser = self.parser.add_subparsers() + self.check_env() + self.forgejo() + + def __add_credentials_parser(self, parser): + group = parser.add_argument_group("credentials", "User credentials") + group.add_argument("username", type=str, help="Forgejo user's username") + group.add_argument("password", type=str, help="Forgejo user's password") + group.add_argument("email", type=str, help="Forgejo user's email") + + def check_env(self): + def run(args, c: Session): + from .forgejo import Forgejo + + Forgejo.check_online(host=args.forgejo_host) + + self.check_env_parser = self.subparser.add_parser( + name="check_env", + description="Check and block until environment is ready", + help="Check and block until environment is ready", + ) + + self.check_env_parser.add_argument( + "forgejo_host", type=str, help="URI at which Forgejo is running" + ) + + self.check_env_parser.set_defaults(func=run) + + def forgejo(self): + self.forgejo = self.subparser.add_parser( + name="forgejo", + description="Forgejo", + help="Forgejo-related functionality", + ) + Forgejo(parser=self.forgejo, c=self.c) + + def parse(self): + return self.parser.parse_args() diff --git a/forgejo/csrf.py b/forgejo/csrf.py new file mode 100644 index 0000000..b5ff144 --- /dev/null +++ b/forgejo/csrf.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: 2023 Aravinth Manivannan +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +from html.parser import HTMLParser + + +class ParseCSRF(HTMLParser): + token: str = None + + def __init__(self, name): + HTMLParser.__init__(self) + self.name = name + + def handle_starttag(self, tag: str, attrs: (str, str)): + if self.token: + return + + if tag != "input": + return + + token = None + for index, (k, v) in enumerate(attrs): + if k == "value": + token = v + + if all([k == "name", v == self.name]): + if token: + self.token = token + return + for inner_index, (nk, nv) in enumerate(attrs, start=index): + if nk == "value": + self.token = nv + return diff --git a/forgejo/forgejo.py b/forgejo/forgejo.py new file mode 100755 index 0000000..fcbfffb --- /dev/null +++ b/forgejo/forgejo.py @@ -0,0 +1,315 @@ +# SPDX-FileCopyrightText: 2023 Aravinth Manivannan +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +import os +import random +import json +from urllib.parse import urlunparse, urlparse +from html.parser import HTMLParser +from time import sleep + +from requests import Session +from requests.auth import HTTPBasicAuth +import requests + +from .csrf import ParseCSRF + +# FORGEJO_USER = "root" +# FORGEJO_EMAIL = "root@example.com" +# FORGEJO_PASSWORD = "foobarpassword" +# HOST = "http://localhost:8080" +# +# REPOS = [] + + +class Forgejo: + def __init__(self, host: str, username: str, password: str, email: str, c: Session): + self.host = host + self.username = username + self.password = password + self.email = email + self.c = c + self.__csrf_key = "_csrf" + self.__logged_in = False + + def get_uri(self, path: str): + parsed = urlparse(self.host) + return urlunparse((parsed.scheme, parsed.netloc, path, "", "", "")) + + def get_api_uri(self, path: str): + parsed = urlparse(self.host) + return urlunparse( + ( + parsed.scheme, + f"{self.username}:{self.password}@{parsed.netloc}", + path, + "", + "", + "", + ) + ) + + @staticmethod + def check_online(host: str): + """ + Check if Forgejo instance is online + """ + count = 0 + parsed = urlparse(host) + url = urlunparse((parsed.scheme, parsed.netloc, "/", "", "", "")) + + while True: + try: + res = requests.get(url, allow_redirects=False) + if any([res.status_code == 302, res.status_code == 200]): + break + except: + sleep(2) + print(f"Retrying {count} time") + count += 1 + continue + + def install(self): + """ + Install Forgejo, first form that a user sees when a new instance is + deployed + """ + + parsed = urlparse(self.host) + # cwd = os.environ.get("PWD") + # user = os.environ.get("USER") + payload = { + "db_type": "sqlite3", + "db_host": "localhost:3306", + "db_user": "root", + "db_passwd": "", + "db_name": "forgejo", + "ssl_mode": "disable", + "db_schema": "", + "charset": "utf8", + "db_path": "/data/gitea/gitea.db", + "app_name": "Forgejo:+Beyond+Coding+We+Forge", + "repo_root_path": "/data/git/repositories", + "lfs_root_path": "/data/git/lfs", + "run_user": "git", + "domain": parsed.netloc, + "ssh_port": "22", + "http_port": "3000", + "app_url": self.get_uri(""), + "log_root_path": "/data/gitea/log", + "smtp_host": "", + "smtp_from": "", + "smtp_user": "", + "smtp_passwd": "", + "enable_federated_avatar": "on", + "enable_open_id_sign_in": "on", + "enable_open_id_sign_up": "on", + "default_allow_create_organization": "on", + "default_enable_timetracking": "on", + "no_reply_address": "noreply.localhost", + "password_algorithm": "pbkdf2", + "admin_name": self.username, + "admin_passwd": self.password, + "admin_confirm_passwd": self.password, + "admin_email": self.email, + } + + resp = self.c.post(self.get_uri(""), data=payload, allow_redirects=False) + sleep(10) + + def get_csrf_token(self, url: str) -> str: + """ + Get CSRF token at a URI + """ + resp = self.c.get(url, allow_redirects=False) + if resp.status_code != 200 and resp.status_code != 302: + print(url, resp.status_code) + raise Exception(f"Can't get csrf token: {resp.status_code}") + parser = ParseCSRF(name=self.__csrf_key) + parser.feed(resp.text) + csrf = parser.token + return csrf + + def register(self): + """ + Register User + """ + url = self.get_uri("/user/sign_up") + csrf = self.get_csrf_token(url) + payload = { + "_csrf": csrf, + "user_name": self.username, + "password": self.password, + "retype": self.password, + "email": self.email, + } + self.c.post(url, data=payload, allow_redirects=False) + + def login(self): + """ + Login, must be called at least once before performing authenticated + operations + """ + if self.__logged_in: + return + url = self.get_uri("/user/login") + csrf = self.get_csrf_token(url) + payload = { + "_csrf": csrf, + "user_name": self.username, + "password": self.password, + "remember": "on", + } + resp = self.c.post(url, data=payload, allow_redirects=False) + if any( + [resp.status_code == 302, resp.status_code == 200, resp.status_code == 303] + ): + print("User logged in") + self.__logged_in = True + return + + raise Exception( + f"[ERROR] Authentication failed. status code {resp.status_code}" + ) + + def create_repository(self, name: str): + """ + Create repository + """ + self.login() + + def get_repository_payload(csrf: str, name: str, user_id: str): + data = { + "_csrf": csrf, + "uid": user_id, + "repo_name": name, + "description": f"this repository is named {name}", + "repo_template": "", + "issue_labels": "", + "gitignores": "", + "license": "", + "readme": "Default", + "default_branch": "master", + "trust_model": "default", + } + return data + + url = self.get_uri("/repo/create") + user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"] + + csrf = self.get_csrf_token(url) + data = get_repository_payload(csrf, name, user_id=user_id) + + resp = self.c.post(url, data=data, allow_redirects=False) + print(f"Created repository {name}") + if ( + resp.status_code != 302 + and resp.status_code != 200 + and resp.status_code != 303 + ): + raise Exception( + f"Error while creating repository: {name} {resp.status_code}" + ) + + def create_issue(self, owner: str, repo: str, title: str, body: str): + """ + Create issue + """ + self.login() + + def create_issue_payload(csrf: str, title: str, body: str): + data = { + "_csrf": csrf, + "title": title, + "content": body, + "search": "", + "label_ids": "", + "milestone_id": "", + "project_id": "", + "assignee_ids": "", + "redirect_after_creation": "", + } + + return data + + url = self.get_uri(f"/{owner}/{repo}/issues/new") + csrf = self.get_csrf_token(url) + data = create_issue_payload(csrf=csrf, title=title, body=body) + + resp = self.c.post(url, data=data, allow_redirects=False) + print(f"Created issue") + if ( + resp.status_code != 302 + and resp.status_code != 200 + and resp.status_code != 303 + ): + raise Exception(f"Error while creating issue: {resp.status_code}") + + def create_comment(self, owner: str, repo: str, issue: int, body: str): + """ + Create comment + """ + self.login() + + def create_comment_payload(csrf: str, body: str): + data = { + "_csrf": csrf, + "content": body, + "status": "", + } + + return data + + url = self.get_uri(f"/{owner}/{repo}/issues/{issue}") + csrf = self.get_csrf_token(url) + data = create_comment_payload(csrf, body) + + url = self.get_uri(f"/{owner}/{repo}/issues/{issue}/comments") + resp = self.c.post(url, data=data, allow_redirects=False) + print(f"Created comment") + if ( + resp.status_code != 302 + and resp.status_code != 200 + and resp.status_code != 303 + ): + raise Exception(f"Error while creating comment: {resp.status_code}") + + def create_access_token(self, name: str, file: str, repo: str): + """ + Create access token + """ + + def create_access_token_payload(name: str): + data = { + "name": name, + "scopes": [ + "read:issue", + "write:notification", + "read:repository", + "read:user", + ], + } + + return data + + url = self.get_uri(f"/api/v1/users/{self.username}/tokens") + data = create_access_token_payload(name) + + session = Session() + session.auth = (self.username, self.password) + resp = session.post(url, json=data, allow_redirects=False) + + if resp.status_code != 201: + raise Exception(f"Error while creating access token: {resp.status_code} {resp} {resp.text}") + + print("Created access token") + data = resp.json() + with open(file, "w") as f: + data["login"] = self.username + data["email"] = self.email + data["repo"] = repo + data["forgejo_url"] = self.get_uri("") + content = json.dumps(data) + f.write(content) + print(f"Wrote access token to {file}") diff --git a/lib.sh b/lib.sh new file mode 100755 index 0000000..d6add58 --- /dev/null +++ b/lib.sh @@ -0,0 +1,170 @@ +# SPDX-FileCopyrightText: 2023 Aravinth Manivannan +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +is_ci(){ + if [ -z ${CI+x} ]; + then + return 1 + else + return 0 + fi +} + +if is_ci +then + mkdir /tmp/forgejo-init-script + FORGEJO_USER1_ACCESS_TOKEN_FILE_PATH=/tmp/forgeo-init-scrit/user1-accesstoken.json +else + FORGEJO_USER1_ACCESS_TOKEN_FILE_PATH=tmp/user1-accesstoken.json +fi + +readonly FORGEJO_ROOT_USERNAME=root +readonly FORGEJO_ROOT_EMAIL="$FORGEJO_ROOT_USERNAME@example.org" +readonly FORGEJO_ROOT_PASSOWRD=supercomplicatedpassword + +readonly FORGEJO_USER1_USERNAME=john +readonly FORGEJO_USER1_PASSWORD=supercomplicatedpassword +readonly FORGEJO_USER1_EMAIL="$FORGEJO_USER1_USERNAME@example.org" +readonly FORGEJO_USER1_SUPPORT_REPO="test_repo" +readonly FORGEJO_USER1_ACCESS_TOKEN_NAME="coreaccesstoken" +rm $FORGEJO_USER1_ACCESS_TOKEN_FILE_PATH || true + +readonly FORGEJO_TESTUSER_USERNAME=alice +readonly FORGEJO_TESTUSER_PASSWORD=supercomplicatedpassword +readonly FORGEJO_TESTUSER_EMAIL="$FORGEJO_TESTUSER_USERNAME@example.org" + + +wait_for_env() { + python -m forgejo \ + check_env $FORGEJO_URL +} + +# register root user on Forgejo to simulate Hoste admin and integrate SSO +forgejo_root(){ +# python -m forgejo \ +# forgejo install \ +# $FORGEJO_ROOT_USERNAME $FORGEJO_ROOT_PASSOWRD \ +# $FORGEJO_ROOT_EMAIL \ +# $FORGEJO_URL + python -m forgejo \ + forgejo register \ + $FORGEJO_ROOT_USERNAME $FORGEJO_ROOT_PASSOWRD \ + $FORGEJO_ROOT_EMAIL \ + $FORGEJO_URL + python -m forgejo \ + forgejo login \ + $FORGEJO_ROOT_USERNAME $FORGEJO_ROOT_PASSOWRD \ + $FORGEJO_ROOT_EMAIL \ + $FORGEJO_URL +} + + +# register user "Hostea" on Forgejo and create support repository +init_users_repo() { + python -m forgejo \ + forgejo register \ + $FORGEJO_USER1_USERNAME $FORGEJO_USER1_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL + python -m forgejo \ + forgejo login \ + $FORGEJO_USER1_USERNAME $FORGEJO_USER1_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL + python -m forgejo \ + forgejo create_repo \ + $FORGEJO_USER1_USERNAME $FORGEJO_USER1_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_SUPPORT_REPO + + python -m forgejo \ + forgejo register \ + $FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \ + $FORGEJO_TESTUSER_EMAIL \ + $FORGEJO_URL + + python -m forgejo \ + forgejo create_issue \ + $FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_USERNAME \ + $FORGEJO_USER1_SUPPORT_REPO \ + "normal issue title" "normal issue body" + + python -m forgejo \ + forgejo create_comment \ + $FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_USERNAME \ + $FORGEJO_USER1_SUPPORT_REPO \ + 1 "normal body" + + python -m forgejo \ + forgejo create_comment \ + $FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_USERNAME \ + $FORGEJO_USER1_SUPPORT_REPO \ + 1 "mention body @$FORGEJO_USER1_USERNAME" + + python -m forgejo \ + forgejo create_issue \ + $FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_USERNAME \ + $FORGEJO_USER1_SUPPORT_REPO \ + "normal issue title" "mention issue @$FORGEJO_USER1_USERNAME" + + python -m forgejo \ + forgejo create_issue \ + $FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_USERNAME \ + $FORGEJO_USER1_SUPPORT_REPO \ + "mention issue @$FORGEJO_USER1_USERNAME" "normal issue body" + + + python -m forgejo \ + forgejo create_issue \ + $FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_USERNAME \ + $FORGEJO_USER1_SUPPORT_REPO \ + "mention issue @$FORGEJO_USER1_USERNAME" "mention issue @$FORGEJO_USER1_USERNAME" + + python -m forgejo \ + forgejo create_issue \ + $FORGEJO_TESTUSER_USERNAME $FORGEJO_TESTUSER_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_USERNAME \ + $FORGEJO_USER1_SUPPORT_REPO \ + "normal issue title and normal body" "normal body" + + python -m forgejo \ + forgejo create_access_token \ + $FORGEJO_USER1_USERNAME $FORGEJO_USER1_PASSWORD \ + $FORGEJO_USER1_EMAIL \ + $FORGEJO_URL \ + $FORGEJO_USER1_ACCESS_TOKEN_NAME \ + $FORGEJO_USER1_ACCESS_TOKEN_FILE_PATH \ + $FORGEJO_USER1_SUPPORT_REPO +} + +docker_compose_up() { + echo "[*] Starting Forgejo" + docker-compose -f docker-compose-dev-deps.yml up +} + +docker_compose_down() { + docker-compose -f docker-compose-dev-deps.yml down + docker-compose -f docker-compose-dev-deps.yml down --remove-orphans +} diff --git a/tests.sh b/tests.sh new file mode 100755 index 0000000..beccd7c --- /dev/null +++ b/tests.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +# SPDX-FileCopyrightText: 2023 Aravinth Manivannan +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +set -Exeuo pipefail + +source lib.sh + +main() { + wait_for_env + forgejo_root + init_users_repo + echo "All Good! :)" +} + +main +exit 0