2023-09-27 15:47:04 +05:30
|
|
|
# SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
|
|
|
|
#
|
|
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
|
|
|
|
import os
|
|
|
|
import random
|
|
|
|
import json
|
|
|
|
from urllib.parse import urlunparse, urlparse
|
|
|
|
from html.parser import HTMLParser
|
|
|
|
from time import sleep
|
|
|
|
|
|
|
|
from requests import Session
|
|
|
|
from requests.auth import HTTPBasicAuth
|
|
|
|
import requests
|
|
|
|
|
|
|
|
from .csrf import ParseCSRF
|
2023-10-05 00:22:07 +05:30
|
|
|
from .logger import logger
|
2023-09-27 15:47:04 +05:30
|
|
|
|
|
|
|
# FORGEJO_USER = "root"
|
|
|
|
# FORGEJO_EMAIL = "root@example.com"
|
|
|
|
# FORGEJO_PASSWORD = "foobarpassword"
|
|
|
|
# HOST = "http://localhost:8080"
|
|
|
|
#
|
|
|
|
# REPOS = []
|
|
|
|
|
|
|
|
|
|
|
|
class Forgejo:
|
|
|
|
def __init__(self, host: str, username: str, password: str, email: str, c: Session):
|
|
|
|
self.host = host
|
|
|
|
self.username = username
|
|
|
|
self.password = password
|
|
|
|
self.email = email
|
|
|
|
self.c = c
|
|
|
|
self.__csrf_key = "_csrf"
|
|
|
|
self.__logged_in = False
|
|
|
|
|
|
|
|
def get_uri(self, path: str):
|
|
|
|
parsed = urlparse(self.host)
|
|
|
|
return urlunparse((parsed.scheme, parsed.netloc, path, "", "", ""))
|
|
|
|
|
|
|
|
def get_api_uri(self, path: str):
|
|
|
|
parsed = urlparse(self.host)
|
|
|
|
return urlunparse(
|
|
|
|
(
|
|
|
|
parsed.scheme,
|
|
|
|
f"{self.username}:{self.password}@{parsed.netloc}",
|
|
|
|
path,
|
|
|
|
"",
|
|
|
|
"",
|
|
|
|
"",
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def check_online(host: str):
|
|
|
|
"""
|
|
|
|
Check if Forgejo instance is online
|
|
|
|
"""
|
|
|
|
count = 0
|
|
|
|
parsed = urlparse(host)
|
|
|
|
url = urlunparse((parsed.scheme, parsed.netloc, "/", "", "", ""))
|
|
|
|
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
res = requests.get(url, allow_redirects=False)
|
|
|
|
if any([res.status_code == 302, res.status_code == 200]):
|
|
|
|
break
|
|
|
|
except:
|
|
|
|
sleep(2)
|
2023-10-05 00:22:07 +05:30
|
|
|
logger.info(f"Retrying {count} time")
|
2023-09-27 15:47:04 +05:30
|
|
|
count += 1
|
|
|
|
continue
|
|
|
|
|
|
|
|
def get_csrf_token(self, url: str) -> str:
|
|
|
|
"""
|
|
|
|
Get CSRF token at a URI
|
|
|
|
"""
|
|
|
|
resp = self.c.get(url, allow_redirects=False)
|
|
|
|
if resp.status_code != 200 and resp.status_code != 302:
|
2023-10-05 00:22:07 +05:30
|
|
|
logger.info(url, resp.status_code)
|
2023-09-27 15:47:04 +05:30
|
|
|
raise Exception(f"Can't get csrf token: {resp.status_code}")
|
|
|
|
parser = ParseCSRF(name=self.__csrf_key)
|
|
|
|
parser.feed(resp.text)
|
|
|
|
csrf = parser.token
|
|
|
|
return csrf
|
|
|
|
|
|
|
|
def register(self):
|
|
|
|
"""
|
|
|
|
Register User
|
|
|
|
"""
|
|
|
|
url = self.get_uri("/user/sign_up")
|
|
|
|
csrf = self.get_csrf_token(url)
|
|
|
|
payload = {
|
|
|
|
"_csrf": csrf,
|
|
|
|
"user_name": self.username,
|
|
|
|
"password": self.password,
|
|
|
|
"retype": self.password,
|
|
|
|
"email": self.email,
|
|
|
|
}
|
|
|
|
self.c.post(url, data=payload, allow_redirects=False)
|
|
|
|
|
|
|
|
def login(self):
|
|
|
|
"""
|
|
|
|
Login, must be called at least once before performing authenticated
|
|
|
|
operations
|
|
|
|
"""
|
|
|
|
if self.__logged_in:
|
|
|
|
return
|
|
|
|
url = self.get_uri("/user/login")
|
|
|
|
csrf = self.get_csrf_token(url)
|
|
|
|
payload = {
|
|
|
|
"_csrf": csrf,
|
|
|
|
"user_name": self.username,
|
|
|
|
"password": self.password,
|
|
|
|
"remember": "on",
|
|
|
|
}
|
|
|
|
resp = self.c.post(url, data=payload, allow_redirects=False)
|
|
|
|
if any(
|
|
|
|
[resp.status_code == 302, resp.status_code == 200, resp.status_code == 303]
|
|
|
|
):
|
2023-10-05 00:22:07 +05:30
|
|
|
logger.info("User logged in")
|
2023-09-27 15:47:04 +05:30
|
|
|
self.__logged_in = True
|
|
|
|
return
|
|
|
|
|
|
|
|
raise Exception(
|
|
|
|
f"[ERROR] Authentication failed. status code {resp.status_code}"
|
|
|
|
)
|
|
|
|
|
|
|
|
def create_repository(self, name: str):
|
|
|
|
"""
|
|
|
|
Create repository
|
|
|
|
"""
|
|
|
|
self.login()
|
|
|
|
|
|
|
|
def get_repository_payload(csrf: str, name: str, user_id: str):
|
|
|
|
data = {
|
|
|
|
"_csrf": csrf,
|
|
|
|
"uid": user_id,
|
|
|
|
"repo_name": name,
|
|
|
|
"description": f"this repository is named {name}",
|
|
|
|
"repo_template": "",
|
|
|
|
"issue_labels": "",
|
|
|
|
"gitignores": "",
|
|
|
|
"license": "",
|
|
|
|
"readme": "Default",
|
|
|
|
"default_branch": "master",
|
|
|
|
"trust_model": "default",
|
|
|
|
}
|
|
|
|
return data
|
|
|
|
|
|
|
|
url = self.get_uri("/repo/create")
|
|
|
|
user_id = self.c.get(self.get_api_uri("/api/v1/user")).json()["id"]
|
|
|
|
|
|
|
|
csrf = self.get_csrf_token(url)
|
|
|
|
data = get_repository_payload(csrf, name, user_id=user_id)
|
|
|
|
|
|
|
|
resp = self.c.post(url, data=data, allow_redirects=False)
|
2023-10-05 00:22:07 +05:30
|
|
|
logger.info(f"Created repository {name}")
|
2023-09-27 15:47:04 +05:30
|
|
|
if (
|
|
|
|
resp.status_code != 302
|
|
|
|
and resp.status_code != 200
|
|
|
|
and resp.status_code != 303
|
|
|
|
):
|
|
|
|
raise Exception(
|
|
|
|
f"Error while creating repository: {name} {resp.status_code}"
|
|
|
|
)
|
|
|
|
|
|
|
|
def create_issue(self, owner: str, repo: str, title: str, body: str):
|
|
|
|
"""
|
|
|
|
Create issue
|
|
|
|
"""
|
|
|
|
self.login()
|
|
|
|
|
|
|
|
def create_issue_payload(csrf: str, title: str, body: str):
|
|
|
|
data = {
|
|
|
|
"_csrf": csrf,
|
|
|
|
"title": title,
|
|
|
|
"content": body,
|
|
|
|
"search": "",
|
|
|
|
"label_ids": "",
|
|
|
|
"milestone_id": "",
|
|
|
|
"project_id": "",
|
|
|
|
"assignee_ids": "",
|
|
|
|
"redirect_after_creation": "",
|
|
|
|
}
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
url = self.get_uri(f"/{owner}/{repo}/issues/new")
|
|
|
|
csrf = self.get_csrf_token(url)
|
|
|
|
data = create_issue_payload(csrf=csrf, title=title, body=body)
|
|
|
|
|
|
|
|
resp = self.c.post(url, data=data, allow_redirects=False)
|
2023-10-05 00:22:07 +05:30
|
|
|
logger.info(f"Created issue")
|
2023-09-27 15:47:04 +05:30
|
|
|
if (
|
|
|
|
resp.status_code != 302
|
|
|
|
and resp.status_code != 200
|
|
|
|
and resp.status_code != 303
|
|
|
|
):
|
|
|
|
raise Exception(f"Error while creating issue: {resp.status_code}")
|
|
|
|
|
|
|
|
def create_comment(self, owner: str, repo: str, issue: int, body: str):
|
|
|
|
"""
|
|
|
|
Create comment
|
|
|
|
"""
|
|
|
|
self.login()
|
|
|
|
|
|
|
|
def create_comment_payload(csrf: str, body: str):
|
|
|
|
data = {
|
|
|
|
"_csrf": csrf,
|
|
|
|
"content": body,
|
|
|
|
"status": "",
|
|
|
|
}
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
url = self.get_uri(f"/{owner}/{repo}/issues/{issue}")
|
|
|
|
csrf = self.get_csrf_token(url)
|
|
|
|
data = create_comment_payload(csrf, body)
|
|
|
|
|
|
|
|
url = self.get_uri(f"/{owner}/{repo}/issues/{issue}/comments")
|
|
|
|
resp = self.c.post(url, data=data, allow_redirects=False)
|
2023-10-05 00:22:07 +05:30
|
|
|
logger.info(f"Created comment")
|
2023-09-27 15:47:04 +05:30
|
|
|
if (
|
|
|
|
resp.status_code != 302
|
|
|
|
and resp.status_code != 200
|
|
|
|
and resp.status_code != 303
|
|
|
|
):
|
|
|
|
raise Exception(f"Error while creating comment: {resp.status_code}")
|
|
|
|
|
|
|
|
def create_access_token(self, name: str, file: str, repo: str):
|
|
|
|
"""
|
|
|
|
Create access token
|
|
|
|
"""
|
|
|
|
|
|
|
|
def create_access_token_payload(name: str):
|
|
|
|
data = {
|
|
|
|
"name": name,
|
|
|
|
"scopes": [
|
|
|
|
"read:issue",
|
|
|
|
"write:notification",
|
|
|
|
"read:repository",
|
|
|
|
"read:user",
|
|
|
|
],
|
|
|
|
}
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
url = self.get_uri(f"/api/v1/users/{self.username}/tokens")
|
|
|
|
data = create_access_token_payload(name)
|
|
|
|
|
|
|
|
session = Session()
|
|
|
|
session.auth = (self.username, self.password)
|
|
|
|
resp = session.post(url, json=data, allow_redirects=False)
|
|
|
|
|
|
|
|
if resp.status_code != 201:
|
2023-10-05 00:22:07 +05:30
|
|
|
raise Exception(
|
|
|
|
f"Error while creating access token: {resp.status_code} {resp} {resp.text}"
|
|
|
|
)
|
2023-09-27 15:47:04 +05:30
|
|
|
|
2023-10-05 00:22:07 +05:30
|
|
|
logger.info("Created access token")
|
2023-09-27 15:47:04 +05:30
|
|
|
data = resp.json()
|
|
|
|
with open(file, "w") as f:
|
|
|
|
data["login"] = self.username
|
|
|
|
data["email"] = self.email
|
|
|
|
data["repo"] = repo
|
|
|
|
data["forgejo_url"] = self.get_uri("")
|
|
|
|
content = json.dumps(data)
|
|
|
|
f.write(content)
|
2023-10-05 00:22:07 +05:30
|
|
|
logger.info(f"Wrote access token to {file}")
|