1
0
Fork 0

Merge pull request #1997 from Salamandar/refactor

Python cleanup
This commit is contained in:
Alexandre Aubin 2024-02-07 19:44:18 +01:00 committed by GitHub
commit 265cf0b186
19 changed files with 840 additions and 109 deletions

View file

@ -0,0 +1 @@
#!/usr/bin/env python3

View file

@ -2,12 +2,13 @@
import argparse import argparse
import json import json
import toml
import os import os
from pathlib import Path from pathlib import Path
import toml
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader
def value_for_lang(values, lang): def value_for_lang(values, lang):
if not isinstance(values, dict): if not isinstance(values, dict):
return values return values

View file

@ -1,15 +1,16 @@
import os #!/usr/bin/env python3
import hmac
import shlex
import hashlib
import asyncio import asyncio
import hashlib
import hmac
import os
import shlex
import tempfile import tempfile
from make_readme import generate_READMEs
from sanic import Sanic, response from sanic import Sanic, response
from sanic.response import text from sanic.response import text
from make_readme import generate_READMEs
app = Sanic(__name__) app = Sanic(__name__)
github_webhook_secret = open("github_webhook_secret", "r").read().strip() github_webhook_secret = open("github_webhook_secret", "r").read().strip()

View file

@ -0,0 +1 @@
#!/usr/bin/env python3

95
app_caches.py Executable file
View file

@ -0,0 +1,95 @@
#!/usr/bin/env python3
import argparse
import shutil
import logging
from multiprocessing import Pool
from pathlib import Path
from typing import Any
import tqdm
from appslib.utils import (REPO_APPS_ROOT, # pylint: disable=import-error
get_catalog, git_repo_age)
from git import Repo
def app_cache_folder(app: str) -> Path:
return REPO_APPS_ROOT / ".apps_cache" / app
def app_cache_clone(app: str, infos: dict[str, str]) -> None:
logging.info("Cloning %s...", app)
git_depths = {
"notworking": 5,
"inprogress": 20,
"default": 40,
}
if app_cache_folder(app).exists():
shutil.rmtree(app_cache_folder(app))
Repo.clone_from(
infos["url"],
to_path=app_cache_folder(app),
depth=git_depths.get(infos["state"], git_depths["default"]),
single_branch=True, branch=infos.get("branch", "master"),
)
def app_cache_clone_or_update(app: str, infos: dict[str, str]) -> None:
app_path = app_cache_folder(app)
# Don't refresh if already refreshed during last hour
age = git_repo_age(app_path)
if age is False:
app_cache_clone(app, infos)
return
# if age < 3600:
# logging.info(f"Skipping {app}, it's been updated recently.")
# return
logging.info("Updating %s...", app)
repo = Repo(app_path)
repo.remote("origin").set_url(infos["url"])
branch = infos.get("branch", "master")
if repo.active_branch != branch:
all_branches = [str(b) for b in repo.branches]
if branch in all_branches:
repo.git.checkout(branch, "--force")
else:
repo.git.remote("set-branches", "--add", "origin", branch)
repo.remote("origin").fetch(f"{branch}:{branch}")
repo.remote("origin").fetch(refspec=branch, force=True)
repo.git.reset("--hard", f"origin/{branch}")
def __app_cache_clone_or_update_mapped(data):
name, info = data
try:
app_cache_clone_or_update(name, info)
except Exception as err:
logging.error("Error while updating %s: %s", name, err)
def apps_cache_update_all(apps: dict[str, dict[str, Any]], parallel: int = 8) -> None:
with Pool(processes=parallel) as pool:
tasks = pool.imap_unordered(__app_cache_clone_or_update_mapped, apps.items())
for _ in tqdm.tqdm(tasks, total=len(apps.keys())):
pass
def __run_for_catalog():
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-j", "--processes", type=int, default=8)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.INFO)
apps_cache_update_all(get_catalog(), parallel=args.processes)
if __name__ == "__main__":
__run_for_catalog()

68
appslib/apps_cache.py Normal file
View file

