# SPDX-FileCopyrightText: 2023 Aravinth Manivannan # # SPDX-License-Identifier: AGPL-3.0-or-later import os import random import json from urllib.parse import urlunparse, urlparse from html.parser import HTMLParser from time import sleep from requests import Session from requests.auth import HTTPBasicAuth import requests from .csrf import ParseCSRF # FORGEJO_USER = "root" # FORGEJO_EMAIL = "root@example.com" # FORGEJO_PASSWORD = "foobarpassword" # HOST = "http://localhost:8080" # # REPOS = [] class Forgejo: def __init__(self, host: str, username: str, password: str, email: str, c: Session): self.host = host self.username = username self.password = password self.email = email self.c = c self.__csrf_key = "_csrf" self.__logged_in = False def get_uri(self, path: str): parsed = urlparse(self.host) return urlunparse((parsed.scheme, parsed.netloc, path, "", "", "")) def get_api_uri(self, path: str): parsed = urlparse(self.host) return urlunparse( ( parsed.scheme, f"{self.username}:{self.password}@{parsed.netloc}", path, "", "", "", ) ) @staticmethod def check_online(host: str): """ Check if Forgejo instance is online """ count = 0 parsed = urlparse(host) url = urlunparse((parsed.scheme, parsed.netloc, "/", "", "", "")) while True: try: res = requests.get(url, allow_redirects=False) if any([res.status_code == 302, res.status_code == 200]): break except: sleep(2) print(f"Retrying {count} time") count += 1 continue def install(self): """ Install Forgejo, first form that a user sees when a new instance is deployed """ parsed = urlparse(self.host) # cwd = os.environ.get("PWD") # user = os.environ.get("USER") payload = { "db_type": "sqlite3", "db_host": "localhost:3306", "db_user": "root", "db_passwd": "", "db_name": "forgejo", "ssl_mode": "disable", "db_schema": "", "charset": "utf8", "db_path": "/data/gitea/gitea.db", "app_name": "Forgejo:+Beyond+Coding+We+Forge", "repo_root_path": "/data/git/repositories", "lfs_root_path": "/data/git/lfs", "run_user": "git", "domain": parsed.netloc, "ssh_port": "22", "http_port": "3000", "app_url": self.get_uri(""), "log_root_path": "/data/gitea/log", "smtp_host": "", "smtp_from": "", "smtp_user": "", "smtp_passwd": "", "enable_federated_avatar": "on", "enable_open_id_sign_in": "on", "enable_open_id_sign_up": "on", "default_allow_create_organization": "on", "default_enable_timetracking": "on", "no_reply_address": "noreply.localhost", "password_algorithm": "pbkdf2", "admin_name": self.username, "admin_passwd": self.password, "admin_confirm_passwd": self.password, "admin_email": self.email, } resp = self.c.post(self.get_uri(""), data=payload, allow_redirects=False) sleep(10) def get_csrf_token(self, url: str) -> str: """ Get CSRF token at a URI """ resp = self.c.get(url, allow_redirects=False) if resp.status_code != 200 and resp.status_code != 302: print(url, resp.status_code) raise Exception(f"Can't get csrf token: {resp.status_code}") parser = ParseCSRF(name=self.__csrf_key) parser.feed(resp.text) csrf = parser.token return csrf def register(self): """ Register User """ url = self.get_uri("/user/sign_up") csrf = self.get_csrf_token(url) payload = { "_csrf": csrf, "user_name": self.username, "password": self.password, "retype": self.password, "email": self.email, } self.c.post(url, data=payload, allow_redirects=False) def login(self): """ Login, must be called at least once before performing authenticated operations """ if self.__logged_in: return url = self.get_uri("/user/login") csrf = self.get_csrf_token(url) payload = { "_csrf": csrf, "user_name": self.username, "password": self.password, "remember": "on", } resp = self.c.post(url, data=payload, allow_redirects=False) if any( [resp.status_code == 302, resp.status_code == 200, resp.status_code == 303] ): print("User logged in") self.__logged_in = True return raise Exception( f"[ERROR] Authentication failed. status code {resp.status_code}" ) def create_repository(self, name: str): """ Create repository """ self.login() def get_repository_payload(csrf: str, name: str, user_id: str): data = { "_csrf": csrf, "uid": user_id, "repo_name": name, "description": f"this repository is named {name}", "repo_template": "", "issue_labels": "", "gitignores": "", "license": "", "readme": "Default", "default_branch": "master", "trust_model": "default", } return data url = self.get_uri("/repo/create") user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"] csrf = self.get_csrf_token(url) data = get_repository_payload(csrf, name, user_id=user_id) resp = self.c.post(url, data=data, allow_redirects=False) print(f"Created repository {name}") if ( resp.status_code != 302 and resp.status_code != 200 and resp.status_code != 303 ): raise Exception( f"Error while creating repository: {name} {resp.status_code}" ) def create_issue(self, owner: str, repo: str, title: str, body: str): """ Create issue """ self.login() def create_issue_payload(csrf: str, title: str, body: str): data = { "_csrf": csrf, "title": title, "content": body, "search": "", "label_ids": "", "milestone_id": "", "project_id": "", "assignee_ids": "", "redirect_after_creation": "", } return data url = self.get_uri(f"/{owner}/{repo}/issues/new") csrf = self.get_csrf_token(url) data = create_issue_payload(csrf=csrf, title=title, body=body) resp = self.c.post(url, data=data, allow_redirects=False) print(f"Created issue") if ( resp.status_code != 302 and resp.status_code != 200 and resp.status_code != 303 ): raise Exception(f"Error while creating issue: {resp.status_code}") def create_comment(self, owner: str, repo: str, issue: int, body: str): """ Create comment """ self.login() def create_comment_payload(csrf: str, body: str): data = { "_csrf": csrf, "content": body, "status": "", } return data url = self.get_uri(f"/{owner}/{repo}/issues/{issue}") csrf = self.get_csrf_token(url) data = create_comment_payload(csrf, body) url = self.get_uri(f"/{owner}/{repo}/issues/{issue}/comments") resp = self.c.post(url, data=data, allow_redirects=False) print(f"Created comment") if ( resp.status_code != 302 and resp.status_code != 200 and resp.status_code != 303 ): raise Exception(f"Error while creating comment: {resp.status_code}") def create_access_token(self, name: str, file: str, repo: str): """ Create access token """ def create_access_token_payload(name: str): data = { "name": name, "scopes": [ "read:issue", "write:notification", "read:repository", "read:user", ], } return data url = self.get_uri("/api/v1/users/owner_user/tokens") data = create_access_token_payload(name) session = Session() session.auth = (self.username, self.password) resp = session.post(url, json=data, allow_redirects=False) if resp.status_code != 201: raise Exception(f"Error while creating comment: {resp.status_code}") print("Created access token") data = resp.json() with open(file, "w") as f: data["login"] = self.username data["email"] = self.email data["repo"] = repo data["forgejo_url"] = self.get_uri("") content = json.dumps(data) f.write(content) print(f"Wrote access token to {file}")