315 lines
9.6 KiB
Python
Executable file
315 lines
9.6 KiB
Python
Executable file
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
|
|
#
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
import os
|
|
import random
|
|
import json
|
|
from urllib.parse import urlunparse, urlparse
|
|
from html.parser import HTMLParser
|
|
from time import sleep
|
|
|
|
from requests import Session
|
|
from requests.auth import HTTPBasicAuth
|
|
import requests
|
|
|
|
from .csrf import ParseCSRF
|
|
|
|
# FORGEJO_USER = "root"
|
|
# FORGEJO_EMAIL = "root@example.com"
|
|
# FORGEJO_PASSWORD = "foobarpassword"
|
|
# HOST = "http://localhost:8080"
|
|
#
|
|
# REPOS = []
|
|
|
|
|
|
class Forgejo:
|
|
def __init__(self, host: str, username: str, password: str, email: str, c: Session):
|
|
self.host = host
|
|
self.username = username
|
|
self.password = password
|
|
self.email = email
|
|
self.c = c
|
|
self.__csrf_key = "_csrf"
|
|
self.__logged_in = False
|
|
|
|
def get_uri(self, path: str):
|
|
parsed = urlparse(self.host)
|
|
return urlunparse((parsed.scheme, parsed.netloc, path, "", "", ""))
|
|
|
|
def get_api_uri(self, path: str):
|
|
parsed = urlparse(self.host)
|
|
return urlunparse(
|
|
(
|
|
parsed.scheme,
|
|
f"{self.username}:{self.password}@{parsed.netloc}",
|
|
path,
|
|
"",
|
|
"",
|
|
"",
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def check_online(host: str):
|
|
"""
|
|
Check if Forgejo instance is online
|
|
"""
|
|
count = 0
|
|
parsed = urlparse(host)
|
|
url = urlunparse((parsed.scheme, parsed.netloc, "/", "", "", ""))
|
|
|
|
while True:
|
|
try:
|
|
res = requests.get(url, allow_redirects=False)
|
|
if any([res.status_code == 302, res.status_code == 200]):
|
|
break
|
|
except:
|
|
sleep(2)
|
|
print(f"Retrying {count} time")
|
|
count += 1
|
|
continue
|
|
|
|
def install(self):
|
|
"""
|
|
Install Forgejo, first form that a user sees when a new instance is
|
|
deployed
|
|
"""
|
|
|
|
parsed = urlparse(self.host)
|
|
# cwd = os.environ.get("PWD")
|
|
# user = os.environ.get("USER")
|
|
payload = {
|
|
"db_type": "sqlite3",
|
|
"db_host": "localhost:3306",
|
|
"db_user": "root",
|
|
"db_passwd": "",
|
|
"db_name": "forgejo",
|
|
"ssl_mode": "disable",
|
|
"db_schema": "",
|
|
"charset": "utf8",
|
|
"db_path": "/data/gitea/gitea.db",
|
|
"app_name": "Forgejo:+Beyond+Coding+We+Forge",
|
|
"repo_root_path": "/data/git/repositories",
|
|
"lfs_root_path": "/data/git/lfs",
|
|
"run_user": "git",
|
|
"domain": parsed.netloc,
|
|
"ssh_port": "22",
|
|
"http_port": "3000",
|
|
"app_url": self.get_uri(""),
|
|
"log_root_path": "/data/gitea/log",
|
|
"smtp_host": "",
|
|
"smtp_from": "",
|
|
"smtp_user": "",
|
|
"smtp_passwd": "",
|
|
"enable_federated_avatar": "on",
|
|
"enable_open_id_sign_in": "on",
|
|
"enable_open_id_sign_up": "on",
|
|
"default_allow_create_organization": "on",
|
|
"default_enable_timetracking": "on",
|
|
"no_reply_address": "noreply.localhost",
|
|
"password_algorithm": "pbkdf2",
|
|
"admin_name": self.username,
|
|
"admin_passwd": self.password,
|
|
"admin_confirm_passwd": self.password,
|
|
"admin_email": self.email,
|
|
}
|
|
|
|
resp = self.c.post(self.get_uri(""), data=payload, allow_redirects=False)
|
|
sleep(10)
|
|
|
|
def get_csrf_token(self, url: str) -> str:
|
|
"""
|
|
Get CSRF token at a URI
|
|
"""
|
|
resp = self.c.get(url, allow_redirects=False)
|
|
if resp.status_code != 200 and resp.status_code != 302:
|
|
print(url, resp.status_code)
|
|
raise Exception(f"Can't get csrf token: {resp.status_code}")
|
|
parser = ParseCSRF(name=self.__csrf_key)
|
|
parser.feed(resp.text)
|
|
csrf = parser.token
|
|
return csrf
|
|
|
|
def register(self):
|
|
"""
|
|
Register User
|
|
"""
|
|
url = self.get_uri("/user/sign_up")
|
|
csrf = self.get_csrf_token(url)
|
|
payload = {
|
|
"_csrf": csrf,
|
|
"user_name": self.username,
|
|
"password": self.password,
|
|
"retype": self.password,
|
|
"email": self.email,
|
|
}
|
|
self.c.post(url, data=payload, allow_redirects=False)
|
|
|
|
def login(self):
|
|
"""
|
|
Login, must be called at least once before performing authenticated
|
|
operations
|
|
"""
|
|
if self.__logged_in:
|
|
return
|
|
url = self.get_uri("/user/login")
|
|
csrf = self.get_csrf_token(url)
|
|
payload = {
|
|
"_csrf": csrf,
|
|
"user_name": self.username,
|
|
"password": self.password,
|
|
"remember": "on",
|
|
}
|
|
resp = self.c.post(url, data=payload, allow_redirects=False)
|
|
if any(
|
|
[resp.status_code == 302, resp.status_code == 200, resp.status_code == 303]
|
|
):
|
|
print("User logged in")
|
|
self.__logged_in = True
|
|
return
|
|
|
|
raise Exception(
|
|
f"[ERROR] Authentication failed. status code {resp.status_code}"
|
|
)
|
|
|
|
def create_repository(self, name: str):
|
|
"""
|
|
Create repository
|
|
"""
|
|
self.login()
|
|
|
|
def get_repository_payload(csrf: str, name: str, user_id: str):
|
|
data = {
|
|
"_csrf": csrf,
|
|
"uid": user_id,
|
|
"repo_name": name,
|
|
"description": f"this repository is named {name}",
|
|
"repo_template": "",
|
|
"issue_labels": "",
|
|
"gitignores": "",
|
|
"license": "",
|
|
"readme": "Default",
|
|
"default_branch": "master",
|
|
"trust_model": "default",
|
|
}
|
|
return data
|
|
|
|
url = self.get_uri("/repo/create")
|
|
user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"]
|
|
|
|
csrf = self.get_csrf_token(url)
|
|
data = get_repository_payload(csrf, name, user_id=user_id)
|
|
|
|
resp = self.c.post(url, data=data, allow_redirects=False)
|
|
print(f"Created repository {name}")
|
|
if (
|
|
resp.status_code != 302
|
|
and resp.status_code != 200
|
|
and resp.status_code != 303
|
|
):
|
|
raise Exception(
|
|
f"Error while creating repository: {name} {resp.status_code}"
|
|
)
|
|
|
|
def create_issue(self, owner: str, repo: str, title: str, body: str):
|
|
"""
|
|
Create issue
|
|
"""
|
|
self.login()
|
|
|
|
def create_issue_payload(csrf: str, title: str, body: str):
|
|
data = {
|
|
"_csrf": csrf,
|
|
"title": title,
|
|
"content": body,
|
|
"search": "",
|
|
"label_ids": "",
|
|
"milestone_id": "",
|
|
"project_id": "",
|
|
"assignee_ids": "",
|
|
"redirect_after_creation": "",
|
|
}
|
|
|
|
return data
|
|
|
|
url = self.get_uri(f"/{owner}/{repo}/issues/new")
|
|
csrf = self.get_csrf_token(url)
|
|
data = create_issue_payload(csrf=csrf, title=title, body=body)
|
|
|
|
resp = self.c.post(url, data=data, allow_redirects=False)
|
|
print(f"Created issue")
|
|
if (
|
|
resp.status_code != 302
|
|
and resp.status_code != 200
|
|
and resp.status_code != 303
|
|
):
|
|
raise Exception(f"Error while creating issue: {resp.status_code}")
|
|
|
|
def create_comment(self, owner: str, repo: str, issue: int, body: str):
|
|
"""
|
|
Create comment
|
|
"""
|
|
self.login()
|
|
|
|
def create_comment_payload(csrf: str, body: str):
|
|
data = {
|
|
"_csrf": csrf,
|
|
"content": body,
|
|
"status": "",
|
|
}
|
|
|
|
return data
|
|
|
|
url = self.get_uri(f"/{owner}/{repo}/issues/{issue}")
|
|
csrf = self.get_csrf_token(url)
|
|
data = create_comment_payload(csrf, body)
|
|
|
|
url = self.get_uri(f"/{owner}/{repo}/issues/{issue}/comments")
|
|
resp = self.c.post(url, data=data, allow_redirects=False)
|
|
print(f"Created comment")
|
|
if (
|
|
resp.status_code != 302
|
|
and resp.status_code != 200
|
|
and resp.status_code != 303
|
|
):
|
|
raise Exception(f"Error while creating comment: {resp.status_code}")
|
|
|
|
def create_access_token(self, name: str, file: str, repo: str):
|
|
"""
|
|
Create access token
|
|
"""
|
|
|
|
def create_access_token_payload(name: str):
|
|
data = {
|
|
"name": name,
|
|
"scopes": [
|
|
"read:issue",
|
|
"write:notification",
|
|
"read:repository",
|
|
"read:user",
|
|
],
|
|
}
|
|
|
|
return data
|
|
|
|
url = self.get_uri(f"/api/v1/users/{self.username}/tokens")
|
|
data = create_access_token_payload(name)
|
|
|
|
session = Session()
|
|
session.auth = (self.username, self.password)
|
|
resp = session.post(url, json=data, allow_redirects=False)
|
|
|
|
if resp.status_code != 201:
|
|
raise Exception(f"Error while creating access token: {resp.status_code} {resp} {resp.text}")
|
|
|
|
print("Created access token")
|
|
data = resp.json()
|
|
with open(file, "w") as f:
|
|
data["login"] = self.username
|
|
data["email"] = self.email
|
|
data["repo"] = repo
|
|
data["forgejo_url"] = self.get_uri("")
|
|
content = json.dumps(data)
|
|
f.write(content)
|
|
print(f"Wrote access token to {file}")
|