@ -0,0 +1,68 @@
#!/usr/bin/env python3
import logging
from pathlib import Path
import utils
from git import Repo
def apps_cache_path() -> Path:
path = apps_repo_root() / ".apps_cache"
path.mkdir()
return path
def app_cache_path(app: str) -> Path:
path = apps_cache_path() / app
path.mkdir()
return path
# def refresh_all_caches(catalog: dict[str, dict[str, str]]):
# for app, infos
# pass
def app_cache_clone(app: str, infos: dict[str, str]) -> None:
git_depths = {
"notworking": 5,
"inprogress": 20,
"default": 40,
}
Repo.clone_from(
infos["url"],
to_path=app_cache_path(app),
depth=git_depths.get(infos["state"], git_depths["default"]),
single_branch=True, branch=infos.get("branch", "master"),
)
def app_cache_update(app: str, infos: dict[str, str]) -> None:
app_path = app_cache_path(app)
age = utils.git_repo_age(app_path)
if age is False:
return app_cache_clone(app, infos)
if age < 3600:
logging.info(f"Skipping {app}, it's been updated recently.")
return
repo = Repo(app_path)
repo.remote("origin").set_url(infos["url"])
branch = infos.get("branch", "master")
if repo.active_branch != branch:
all_branches = [str(b) for b in repo.branches]
if branch in all_branches:
repo.git.checkout(branch, "--force")
else:
repo.git.remote("set-branches", "--add", "origin", branch)
repo.remote("origin").fetch(f"{branch}:{branch}")
repo.remote("origin").fetch(refspec=branch, force=True)
repo.git.reset("--hard", f"origin/{branch}")
def cache_all_apps(catalog: dict[str, dict[str, str]]) -> None:

72
appslib/utils.py Normal file
View file

@ -0,0 +1,72 @@
#!/usr/bin/env python3
import sys
import subprocess
from typing import Any, TextIO, Generator
import time
from functools import cache
from pathlib import Path
from git import Repo
import toml
REPO_APPS_ROOT = Path(Repo(__file__, search_parent_directories=True).working_dir)
@cache
def apps_repo_root() -> Path:
return Path(__file__).parent.parent.parent
def git(cmd: list[str], cwd: Path | None = None) -> str:
full_cmd = ["git"]
if cwd:
full_cmd.extend(["-C", str(cwd)])
full_cmd.extend(cmd)
return subprocess.check_output(
full_cmd,
# env=my_env,
).strip().decode("utf-8")
def git_repo_age(path: Path) -> bool | int:
for file in [path / ".git" / "FETCH_HEAD", path / ".git" / "HEAD"]:
if file.exists():
return int(time.time() - file.stat().st_mtime)
return False
# Progress bar helper, stolen from https://stackoverflow.com/a/34482761
def progressbar(
it: list[Any],
prefix: str = "",
size: int = 60,
file: TextIO = sys.stdout) -> Generator[Any, None, None]:
count = len(it)
def show(j, name=""):
name += " "
x = int(size * j / count)
file.write(
"%s[%s%s] %i/%i %s\r" % (prefix, "#" * x, "." * (size - x), j, count, name)
)
file.flush()
show(0)
for i, item in enumerate(it):
yield item
show(i + 1, item[0])
file.write("\n")
file.flush()
@cache
def get_catalog(working_only=False):
"""Load the app catalog and filter out the non-working ones"""
catalog = toml.load((REPO_APPS_ROOT / "apps.toml").open("r", encoding="utf-8"))
if working_only:
catalog = {
app: infos for app, infos in catalog.items()
if infos.get("state") != "notworking"
}
return catalog

View file

@ -1,9 +1,11 @@
#!/usr/bin/python3 #!/usr/bin/python3
import json import json
import sys
import requests
import os import os
import subprocess import subprocess
import sys
import requests
catalog = requests.get("https://raw.githubusercontent.com/YunoHost/apps/master/apps.json").json() catalog = requests.get("https://raw.githubusercontent.com/YunoHost/apps/master/apps.json").json()

View file

