forgejo-notifications-core/integration/forgejo.py

314 lines
9.5 KiB
Python
Raw Normal View History

2023-09-18 20:44:19 +05:30
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import os
import random
2023-09-18 17:55:18 +05:30
import json
from urllib.parse import urlunparse, urlparse
from html.parser import HTMLParser
from time import sleep
from requests import Session
from requests.auth import HTTPBasicAuth
import requests
from .csrf import ParseCSRF
# FORGEJO_USER = "root"
# FORGEJO_EMAIL = "root@example.com"
# FORGEJO_PASSWORD = "foobarpassword"
# HOST = "http://localhost:8080"
#
# REPOS = []
class Forgejo:
def __init__(self, host: str, username: str, password: str, email: str, c: Session):
self.host = host
self.username = username
self.password = password
self.email = email
self.c = c
self.__csrf_key = "_csrf"
self.__logged_in = False
def get_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse((parsed.scheme, parsed.netloc, path, "", "", ""))
def get_api_uri(self, path: str):
parsed = urlparse(self.host)
return urlunparse(
(
parsed.scheme,
f"{self.username}:{self.password}@{parsed.netloc}",
path,
"",
"",
"",
)
)
@staticmethod
def check_online(host: str):
"""
Check if Forgejo instance is online
"""
count = 0
parsed = urlparse(host)
url = urlunparse((parsed.scheme, parsed.netloc, "api/v1/nodeinfo", "", "", ""))
while True:
try:
res = requests.get(url, allow_redirects=False)
if any([res.status_code == 302, res.status_code == 200]):
break
except:
sleep(2)
print(f"Retrying {count} time")
count += 1
continue
def install(self):
"""
Install Forgejo, first form that a user sees when a new instance is
deployed
"""
# cwd = os.environ.get("PWD")
# user = os.environ.get("USER")
payload = {
"db_type": "sqlite3",
"db_host": "localhost:3306",
"db_user": "root",
"db_passwd": "",
"db_name": "forgejo",
"ssl_mode": "disable",
"db_schema": "",
"charset": "utf8",
"db_path": "/data/gitea/gitea.db",
"app_name": "Forgejo:+Beyond+Coding+We+Forge",
"repo_root_path": "/data/git/repositories",
"lfs_root_path": "/data/git/lfs",
"run_user": "git",
"domain": "localhost",
"ssh_port": "22",
"http_port": "3000",
"app_url": "http://localhost:3000/",
"log_root_path": "/data/gitea/log",
"smtp_host": "",
"smtp_from": "",
"smtp_user": "",
"smtp_passwd": "",
"enable_federated_avatar": "on",
"enable_open_id_sign_in": "on",
"enable_open_id_sign_up": "on",
"default_allow_create_organization": "on",
"default_enable_timetracking": "on",
"no_reply_address": "noreply.localhost",
"password_algorithm": "pbkdf2",
"admin_name": "",
"admin_passwd": "",
"admin_confirm_passwd": "",
"admin_email": "",
}
resp = self.c.post(self.get_uri(""), data=payload)
sleep(10)
def get_csrf_token(self, url: str) -> str:
"""
Get CSRF token at a URI
"""
resp = self.c.get(url, allow_redirects=False)
if resp.status_code != 200 and resp.status_code != 302:
print(resp.status_code, resp.text)
raise Exception(f"Can't get csrf token: {resp.status_code}")
parser = ParseCSRF(name=self.__csrf_key)
parser.feed(resp.text)
csrf = parser.token
return csrf
def register(self):
"""
Register User
"""
url = self.get_uri("/user/sign_up")
csrf = self.get_csrf_token(url)
payload = {
"_csrf": csrf,
"user_name": self.username,
"password": self.password,
"retype": self.password,
"email": self.email,
}
self.c.post(url, data=payload, allow_redirects=False)
def login(self):
"""
Login, must be called at least once before performing authenticated
operations
"""
if self.__logged_in:
return
url = self.get_uri("/user/login")
csrf = self.get_csrf_token(url)
payload = {
"_csrf": csrf,
"user_name": self.username,
"password": self.password,
"remember": "on",
}
resp = self.c.post(url, data=payload, allow_redirects=False)
if any(
[resp.status_code == 302, resp.status_code == 200, resp.status_code == 303]
):
print("User logged in")
self.__logged_in = True
return
raise Exception(
f"[ERROR] Authentication failed. status code {resp.status_code}"
)
def create_repository(self, name: str):
"""
Create repository
"""
self.login()
def get_repository_payload(csrf: str, name: str, user_id: str):
data = {
"_csrf": csrf,
"uid": user_id,
"repo_name": name,
"description": f"this repository is named {name}",
"repo_template": "",
"issue_labels": "",
"gitignores": "",
"license": "",
"readme": "Default",
"default_branch": "master",
"trust_model": "default",
}
return data
url = self.get_uri("/repo/create")
user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"]
csrf = self.get_csrf_token(url)
data = get_repository_payload(csrf, name, user_id=user_id)
resp = self.c.post(url, data=data, allow_redirects=False)
print(f"Created repository {name}")
if (
resp.status_code != 302
and resp.status_code != 200
and resp.status_code != 303
):
raise Exception(
f"Error while creating repository: {name} {resp.status_code}"
)
def create_issue(self, owner: str, repo: str, title: str, body: str):
"""
Create issue
"""
self.login()
def create_issue_payload(csrf: str, title: str, body: str):
data = {
"_csrf": csrf,
"title": title,
"content": body,
"search": "",
"label_ids": "",
"milestone_id": "",
"project_id": "",
"assignee_ids": "",
"redirect_after_creation": "",
}
return data
url = self.get_uri(f"/{owner}/{repo}/issues/new")
csrf = self.get_csrf_token(url)
data = create_issue_payload(csrf=csrf, title=title, body=body)
resp = self.c.post(url, data=data, allow_redirects=False)
print(f"Created issue")
if (
resp.status_code != 302
and resp.status_code != 200
and resp.status_code != 303
):
raise Exception(f"Error while creating issue: {resp.status_code}")
def create_comment(self, owner: str, repo: str, issue: int, body: str):
"""
Create comment
"""
self.login()
def create_comment_payload(csrf: str, body: str):
data = {
"_csrf": csrf,
"content": body,
"status": "",
}
return data
url = self.get_uri(f"/{owner}/{repo}/issues/{issue}")
csrf = self.get_csrf_token(url)
data = create_comment_payload(csrf, body)
url = self.get_uri(f"/{owner}/{repo}/issues/{issue}/comments")
resp = self.c.post(url, data=data, allow_redirects=False)
print(f"Created comment")
if (
resp.status_code != 302
and resp.status_code != 200
and resp.status_code != 303
):
raise Exception(f"Error while creating comment: {resp.status_code}")
2023-09-18 17:14:19 +05:30
def create_access_token(self, name: str, file: str, repo: str):
2023-09-18 17:14:19 +05:30
"""
Create access token
"""
def create_access_token_payload(name: str):
data = {
"name": name,
2023-09-18 20:59:40 +05:30
"scopes": [
"read:issue",
"write:notification",
"read:repository",
"read:user",
],
2023-09-18 17:14:19 +05:30
}
return data
url = self.get_uri("/api/v1/users/owner_user/tokens")
data = create_access_token_payload(name)
session = Session()
session.auth = (self.username, self.password)
resp = session.post(url, json=data, allow_redirects=False)
if resp.status_code != 201:
raise Exception(f"Error while creating comment: {resp.status_code}")
print("Created access token")
data = resp.json()
with open(file, "w") as f:
data["login"] = self.username
data["email"] = self.email
data["repo"] = repo
data["forgejo_url"] = self.get_uri("")
2023-09-18 17:55:18 +05:30
content = json.dumps(data)
f.write(content)
2023-09-18 17:14:19 +05:30
print(f"Wrote access token to {file}")