Merge pull request #2030 from Salamandar/fix_autoupdater
Update the autoupdater
This commit is contained in:
commit
85b61270d1
2 changed files with 85 additions and 51 deletions
|
@ -145,6 +145,7 @@ class LocalOrRemoteRepo:
|
||||||
self.pr_branch = name
|
self.pr_branch = name
|
||||||
commit_sha = self.repo.get_branch(self.base_branch).commit.sha
|
commit_sha = self.repo.get_branch(self.base_branch).commit.sha
|
||||||
if self.pr_branch in [branch.name for branch in self.repo.get_branches()]:
|
if self.pr_branch in [branch.name for branch in self.repo.get_branches()]:
|
||||||
|
print("already existing")
|
||||||
return False
|
return False
|
||||||
self.repo.create_git_ref(ref=f"refs/heads/{name}", sha=commit_sha)
|
self.repo.create_git_ref(ref=f"refs/heads/{name}", sha=commit_sha)
|
||||||
return True
|
return True
|
||||||
|
@ -156,7 +157,7 @@ class LocalOrRemoteRepo:
|
||||||
pr = self.repo.create_pull(
|
pr = self.repo.create_pull(
|
||||||
title=title, body=message, head=branch, base=self.base_branch
|
title=title, body=message, head=branch, base=self.base_branch
|
||||||
)
|
)
|
||||||
return pr.url
|
return pr.html_url
|
||||||
logging.warning("Can't create pull requests for local repositories")
|
logging.warning("Can't create pull requests for local repositories")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -223,8 +224,17 @@ class AppAutoUpdater:
|
||||||
try:
|
try:
|
||||||
if pr:
|
if pr:
|
||||||
self.repo.new_branch(branch_name)
|
self.repo.new_branch(branch_name)
|
||||||
|
except github.GithubException as e:
|
||||||
|
if e.status == 409:
|
||||||
|
print("Branch already exists!")
|
||||||
|
|
||||||
|
try:
|
||||||
if commit:
|
if commit:
|
||||||
self.repo.commit(commit_msg)
|
self.repo.commit(commit_msg)
|
||||||
|
except github.GithubException as e:
|
||||||
|
if e.status == 409:
|
||||||
|
print("Commits were already commited on branch!")
|
||||||
|
try:
|
||||||
if pr:
|
if pr:
|
||||||
pr_url = self.repo.create_pr(branch_name, pr_title, commit_msg) or ""
|
pr_url = self.repo.create_pr(branch_name, pr_title, commit_msg) or ""
|
||||||
except github.GithubException as e:
|
except github.GithubException as e:
|
||||||
|
@ -236,7 +246,17 @@ class AppAutoUpdater:
|
||||||
return (state, self.current_version, main_version, pr_url)
|
return (state, self.current_version, main_version, pr_url)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def filter_and_get_latest_tag(tags: list[str], app_id: str) -> tuple[str, str]:
|
def relevant_versions(tags: list[str], app_id: str, version_regex: Optional[str]) -> tuple[str, str]:
|
||||||
|
|
||||||
|
def apply_version_regex(tag: str) -> Optional[str]:
|
||||||
|
# First preprocessing according to the manifest version_regex…
|
||||||
|
if not version_regex:
|
||||||
|
return tag
|
||||||
|
match = re.match(version_regex, tag)
|
||||||
|
if match is None:
|
||||||
|
return None
|
||||||
|
return match.group(1)
|
||||||
|
|
||||||
def version_numbers(tag: str) -> Optional[tuple[int, ...]]:
|
def version_numbers(tag: str) -> Optional[tuple[int, ...]]:
|
||||||
filter_keywords = ["start", "rc", "beta", "alpha"]
|
filter_keywords = ["start", "rc", "beta", "alpha"]
|
||||||
if any(keyword in tag for keyword in filter_keywords):
|
if any(keyword in tag for keyword in filter_keywords):
|
||||||
|
@ -255,16 +275,24 @@ class AppAutoUpdater:
|
||||||
print(f"Ignoring tag {t_to_check}, doesn't look like a version number")
|
print(f"Ignoring tag {t_to_check}, doesn't look like a version number")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# sorted will sort by keys
|
tags_dict: dict[tuple[int, ...], tuple[str, str]] = {}
|
||||||
tags_dict = {version_numbers(tag): tag for tag in tags}
|
for tag in tags:
|
||||||
tags_dict.pop(None, None)
|
tag_clean = apply_version_regex(tag)
|
||||||
|
if tag_clean is None:
|
||||||
|
continue
|
||||||
|
tag_as_ints = version_numbers(tag_clean)
|
||||||
|
if tag_as_ints is None:
|
||||||
|
continue
|
||||||
|
tags_dict[tag_as_ints] = (tag, tag_clean)
|
||||||
|
|
||||||
|
# sorted will sort by keys, tag_as_ints
|
||||||
# reverse=True will set the last release as first element
|
# reverse=True will set the last release as first element
|
||||||
tags_dict = dict(sorted(tags_dict.items(), reverse=True))
|
tags_dict = dict(sorted(tags_dict.items(), reverse=True))
|
||||||
if not tags_dict:
|
if not tags_dict:
|
||||||
raise RuntimeError("No tags were found after sanity filtering!")
|
raise RuntimeError("No tags were found after sanity filtering!")
|
||||||
the_tag_list, the_tag = next(iter(tags_dict.items()))
|
the_tag_list, (the_tag_orig, the_tag_clean) = next(iter(tags_dict.items()))
|
||||||
assert the_tag_list is not None
|
assert the_tag_list is not None
|
||||||
return the_tag, ".".join(str(i) for i in the_tag_list)
|
return the_tag_orig, the_tag_clean
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def tag_to_int_tuple(tag: str) -> tuple[int, ...]:
|
def tag_to_int_tuple(tag: str) -> tuple[int, ...]:
|
||||||
|
@ -287,16 +315,17 @@ class AppAutoUpdater:
|
||||||
|
|
||||||
def get_source_update(self, name: str, infos: dict[str, Any]
|
def get_source_update(self, name: str, infos: dict[str, Any]
|
||||||
) -> Optional[tuple[str, Union[str, dict[str, str]], str]]:
|
) -> Optional[tuple[str, Union[str, dict[str, str]], str]]:
|
||||||
if "autoupdate" not in infos:
|
autoupdate = infos.get("autoupdate")
|
||||||
|
if autoupdate is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
print(f"\n Checking {name} ...")
|
print(f"\n Checking {name} ...")
|
||||||
asset = infos.get("autoupdate", {}).get("asset", "tarball")
|
asset = autoupdate.get("asset", "tarball")
|
||||||
strategy = infos.get("autoupdate", {}).get("strategy")
|
strategy = autoupdate.get("strategy")
|
||||||
if strategy not in STRATEGIES:
|
if strategy not in STRATEGIES:
|
||||||
raise ValueError(f"Unknown update strategy '{strategy}' for '{name}', expected one of {STRATEGIES}")
|
raise ValueError(f"Unknown update strategy '{strategy}' for '{name}', expected one of {STRATEGIES}")
|
||||||
|
|
||||||
result = self.get_latest_version_and_asset(strategy, asset, infos)
|
result = self.get_latest_version_and_asset(strategy, asset, autoupdate)
|
||||||
if result is None:
|
if result is None:
|
||||||
return None
|
return None
|
||||||
new_version, assets, more_info = result
|
new_version, assets, more_info = result
|
||||||
|
@ -348,9 +377,10 @@ class AppAutoUpdater:
|
||||||
raise RuntimeError(f"Too many assets matching regex '{regex}': {matching_assets}")
|
raise RuntimeError(f"Too many assets matching regex '{regex}': {matching_assets}")
|
||||||
return next(iter(matching_assets.items()))
|
return next(iter(matching_assets.items()))
|
||||||
|
|
||||||
def get_latest_version_and_asset(self, strategy: str, asset: Union[str, dict], infos
|
def get_latest_version_and_asset(self, strategy: str, asset: Union[str, dict], autoupdate
|
||||||
) -> Optional[tuple[str, Union[str, dict[str, str]], str]]:
|
) -> Optional[tuple[str, Union[str, dict[str, str]], str]]:
|
||||||
upstream = (infos.get("autoupdate", {}).get("upstream", self.main_upstream).strip("/"))
|
upstream = autoupdate.get("upstream", self.main_upstream).strip("/")
|
||||||
|
version_re = autoupdate.get("version_regex", None)
|
||||||
_, remote_type, revision_type = strategy.split("_")
|
_, remote_type, revision_type = strategy.split("_")
|
||||||
|
|
||||||
api: Union[GithubAPI, GitlabAPI, GiteaForgejoAPI]
|
api: Union[GithubAPI, GitlabAPI, GiteaForgejoAPI]
|
||||||
|
@ -370,7 +400,7 @@ class AppAutoUpdater:
|
||||||
for release in api.releases()
|
for release in api.releases()
|
||||||
if not release["draft"] and not release["prerelease"]
|
if not release["draft"] and not release["prerelease"]
|
||||||
}
|
}
|
||||||
latest_version_orig, latest_version = self.filter_and_get_latest_tag(list(releases.keys()), self.app_id)
|
latest_version_orig, latest_version = self.relevant_versions(list(releases.keys()), self.app_id, version_re)
|
||||||
latest_release = releases[latest_version_orig]
|
latest_release = releases[latest_version_orig]
|
||||||
latest_assets = {
|
latest_assets = {
|
||||||
a["name"]: a["browser_download_url"]
|
a["name"]: a["browser_download_url"]
|
||||||
|
@ -409,7 +439,7 @@ class AppAutoUpdater:
|
||||||
if asset != "tarball":
|
if asset != "tarball":
|
||||||
raise ValueError("For the latest tag strategies, only asset = 'tarball' is supported")
|
raise ValueError("For the latest tag strategies, only asset = 'tarball' is supported")
|
||||||
tags = [t["name"] for t in api.tags()]
|
tags = [t["name"] for t in api.tags()]
|
||||||
latest_version_orig, latest_version = self.filter_and_get_latest_tag(tags, self.app_id)
|
latest_version_orig, latest_version = self.relevant_versions(tags, self.app_id, version_re)
|
||||||
latest_tarball = api.url_for_ref(latest_version_orig, RefType.tags)
|
latest_tarball = api.url_for_ref(latest_version_orig, RefType.tags)
|
||||||
return latest_version, latest_tarball, ""
|
return latest_version, latest_tarball, ""
|
||||||
|
|
||||||
|
@ -421,7 +451,7 @@ class AppAutoUpdater:
|
||||||
latest_tarball = api.url_for_ref(latest_commit["sha"], RefType.commits)
|
latest_tarball = api.url_for_ref(latest_commit["sha"], RefType.commits)
|
||||||
# Let's have the version as something like "2023.01.23"
|
# Let's have the version as something like "2023.01.23"
|
||||||
latest_commit_date = datetime.strptime(latest_commit["commit"]["author"]["date"][:10], "%Y-%m-%d")
|
latest_commit_date = datetime.strptime(latest_commit["commit"]["author"]["date"][:10], "%Y-%m-%d")
|
||||||
version_format = infos.get("autoupdate", {}).get("force_version", "%Y.%m.%d")
|
version_format = autoupdate.get("force_version", "%Y.%m.%d")
|
||||||
latest_version = latest_commit_date.strftime(version_format)
|
latest_version = latest_commit_date.strftime(version_format)
|
||||||
return latest_version, latest_tarball, ""
|
return latest_version, latest_tarball, ""
|
||||||
return None
|
return None
|
||||||
|
@ -556,27 +586,16 @@ def main() -> None:
|
||||||
result_message += f"\n{'=' * 80}\nApps already with an update PR:"
|
result_message += f"\n{'=' * 80}\nApps already with an update PR:"
|
||||||
for app, info in apps_already.items():
|
for app, info in apps_already.items():
|
||||||
result_message += f"\n- {app}"
|
result_message += f"\n- {app}"
|
||||||
if isinstance(info, tuple):
|
result_message += f" ({info[0]} -> {info[1]})" if info[1] else " (app version did not change)"
|
||||||
result_message += f" ({info[0]} -> {info[1]})"
|
if info[2]:
|
||||||
if info[2] is not None:
|
|
||||||
result_message += f" see {info[2]}"
|
result_message += f" see {info[2]}"
|
||||||
|
|
||||||
if apps_updated:
|
if apps_updated:
|
||||||
result_message += f"\n{'=' * 80}\nApps updated:"
|
result_message += f"\n{'=' * 80}\nApps updated:"
|
||||||
for app, info in apps_updated.items():
|
for app, info in apps_updated.items():
|
||||||
result_message += f"\n- {app}"
|
result_message += f"\n- {app}"
|
||||||
if isinstance(info, tuple):
|
result_message += f" ({info[0]} -> {info[1]})" if info[1] else " (app version did not change)"
|
||||||
result_message += f" ({info[0]} -> {info[1]})"
|
if info[2]:
|
||||||
if info[2] is not None:
|
|
||||||
result_message += f" see {info[2]}"
|
|
||||||
|
|
||||||
if apps_updated:
|
|
||||||
result_message += f"\n{'=' * 80}\nApps updated:"
|
|
||||||
for app, info in apps_updated.items():
|
|
||||||
result_message += f"\n- {app}"
|
|
||||||
if isinstance(info, tuple):
|
|
||||||
result_message += f" ({info[0]} -> {info[1]})"
|
|
||||||
if info[2] is not None:
|
|
||||||
result_message += f" see {info[2]}"
|
result_message += f" see {info[2]}"
|
||||||
|
|
||||||
if apps_failed:
|
if apps_failed:
|
||||||
|
@ -589,7 +608,7 @@ def main() -> None:
|
||||||
logging.error(textwrap.dedent(f"""
|
logging.error(textwrap.dedent(f"""
|
||||||
Failed to run the source auto-update for: {', '.join(apps_failed.keys())}
|
Failed to run the source auto-update for: {', '.join(apps_failed.keys())}
|
||||||
Please run manually the `autoupdate_app_sources.py` script on these apps to debug what is happening!
|
Please run manually the `autoupdate_app_sources.py` script on these apps to debug what is happening!
|
||||||
See the debug log here: {paste_url}"
|
See the debug log here: {paste_url}
|
||||||
"""))
|
"""))
|
||||||
|
|
||||||
print(result_message)
|
print(result_message)
|
||||||
|
|
|
@ -52,11 +52,18 @@ class GithubAPI:
|
||||||
|
|
||||||
class GitlabAPI:
|
class GitlabAPI:
|
||||||
def __init__(self, upstream: str):
|
def __init__(self, upstream: str):
|
||||||
split = re.search("(?P<host>https?://.+)/(?P<group>[^/]+)/(?P<project>[^/]+)/?$", upstream)
|
# Find gitlab api root...
|
||||||
assert split is not None
|
self.forge_root = self.get_forge_root(upstream)
|
||||||
self.upstream = split.group("host")
|
self.project_path = upstream.replace(self.forge_root, "").lstrip("/")
|
||||||
self.upstream_repo = f"{split.group('group')}/{split.group('project')}"
|
self.project_id = self.find_project_id(self.project_path)
|
||||||
self.project_id = self.find_project_id(self.upstream_repo)
|
|
||||||
|
def get_forge_root(self, project_url: str) -> str:
|
||||||
|
"""A small heuristic based on the content of the html page..."""
|
||||||
|
r = requests.get(project_url)
|
||||||
|
r.raise_for_status()
|
||||||
|
match = re.search(r"const url = `(.*)/api/graphql`", r.text)
|
||||||
|
assert match is not None
|
||||||
|
return match.group(1)
|
||||||
|
|
||||||
def find_project_id(self, project: str) -> int:
|
def find_project_id(self, project: str) -> int:
|
||||||
project = self.internal_api(f"projects/{project.replace('/', '%2F')}")
|
project = self.internal_api(f"projects/{project.replace('/', '%2F')}")
|
||||||
|
@ -65,7 +72,7 @@ class GitlabAPI:
|
||||||
return project_id
|
return project_id
|
||||||
|
|
||||||
def internal_api(self, uri: str) -> Any:
|
def internal_api(self, uri: str) -> Any:
|
||||||
url = f"{self.upstream}/api/v4/{uri}"
|
url = f"{self.forge_root}/api/v4/{uri}"
|
||||||
r = requests.get(url)
|
r = requests.get(url)
|
||||||
assert r.status_code == 200, r
|
assert r.status_code == 200, r
|
||||||
return r.json()
|
return r.json()
|
||||||
|
@ -113,35 +120,43 @@ class GitlabAPI:
|
||||||
return retval
|
return retval
|
||||||
|
|
||||||
def url_for_ref(self, ref: str, ref_type: RefType) -> str:
|
def url_for_ref(self, ref: str, ref_type: RefType) -> str:
|
||||||
name = self.upstream_repo.split("/")[-1]
|
name = self.project_path.split("/")[-1]
|
||||||
return f"{self.upstream}/{self.upstream_repo}/-/archive/{ref}/{name}-{ref}.tar.bz2"
|
clean_ref = ref.replace("/", "-")
|
||||||
|
return f"{self.forge_root}/{self.project_path}/-/archive/{ref}/{name}-{clean_ref}.tar.bz2"
|
||||||
|
|
||||||
|
|
||||||
class GiteaForgejoAPI:
|
class GiteaForgejoAPI:
|
||||||
def __init__(self, upstream: str):
|
def __init__(self, upstream: str):
|
||||||
split = re.search("(?P<host>https?://.+)/(?P<group>[^/]+)/(?P<project>[^/]+)/?$", upstream)
|
# Find gitea/forgejo api root...
|
||||||
assert split is not None
|
self.forge_root = self.get_forge_root(upstream)
|
||||||
self.upstream = split.group("host")
|
self.project_path = upstream.replace(self.forge_root, "").lstrip("/")
|
||||||
self.upstream_repo = f"{split.group('group')}/{split.group('project')}"
|
|
||||||
|
def get_forge_root(self, project_url: str) -> str:
|
||||||
|
"""A small heuristic based on the content of the html page..."""
|
||||||
|
r = requests.get(project_url)
|
||||||
|
r.raise_for_status()
|
||||||
|
match = re.search(r"appUrl: '([^']*)',", r.text)
|
||||||
|
assert match is not None
|
||||||
|
return match.group(1).replace("\\", "")
|
||||||
|
|
||||||
def internal_api(self, uri: str):
|
def internal_api(self, uri: str):
|
||||||
url = f"{self.upstream}/api/v1/{uri}"
|
url = f"{self.forge_root}/api/v1/{uri}"
|
||||||
r = requests.get(url)
|
r = requests.get(url)
|
||||||
assert r.status_code == 200, r
|
assert r.status_code == 200, r
|
||||||
return r.json()
|
return r.json()
|
||||||
|
|
||||||
def tags(self) -> list[dict[str, Any]]:
|
def tags(self) -> list[dict[str, Any]]:
|
||||||
"""Get a list of tags for project."""
|
"""Get a list of tags for project."""
|
||||||
return self.internal_api(f"repos/{self.upstream_repo}/tags")
|
return self.internal_api(f"repos/{self.project_path}/tags")
|
||||||
|
|
||||||
def commits(self) -> list[dict[str, Any]]:
|
def commits(self) -> list[dict[str, Any]]:
|
||||||
"""Get a list of commits for project."""
|
"""Get a list of commits for project."""
|
||||||
return self.internal_api(f"repos/{self.upstream_repo}/commits")
|
return self.internal_api(f"repos/{self.project_path}/commits")
|
||||||
|
|
||||||
def releases(self) -> list[dict[str, Any]]:
|
def releases(self) -> list[dict[str, Any]]:
|
||||||
"""Get a list of releases for project."""
|
"""Get a list of releases for project."""
|
||||||
return self.internal_api(f"repos/{self.upstream_repo}/releases")
|
return self.internal_api(f"repos/{self.project_path}/releases")
|
||||||
|
|
||||||
def url_for_ref(self, ref: str, ref_type: RefType) -> str:
|
def url_for_ref(self, ref: str, ref_type: RefType) -> str:
|
||||||
"""Get a URL for a ref."""
|
"""Get a URL for a ref."""
|
||||||
return f"{self.upstream}/{self.upstream_repo}/archive/{ref}.tar.gz"
|
return f"{self.forge_root}/{self.project_path}/archive/{ref}.tar.gz"
|
||||||
|
|
Loading…
Reference in a new issue