@ -1,7 +1,8 @@
#!/usr/bin/python3 #!/usr/bin/python3
import json
import csv import csv
import json
def find_cpe(app_id): def find_cpe(app_id):
with open("../../patches/add-cpe/cpe.csv", newline='') as f: with open("../../patches/add-cpe/cpe.csv", newline='') as f:

View file

@ -1,13 +1,15 @@
import time #!/usr/bin/env python3
import glob
import hashlib import hashlib
import os
import re import re
import sys import sys
import requests import time
import toml
import os
import glob
from datetime import datetime from datetime import datetime
import requests
import toml
from rest_api import GithubAPI, GitlabAPI, RefType from rest_api import GithubAPI, GitlabAPI, RefType
STRATEGIES = [ STRATEGIES = [

View file

@ -1,8 +1,11 @@
from enum import Enum #!/usr/bin/env python3
import re import re
import requests from enum import Enum
from typing import List from typing import List
import requests
class RefType(Enum): class RefType(Enum):
tags = 1 tags = 1

View file

@ -1,14 +1,17 @@
#!venv/bin/python3 #!/usr/bin/env python3
import sys, os, time import json
import urllib.request, json import os
import re import re
import sys
import time
import urllib.request
from github import Github
import github import github
from github import Github
# Debug # Debug
from rich.traceback import install from rich.traceback import install
install(width=150, show_locals=True, locals_max_length=None, locals_max_string=None) install(width=150, show_locals=True, locals_max_length=None, locals_max_string=None)
##### #####

View file

@ -1,4 +1,4 @@
#!venv/bin/python3 #!/usr/bin/env python3
# Obtained with `pip install PyGithub`, better within a venv # Obtained with `pip install PyGithub`, better within a venv
from github import Github from github import Github

View file

@ -2,10 +2,10 @@
import json import json
import sys import sys
from difflib import SequenceMatcher
from functools import cache from functools import cache
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Generator, List, Tuple from typing import Any, Dict, Generator, List, Tuple
from difflib import SequenceMatcher
import jsonschema import jsonschema
import toml import toml

355
list_builder.py Executable file
View file

@ -0,0 +1,355 @@
#!/usr/bin/python3
import copy
import json
import os
import re
import subprocess
import sys
import time
from collections import OrderedDict
from pathlib import Path
from shutil import which
from typing import Any, Generator, TextIO
import toml
from git import Repo
from packaging_v2.convert_v1_manifest_to_v2_for_catalog import \
convert_v1_manifest_to_v2_for_catalog # pylint: disable=import-error
now = time.time()
REPO_APPS_PATH = Path(__file__).parent.parent
# Load categories and reformat the structure to have a list with an "id" key
categories = toml.load((REPO_APPS_PATH / "categories.toml").open("r", encoding="utf-8"))
for category_id, infos in categories.items():
infos["id"] = category_id
for subtag_id, subtag_infos in infos.get("subtags", {}).items():
subtag_infos["id"] = subtag_id
infos["subtags"] = list(infos.get('subtags', {}).values())
categories = list(categories.values())
# (Same for antifeatures)
antifeatures = toml.load((REPO_APPS_PATH / "antifeatures.toml").open("r", encoding="utf-8"))
for antifeature_id, infos in antifeatures.items():
infos["id"] = antifeature_id
antifeatures = list(antifeatures.values())
# Load the app catalog and filter out the non-working ones
catalog = toml.load((REPO_APPS_PATH / "apps.toml").open("r", encoding="utf-8"))
catalog = {
app: infos for app, infos in catalog.items() if infos.get("state") != "notworking"
}
my_env = os.environ.copy()
my_env["GIT_TERMINAL_PROMPT"] = "0"
(REPO_APPS_PATH / ".apps_cache").mkdir(exist_ok=True)
(REPO_APPS_PATH / "builds").mkdir(exist_ok=True)
def error(msg: str) -> None:
msg = "[Applist builder error] " + msg
if which("sendxmpppy") is not None:
subprocess.call(["sendxmpppy", msg], stdout=open(os.devnull, "wb"))
print(msg + "\n")
# Progress bar helper, stolen from https://stackoverflow.com/a/34482761
def progressbar(it: list[Any], prefix: str = "", size: int = 60, file: TextIO = sys.stdout
) -> Generator[Any, None, None]:
count = len(it)
def show(j, name=""):
name += " "
x = int(size * j / count)
file.write(
"%s[%s%s] %i/%i %s\r" % (prefix, "#" * x, "." * (size - x), j, count, name)
)
file.flush()
show(0)
for i, item in enumerate(it):
yield item
show(i + 1, item[0])
file.write("\n")
file.flush()
###################################
# App git clones cache management #
###################################
def app_cache_folder(app: str) -> Path:
return REPO_APPS_PATH / ".apps_cache" / app
def refresh_all_caches() -> None:
for app, infos in progressbar(sorted(catalog.items()), "Updating git clones: ", 40):
app = app.lower()
if not app_cache_folder(app).exists():
try:
init_cache(app, infos)
except Exception as e:
error("Failed to init cache for %s" % app)
else:
try:
refresh_cache(app, infos)
except Exception as e:
error("Failed to not refresh cache for %s: %s" % (app, e))
raise e
def init_cache(app: str, infos: dict[str, str]) -> None:
git_depths = {
"notworking": 5,
"inprogress": 20,
"default": 40,
}
Repo.clone_from(
infos["url"],
to_path=app_cache_folder(app),
depth=git_depths.get(infos["state"], git_depths["default"]),
single_branch=True, branch=infos.get("branch", "master"),
)
def git_repo_age(path: Path) -> bool | int:
fetch_head = path / ".git" / "FETCH_HEAD"
if fetch_head.exists():
return int(time.time() - fetch_head.stat().st_mtime)
return False
def refresh_cache(app: str, infos: dict[str, str]) -> None:
app_path = app_cache_folder(app)
# Don't refresh if already refreshed during last hour
age = git_repo_age(app_path)
if age is not False and age < 3600:
return
try:
repo = Repo(app_path)
repo.remote("origin").set_url(infos["url"])
branch = infos.get("branch", "master")
if repo.active_branch != branch:
all_branches = [str(b) for b in repo.branches]
if branch in all_branches:
repo.git.checkout(branch, "--force")
else:
repo.git.remote("set-branches", "--add", "origin", branch)
repo.remote("origin").fetch(f"{branch}:{branch}")
repo.remote("origin").fetch(refspec=branch, force=True)
repo.git.reset("--hard", f"origin/{branch}")
except:
# Sometimes there are tmp issue such that the refresh cache ..
# we don't trigger an error unless the cache hasnt been updated since more than 24 hours
age = git_repo_age(app_path)
if age is not False and age < 24 * 3600:
pass
else:
raise
################################
# Actual list build management #
################################
def build_catalog():
result_dict = {}
for app, infos in progressbar(sorted(catalog.items()), "Processing: ", 40):
app = app.lower()
try:
app_dict = build_app_dict(app, infos)
except Exception as e:
error("Processing %s failed: %s" % (app, str(e)))
continue
result_dict[app_dict["id"]] = app_dict
#############################
# Current catalog API v2 #
#############################
result_dict_with_manifest_v1 = copy.deepcopy(result_dict)
result_dict_with_manifest_v1 = {name: infos for name, infos in result_dict_with_manifest_v1.items() if float(str(infos["manifest"].get("packaging_format", "")).strip() or "0") < 2}
os.system("mkdir -p ./builds/default/v2/")
with open("builds/default/v2/apps.json", "w") as f:
f.write(
json.dumps(
{
"apps": result_dict_with_manifest_v1,
"categories": categories,
"antifeatures": antifeatures,
},
sort_keys=True,
)
)
#############################################
# Catalog catalog API v3 (with manifest v2) #
#############################################
result_dict_with_manifest_v2 = copy.deepcopy(result_dict)
for app in result_dict_with_manifest_v2.values():
packaging_format = float(str(app["manifest"].get("packaging_format", "")).strip() or "0")
if packaging_format < 2:
app["manifest"] = convert_v1_manifest_to_v2_for_catalog(app["manifest"])
# We also remove the app install question and resources parts which aint needed anymore by webadmin etc (or at least we think ;P)
for app in result_dict_with_manifest_v2.values():
if "manifest" in app and "install" in app["manifest"]:
del app["manifest"]["install"]
if "manifest" in app and "resources" in app["manifest"]:
del app["manifest"]["resources"]
for appid, app in result_dict_with_manifest_v2.items():
appid = appid.lower()
if (REPO_APPS_PATH / "logos" / f"{appid}.png").exists():
logo_hash = subprocess.check_output(["sha256sum", f"logos/{appid}.png"]).strip().decode("utf-8").split()[0]
os.system(f"cp logos/{appid}.png builds/default/v3/logos/{logo_hash}.png")
# FIXME: implement something to cleanup old logo stuf in the builds/.../logos/ folder somehow
else:
logo_hash = None
app["logo_hash"] = logo_hash
os.system("mkdir -p ./builds/default/v3/")
with open("builds/default/v3/apps.json", "w") as f:
f.write(
json.dumps(
{
"apps": result_dict_with_manifest_v2,
"categories": categories,
"antifeatures": antifeatures,
},
sort_keys=True,
)
)
##############################
# Version for catalog in doc #
##############################
os.system("mkdir -p ./builds/default/doc_catalog")
def infos_for_doc_catalog(infos):
level = infos.get("level")
if not isinstance(level, int):
level = -1
return {
"id": infos["id"],
"category": infos["category"],
"url": infos["git"]["url"],
"name": infos["manifest"]["name"],
"description": infos["manifest"]["description"],
"state": infos["state"],
"level": level,
"broken": level <= 0,
"good_quality": level >= 8,
"bad_quality": level <= 5,
"antifeatures": infos.get("antifeatures"),
"potential_alternative_to": infos.get("potential_alternative_to", []),
}
result_dict_doc = {
k: infos_for_doc_catalog(v)
for k, v in result_dict.items()
if v["state"] == "working"
}
with open("builds/default/doc_catalog/apps.json", "w") as f:
f.write(
json.dumps(
{"apps": result_dict_doc, "categories": categories}, sort_keys=True
)
)
def build_app_dict(app, infos):
# Make sure we have some cache
this_app_cache = app_cache_folder(app)
assert this_app_cache.exists(), "No cache yet for %s" % app
repo = Repo(this_app_cache)
commit_timestamps_for_this_app_in_catalog = \
repo.git.log("-G", f"cinny", "--first-parent", "--reverse", "--date=unix",
"--format=%cd", "--", "apps.json", "apps.toml")
# Assume the first entry we get (= the oldest) is the time the app was added
infos["added_in_catalog"] = int(commit_timestamps_for_this_app_in_catalog.split("\n")[0])
infos["branch"] = infos.get("branch", "master")
infos["revision"] = infos.get("revision", "HEAD")
# If using head, find the most recent meaningful commit in logs
if infos["revision"] == "HEAD":
relevant_files = [
"manifest.json",
"manifest.toml",
"config_panel.toml",
"hooks/",
"scripts/",
"conf/",
"sources/",
]
relevant_commits = repo.iter_commits(paths=relevant_files, full_history=True, all=True)
infos["revision"] = next(relevant_commits).hexsha
# Otherwise, validate commit exists
else:
try:
_ = repo.commit(infos["revision"])
except ValueError as err:
raise RuntimeError(f"Revision ain't in history ? {infos['revision']}") from err
# Find timestamp corresponding to that commit
timestamp = repo.commit(infos["revision"]).committed_date
# Build the dict with all the infos
if (this_app_cache / "manifest.toml").exists():
manifest = toml.load((this_app_cache / "manifest.toml").open("r"), _dict=OrderedDict)
else:
manifest = json.load((this_app_cache / "manifest.json").open("r"))
return {
"id": manifest["id"],
"git": {
"branch": infos["branch"],
"revision": infos["revision"],
"url": infos["url"],
},
"added_in_catalog": infos["added_in_catalog"],
"lastUpdate": timestamp,
"manifest": manifest,
"state": infos["state"],
"level": infos.get("level", "?"),
"maintained": not 'package-not-maintained' in infos.get('antifeatures', []),
"high_quality": infos.get("high_quality", False),
"featured": infos.get("featured", False),
"category": infos.get("category", None),
"subtags": infos.get("subtags", []),
"potential_alternative_to": infos.get("potential_alternative_to", []),
"antifeatures": list(
set(list(manifest.get("antifeatures", {}).keys()) + infos.get("antifeatures", []))
),
}
if __name__ == "__main__":
refresh_all_caches()
build_catalog()

View file

@ -0,0 +1 @@
#!/usr/bin/env python3

View file

@ -1,7 +1,9 @@
#!/usr/bin/env python3
import argparse import argparse
import json
import os import os
import re import re
import json
import subprocess import subprocess
from glob import glob from glob import glob
@ -226,7 +228,8 @@ def _convert_v1_manifest_to_v2(app_path):
def _dump_v2_manifest_as_toml(manifest): def _dump_v2_manifest_as_toml(manifest):
import re import re
from tomlkit import document, nl, table, dumps, comment
from tomlkit import comment, document, dumps, nl, table
toml_manifest = document() toml_manifest = document()
toml_manifest.add("packaging_format", 2) toml_manifest.add("packaging_format", 2)

View file

@ -1,3 +1,5 @@
#!/usr/bin/env python3
import copy import copy

254
update_app_levels/update_app_levels.py Normal file → Executable file
View file

@ -1,26 +1,74 @@
import time #!/usr/bin/env python3
import toml """
import requests Update app catalog: commit, and create a merge request
"""
import argparse
import logging
import tempfile import tempfile
import os import textwrap
import sys import time
import json
from collections import OrderedDict from collections import OrderedDict
from typing import Any
token = open(os.path.dirname(__file__) + "/../../.github_token").read().strip() from pathlib import Path
import jinja2
import requests
import toml
from git import Repo
tmpdir = tempfile.mkdtemp(prefix="update_app_levels_") # APPS_REPO = "YunoHost/apps"
os.system(f"git clone 'https://oauth2:{token}@github.com/yunohost/apps' {tmpdir}") APPS_REPO = "Salamandar/apps"
os.system(f"git -C {tmpdir} checkout -b update_app_levels")
# Load the app catalog and filter out the non-working ones
catalog = toml.load(open(f"{tmpdir}/apps.toml"))
# Fetch results from the CI
CI_RESULTS_URL = "https://ci-apps.yunohost.org/ci/api/results" CI_RESULTS_URL = "https://ci-apps.yunohost.org/ci/api/results"
ci_results = requests.get(CI_RESULTS_URL).json()
comment = { REPO_APPS_ROOT = Path(Repo(__file__, search_parent_directories=True).working_dir)
def github_token() -> str | None:
github_token_path = REPO_APPS_ROOT.parent / ".github_token"
if github_token_path.exists():
return github_token_path.open("r", encoding="utf-8").read().strip()
return None
def get_ci_results() -> dict[str, dict[str, Any]]:
return requests.get(CI_RESULTS_URL, timeout=10).json()
def ci_result_is_outdated(result) -> bool:
# 3600 * 24 * 60 = ~2 months
return (int(time.time()) - result.get("timestamp", 0)) > 3600 * 24 * 60
def update_catalog(catalog, ci_results) -> dict:
"""
Actually change the catalog data
"""
# Re-sort the catalog keys / subkeys
for app, infos in catalog.items():
catalog[app] = OrderedDict(sorted(infos.items()))
catalog = OrderedDict(sorted(catalog.items()))
def app_level(app):
if app not in ci_results:
return 0
if ci_result_is_outdated(ci_results[app]):
return 0
return ci_results[app]["level"]
for app, info in catalog.items():
info["level"] = app_level(app)
return catalog
def list_changes(catalog, ci_results) -> dict[str, list[tuple[str, int, int]]]:
"""
Lists changes for a pull request
"""
changes = {
"major_regressions": [], "major_regressions": [],
"minor_regressions": [], "minor_regressions": [],
"improvements": [], "improvements": [],
@ -29,17 +77,15 @@ comment = {
} }
for app, infos in catalog.items(): for app, infos in catalog.items():
if infos.get("state") != "working": if infos.get("state") != "working":
continue continue
if app not in ci_results: if app not in ci_results:
comment["missing"].append(app) changes["missing"].append(app)
continue continue
# 3600 * 24 * 60 = ~2 months if ci_result_is_outdated(ci_results[app]):
if (int(time.time()) - ci_results[app].get("timestamp", 0)) > 3600 * 24 * 60: changes["outdated"].append(app)
comment["outdated"].append(app)
continue continue
ci_level = ci_results[app]["level"] ci_level = ci_results[app]["level"]
@ -47,60 +93,134 @@ for app, infos in catalog.items():
if ci_level == current_level: if ci_level == current_level:
continue continue
elif current_level is None or ci_level > current_level:
comment["improvements"].append((app, current_level, ci_level)) if current_level is None or ci_level > current_level:
elif ci_level < current_level: changes["improvements"].append((app, current_level, ci_level))
if ci_level <= 4 and current_level > 4: continue
comment["major_regressions"].append((app, current_level, ci_level))
if ci_level < current_level:
if ci_level <= 4 < current_level:
changes["major_regressions"].append((app, current_level, ci_level))
else: else:
comment["minor_regressions"].append((app, current_level, ci_level)) changes["minor_regressions"].append((app, current_level, ci_level))
infos["level"] = ci_level return changes
# Also re-sort the catalog keys / subkeys
for app, infos in catalog.items():
catalog[app] = OrderedDict(sorted(infos.items()))
catalog = OrderedDict(sorted(catalog.items()))
updated_catalog = toml.dumps(catalog) def pretty_changes(changes: dict[str, list[tuple[str, int, int]]]) -> str:
updated_catalog = updated_catalog.replace(",]", " ]") pr_body_template = textwrap.dedent("""
open(f"{tmpdir}/apps.toml", "w").write(updated_catalog) {%- if changes["major_regressions"] %}
### Major regressions 😭
{% for app in changes["major_regressions"] %}
- [ ] [{{app.0}}: {{app.1}} {{app.2}}](https://ci-apps.yunohost.org/ci/apps/{{app.0}}/latestjob)
{%- endfor %}
{% endif %}
{%- if changes["minor_regressions"] %}
### Minor regressions 😬
{% for app in changes["minor_regressions"] %}
- [ ] [{{app.0}}: {{app.1}} {{app.2}}](https://ci-apps.yunohost.org/ci/apps/{{app.0}}/latestjob)
{%- endfor %}
{% endif %}
{%- if changes["improvements"] %}
### Improvements 🥳
{% for app in changes["improvements"] %}
- [{{app.0}}: {{app.1}} {{app.2}}](https://ci-apps.yunohost.org/ci/apps/{{app.0}}/latestjob)
{%- endfor %}
{% endif %}
{%- if changes["missing"] %}
### Missing 🫠
{% for app in changes["missing"] %}
- [{{app}} (See latest job if it exists)](https://ci-apps.yunohost.org/ci/apps/{{app.0}}/latestjob)
{%- endfor %}
{% endif %}
{%- if changes["outdated"] %}
### Outdated ⏰
{% for app in changes["outdated"] %}
- [ ] [{{app}} (See latest job if it exists)](https://ci-apps.yunohost.org/ci/apps/{{app.0}}/latestjob)
{%- endfor %}
{% endif %}
""")
os.system(f"git -C {tmpdir} commit apps.toml -m 'Update app levels according to CI results'") return jinja2.Environment().from_string(pr_body_template).render(changes=changes)
os.system(f"git -C {tmpdir} push origin update_app_levels --force")
os.system(f"rm -rf {tmpdir}")
PR_body = ""
if comment["major_regressions"]:
PR_body += "\n### Major regressions\n\n"
for app, current_level, new_level in comment['major_regressions']:
PR_body += f"- [ ] {app} | {current_level} -> {new_level} | https://ci-apps.yunohost.org/ci/apps/{app}/latestjob\n"
if comment["minor_regressions"]:
PR_body += "\n### Minor regressions\n\n"
for app, current_level, new_level in comment['minor_regressions']:
PR_body += f"- [ ] {app} | {current_level} -> {new_level} | https://ci-apps.yunohost.org/ci/apps/{app}/latestjob\n"
if comment["improvements"]:
PR_body += "\n### Improvements\n\n"
for app, current_level, new_level in comment['improvements']:
PR_body += f"- {app} | {current_level} -> {new_level} | https://ci-apps.yunohost.org/ci/apps/{app}/latestjob\n"
if comment["missing"]:
PR_body += "\n### Missing results\n\n"
for app in comment['missing']:
PR_body += f"- {app} | https://ci-apps.yunohost.org/ci/apps/{app}/latestjob\n"
if comment["outdated"]:
PR_body += "\n### Outdated results\n\n"
for app in comment['outdated']:
PR_body += f"- [ ] {app} | https://ci-apps.yunohost.org/ci/apps/{app}/latestjob\n"
PR = {"title": "Update app levels according to CI results", def make_pull_request(pr_body: str) -> None:
"body": PR_body, pr_data = {
"title": "Update app levels according to CI results",
"body": pr_body,
"head": "update_app_levels", "head": "update_app_levels",
"base": "master"} "base": "master"
}
with requests.Session() as s: with requests.Session() as s:
s.headers.update({"Authorization": f"token {token}"}) s.headers.update({"Authorization": f"token {github_token()}"})
r = s.post("https://api.github.com/repos/yunohost/apps/pulls", json.dumps(PR)) response = s.post(f"https://api.github.com/repos/{APPS_REPO}/pulls", json=pr_data)
if r.status_code != 200: if response.status_code == 422:
print(r.text) response = s.get(f"https://api.github.com/repos/{APPS_REPO}/pulls", data={"head": "update_app_levels"})
sys.exit(1) response.raise_for_status()
pr_number = response.json()[0]["number"]
# head can't be updated
del pr_data["head"]
response = s.patch(f"https://api.github.com/repos/{APPS_REPO}/pulls/{pr_number}", json=pr_data)
response.raise_for_status()
existing_url = response.json()["html_url"]
logging.warning(f"An existing Pull Request has been updated at {existing_url} !")
else:
response.raise_for_status()
new_url = response.json()["html_url"]
logging.info(f"Opened a Pull Request at {new_url} !")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--commit", action=argparse.BooleanOptionalAction, default=True)
parser.add_argument("--pr", action=argparse.BooleanOptionalAction, default=True)
parser.add_argument("-v", "--verbose", action=argparse.BooleanOptionalAction)
args = parser.parse_args()
logging.getLogger().setLevel(logging.INFO)
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
with tempfile.TemporaryDirectory(prefix="update_app_levels_") as tmpdir:
logging.info("Cloning the repository...")
apps_repo = Repo.clone_from(f"git@github.com:{APPS_REPO}", to_path=tmpdir)
# Load the app catalog and filter out the non-working ones
catalog = toml.load((Path(apps_repo.working_tree_dir) / "apps.toml").open("r", encoding="utf-8"))
new_branch = apps_repo.create_head("update_app_levels", apps_repo.refs.master)
apps_repo.head.reference = new_branch
logging.info("Retrieving the CI results...")
ci_results = get_ci_results()
# Now compute changes, then update the catalog
changes = list_changes(catalog, ci_results)
pr_body = pretty_changes(changes)
catalog = update_catalog(catalog, ci_results)
# Save the new catalog
updated_catalog = toml.dumps(catalog)
updated_catalog = updated_catalog.replace(",]", " ]")
(Path(apps_repo.working_tree_dir) / "apps.toml").open("w", encoding="utf-8").write(updated_catalog)
if args.commit:
logging.info("Committing and pushing the new catalog...")
apps_repo.index.add("apps.toml")
apps_repo.index.commit("Update app levels according to CI results")
apps_repo.remote().push(force=True)
if args.verbose:
print(pr_body)
if args.pr:
logging.info("Opening a pull request...")
make_pull_request(pr_body)
if __name__ == "__main__":
main()