diff --git a/autoupdate_app_sources/autoupdate_app_sources.py b/autoupdate_app_sources/autoupdate_app_sources.py index 1dc4e43..3b7fbf8 100755 --- a/autoupdate_app_sources/autoupdate_app_sources.py +++ b/autoupdate_app_sources/autoupdate_app_sources.py @@ -145,6 +145,7 @@ class LocalOrRemoteRepo: self.pr_branch = name commit_sha = self.repo.get_branch(self.base_branch).commit.sha if self.pr_branch in [branch.name for branch in self.repo.get_branches()]: + print("already existing") return False self.repo.create_git_ref(ref=f"refs/heads/{name}", sha=commit_sha) return True @@ -156,7 +157,7 @@ class LocalOrRemoteRepo: pr = self.repo.create_pull( title=title, body=message, head=branch, base=self.base_branch ) - return pr.url + return pr.html_url logging.warning("Can't create pull requests for local repositories") return None @@ -223,8 +224,17 @@ class AppAutoUpdater: try: if pr: self.repo.new_branch(branch_name) + except github.GithubException as e: + if e.status == 409: + print("Branch already exists!") + + try: if commit: self.repo.commit(commit_msg) + except github.GithubException as e: + if e.status == 409: + print("Commits were already commited on branch!") + try: if pr: pr_url = self.repo.create_pr(branch_name, pr_title, commit_msg) or "" except github.GithubException as e: @@ -236,7 +246,17 @@ class AppAutoUpdater: return (state, self.current_version, main_version, pr_url) @staticmethod - def filter_and_get_latest_tag(tags: list[str], app_id: str) -> tuple[str, str]: + def relevant_versions(tags: list[str], app_id: str, version_regex: Optional[str]) -> tuple[str, str]: + + def apply_version_regex(tag: str) -> Optional[str]: + # First preprocessing according to the manifest version_regex… + if not version_regex: + return tag + match = re.match(version_regex, tag) + if match is None: + return None + return match.group(1) + def version_numbers(tag: str) -> Optional[tuple[int, ...]]: filter_keywords = ["start", "rc", "beta", "alpha"] if any(keyword in tag for keyword in filter_keywords): @@ -255,16 +275,24 @@ class AppAutoUpdater: print(f"Ignoring tag {t_to_check}, doesn't look like a version number") return None - # sorted will sort by keys - tags_dict = {version_numbers(tag): tag for tag in tags} - tags_dict.pop(None, None) + tags_dict: dict[tuple[int, ...], tuple[str, str]] = {} + for tag in tags: + tag_clean = apply_version_regex(tag) + if tag_clean is None: + continue + tag_as_ints = version_numbers(tag_clean) + if tag_as_ints is None: + continue + tags_dict[tag_as_ints] = (tag, tag_clean) + + # sorted will sort by keys, tag_as_ints # reverse=True will set the last release as first element tags_dict = dict(sorted(tags_dict.items(), reverse=True)) if not tags_dict: raise RuntimeError("No tags were found after sanity filtering!") - the_tag_list, the_tag = next(iter(tags_dict.items())) + the_tag_list, (the_tag_orig, the_tag_clean) = next(iter(tags_dict.items())) assert the_tag_list is not None - return the_tag, ".".join(str(i) for i in the_tag_list) + return the_tag_orig, the_tag_clean @staticmethod def tag_to_int_tuple(tag: str) -> tuple[int, ...]: @@ -287,16 +315,17 @@ class AppAutoUpdater: def get_source_update(self, name: str, infos: dict[str, Any] ) -> Optional[tuple[str, Union[str, dict[str, str]], str]]: - if "autoupdate" not in infos: + autoupdate = infos.get("autoupdate") + if autoupdate is None: return None print(f"\n Checking {name} ...") - asset = infos.get("autoupdate", {}).get("asset", "tarball") - strategy = infos.get("autoupdate", {}).get("strategy") + asset = autoupdate.get("asset", "tarball") + strategy = autoupdate.get("strategy") if strategy not in STRATEGIES: raise ValueError(f"Unknown update strategy '{strategy}' for '{name}', expected one of {STRATEGIES}") - result = self.get_latest_version_and_asset(strategy, asset, infos) + result = self.get_latest_version_and_asset(strategy, asset, autoupdate) if result is None: return None new_version, assets, more_info = result @@ -348,9 +377,10 @@ class AppAutoUpdater: raise RuntimeError(f"Too many assets matching regex '{regex}': {matching_assets}") return next(iter(matching_assets.items())) - def get_latest_version_and_asset(self, strategy: str, asset: Union[str, dict], infos + def get_latest_version_and_asset(self, strategy: str, asset: Union[str, dict], autoupdate ) -> Optional[tuple[str, Union[str, dict[str, str]], str]]: - upstream = (infos.get("autoupdate", {}).get("upstream", self.main_upstream).strip("/")) + upstream = autoupdate.get("upstream", self.main_upstream).strip("/") + version_re = autoupdate.get("version_regex", None) _, remote_type, revision_type = strategy.split("_") api: Union[GithubAPI, GitlabAPI, GiteaForgejoAPI] @@ -370,7 +400,7 @@ class AppAutoUpdater: for release in api.releases() if not release["draft"] and not release["prerelease"] } - latest_version_orig, latest_version = self.filter_and_get_latest_tag(list(releases.keys()), self.app_id) + latest_version_orig, latest_version = self.relevant_versions(list(releases.keys()), self.app_id, version_re) latest_release = releases[latest_version_orig] latest_assets = { a["name"]: a["browser_download_url"] @@ -409,7 +439,7 @@ class AppAutoUpdater: if asset != "tarball": raise ValueError("For the latest tag strategies, only asset = 'tarball' is supported") tags = [t["name"] for t in api.tags()] - latest_version_orig, latest_version = self.filter_and_get_latest_tag(tags, self.app_id) + latest_version_orig, latest_version = self.relevant_versions(tags, self.app_id, version_re) latest_tarball = api.url_for_ref(latest_version_orig, RefType.tags) return latest_version, latest_tarball, "" @@ -421,7 +451,7 @@ class AppAutoUpdater: latest_tarball = api.url_for_ref(latest_commit["sha"], RefType.commits) # Let's have the version as something like "2023.01.23" latest_commit_date = datetime.strptime(latest_commit["commit"]["author"]["date"][:10], "%Y-%m-%d") - version_format = infos.get("autoupdate", {}).get("force_version", "%Y.%m.%d") + version_format = autoupdate.get("force_version", "%Y.%m.%d") latest_version = latest_commit_date.strftime(version_format) return latest_version, latest_tarball, "" return None @@ -556,28 +586,17 @@ def main() -> None: result_message += f"\n{'=' * 80}\nApps already with an update PR:" for app, info in apps_already.items(): result_message += f"\n- {app}" - if isinstance(info, tuple): - result_message += f" ({info[0]} -> {info[1]})" - if info[2] is not None: - result_message += f" see {info[2]}" + result_message += f" ({info[0]} -> {info[1]})" if info[1] else " (app version did not change)" + if info[2]: + result_message += f" see {info[2]}" if apps_updated: result_message += f"\n{'=' * 80}\nApps updated:" for app, info in apps_updated.items(): result_message += f"\n- {app}" - if isinstance(info, tuple): - result_message += f" ({info[0]} -> {info[1]})" - if info[2] is not None: - result_message += f" see {info[2]}" - - if apps_updated: - result_message += f"\n{'=' * 80}\nApps updated:" - for app, info in apps_updated.items(): - result_message += f"\n- {app}" - if isinstance(info, tuple): - result_message += f" ({info[0]} -> {info[1]})" - if info[2] is not None: - result_message += f" see {info[2]}" + result_message += f" ({info[0]} -> {info[1]})" if info[1] else " (app version did not change)" + if info[2]: + result_message += f" see {info[2]}" if apps_failed: result_message += f"\n{'=' * 80}\nApps failed:" @@ -589,7 +608,7 @@ def main() -> None: logging.error(textwrap.dedent(f""" Failed to run the source auto-update for: {', '.join(apps_failed.keys())} Please run manually the `autoupdate_app_sources.py` script on these apps to debug what is happening! - See the debug log here: {paste_url}" + See the debug log here: {paste_url} """)) print(result_message) diff --git a/autoupdate_app_sources/rest_api.py b/autoupdate_app_sources/rest_api.py index 09c44c6..6cf6d43 100644 --- a/autoupdate_app_sources/rest_api.py +++ b/autoupdate_app_sources/rest_api.py @@ -52,11 +52,18 @@ class GithubAPI: class GitlabAPI: def __init__(self, upstream: str): - split = re.search("(?Phttps?://.+)/(?P[^/]+)/(?P[^/]+)/?$", upstream) - assert split is not None - self.upstream = split.group("host") - self.upstream_repo = f"{split.group('group')}/{split.group('project')}" - self.project_id = self.find_project_id(self.upstream_repo) + # Find gitlab api root... + self.forge_root = self.get_forge_root(upstream) + self.project_path = upstream.replace(self.forge_root, "").lstrip("/") + self.project_id = self.find_project_id(self.project_path) + + def get_forge_root(self, project_url: str) -> str: + """A small heuristic based on the content of the html page...""" + r = requests.get(project_url) + r.raise_for_status() + match = re.search(r"const url = `(.*)/api/graphql`", r.text) + assert match is not None + return match.group(1) def find_project_id(self, project: str) -> int: project = self.internal_api(f"projects/{project.replace('/', '%2F')}") @@ -65,7 +72,7 @@ class GitlabAPI: return project_id def internal_api(self, uri: str) -> Any: - url = f"{self.upstream}/api/v4/{uri}" + url = f"{self.forge_root}/api/v4/{uri}" r = requests.get(url) assert r.status_code == 200, r return r.json() @@ -113,35 +120,43 @@ class GitlabAPI: return retval def url_for_ref(self, ref: str, ref_type: RefType) -> str: - name = self.upstream_repo.split("/")[-1] - return f"{self.upstream}/{self.upstream_repo}/-/archive/{ref}/{name}-{ref}.tar.bz2" + name = self.project_path.split("/")[-1] + clean_ref = ref.replace("/", "-") + return f"{self.forge_root}/{self.project_path}/-/archive/{ref}/{name}-{clean_ref}.tar.bz2" class GiteaForgejoAPI: def __init__(self, upstream: str): - split = re.search("(?Phttps?://.+)/(?P[^/]+)/(?P[^/]+)/?$", upstream) - assert split is not None - self.upstream = split.group("host") - self.upstream_repo = f"{split.group('group')}/{split.group('project')}" + # Find gitea/forgejo api root... + self.forge_root = self.get_forge_root(upstream) + self.project_path = upstream.replace(self.forge_root, "").lstrip("/") + + def get_forge_root(self, project_url: str) -> str: + """A small heuristic based on the content of the html page...""" + r = requests.get(project_url) + r.raise_for_status() + match = re.search(r"appUrl: '([^']*)',", r.text) + assert match is not None + return match.group(1).replace("\\", "") def internal_api(self, uri: str): - url = f"{self.upstream}/api/v1/{uri}" + url = f"{self.forge_root}/api/v1/{uri}" r = requests.get(url) assert r.status_code == 200, r return r.json() def tags(self) -> list[dict[str, Any]]: """Get a list of tags for project.""" - return self.internal_api(f"repos/{self.upstream_repo}/tags") + return self.internal_api(f"repos/{self.project_path}/tags") def commits(self) -> list[dict[str, Any]]: """Get a list of commits for project.""" - return self.internal_api(f"repos/{self.upstream_repo}/commits") + return self.internal_api(f"repos/{self.project_path}/commits") def releases(self) -> list[dict[str, Any]]: """Get a list of releases for project.""" - return self.internal_api(f"repos/{self.upstream_repo}/releases") + return self.internal_api(f"repos/{self.project_path}/releases") def url_for_ref(self, ref: str, ref_type: RefType) -> str: """Get a URL for a ref.""" - return f"{self.upstream}/{self.upstream_repo}/archive/{ref}.tar.gz" + return f"{self.forge_root}/{self.project_path}/archive/{ref}.tar.gz"