1
0
Fork 0
ynh-apps_tools/webhooks/webhook.py
Félix Piédallu 8cf292fd77
Some checks failed
Auto updates messages.pot for readme_generator / Auto updates messages.pot for readme_generator (push) Has been cancelled
Check / auto apply Black / Check / auto apply black (push) Has been cancelled
Run tests for make_readme.py / Run tests for make_readme.py (push) Has been cancelled
webhooks: was not checking signatures...
2024-09-30 16:09:07 +02:00

330 lines
10 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
import tomlkit
import hashlib
import argparse
import hmac
from functools import cache
import tempfile
import aiohttp
import logging
from pathlib import Path
import re
import requests
from typing import Optional
from git import Actor, Repo, GitCommandError
from sanic import HTTPResponse, Request, Sanic, response
# add apps/tools to sys.path
sys.path.insert(0, str(Path(__file__).parent.parent))
from appslib import get_apps_repo
from readme_generator.make_readme import generate_READMEs
TOOLS_DIR = Path(__file__).resolve().parent.parent
APPS_REPO = None
DEBUG = False
UNSAFE = False
APP = Sanic(__name__)
@cache
def github_webhook_secret() -> str:
return (
(TOOLS_DIR / ".github_webhook_secret")
.open("r", encoding="utf-8")
.read()
.strip()
)
@cache
def github_login() -> str:
return (TOOLS_DIR / ".github_login").open("r", encoding="utf-8").read().strip()
@cache
def github_token() -> str:
return (TOOLS_DIR / ".github_token").open("r", encoding="utf-8").read().strip()
@APP.route("/github", methods=["GET"])
async def github_get(request: Request) -> HTTPResponse:
return response.text(
"You aren't supposed to go on this page using a browser, it's for webhooks push instead."
)
@APP.route("/github", methods=["POST"])
async def github_post(request: Request) -> HTTPResponse:
if UNSAFE:
logging.warning("Unsafe webhook!")
elif signatures_reply := check_webhook_signatures(request):
return signatures_reply
event = request.headers.get("X-Github-Event")
if event == "push":
return on_push(request)
if event == "issue_comment":
infos = request.json
valid_pr_comment = (
infos["action"] == "created"
and infos["issue"]["state"] == "open"
and "pull_request" in infos["issue"]
)
pr_infos = await get_pr_infos(request)
if valid_pr_comment:
return on_pr_comment(request, pr_infos)
else:
return response.empty()
return response.json({"error": f"Unknown event '{event}'"}, 422)
async def get_pr_infos(request: Request) -> dict:
pr_infos_url = request.json["issue"]["pull_request"]["url"]
async with aiohttp.ClientSession() as session:
async with session.get(pr_infos_url) as resp:
pr_infos = await resp.json()
return pr_infos
def check_webhook_signatures(request: Request) -> Optional[HTTPResponse]:
header_signature = request.headers.get("X-Hub-Signature")
if header_signature is None:
logging.error("no header X-Hub-Signature")
return response.json({"error": "No X-Hub-Signature"}, 403)
sha_name, signature = header_signature.split("=")
if sha_name != "sha1":
logging.error("signing algo isn't sha1, it's '%s'" % sha_name)
return response.json({"error": "Signing algorightm is not sha1 ?!"}, 501)
# HMAC requires the key to be bytes, but data is string
mac = hmac.new(
github_webhook_secret().encode(), msg=request.body, digestmod=hashlib.sha1
)
if not hmac.compare_digest(str(mac.hexdigest()), str(signature)):
return response.json({"error": "Bad signature ?!"}, 403)
return None
def on_push(request: Request) -> HTTPResponse:
data = request.json
repository = data["repository"]["full_name"]
branch = data["ref"].split("/", 2)[2]
if repository.startswith("YunoHost-Apps/"):
logging.info(f"{repository} -> branch '{branch}'")
need_push = False
with tempfile.TemporaryDirectory() as folder_str:
folder = Path(folder_str)
repo = Repo.clone_from(
f"https://{github_login()}:{github_token()}@github.com/{repository}",
to_path=folder,
)
# First rebase the testing branch if possible
if branch in ["master", "testing"]:
result = git_repo_rebase_testing_fast_forward(repo)
need_push = need_push or result
repo.git.checkout(branch)
result = generate_and_commit_readmes(repo)
need_push = need_push or result
if not need_push:
logging.debug("nothing to do")
return response.text("nothing to do")
logging.debug(f"Pushing {repository}")
repo.remote().push(quiet=False, all=True)
return response.text("ok")
def on_pr_comment(request: Request, pr_infos: dict) -> HTTPResponse:
body = request.json["comment"]["body"].strip()[:100].lower()
# Check the comment contains proper keyword trigger
BUMP_REV_COMMANDS = ["!bump", "!new_revision", "!newrevision"]
if any(trigger.lower() in body for trigger in BUMP_REV_COMMANDS):
bump_revision(request, pr_infos)
return response.text("ok")
REJECT_WISHLIST_COMMANDS = ["!reject", "!nope"]
if any(trigger.lower() in body for trigger in REJECT_WISHLIST_COMMANDS):
reason = ""
for command in REJECT_WISHLIST_COMMANDS:
try:
reason = re.search(f"{command} (.*)", body).group(1).rstrip()
except:
pass
reject_wishlist(request, pr_infos, reason)
return response.text("ok")
return response.empty()
def bump_revision(request: Request, pr_infos: dict) -> HTTPResponse:
data = request.json
repository = data["repository"]["full_name"]
branch = pr_infos["head"]["ref"]
if repository.startswith("YunoHost-Apps/"):
logging.info(f"Will bump revision on {repository} branch {branch}...")
with tempfile.TemporaryDirectory() as folder_str:
folder = Path(folder_str)
repo = Repo.clone_from(
f"https://{github_login()}:{github_token()}@github.com/{repository}",
to_path=folder,
)
repo.git.checkout(branch)
manifest_file = folder / "manifest.toml"
manifest = tomlkit.load(manifest_file.open("r", encoding="utf-8"))
version, revision = manifest["version"].split("~ynh")
revision = str(int(revision) + 1)
manifest["version"] = "~ynh".join([version, revision])
tomlkit.dump(manifest, manifest_file.open("w", encoding="utf-8"))
repo.git.add("manifest.toml")
repo.index.commit(
"Bump package revision",
author=Actor("yunohost-bot", "yunohost@yunohost.org"),
)
logging.debug(f"Pushing {repository}")
repo.remote().push(quiet=False, all=True)
return response.text("ok")
def reject_wishlist(request: Request, pr_infos: dict, reason=None) -> HTTPResponse:
data = request.json
repository = data["repository"]["full_name"]
branch = pr_infos["head"]["ref"]
pr_number = pr_infos["number"]
if repository == "YunoHost/apps" and branch.startswith("add-to-wishlist"):
logging.info(
f"Will put the suggested app in the rejected list on {repository} branch {branch}..."
)
with tempfile.TemporaryDirectory() as folder_str:
folder = Path(folder_str)
repo = Repo.clone_from(
f"https://{github_login()}:{github_token()}@github.com/{repository}",
to_path=folder,
)
repo.git.checkout(branch)
rejectedlist_file = folder / "rejectedlist.toml"
rejectedlist = tomlkit.load(rejectedlist_file.open("r", encoding="utf-8"))
wishlist_file = folder / "wishlist.toml"
wishlist = tomlkit.load(wishlist_file.open("r", encoding="utf-8"))
suggestedapp_slug = branch.replace("add-to-wishlist-", "")
suggestedapp = {suggestedapp_slug: wishlist[suggestedapp_slug]}
suggestedapp[suggestedapp_slug]["rejection_pr"] = pr_infos["html_url"]
suggestedapp[suggestedapp_slug]["reason"] = reason
wishlist.pop(suggestedapp_slug)
rejectedlist.update(suggestedapp)
tomlkit.dump(rejectedlist, rejectedlist_file.open("w", encoding="utf-8"))
tomlkit.dump(wishlist, wishlist_file.open("w", encoding="utf-8"))
repo.git.add("rejectedlist.toml")
repo.git.add("wishlist.toml")
suggestedapp_name = suggestedapp[suggestedapp_slug]["name"]
repo.index.commit(
f"Reject {suggestedapp_name} from catalog",
author=Actor("yunohost-bot", "yunohost@yunohost.org"),
)
logging.debug(f"Pushing {repository}")
repo.remote().push(quiet=False, all=True, force=True)
new_pr_title = {"title": f"Add {suggestedapp_name} to rejection list"}
with requests.Session() as s:
s.headers.update({"Authorization": f"token {github_token()}"})
r = s.post(
f"https://api.github.com/repos/{repository}/pulls/{pr_number}",
json=new_pr_title,
)
if r.status_code != 200:
logging.info(
f"PR #{pr_number} renaming failed with code {r.status_code}"
)
return response.text("ok")
def generate_and_commit_readmes(repo: Repo) -> bool:
assert repo.working_tree_dir is not None
generate_READMEs(Path(repo.working_tree_dir), APPS_REPO)
repo.git.add("README*.md")
repo.git.add("ALL_README.md")
diff_empty = len(repo.index.diff("HEAD")) == 0
if diff_empty:
return False
repo.index.commit(
"Auto-update READMEs", author=Actor("yunohost-bot", "yunohost@yunohost.org")
)
return True
def git_repo_rebase_testing_fast_forward(repo: Repo) -> bool:
try:
repo.git.checkout("testing")
except GitCommandError:
return False
if not repo.is_ancestor("testing", "master"):
return False
repo.git.merge("master", ff_only=True)
return True
def main() -> None:
parser = argparse.ArgumentParser()
get_apps_repo.add_args(parser)
parser.add_argument("-d", "--debug", action="store_true")
parser.add_argument(
"-u",
"--unsafe",
action="store_true",
help="Disable Github signature checks on webhooks, for debug only.",
)
args = parser.parse_args()
global APPS_REPO
APPS_REPO = get_apps_repo.from_args(args)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
global DEBUG, UNSAFE
DEBUG = args.debug
UNSAFE = args.unsafe
APP.run(host="127.0.0.1", port=8123, debug=args.debug)
if __name__ == "__main__":
main()