1
0
Fork 0

New source autoupdate: black

This commit is contained in:
Alexandre Aubin 2023-03-28 00:42:18 +02:00
parent 6244d6fdde
commit 04f318cdf4

View file

@ -23,20 +23,31 @@ def apps_to_run_auto_update_for():
catalog = toml.load(open(os.path.dirname(__file__) + "/../../apps.toml")) catalog = toml.load(open(os.path.dirname(__file__) + "/../../apps.toml"))
apps_flagged_as_working_and_on_yunohost_apps_org = [app apps_flagged_as_working_and_on_yunohost_apps_org = [
app
for app, infos in catalog.items() for app, infos in catalog.items()
if infos["state"] == "working" if infos["state"] == "working"
and "/github.com/yunohost-apps" in infos["url"].lower()] and "/github.com/yunohost-apps" in infos["url"].lower()
]
manifest_tomls = glob.glob(os.path.dirname(__file__) + "/../../.apps_cache/*/manifest.toml") manifest_tomls = glob.glob(
os.path.dirname(__file__) + "/../../.apps_cache/*/manifest.toml"
)
apps_with_manifest_toml = [path.split("/")[-2] for path in manifest_tomls] apps_with_manifest_toml = [path.split("/")[-2] for path in manifest_tomls]
relevant_apps = list(sorted(set(apps_flagged_as_working_and_on_yunohost_apps_org) & set(apps_with_manifest_toml))) relevant_apps = list(
sorted(
set(apps_flagged_as_working_and_on_yunohost_apps_org)
& set(apps_with_manifest_toml)
)
)
out = [] out = []
for app in relevant_apps: for app in relevant_apps:
manifest = toml.load(os.path.dirname(__file__) + f"/../../.apps_cache/{app}/manifest.toml") manifest = toml.load(
os.path.dirname(__file__) + f"/../../.apps_cache/{app}/manifest.toml"
)
sources = manifest.get("resources", {}).get("sources", {}) sources = manifest.get("resources", {}).get("sources", {})
if any("autoupdate" in source for source in sources.values()): if any("autoupdate" in source for source in sources.values()):
out.append(app) out.append(app)
@ -78,11 +89,10 @@ def sha256_of_remote_file(url):
return None return None
class AppAutoUpdater(): class AppAutoUpdater:
def __init__(self, app_id): def __init__(self, app_id):
#if not os.path.exists(app_path + "/manifest.toml"): # if not os.path.exists(app_path + "/manifest.toml"):
# raise Exception("manifest.toml doesnt exists?") # raise Exception("manifest.toml doesnt exists?")
# We actually want to look at the manifest on the "testing" (or default) branch # We actually want to look at the manifest on the "testing" (or default) branch
@ -117,34 +127,49 @@ class AppAutoUpdater():
strategy = infos.get("autoupdate", {}).get("strategy") strategy = infos.get("autoupdate", {}).get("strategy")
if strategy not in STRATEGIES: if strategy not in STRATEGIES:
raise Exception(f"Unknown strategy to autoupdate {source}, expected one of {STRATEGIES}, got {strategy}") raise Exception(
f"Unknown strategy to autoupdate {source}, expected one of {STRATEGIES}, got {strategy}"
)
asset = infos.get("autoupdate", {}).get("asset", "tarball") asset = infos.get("autoupdate", {}).get("asset", "tarball")
print(f"Checking {source} ...") print(f"Checking {source} ...")
new_version, new_asset_urls = self.get_latest_version_and_asset(strategy, asset, infos) new_version, new_asset_urls = self.get_latest_version_and_asset(
strategy, asset, infos
)
print(f"Current version in manifest: {self.current_version}") print(f"Current version in manifest: {self.current_version}")
print(f"Newest version on upstream: {new_version}") print(f"Newest version on upstream: {new_version}")
if source == "main": if source == "main":
if self.current_version == new_version: if self.current_version == new_version:
print(f"Version is still {new_version}, no update required for {source}") print(
f"Version is still {new_version}, no update required for {source}"
)
continue continue
else: else:
print(f"Update needed for {source}") print(f"Update needed for {source}")
todos[source] = {"new_asset_urls": new_asset_urls, "old_assets": infos, "new_version": new_version} todos[source] = {
"new_asset_urls": new_asset_urls,
"old_assets": infos,
"new_version": new_version,
}
else: else:
if isinstance(new_asset_urls, str) and infos["url"] == new_asset_urls: if isinstance(new_asset_urls, str) and infos["url"] == new_asset_urls:
print(f"URL is still up to date for asset {source}") print(f"URL is still up to date for asset {source}")
continue continue
elif isinstance(new_asset_urls, dict) and new_asset_urls == {k: infos[k]["url"] for k in new_asset_urls.keys()}: elif isinstance(new_asset_urls, dict) and new_asset_urls == {
k: infos[k]["url"] for k in new_asset_urls.keys()
}:
print(f"URLs are still up to date for asset {source}") print(f"URLs are still up to date for asset {source}")
continue continue
else: else:
print(f"Update needed for {source}") print(f"Update needed for {source}")
todos[source] = {"new_asset_urls": new_asset_urls, "old_assets": infos} todos[source] = {
"new_asset_urls": new_asset_urls,
"old_assets": infos,
}
if not todos: if not todos:
return return
@ -166,85 +191,144 @@ class AppAutoUpdater():
manifest_new = self.manifest_raw manifest_new = self.manifest_raw
for source, infos in todos.items(): for source, infos in todos.items():
manifest_new = self.replace_version_and_asset_in_manifest(manifest_new, infos.get("new_version"), infos["new_asset_urls"], infos["old_assets"], is_main=source == "main") manifest_new = self.replace_version_and_asset_in_manifest(
manifest_new,
infos.get("new_version"),
infos["new_asset_urls"],
infos["old_assets"],
is_main=source == "main",
)
self.repo.update_file("manifest.toml", self.repo.update_file(
"manifest.toml",
message=message, message=message,
content=manifest_new, content=manifest_new,
sha=self.manifest_raw_sha, sha=self.manifest_raw_sha,
branch=new_branch, branch=new_branch,
author=author) author=author,
)
# Wait a bit to preserve the API rate limit # Wait a bit to preserve the API rate limit
time.sleep(1.5) time.sleep(1.5)
# Open the PR # Open the PR
pr = self.repo.create_pull(title=message, body=message, head=new_branch, base=self.base_branch) pr = self.repo.create_pull(
title=message, body=message, head=new_branch, base=self.base_branch
)
print("Created PR " + self.repo.full_name + " updated with PR #" + str(pr.id)) print("Created PR " + self.repo.full_name + " updated with PR #" + str(pr.id))
def get_latest_version_and_asset(self, strategy, asset, infos): def get_latest_version_and_asset(self, strategy, asset, infos):
if "github" in strategy: if "github" in strategy:
assert self.upstream and self.upstream.startswith("https://github.com/"), "When using strategy {strategy}, having a defined upstream code repo on github.com is required" assert self.upstream and self.upstream.startswith(
self.upstream_repo = self.upstream.replace("https://github.com/", "").strip("/") "https://github.com/"
assert len(self.upstream_repo.split("/")) == 2, "'{self.upstream}' doesn't seem to be a github repository ?" ), "When using strategy {strategy}, having a defined upstream code repo on github.com is required"
self.upstream_repo = self.upstream.replace("https://github.com/", "").strip(
"/"
)
assert (
len(self.upstream_repo.split("/")) == 2
), "'{self.upstream}' doesn't seem to be a github repository ?"
if strategy == "latest_github_release": if strategy == "latest_github_release":
releases = self.github(f"repos/{self.upstream_repo}/releases") releases = self.github(f"repos/{self.upstream_repo}/releases")
tags = [release["tag_name"] for release in releases if not release["draft"] and not release["prerelease"]] tags = [
release["tag_name"]
for release in releases
if not release["draft"] and not release["prerelease"]
]
latest_version = filter_and_get_latest_tag(tags) latest_version = filter_and_get_latest_tag(tags)
if asset == "tarball": if asset == "tarball":
latest_tarball = f"{self.upstream}/archive/refs/tags/{latest_version}.tar.gz" latest_tarball = (
f"{self.upstream}/archive/refs/tags/{latest_version}.tar.gz"
)
return latest_version.strip("v"), latest_tarball return latest_version.strip("v"), latest_tarball
# FIXME # FIXME
else: else:
latest_release = [release for release in releases if release["tag_name"] == latest_version][0] latest_release = [
latest_assets = {a["name"]: a["browser_download_url"] for a in latest_release["assets"] if not a["name"].endswith(".md5")} release
for release in releases
if release["tag_name"] == latest_version
][0]
latest_assets = {
a["name"]: a["browser_download_url"]
for a in latest_release["assets"]
if not a["name"].endswith(".md5")
}
if isinstance(asset, str): if isinstance(asset, str):
matching_assets_urls = [url for name, url in latest_assets.items() if re.match(asset, name)] matching_assets_urls = [
url
for name, url in latest_assets.items()
if re.match(asset, name)
]
if not matching_assets_urls: if not matching_assets_urls:
raise Exception(f"No assets matching regex '{asset}' for release {latest_version} among {list(latest_assets.keys())}") raise Exception(
f"No assets matching regex '{asset}' for release {latest_version} among {list(latest_assets.keys())}"
)
elif len(matching_assets_urls) > 1: elif len(matching_assets_urls) > 1:
raise Exception(f"Too many assets matching regex '{asset}' for release {latest_version} : {matching_assets_urls}") raise Exception(
f"Too many assets matching regex '{asset}' for release {latest_version} : {matching_assets_urls}"
)
return latest_version.strip("v"), matching_assets_urls[0] return latest_version.strip("v"), matching_assets_urls[0]
elif isinstance(asset, dict): elif isinstance(asset, dict):
matching_assets_dicts = {} matching_assets_dicts = {}
for asset_name, asset_regex in asset.items(): for asset_name, asset_regex in asset.items():
matching_assets_urls = [url for name, url in latest_assets.items() if re.match(asset_regex, name)] matching_assets_urls = [
url
for name, url in latest_assets.items()
if re.match(asset_regex, name)
]
if not matching_assets_urls: if not matching_assets_urls:
raise Exception(f"No assets matching regex '{asset}' for release {latest_version} among {list(latest_assets.keys())}") raise Exception(
f"No assets matching regex '{asset}' for release {latest_version} among {list(latest_assets.keys())}"
)
elif len(matching_assets_urls) > 1: elif len(matching_assets_urls) > 1:
raise Exception(f"Too many assets matching regex '{asset}' for release {latest_version} : {matching_assets_urls}") raise Exception(
f"Too many assets matching regex '{asset}' for release {latest_version} : {matching_assets_urls}"
)
matching_assets_dicts[asset_name] = matching_assets_urls[0] matching_assets_dicts[asset_name] = matching_assets_urls[0]
return latest_version.strip("v"), matching_assets_dicts return latest_version.strip("v"), matching_assets_dicts
elif strategy == "latest_github_tag": elif strategy == "latest_github_tag":
if asset != "tarball": if asset != "tarball":
raise Exception("For the latest_github_tag strategy, only asset = 'tarball' is supported") raise Exception(
"For the latest_github_tag strategy, only asset = 'tarball' is supported"
)
tags = self.github(f"repos/{self.upstream_repo}/tags") tags = self.github(f"repos/{self.upstream_repo}/tags")
latest_version = filter_and_get_latest_tag([t["name"] for t in tags]) latest_version = filter_and_get_latest_tag([t["name"] for t in tags])
latest_tarball = f"{self.upstream}/archive/refs/tags/{latest_version}.tar.gz" latest_tarball = (
f"{self.upstream}/archive/refs/tags/{latest_version}.tar.gz"
)
return latest_version.strip("v"), latest_tarball return latest_version.strip("v"), latest_tarball
def github(self, uri): def github(self, uri):
#print(f'https://api.github.com/{uri}') # print(f'https://api.github.com/{uri}')
r = requests.get(f'https://api.github.com/{uri}', auth=(GITHUB_LOGIN, GITHUB_TOKEN)) r = requests.get(
f"https://api.github.com/{uri}", auth=(GITHUB_LOGIN, GITHUB_TOKEN)
)
assert r.status_code == 200, r assert r.status_code == 200, r
return r.json() return r.json()
def replace_version_and_asset_in_manifest(self, content, new_version, new_assets_urls, current_assets, is_main): def replace_version_and_asset_in_manifest(
self, content, new_version, new_assets_urls, current_assets, is_main
):
if isinstance(new_assets_urls, str): if isinstance(new_assets_urls, str):
sha256 = sha256_of_remote_file(new_assets_urls) sha256 = sha256_of_remote_file(new_assets_urls)
elif isinstance(new_assets_urls, dict): elif isinstance(new_assets_urls, dict):
sha256 = {url: sha256_of_remote_file(url) for url in new_assets_urls.values()} sha256 = {
url: sha256_of_remote_file(url) for url in new_assets_urls.values()
}
if is_main: if is_main:
def repl(m): def repl(m):
return m.group(1) + new_version + m.group(3) return m.group(1) + new_version + m.group(3)
content = re.sub(r"(\s*version\s*=\s*[\"\'])([\d\.]+)(\~ynh\d+[\"\'])", repl, content)
content = re.sub(
r"(\s*version\s*=\s*[\"\'])([\d\.]+)(\~ynh\d+[\"\'])", repl, content
)
if isinstance(new_assets_urls, str): if isinstance(new_assets_urls, str):
content = content.replace(current_assets["url"], new_assets_urls) content = content.replace(current_assets["url"], new_assets_urls)
content = content.replace(current_assets["sha256"], sha256) content = content.replace(current_assets["sha256"], sha256)
@ -260,15 +344,19 @@ class AppAutoUpdater():
def progressbar(it, prefix="", size=60, file=sys.stdout): def progressbar(it, prefix="", size=60, file=sys.stdout):
it = list(it) it = list(it)
count = len(it) count = len(it)
def show(j, name=""): def show(j, name=""):
name += " " name += " "
x = int(size*j/count) x = int(size * j / count)
file.write("%s[%s%s] %i/%i %s\r" % (prefix, "#"*x, "."*(size-x), j, count, name)) file.write(
"%s[%s%s] %i/%i %s\r" % (prefix, "#" * x, "." * (size - x), j, count, name)
)
file.flush() file.flush()
show(0) show(0)
for i, item in enumerate(it): for i, item in enumerate(it):
yield item yield item
show(i+1, item) show(i + 1, item)
file.write("\n") file.write("\n")
file.flush() file.flush()