1
0
Fork 0
ynh-apps_tools/webhooks/webhook.py

243 lines
7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import sys
2024-08-14 12:09:39 +02:00
import tomlkit
import hashlib
import argparse
2021-06-13 05:12:34 +02:00
import hmac
from functools import cache
import tempfile
2024-08-14 12:30:21 +02:00
import aiohttp
import logging
from pathlib import Path
from typing import Optional
from git import Actor, Repo, GitCommandError
from sanic import HTTPResponse, Request, Sanic, response
# add apps/tools to sys.path
sys.path.insert(0, str(Path(__file__).parent.parent))
from readme_generator.make_readme import generate_READMEs
2021-06-13 05:12:34 +02:00
TOOLS_DIR = Path(__file__).resolve().parent.parent
DEBUG = False
UNSAFE = False
APP = Sanic(__name__)
2024-05-28 19:11:13 +02:00
@cache
def github_webhook_secret() -> str:
2024-06-06 12:11:58 +02:00
return (
(TOOLS_DIR / ".github_webhook_secret")
.open("r", encoding="utf-8")
.read()
.strip()
)
2024-03-19 19:02:31 +01:00
@cache
def github_login() -> str:
return (TOOLS_DIR / ".github_login").open("r", encoding="utf-8").read().strip()
2024-03-19 19:02:31 +01:00
@cache
def github_token() -> str:
return (TOOLS_DIR / ".github_token").open("r", encoding="utf-8").read().strip()
2021-06-13 05:12:34 +02:00
@APP.route("/github", methods=["GET"])
async def github_get(request: Request) -> HTTPResponse:
return response.text(
2024-03-11 17:34:33 +01:00
"You aren't supposed to go on this page using a browser, it's for webhooks push instead."
)
@APP.route("/github", methods=["POST"])
async def github_post(request: Request) -> HTTPResponse:
if UNSAFE and (signatures_reply := check_webhook_signatures(request)):
return signatures_reply
event = request.headers.get("X-Github-Event")
if event == "push":
return on_push(request)
2024-06-06 12:11:58 +02:00
2024-08-14 12:09:39 +02:00
if event == "issue_comment":
infos = request.json
valid_pr_comment = (
2024-08-17 11:51:48 +02:00
infos["action"] == "created"
and infos["issue"]["state"] == "open"
and "pull_request" in infos["issue"]
)
2024-08-14 12:30:21 +02:00
pr_infos = await get_pr_infos(request)
2024-08-14 12:09:39 +02:00
if valid_pr_comment:
2024-08-14 12:30:21 +02:00
return on_pr_comment(request, pr_infos)
else:
return response.empty()
2024-08-14 12:09:39 +02:00
return response.json({"error": f"Unknown event '{event}'"}, 422)
2024-08-14 12:30:21 +02:00
async def get_pr_infos(request: Request) -> dict:
pr_infos_url = request.json["issue"]["pull_request"]["url"]
async with aiohttp.ClientSession() as session:
async with session.get(pr_infos_url) as resp:
pr_infos = await resp.json()
return pr_infos
def check_webhook_signatures(request: Request) -> Optional[HTTPResponse]:
logging.warning("Unsafe webhook!")
2021-06-13 05:12:34 +02:00
header_signature = request.headers.get("X-Hub-Signature")
if header_signature is None:
logging.error("no header X-Hub-Signature")
return response.json({"error": "No X-Hub-Signature"}, 403)
2021-06-13 05:12:34 +02:00
sha_name, signature = header_signature.split("=")
if sha_name != "sha1":
logging.error("signing algo isn't sha1, it's '%s'" % sha_name)
return response.json({"error": "Signing algorightm is not sha1 ?!"}, 501)
2021-06-13 05:12:34 +02:00
# HMAC requires the key to be bytes, but data is string
2024-03-11 17:34:33 +01:00
mac = hmac.new(
github_webhook_secret().encode(), msg=request.body, digestmod=hashlib.sha1
2024-03-11 17:34:33 +01:00
)
2021-06-13 05:12:34 +02:00
if not hmac.compare_digest(str(mac.hexdigest()), str(signature)):
return response.json({"error": "Bad signature ?!"}, 403)
return None
2021-06-13 05:12:34 +02:00
2024-06-06 12:11:58 +02:00
def on_push(request: Request) -> HTTPResponse:
2021-06-13 05:12:34 +02:00
data = request.json
repository = data["repository"]["full_name"]
branch = data["ref"].split("/", 2)[2]
logging.info(f"{repository} -> branch '{branch}'")
2021-06-14 16:34:26 +02:00
need_push = False
with tempfile.TemporaryDirectory() as folder_str:
folder = Path(folder_str)
repo = Repo.clone_from(
f"https://{github_login()}:{github_token()}@github.com/{repository}",
to_path=folder,
2024-03-11 17:34:33 +01:00
)
# First rebase the testing branch if possible
if branch in ["master", "testing"]:
result = git_repo_rebase_testing_fast_forward(repo)
need_push = need_push or result
2021-05-21 20:12:00 +02:00
repo.git.checkout(branch)
result = generate_and_commit_readmes(repo)
need_push = need_push or result
if not need_push:
logging.debug("nothing to do")
return response.text("nothing to do")
logging.debug(f"Pushing {repository}")
repo.remote().push(quiet=False, all=True)
return response.text("ok")
2024-06-06 12:11:58 +02:00
2024-08-14 12:30:21 +02:00
def on_pr_comment(request: Request, pr_infos: dict) -> HTTPResponse:
2024-08-14 12:09:39 +02:00
body = request.json["comment"]["body"].strip()[:100].lower()
# Check the comment contains proper keyword trigger
BUMP_REV_COMMANDS = ["!bump", "!new_revision", "!newrevision"]
if any(trigger.lower() in body for trigger in BUMP_REV_COMMANDS):
2024-08-14 12:30:21 +02:00
bump_revision(request, pr_infos)
2024-08-14 12:09:39 +02:00
return response.text("ok")
return response.empty()
2024-08-14 12:30:21 +02:00
def bump_revision(request: Request, pr_infos: dict) -> HTTPResponse:
2024-08-14 12:09:39 +02:00
data = request.json
repository = data["repository"]["full_name"]
2024-08-14 12:30:21 +02:00
branch = pr_infos["head"]["ref"]
2024-08-14 12:09:39 +02:00
logging.info(f"Will bump revision on {repository} branch {branch}...")
with tempfile.TemporaryDirectory() as folder_str:
folder = Path(folder_str)
repo = Repo.clone_from(
f"https://{github_login()}:{github_token()}@github.com/{repository}",
to_path=folder,
)
repo.git.checkout(branch)
2024-08-17 11:51:48 +02:00
manifest_file = folder / "manifest.toml"
2024-08-14 12:09:39 +02:00
manifest = tomlkit.load(manifest_file.open("r", encoding="utf-8"))
2024-08-14 12:33:08 +02:00
version, revision = manifest["version"].split("~ynh")
2024-08-14 12:09:39 +02:00
revision = str(int(revision) + 1)
2024-08-14 12:31:34 +02:00
manifest["version"] = "~ynh".join([version, revision])
2024-08-14 12:09:39 +02:00
tomlkit.dump(manifest, manifest_file.open("w", encoding="utf-8"))
repo.git.add("manifest.toml")
2024-08-17 11:51:48 +02:00
repo.index.commit(
"Bump package revision",
author=Actor("yunohost-bot", "yunohost@yunohost.org"),
)
2024-08-14 12:09:39 +02:00
logging.debug(f"Pushing {repository}")
repo.remote().push(quiet=False, all=True)
return response.text("ok")
def generate_and_commit_readmes(repo: Repo) -> bool:
assert repo.working_tree_dir is not None
generate_READMEs(Path(repo.working_tree_dir))
repo.git.add("README*.md")
repo.git.add("ALL_README.md")
diff_empty = len(repo.index.diff("HEAD")) == 0
if diff_empty:
return False
repo.index.commit(
"Auto-update READMEs", author=Actor("yunohost-bot", "yunohost@yunohost.org")
)
return True
def git_repo_rebase_testing_fast_forward(repo: Repo) -> bool:
try:
repo.git.checkout("testing")
except GitCommandError:
return False
if not repo.is_ancestor("testing", "master"):
return False
repo.git.merge("master", ff_only=True)
return True
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", action="store_true")
2024-06-06 12:11:58 +02:00
parser.add_argument(
"-u",
"--unsafe",
action="store_true",
help="Disable Github signature checks on webhooks, for debug only.",
)
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
global DEBUG, UNSAFE
DEBUG = args.debug
UNSAFE = args.unsafe
APP.run(host="127.0.0.1", port=8123, debug=args.debug)
2024-06-06 12:11:58 +02:00
if __name__ == "__main__":
main()