1
0
Fork 0

Refactor autopatch: use a AppToPatch class that regroups actions, put loops outside the action functions

This commit is contained in:
Félix Piédallu 2024-09-12 18:03:30 +02:00 committed by Salamandar
parent fad36a07dc
commit 0ea25c671e

View file

@ -5,29 +5,37 @@ import json
import os
import subprocess
import sys
import logging
import time
from typing import Optional, TypeVar, Iterable, Generator
from pathlib import Path
import requests
import tqdm
import toml
from git import Repo, Head, Actor
# add apps/tools to sys.path
sys.path.insert(0, str(Path(__file__).parent.parent))
from app_caches import AppDir
from appslib.utils import ( # noqa: E402 pylint: disable=import-error,wrong-import-position
get_catalog,
)
import appslib.get_apps_repo as get_apps_repo
TOOLS_DIR = Path(__file__).resolve().parent.parent
my_env = os.environ.copy()
my_env["GIT_TERMINAL_PROMPT"] = "0"
os.makedirs(".apps_cache", exist_ok=True)
login = (TOOLS_DIR / ".github_login").open("r", encoding="utf-8").read().strip()
token = (TOOLS_DIR / ".github_token").open("r", encoding="utf-8").read().strip()
LOGIN = (TOOLS_DIR / ".github_login").open("r", encoding="utf-8").read().strip()
TOKEN = (TOOLS_DIR / ".github_token").open("r", encoding="utf-8").read().strip()
github_api = "https://api.github.com"
PATCHES_PATH = Path(__file__).resolve().parent / "patches"
def apps(min_level=4):
for app, infos in get_catalog().items():
@ -36,162 +44,128 @@ def apps(min_level=4):
yield infos
def app_cache_folder(app):
return os.path.join(".apps_cache", app)
class AppToPatch:
def __init__(self, id: str, path: Path, info: dict) -> None:
self.id = id
self.path = path
self.info = info
self.patch: Optional[str] = None
self._repo: Optional[Repo] = None
@property
def repo(self) -> Repo:
if self._repo is None:
self._repo = Repo(self.path)
return self._repo
def git(cmd, in_folder=None):
if not isinstance(cmd, list):
cmd = cmd.split()
if in_folder:
cmd = ["-C", in_folder] + cmd
cmd = ["git"] + cmd
return subprocess.check_output(cmd, env=my_env).strip().decode("utf-8")
def cache(self) -> None:
appdir = AppDir(self.id, self.path)
appdir.ensure(self.info["url"], self.info.get("branch", "master"), False, False)
def reset(self) -> None:
if self.get_diff():
logging.warning("%s had local changes, they were stashed.", self.id)
self.repo.git.stash("save")
self.repo.git.checkout("testing")
# Progress bar helper, stolen from https://stackoverflow.com/a/34482761
def progressbar(it, prefix="", size=60, file=sys.stdout):
it = list(it)
count = len(it)
def apply(self, patch: str) -> None:
current_branch = self.repo.active_branch
self.repo.head.reset(f"{current_branch}", index=True, working_tree=True)
subprocess.call([PATCHES_PATH / patch / "patch"], cwd=self.path)
def show(j, name=""):
name += " "
x = int(size * j / count)
file.write(
"%s[%s%s] %i/%i %s\r" % (prefix, "#" * x, "." * (size - x), j, count, name)
)
file.flush()
def get_diff(self) -> str:
return " ".join(str(d) for d in self.repo.index.diff(None, create_patch=True))
show(0)
for i, item in enumerate(it):
yield item
show(i + 1, item["id"])
file.write("\n")
file.flush()
def diff(self) -> None:
diff = self.get_diff()
if not diff:
return
print(80 * "=")
print(f"Changes in : {self.id}")
print(80 * "=")
print()
print(diff)
print("\n\n\n")
def on_github(self) -> bool:
return "github.com/yunohost-apps" in self.info["url"].lower()
def build_cache():
for app in progressbar(apps(), "Git cloning: ", 40):
folder = os.path.join(".apps_cache", app["id"])
reponame = app["url"].rsplit("/", 1)[-1]
git(f"clone --quiet --depth 1 --single-branch {app['url']} {folder}")
git(
f"remote add fork https://{login}:{token}@github.com/{login}/{reponame}",
in_folder=folder,
def fork_if_needed(self, session: requests.Session) -> None:
repo_name = self.info["url"].split("/")[-1]
r = session.get(github_api + f"/repos/{LOGIN}/{repo_name}")
if r.status_code == 200:
return
fork_repo_name = self.info["url"].split("github.com/")[-1]
r = session.post(github_api + f"/repos/{fork_repo_name}/forks")
r.raise_for_status()
time.sleep(2) # to avoid rate limiting lol
def commit(self, patch: str) -> None:
pr_title = (PATCHES_PATH / patch / "pr_title.md").open().read().strip()
title = f"[autopatch] {pr_title}"
self.repo.git.add(all=True)
self.repo.index.commit(title, author=Actor("Yunohost-Bot", None))
def push(self, patch: str, session: requests.Session) -> None:
if not self.get_diff():
return
if not self.on_github():
return
self.fork_if_needed(session)
base_branch = self.repo.active_branch
if patch in self.repo.heads:
self.repo.delete_head(patch)
head_branch = self.repo.create_head(patch)
head_branch.checkout()
self.commit(patch)
if "fork" in self.repo.remotes:
self.repo.delete_remote(self.repo.remote("fork"))
reponame = self.info["url"].rsplit("/", 1)[-1]
self.repo.create_remote(
"fork", url=f"https://{LOGIN}:{TOKEN}@github.com/{LOGIN}/{reponame}"
)
self.repo.remote(name="fork").push(progress=None, force=True)
def apply(patch):
patch_path = os.path.abspath(os.path.join("patches", patch, "patch.sh"))
self.create_pull_request(patch, head_branch, base_branch, session)
for app in progressbar(apps(), "Apply to: ", 40):
folder = os.path.join(".apps_cache", app["id"])
current_branch = git(f"symbolic-ref --short HEAD", in_folder=folder)
git(f"reset --hard origin/{current_branch}", in_folder=folder)
os.system(f"cd {folder} && bash {patch_path}")
def create_pull_request(
self, patch: str, head: Head, base: Head, session: requests.Session
) -> None:
pr_title = (PATCHES_PATH / patch / "pr_title.md").open().read().strip()
pr_body = (PATCHES_PATH / patch / "pr_body.md").open().read().strip()
PR = {
"title": f"[autopatch] {pr_title}",
"body": f"This is an automatic PR\n\n{pr_body}",
"head": f"{LOGIN}:{head.name}",
"base": base.name,
"maintainer_can_modify": True,
}
fork_repo_name = self.info["url"].split("github.com/")[-1]
repo_name = self.info["url"].split("/")[-1]
r = session.post(github_api + f"/repos/{fork_repo_name}/pulls", json.dumps(PR))
r.raise_for_status()
print(json.loads(r.text)["html_url"])
time.sleep(4) # to avoid rate limiting lol
def diff():
for app in apps():
folder = os.path.join(".apps_cache", app["id"])
if bool(
subprocess.check_output(f"cd {folder} && git diff", shell=True)
.strip()
.decode("utf-8")
):
print("\n\n\n")
print("=================================")
print("Changes in : " + app["id"])
print("=================================")
print("\n")
os.system(f"cd {folder} && git --no-pager diff")
IterType = TypeVar("IterType")
def push(patch):
title = (
"[autopatch] "
+ open(os.path.join("patches", patch, "pr_title.md")).read().strip()
)
def diff_not_empty(app):
folder = os.path.join(".apps_cache", app["id"])
return bool(
subprocess.check_output(f"cd {folder} && git diff", shell=True)
.strip()
.decode("utf-8")
)
def app_is_on_github(app):
return "github.com" in app["url"]
apps_to_push = [
app for app in apps() if diff_not_empty(app) and app_is_on_github(app)
]
with requests.Session() as s:
s.headers.update({"Authorization": f"token {token}"})
for app in progressbar(apps_to_push, "Forking: ", 40):
app["repo"] = app["url"][len("https://github.com/") :].strip("/")
fork_if_needed(app["repo"], s)
time.sleep(2) # to avoid rate limiting lol
for app in progressbar(apps_to_push, "Pushing: ", 40):
app["repo"] = app["url"][len("https://github.com/") :].strip("/")
app_repo_name = app["url"].rsplit("/", 1)[-1]
folder = os.path.join(".apps_cache", app["id"])
current_branch = git(f"symbolic-ref --short HEAD", in_folder=folder)
git(f"reset origin/{current_branch}", in_folder=folder)
git(
["commit", "-a", "-m", title, "--author='Yunohost-Bot <>'"],
in_folder=folder,
)
try:
git(f"remote remove fork", in_folder=folder)
except Exception:
pass
git(
f"remote add fork https://{login}:{token}@github.com/{login}/{app_repo_name}",
in_folder=folder,
)
git(f"push fork {current_branch}:{patch} --quiet --force", in_folder=folder)
create_pull_request(app["repo"], patch, current_branch, s)
time.sleep(4) # to avoid rate limiting lol
def progressbar(elements: list[IterType]) -> Generator[IterType, None, None]:
return tqdm.tqdm(elements, total=len(elements), ascii=" ·#")
def fork_if_needed(repo, s):
repo_name = repo.split("/")[-1]
r = s.get(github_api + f"/repos/{login}/{repo_name}")
if r.status_code == 200:
return
r = s.post(github_api + f"/repos/{repo}/forks")
if r.status_code != 200:
print(r.text)
def create_pull_request(repo, patch, base_branch, s):
PR = {
"title": "[autopatch] "
+ open(os.path.join("patches", patch, "pr_title.md")).read().strip(),
"body": "This is an automatic PR\n\n"
+ open(os.path.join("patches", patch, "pr_body.md")).read().strip(),
"head": login + ":" + patch,
"base": base_branch,
"maintainer_can_modify": True,
}
r = s.post(github_api + f"/repos/{repo}/pulls", json.dumps(PR))
if r.status_code != 200:
print(r.text)
else:
json.loads(r.text)["html_url"]
def main():
def main() -> None:
parser = argparse.ArgumentParser()
get_apps_repo.add_args(parser)
parser.add_argument(
"the_patch", type=str, nargs="?", help="The name of the patch to apply"
)
@ -212,24 +186,44 @@ def main():
)
args = parser.parse_args()
get_apps_repo.from_args(args)
cache_path = get_apps_repo.cache_path(args)
cache_path.mkdir(exist_ok=True, parents=True)
if not (args.cache or args.apply or args.diff or args.push):
parser.error("We required --cache, --apply, --diff or --push.")
apps_to_patch: list[AppToPatch] = [
AppToPatch(info["id"], cache_path / info["id"], info) for info in apps()
]
if args.cache:
build_cache()
print("Caching apps...")
for app in progressbar(apps_to_patch):
app.cache()
if args.apply:
if not args.the_patch:
parser.error("--apply requires the patch name to be passed")
apply(args.the_patch)
print(f"Applying patch '{args.the_patch}' to apps...")
for app in progressbar(apps_to_patch):
app.reset()
app.apply(args.the_patch)
if args.diff:
diff()
print("Printing diff of apps...")
for app in progressbar(apps_to_patch):
app.diff()
if args.push:
if not args.the_patch:
parser.error("--push requires the patch name to be passed")
push(args.the_patch)
print("Pushing apps...")
with requests.Session() as session:
session.headers.update({"Authorization": f"token {TOKEN}"})
for app in progressbar(apps_to_patch):
app.push(args.the_patch, session)
main()
if __name__ == "__main__":
main()