1
0
Fork 0

Merge pull request #2019 from Salamandar/rework_autoupdater

Rework autoupdater - part 2
This commit is contained in:
Alexandre Aubin 2024-02-15 01:12:44 +01:00 committed by GitHub
commit e6903bf275
3 changed files with 441 additions and 393 deletions

View file

@ -0,0 +1 @@
#!/usr/bin/env python3

808
autoupdate_app_sources/autoupdate_app_sources.py Normal file → Executable file
View file

@ -1,25 +1,28 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import glob
import hashlib import hashlib
import os import multiprocessing
import logging
from typing import Any
import re import re
import sys import sys
import time import textwrap
from pathlib import Path from pathlib import Path
from functools import cache
from datetime import datetime from datetime import datetime
import requests import requests
import toml import toml
import tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm import github
# add apps/tools to sys.path # add apps/tools to sys.path
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from rest_api import GithubAPI, GitlabAPI, GiteaForgejoAPI, RefType from rest_api import GithubAPI, GitlabAPI, GiteaForgejoAPI, RefType # noqa: E402,E501 pylint: disable=import-error,wrong-import-position
from appslib.utils import REPO_APPS_ROOT, get_catalog # pylint: disable=import-error from appslib.utils import REPO_APPS_ROOT, get_catalog # noqa: E402 pylint: disable=import-error,wrong-import-position
from app_caches import app_cache_folder # noqa: E402 pylint: disable=import-error,wrong-import-position
STRATEGIES = [ STRATEGIES = [
@ -34,15 +37,24 @@ STRATEGIES = [
"latest_gitea_commit", "latest_gitea_commit",
"latest_forgejo_release", "latest_forgejo_release",
"latest_forgejo_tag", "latest_forgejo_tag",
"latest_forgejo_commit" "latest_forgejo_commit",
] ]
dry_run = True
# For github authentication @cache
auth = None def get_github() -> tuple[tuple[str, str] | None, github.Github | None, github.InputGitAuthor | None]:
github = None try:
author = None github_login = (REPO_APPS_ROOT / ".github_login").open("r", encoding="utf-8").read().strip()
github_token = (REPO_APPS_ROOT / ".github_token").open("r", encoding="utf-8").read().strip()
github_email = (REPO_APPS_ROOT / ".github_email").open("r", encoding="utf-8").read().strip()
auth = (github_login, github_token)
github_api = github.Github(github_token)
author = github.InputGitAuthor(github_login, github_email)
return auth, github_api, author
except Exception as e:
logging.warning(f"Could not get github: {e}")
return None, None, None
def apps_to_run_auto_update_for(): def apps_to_run_auto_update_for():
@ -53,383 +65,374 @@ def apps_to_run_auto_update_for():
and "/github.com/yunohost-apps" in infos["url"].lower() and "/github.com/yunohost-apps" in infos["url"].lower()
] ]
manifest_tomls = glob.glob( relevant_apps = []
os.path.dirname(__file__) + "/../../.apps_cache/*/manifest.toml" for app in apps_flagged_as_working_and_on_yunohost_apps_org:
) manifest_toml = app_cache_folder(app) / "manifest.toml"
if manifest_toml.exists():
apps_with_manifest_toml = [path.split("/")[-2] for path in manifest_tomls] manifest = toml.load(manifest_toml.open("r", encoding="utf-8"))
sources = manifest.get("resources", {}).get("sources", {})
relevant_apps = list( if any("autoupdate" in source for source in sources.values()):
sorted( relevant_apps.append(app)
set(apps_flagged_as_working_and_on_yunohost_apps_org) return relevant_apps
& set(apps_with_manifest_toml)
)
)
out = []
for app in relevant_apps:
manifest = toml.load(
os.path.dirname(__file__) + f"/../../.apps_cache/{app}/manifest.toml"
)
sources = manifest.get("resources", {}).get("sources", {})
if any("autoupdate" in source for source in sources.values()):
out.append(app)
return out
def filter_and_get_latest_tag(tags, app_id): class LocalOrRemoteRepo:
filter_keywords = ["start", "rc", "beta", "alpha"] def __init__(self, app: str | Path) -> None:
tags = [t for t in tags if not any(keyword in t for keyword in filter_keywords)] self.local = False
self.remote = False
tag_dict = {} self.app = app
for t in tags: if isinstance(app, Path):
t_to_check = t # It's local
if t.startswith(app_id + "-"): self.local = True
t_to_check = t.split("-", 1)[-1] self.manifest_path = app / "manifest.toml"
# Boring special case for dokuwiki...
elif t.startswith("release-"):
t_to_check = t.split("-", 1)[-1].replace("-", ".")
if not re.match(r"^v?[\d\.]*\-?\d$", t_to_check): if not self.manifest_path.exists():
print(f"Ignoring tag {t_to_check}, doesn't look like a version number") raise RuntimeError(f"{app.name}: manifest.toml doesnt exists?")
else: # app is in fact a path
tag_dict[t] = tag_to_int_tuple(t_to_check) self.manifest_raw = (app / "manifest.toml").open("r", encoding="utf-8").read()
tags = sorted(list(tag_dict.keys()), key=tag_dict.get) elif isinstance(app, str):
# It's remote
return tags[-1], ".".join([str(i) for i in tag_dict[tags[-1]]]) self.remote = True
github = get_github()[1]
assert github, "Could not get github authentication!"
def tag_to_int_tuple(tag): self.repo = github.get_repo(f"Yunohost-Apps/{app}_ynh")
tag = tag.strip("v").replace("-", ".").strip(".") self.pr_branch = None
int_tuple = tag.split(".")
assert all(i.isdigit() for i in int_tuple), f"Cant convert {tag} to int tuple :/"
return tuple(int(i) for i in int_tuple)
def sha256_of_remote_file(url):
print(f"Computing sha256sum for {url} ...")
try:
r = requests.get(url, stream=True)
m = hashlib.sha256()
for data in r.iter_content(8192):
m.update(data)
return m.hexdigest()
except Exception as e:
print(f"Failed to compute sha256 for {url} : {e}")
return None
class AppAutoUpdater:
def __init__(self, app_id, app_id_is_local_app_dir=False):
if app_id_is_local_app_dir:
if not os.path.exists(app_id + "/manifest.toml"):
raise Exception("manifest.toml doesnt exists?")
# app_id is in fact a path
manifest = toml.load(open(app_id + "/manifest.toml"))
else:
# We actually want to look at the manifest on the "testing" (or default) branch
self.repo = github.get_repo(f"Yunohost-Apps/{app_id}_ynh")
# Determine base branch, either `testing` or default branch # Determine base branch, either `testing` or default branch
try: try:
self.base_branch = self.repo.get_branch("testing").name self.base_branch = self.repo.get_branch("testing").name
except: except Exception:
self.base_branch = self.repo.default_branch self.base_branch = self.repo.default_branch
contents = self.repo.get_contents("manifest.toml", ref=self.base_branch) contents = self.repo.get_contents("manifest.toml", ref=self.base_branch)
assert not isinstance(contents, list)
self.manifest_raw = contents.decoded_content.decode() self.manifest_raw = contents.decoded_content.decode()
self.manifest_raw_sha = contents.sha self.manifest_raw_sha = contents.sha
manifest = toml.loads(self.manifest_raw)
self.app_id = manifest["id"]
self.current_version = manifest["version"].split("~")[0]
self.sources = manifest.get("resources", {}).get("sources")
if not self.sources:
raise Exception("There's no resources.sources in manifest.toml ?")
self.main_upstream = manifest.get("upstream", {}).get("code")
def run(self):
todos = {}
for source, infos in self.sources.items():
if "autoupdate" not in infos:
continue
strategy = infos.get("autoupdate", {}).get("strategy")
if strategy not in STRATEGIES:
raise Exception(
f"Unknown strategy to autoupdate {source}, expected one of {STRATEGIES}, got {strategy}"
)
asset = infos.get("autoupdate", {}).get("asset", "tarball")
print(f"\n Checking {source} ...")
if strategy.endswith("_release"):
(
new_version,
new_asset_urls,
changelog_url,
) = self.get_latest_version_and_asset(strategy, asset, infos, source)
else:
(new_version, new_asset_urls) = self.get_latest_version_and_asset(
strategy, asset, infos, source
)
if source == "main":
print(f"Current version in manifest: {self.current_version}")
print(f"Newest version on upstream: {new_version}")
# Maybe new version is older than current version
# Which can happen for example if we manually release a RC,
# which is ignored by this script
# Though we wrap this in a try/except pass, because don't want to miserably crash
# if the tag can't properly be converted to int tuple ...
try:
if tag_to_int_tuple(self.current_version) > tag_to_int_tuple(
new_version
):
print(
"Up to date (current version appears more recent than newest version found)"
)
continue
except:
pass
if self.current_version == new_version:
print("Up to date")
continue
if (
isinstance(new_asset_urls, dict) and isinstance(infos.get("url"), str)
) or (
isinstance(new_asset_urls, str)
and not isinstance(infos.get("url"), str)
):
raise Exception(
f"It looks like there's an inconsistency between the old asset list and the new ones ... one is arch-specific, the other is not ... Did you forget to define arch-specific regexes ? ... New asset url is/are : {new_asset_urls}"
)
if isinstance(new_asset_urls, str) and infos["url"] == new_asset_urls:
print(f"URL for asset {source} is up to date")
continue
elif isinstance(new_asset_urls, dict) and new_asset_urls == {
k: infos[k]["url"] for k in new_asset_urls.keys()
}:
print(f"URLs for asset {source} are up to date")
continue
else:
print(f"Update needed for {source}")
todos[source] = {
"new_asset_urls": new_asset_urls,
"old_assets": infos,
}
if source == "main":
todos[source]["new_version"] = new_version
if dry_run or not todos:
return bool(todos)
if "main" in todos:
if strategy.endswith("_release"):
title = f"Upgrade to v{new_version}"
message = f"Upgrade to v{new_version}\nChangelog: {changelog_url}"
else:
title = message = f"Upgrade to v{new_version}"
new_version = todos["main"]["new_version"]
new_branch = f"ci-auto-update-{new_version}"
else: else:
title = message = "Upgrade sources" raise TypeError(f"Invalid argument type for app: {type(app)}")
new_branch = "ci-auto-update-sources"
try: def edit_manifest(self, content: str):
# Get the commit base for the new branch, and create it self.manifest_raw = content
commit_sha = self.repo.get_branch(self.base_branch).commit.sha if self.local:
self.repo.create_git_ref(ref=f"refs/heads/{new_branch}", sha=commit_sha) self.manifest_path.open("w", encoding="utf-8").write(content)
except:
print("... Branch already exists, skipping")
return False
manifest_new = self.manifest_raw def commit(self, message: str):
for source, infos in todos.items(): if self.remote:
manifest_new = self.replace_version_and_asset_in_manifest( author = get_github()[2]
manifest_new, assert author, "Could not get Github author!"
infos.get("new_version"), assert self.pr_branch is not None, "Did you forget to create a branch?"
infos["new_asset_urls"], self.repo.update_file(
infos["old_assets"], "manifest.toml",
is_main=source == "main", message=message,
content=self.manifest_raw,
sha=self.manifest_raw_sha,
branch=self.pr_branch,
author=author,
) )
self.repo.update_file( def new_branch(self, name: str) -> bool:
"manifest.toml", if self.local:
message=message, logging.warning("Can't create branches for local repositories")
content=manifest_new, return False
sha=self.manifest_raw_sha, if self.remote:
branch=new_branch, self.pr_branch = name
author=author, commit_sha = self.repo.get_branch(self.base_branch).commit.sha
) if self.pr_branch in [branch.name for branch in self.repo.get_branches()]:
return False
self.repo.create_git_ref(ref=f"refs/heads/{name}", sha=commit_sha)
return True
return False
# Wait a bit to preserve the API rate limit def create_pr(self, branch: str, title: str, message: str) -> str | None:
time.sleep(1.5) if self.local:
logging.warning("Can't create pull requests for local repositories")
return
if self.remote:
# Open the PR
pr = self.repo.create_pull(
title=title, body=message, head=branch, base=self.base_branch
)
return pr.url
# Open the PR def get_pr(self, branch: str) -> str:
pr = self.repo.create_pull( return next(pull.html_url for pull in self.repo.get_pulls(head=branch))
title=title, body=message, head=new_branch, base=self.base_branch
)
print("Created PR " + self.repo.full_name + " updated with PR #" + str(pr.id))
return bool(todos) class AppAutoUpdater:
def __init__(self, app_id: str | Path) -> None:
self.repo = LocalOrRemoteRepo(app_id)
self.manifest = toml.loads(self.repo.manifest_raw)
def get_latest_version_and_asset(self, strategy, asset, infos, source): self.app_id = self.manifest["id"]
upstream = ( self.current_version = self.manifest["version"].split("~")[0]
infos.get("autoupdate", {}).get("upstream", self.main_upstream).strip("/") self.sources = self.manifest.get("resources", {}).get("sources")
) self.main_upstream = self.manifest.get("upstream", {}).get("code")
if "github" in strategy: if not self.sources:
raise RuntimeError("There's no resources.sources in manifest.toml ?")
self.main_upstream = self.manifest.get("upstream", {}).get("code")
def run(self, edit: bool = False, commit: bool = False, pr: bool = False) -> bool | tuple[str | None, str | None, str | None]:
has_updates = False
main_version = None
pr_url = None
# Default message
pr_title = commit_msg = "Upgrade sources"
branch_name = "ci-auto-update-sources"
for source, infos in self.sources.items():
update = self.get_source_update(source, infos)
if update is None:
continue
has_updates = True
version, assets, msg = update
if source == "main":
main_version = version
branch_name = f"ci-auto-update-{version}"
pr_title = commit_msg = f"Upgrade to v{version}"
if msg:
commit_msg += f"\n{msg}"
self.repo.manifest_raw = self.replace_version_and_asset_in_manifest(
self.repo.manifest_raw, version, assets, infos, is_main=source == "main",
)
if not has_updates:
return False
if edit:
self.repo.edit_manifest(self.repo.manifest_raw)
try:
if pr:
self.repo.new_branch(branch_name)
if commit:
self.repo.commit(commit_msg)
if pr:
pr_url = self.repo.create_pr(branch_name, pr_title, commit_msg)
except github.GithubException as e:
if e.status == 422 or e.status == 409:
pr_url = f"already existing pr: {self.repo.get_pr(branch_name)}"
else:
raise
return self.current_version, main_version, pr_url
@staticmethod
def filter_and_get_latest_tag(tags: list[str], app_id: str) -> tuple[str, str]:
def version_numbers(tag: str) -> tuple[int, ...] | None:
filter_keywords = ["start", "rc", "beta", "alpha"]
if any(keyword in tag for keyword in filter_keywords):
logging.debug(f"Tag {tag} contains filtered keyword from {filter_keywords}.")
return None
t_to_check = tag
if tag.startswith(app_id + "-"):
t_to_check = tag.split("-", 1)[-1]
# Boring special case for dokuwiki...
elif tag.startswith("release-"):
t_to_check = tag.split("-", 1)[-1].replace("-", ".")
if re.match(r"^v?[\d\.]*\-?\d$", t_to_check):
return AppAutoUpdater.tag_to_int_tuple(t_to_check)
print(f"Ignoring tag {t_to_check}, doesn't look like a version number")
return None
# sorted will sort by keys
tags_dict = {version_numbers(tag): tag for tag in tags}
tags_dict.pop(None, None)
# reverse=True will set the last release as first element
tags_dict = dict(sorted(tags_dict.items(), reverse=True))
if not tags_dict:
raise RuntimeError("No tags were found after sanity filtering!")
the_tag_list, the_tag = next(iter(tags_dict.items()))
assert the_tag_list is not None
return the_tag, ".".join(str(i) for i in the_tag_list)
@staticmethod
def tag_to_int_tuple(tag: str) -> tuple[int, ...]:
tag = tag.strip("v").replace("-", ".").strip(".")
int_tuple = tag.split(".")
assert all(i.isdigit() for i in int_tuple), f"Cant convert {tag} to int tuple :/"
return tuple(int(i) for i in int_tuple)
@staticmethod
def sha256_of_remote_file(url: str) -> str:
print(f"Computing sha256sum for {url} ...")
try:
r = requests.get(url, stream=True)
m = hashlib.sha256()
for data in r.iter_content(8192):
m.update(data)
return m.hexdigest()
except Exception as e:
raise RuntimeError(f"Failed to compute sha256 for {url} : {e}") from e
def get_source_update(self, name: str, infos: dict[str, Any]) -> tuple[str, str | dict[str, str], str] | None:
if "autoupdate" not in infos:
return None
print(f"\n Checking {name} ...")
asset = infos.get("autoupdate", {}).get("asset", "tarball")
strategy = infos.get("autoupdate", {}).get("strategy")
if strategy not in STRATEGIES:
raise ValueError(f"Unknown update strategy '{strategy}' for '{name}', expected one of {STRATEGIES}")
result = self.get_latest_version_and_asset(strategy, asset, infos)
if result is None:
return None
new_version, assets, more_info = result
if name == "main":
print(f"Current version in manifest: {self.current_version}")
print(f"Newest version on upstream: {new_version}")
# Maybe new version is older than current version
# Which can happen for example if we manually release a RC,
# which is ignored by this script
# Though we wrap this in a try/except pass, because don't want to miserably crash
# if the tag can't properly be converted to int tuple ...
if self.current_version == new_version:
print("Up to date")
return None
try:
if self.tag_to_int_tuple(self.current_version) > self.tag_to_int_tuple(new_version):
print("Up to date (current version appears more recent than newest version found)")
return None
except (AssertionError, ValueError):
pass
if isinstance(assets, dict) and isinstance(infos.get("url"), str) or \
isinstance(assets, str) and not isinstance(infos.get("url"), str):
raise RuntimeError(
"It looks like there's an inconsistency between the old asset list and the new ones... "
"One is arch-specific, the other is not... Did you forget to define arch-specific regexes? "
f"New asset url is/are : {assets}"
)
if isinstance(assets, str) and infos["url"] == assets:
print(f"URL for asset {name} is up to date")
return
if isinstance(assets, dict) and assets == {k: infos[k]["url"] for k in assets.keys()}:
print(f"URLs for asset {name} are up to date")
return
print(f"Update needed for {name}")
return new_version, assets, more_info
@staticmethod
def find_matching_asset(assets: dict[str, str], regex: str) -> tuple[str, str]:
matching_assets = {
name: url for name, url in assets.items() if re.match(regex, name)
}
if not matching_assets:
raise RuntimeError(f"No assets matching regex '{regex}' in {list(assets.keys())}")
if len(matching_assets) > 1:
raise RuntimeError(f"Too many assets matching regex '{regex}': {matching_assets}")
return next(iter(matching_assets.items()))
def get_latest_version_and_asset(self, strategy: str, asset: str | dict, infos
) -> tuple[str, str | dict[str, str], str] | None:
upstream = (infos.get("autoupdate", {}).get("upstream", self.main_upstream).strip("/"))
_, remote_type, revision_type = strategy.split("_")
if remote_type == "github":
assert ( assert (
upstream and upstream.startswith("https://github.com/") upstream and upstream.startswith("https://github.com/")
), f"When using strategy {strategy}, having a defined upstream code repo on github.com is required" ), f"When using strategy {strategy}, having a defined upstream code repo on github.com is required"
api = GithubAPI(upstream, auth=auth) api = GithubAPI(upstream, auth=get_github()[0])
elif "gitlab" in strategy: if remote_type == "gitlab":
api = GitlabAPI(upstream) api = GitlabAPI(upstream)
elif "gitea" in strategy or "forgejo" in strategy: if remote_type in ["gitea", "forgejo"]:
api = GiteaForgejoAPI(upstream) api = GiteaForgejoAPI(upstream)
if strategy.endswith("_release"): if revision_type == "release":
releases = api.releases() releases: dict[str, dict[str, Any]] = {
tags = [ release["tag_name"]: release
release["tag_name"] for release in api.releases()
for release in releases
if not release["draft"] and not release["prerelease"] if not release["draft"] and not release["prerelease"]
] }
latest_version_orig, latest_version = filter_and_get_latest_tag( latest_version_orig, latest_version = self.filter_and_get_latest_tag(list(releases.keys()), self.app_id)
tags, self.app_id latest_release = releases[latest_version_orig]
)
latest_release = [
release
for release in releases
if release["tag_name"] == latest_version_orig
][0]
latest_assets = { latest_assets = {
a["name"]: a["browser_download_url"] a["name"]: a["browser_download_url"]
for a in latest_release["assets"] for a in latest_release["assets"]
if not a["name"].endswith(".md5") if not a["name"].endswith(".md5")
} }
if ("gitea" in strategy or "forgejo" in strategy) and latest_assets == "": if remote_type in ["gitea", "forgejo"] and latest_assets == "":
# if empty (so only the base asset), take the tarball_url # if empty (so only the base asset), take the tarball_url
latest_assets = latest_release["tarball_url"] latest_assets = latest_release["tarball_url"]
# get the release changelog link # get the release changelog link
latest_release_html_url = latest_release["html_url"] latest_release_html_url = latest_release["html_url"]
if asset == "tarball": if asset == "tarball":
latest_tarball = ( latest_tarball = api.url_for_ref(latest_version_orig, RefType.tags)
api.url_for_ref(latest_version_orig, RefType.tags)
)
return latest_version, latest_tarball, latest_release_html_url return latest_version, latest_tarball, latest_release_html_url
# FIXME # FIXME
else: if isinstance(asset, str):
if isinstance(asset, str): try:
matching_assets_urls = [ _, url = self.find_matching_asset(latest_assets, asset)
url return latest_version, url, latest_release_html_url
for name, url in latest_assets.items() except RuntimeError as e:
if re.match(asset, name) raise RuntimeError(f"{e}.\nFull release details on {latest_release_html_url}.") from e
]
if not matching_assets_urls:
raise Exception(
f"No assets matching regex '{asset}' for release {latest_version} among {list(latest_assets.keys())}. Full release details on {latest_release_html_url}"
)
elif len(matching_assets_urls) > 1:
raise Exception(
f"Too many assets matching regex '{asset}' for release {latest_version} : {matching_assets_urls}. Full release details on {latest_release_html_url}"
)
return (
latest_version,
matching_assets_urls[0],
latest_release_html_url,
)
elif isinstance(asset, dict):
matching_assets_dicts = {}
for asset_name, asset_regex in asset.items():
matching_assets_urls = [
url
for name, url in latest_assets.items()
if re.match(asset_regex, name)
]
if not matching_assets_urls:
raise Exception(
f"No assets matching regex '{asset_regex}' for release {latest_version} among {list(latest_assets.keys())}. Full release details on {latest_release_html_url}"
)
elif len(matching_assets_urls) > 1:
raise Exception(
f"Too many assets matching regex '{asset}' for release {latest_version} : {matching_assets_urls}. Full release details on {latest_release_html_url}"
)
matching_assets_dicts[asset_name] = matching_assets_urls[0]
return (
latest_version.strip("v"),
matching_assets_dicts,
latest_release_html_url,
)
elif strategy.endswith("_tag"): if isinstance(asset, dict):
new_assets = {}
for asset_name, asset_regex in asset.items():
try:
_, url = self.find_matching_asset(latest_assets, asset_regex)
new_assets[asset_name] = url
except RuntimeError as e:
raise RuntimeError(f"{e}.\nFull release details on {latest_release_html_url}.") from e
return latest_version, new_assets, latest_release_html_url
return None
if revision_type == "tag":
if asset != "tarball": if asset != "tarball":
raise Exception( raise ValueError("For the latest tag strategies, only asset = 'tarball' is supported")
"For the latest tag strategy, only asset = 'tarball' is supported" tags = [t["name"] for t in api.tags()]
) latest_version_orig, latest_version = self.filter_and_get_latest_tag(tags, self.app_id)
tags = api.tags()
latest_version_orig, latest_version = filter_and_get_latest_tag(
[t["name"] for t in tags], self.app_id
)
latest_tarball = api.url_for_ref(latest_version_orig, RefType.tags) latest_tarball = api.url_for_ref(latest_version_orig, RefType.tags)
return latest_version, latest_tarball return latest_version, latest_tarball, ""
elif strategy.endswith("_commit"): if revision_type == "commit":
if asset != "tarball": if asset != "tarball":
raise Exception( raise ValueError("For the latest commit strategies, only asset = 'tarball' is supported")
"For the latest release strategy, only asset = 'tarball' is supported"
)
commits = api.commits() commits = api.commits()
latest_commit = commits[0] latest_commit = commits[0]
latest_tarball = api.url_for_ref(latest_commit["sha"], RefType.commits) latest_tarball = api.url_for_ref(latest_commit["sha"], RefType.commits)
# Let's have the version as something like "2023.01.23" # Let's have the version as something like "2023.01.23"
latest_commit_date = datetime.strptime( latest_commit_date = datetime.strptime(latest_commit["commit"]["author"]["date"][:10], "%Y-%m-%d")
latest_commit["commit"]["author"]["date"][:10], "%Y-%m-%d" version_format = infos.get("autoupdate", {}).get("force_version", "%Y.%m.%d")
)
version_format = infos.get("autoupdate", {}).get(
"force_version", "%Y.%m.%d"
)
latest_version = latest_commit_date.strftime(version_format) latest_version = latest_commit_date.strftime(version_format)
return latest_version, latest_tarball, ""
return latest_version, latest_tarball def replace_version_and_asset_in_manifest(self, content: str, new_version: str, new_assets_urls: str | dict,
current_assets: dict, is_main: bool):
def replace_version_and_asset_in_manifest( replacements = []
self, content, new_version, new_assets_urls, current_assets, is_main
):
if isinstance(new_assets_urls, str): if isinstance(new_assets_urls, str):
sha256 = sha256_of_remote_file(new_assets_urls) replacements = [
elif isinstance(new_assets_urls, dict): (current_assets["url"], new_assets_urls),
sha256 = { (current_assets["sha256"], self.sha256_of_remote_file(new_assets_urls)),
url: sha256_of_remote_file(url) for url in new_assets_urls.values() ]
} if isinstance(new_assets_urls, dict):
replacements = [
repl
for key, url in new_assets_urls.items() for repl in (
(current_assets[key]["url"], url),
(current_assets[key]["sha256"], self.sha256_of_remote_file(url))
)
]
if is_main: if is_main:
def repl(m: re.Match) -> str:
def repl(m):
return m.group(1) + new_version + '~ynh1"' return m.group(1) + new_version + '~ynh1"'
content = re.sub(r"(\s*version\s*=\s*[\"\'])([\d\.]+)(\~ynh\d+[\"\'])", repl, content)
content = re.sub( for old, new in replacements:
r"(\s*version\s*=\s*[\"\'])([\d\.]+)(\~ynh\d+[\"\'])", repl, content content = content.replace(old, new)
)
if isinstance(new_assets_urls, str):
content = content.replace(current_assets["url"], new_assets_urls)
content = content.replace(current_assets["sha256"], sha256)
elif isinstance(new_assets_urls, dict):
for key, url in new_assets_urls.items():
content = content.replace(current_assets[key]["url"], url)
content = content.replace(current_assets[key]["sha256"], sha256[url])
return content return content
@ -438,75 +441,116 @@ def paste_on_haste(data):
# NB: we hardcode this here and can't use the yunopaste command # NB: we hardcode this here and can't use the yunopaste command
# because this script runs on the same machine than haste is hosted on... # because this script runs on the same machine than haste is hosted on...
# and doesn't have the proper front-end LE cert in this context # and doesn't have the proper front-end LE cert in this context
SERVER_URL = "http://paste.yunohost.org" SERVER_HOST = "http://paste.yunohost.org"
TIMEOUT = 3 TIMEOUT = 3
try: try:
url = SERVER_URL + "/documents" url = f"{SERVER_HOST}/documents"
response = requests.post(url, data=data.encode("utf-8"), timeout=TIMEOUT) response = requests.post(url, data=data.encode("utf-8"), timeout=TIMEOUT)
response.raise_for_status() response.raise_for_status()
dockey = response.json()["key"] dockey = response.json()["key"]
return SERVER_URL + "/raw/" + dockey return f"{SERVER_HOST}/raw/{dockey}"
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print("\033[31mError: {}\033[0m".format(e)) logging.error("\033[31mError: {}\033[0m".format(e))
sys.exit(1) raise
class StdoutSwitch:
class DummyFile:
def __init__(self):
self.result = ""
def write(self, x):
self.result += x
def __init__(self) -> None:
self.save_stdout = sys.stdout
sys.stdout = self.DummyFile()
def reset(self) -> str:
result = ""
if isinstance(sys.stdout, self.DummyFile):
result = sys.stdout.result
sys.stdout = self.save_stdout
return result
def __exit__(self) -> None:
sys.stdout = self.save_stdout
def run_autoupdate_for_multiprocessing(data) -> tuple[bool, str, Any] | None:
app, edit, commit, pr = data
stdoutswitch = StdoutSwitch()
try:
result = AppAutoUpdater(app).run(edit=edit, commit=commit, pr=pr)
if result is not False:
return True, app, result
except Exception:
result = stdoutswitch.reset()
import traceback
t = traceback.format_exc()
return False, app, f"{result}\n{t}"
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("app_dir", nargs="?", type=Path) parser.add_argument("apps", nargs="*", type=Path,
parser.add_argument("--commit-and-create-PR", action="store_true") help="If not passed, the script will run on the catalog. Github keys required.")
parser.add_argument("--edit", action=argparse.BooleanOptionalAction, help="Edit the local files", default=True)
parser.add_argument("--commit", action=argparse.BooleanOptionalAction, help="Create a commit with the changes")
parser.add_argument("--pr", action=argparse.BooleanOptionalAction, help="Create a pull request with the changes")
parser.add_argument("--paste", action="store_true")
parser.add_argument("-j", "--processes", type=int, default=multiprocessing.cpu_count())
args = parser.parse_args() args = parser.parse_args()
global dry_run, auth, github, author if args.commit and not args.edit:
dry_run = args.commit_and_create_PR parser.error("--commit requires --edit")
if args.pr and not args.commit:
parser.error("--pr requires --commit")
if args.app_dir: # Handle apps or no apps
AppAutoUpdater(str(args.app_dir), app_id_is_local_app_dir=True).run() apps = list(args.apps) if args.apps else apps_to_run_auto_update_for()
else: apps_failed = {}
GITHUB_LOGIN = (REPO_APPS_ROOT / ".github_login").open("r", encoding="utf-8").read().strip() apps_updated = {}
GITHUB_TOKEN = (REPO_APPS_ROOT / ".github_token").open("r", encoding="utf-8").read().strip()
GITHUB_EMAIL = (REPO_APPS_ROOT / ".github_email").open("r", encoding="utf-8").read().strip()
from github import Github, InputGitAuthor with multiprocessing.Pool(processes=args.processes) as pool:
tasks = pool.imap(run_autoupdate_for_multiprocessing,
((app, args.edit, args.commit, args.pr) for app in apps))
for result in tqdm.tqdm(tasks, total=len(apps), ascii=" ·#"):
if result is None:
continue
is_ok, app, info = result
if is_ok:
apps_updated[app] = info
else:
apps_failed[app] = info
pass
auth = (GITHUB_LOGIN, GITHUB_TOKEN) result_message = ""
github = Github(GITHUB_TOKEN) if apps_updated:
author = InputGitAuthor(GITHUB_LOGIN, GITHUB_EMAIL) result_message += f"\n{'=' * 80}\nApps updated:"
for app, info in apps_updated.items():
result_message += f"\n- {app}"
if isinstance(info, tuple):
print(info)
result_message += f" ({info[0]} -> {info[1]})"
if info[2] is not None:
result_message += f" see {info[2]}"
apps_failed = [] if apps_failed:
apps_failed_details = {} result_message += f"\n{'=' * 80}\nApps failed:"
apps_updated = [] for app, info in apps_failed.items():
result_message += f"\n{'='*40}\n{app}\n{'-'*40}\n{info}\n\n"
with logging_redirect_tqdm(): if apps_failed and args.paste:
for app in tqdm.tqdm(apps_to_run_auto_update_for(), ascii=" ·#"): paste_url = paste_on_haste(result_message)
try: logging.error(textwrap.dedent(f"""
updated = AppAutoUpdater(app).run() Failed to run the source auto-update for: {', '.join(apps_failed.keys())}
except Exception as e: Please run manually the `autoupdate_app_sources.py` script on these apps to debug what is happening!
apps_failed.append(app) See the debug log here: {paste_url}"
import traceback """))
t = traceback.format_exc() print(result_message)
apps_failed_details[app] = t
print(t)
else:
if updated:
apps_updated.append(app)
if apps_failed:
print(f"Apps failed: {', '.join(apps_failed)}")
if os.path.exists("/usr/bin/sendxmpppy"):
paste = "\n=========\n".join(
[
app + "\n-------\n" + trace + "\n\n"
for app, trace in apps_failed_details.items()
]
)
paste_url = paste_on_haste(paste)
os.system(
f"/usr/bin/sendxmpppy 'Failed to run the source auto-update for : {', '.join(apps_failed)}. Please run manually the `autoupdate_app_sources.py` script on these apps to debug what is happening! Debug log : {paste_url}'"
)
if apps_updated:
print(f"Apps updated: {', '.join(apps_updated)}")
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -2,7 +2,7 @@
import re import re
from enum import Enum from enum import Enum
from typing import List from typing import Any
import requests import requests
@ -13,7 +13,7 @@ class RefType(Enum):
class GithubAPI: class GithubAPI:
def __init__(self, upstream: str, auth: tuple[str, str] = None): def __init__(self, upstream: str, auth: tuple[str, str] | None = None):
self.upstream = upstream self.upstream = upstream
self.upstream_repo = upstream.replace("https://github.com/", "")\ self.upstream_repo = upstream.replace("https://github.com/", "")\
.strip("/") .strip("/")
@ -22,13 +22,13 @@ class GithubAPI:
), f"'{upstream}' doesn't seem to be a github repository ?" ), f"'{upstream}' doesn't seem to be a github repository ?"
self.auth = auth self.auth = auth
def internal_api(self, uri: str): def internal_api(self, uri: str) -> Any:
url = f"https://api.github.com/{uri}" url = f"https://api.github.com/{uri}"
r = requests.get(url, auth=self.auth) r = requests.get(url, auth=self.auth)
assert r.status_code == 200, r assert r.status_code == 200, r
return r.json() return r.json()
def tags(self) -> List[str]: def tags(self) -> list[dict[str, str]]:
"""Get a list of tags for project.""" """Get a list of tags for project."""
return self.internal_api(f"repos/{self.upstream_repo}/tags") return self.internal_api(f"repos/{self.upstream_repo}/tags")
@ -53,25 +53,28 @@ class GithubAPI:
class GitlabAPI: class GitlabAPI:
def __init__(self, upstream: str): def __init__(self, upstream: str):
split = re.search("(?P<host>https?://.+)/(?P<group>[^/]+)/(?P<project>[^/]+)/?$", upstream) split = re.search("(?P<host>https?://.+)/(?P<group>[^/]+)/(?P<project>[^/]+)/?$", upstream)
assert split is not None
self.upstream = split.group("host") self.upstream = split.group("host")
self.upstream_repo = f"{split.group('group')}/{split.group('project')}" self.upstream_repo = f"{split.group('group')}/{split.group('project')}"
self.project_id = self.find_project_id(self.upstream_repo) self.project_id = self.find_project_id(self.upstream_repo)
def find_project_id(self, project: str) -> int: def find_project_id(self, project: str) -> int:
project = self.internal_api(f"projects/{project.replace('/', '%2F')}") project = self.internal_api(f"projects/{project.replace('/', '%2F')}")
return project["id"] assert isinstance(project, dict)
project_id = project.get("id", None)
return project_id
def internal_api(self, uri: str): def internal_api(self, uri: str) -> Any:
url = f"{self.upstream}/api/v4/{uri}" url = f"{self.upstream}/api/v4/{uri}"
r = requests.get(url) r = requests.get(url)
assert r.status_code == 200, r assert r.status_code == 200, r
return r.json() return r.json()
def tags(self) -> List[str]: def tags(self) -> list[dict[str, str]]:
"""Get a list of tags for project.""" """Get a list of tags for project."""
return self.internal_api(f"projects/{self.project_id}/repository/tags") return self.internal_api(f"projects/{self.project_id}/repository/tags")
def commits(self) -> List[str]: def commits(self) -> list[dict[str, Any]]:
"""Get a list of commits for project.""" """Get a list of commits for project."""
return [ return [
{ {
@ -79,13 +82,13 @@ class GitlabAPI:
"commit": { "commit": {
"author": { "author": {
"date": commit["committed_date"] "date": commit["committed_date"]
}
} }
}
} }
for commit in self.internal_api(f"projects/{self.project_id}/repository/commits") for commit in self.internal_api(f"projects/{self.project_id}/repository/commits")
] ]
def releases(self) -> List[str]: def releases(self) -> list[dict[str, Any]]:
"""Get a list of releases for project.""" """Get a list of releases for project."""
releases = self.internal_api(f"projects/{self.project_id}/releases") releases = self.internal_api(f"projects/{self.project_id}/releases")
retval = [] retval = []