1
0
Fork 0

Fix: need to get pr infos

This commit is contained in:
Félix Piédallu 2024-08-14 12:30:21 +02:00 committed by Salamandar
parent c6f680a1bc
commit 4131b654ab

View file

@ -7,6 +7,7 @@ import argparse
import hmac import hmac
from functools import cache from functools import cache
import tempfile import tempfile
import aiohttp
import logging import logging
from pathlib import Path from pathlib import Path
@ -68,13 +69,22 @@ async def github_post(request: Request) -> HTTPResponse:
valid_pr_comment = ( valid_pr_comment = (
infos["action"] == "created" and infos["issue"]["state"] == "open" infos["action"] == "created" and infos["issue"]["state"] == "open"
and "pull_request" in infos["issue"] ) and "pull_request" in infos["issue"] )
pr_infos = await get_pr_infos(request)
if valid_pr_comment: if valid_pr_comment:
return on_pr_comment(request) return on_pr_comment(request, pr_infos)
return response.json({"error": f"Unknown event '{event}'"}, 422) return response.json({"error": f"Unknown event '{event}'"}, 422)
async def get_pr_infos(request: Request) -> dict:
pr_infos_url = request.json["issue"]["pull_request"]["url"]
async with aiohttp.ClientSession() as session:
async with session.get(pr_infos_url) as resp:
pr_infos = await resp.json()
return pr_infos
def check_webhook_signatures(request: Request) -> Optional[HTTPResponse]: def check_webhook_signatures(request: Request) -> Optional[HTTPResponse]:
logging.warning("Unsafe webhook!") logging.warning("Unsafe webhook!")
header_signature = request.headers.get("X-Hub-Signature") header_signature = request.headers.get("X-Hub-Signature")
@ -131,22 +141,22 @@ def on_push(request: Request) -> HTTPResponse:
return response.text("ok") return response.text("ok")
def on_pr_comment(request: Request) -> HTTPResponse: def on_pr_comment(request: Request, pr_infos: dict) -> HTTPResponse:
body = request.json["comment"]["body"].strip()[:100].lower() body = request.json["comment"]["body"].strip()[:100].lower()
# Check the comment contains proper keyword trigger # Check the comment contains proper keyword trigger
BUMP_REV_COMMANDS = ["!bump", "!new_revision", "!newrevision"] BUMP_REV_COMMANDS = ["!bump", "!new_revision", "!newrevision"]
if any(trigger.lower() in body for trigger in BUMP_REV_COMMANDS): if any(trigger.lower() in body for trigger in BUMP_REV_COMMANDS):
bump_revision(request) bump_revision(request, pr_infos)
return response.text("ok") return response.text("ok")
return response.empty() return response.empty()
def bump_revision(request: Request) -> HTTPResponse: def bump_revision(request: Request, pr_infos: dict) -> HTTPResponse:
data = request.json data = request.json
repository = data["repository"]["full_name"] repository = data["repository"]["full_name"]
branch = data["ref"].split("/", 2)[2] branch = pr_infos["head"]["ref"]
logging.info(f"Will bump revision on {repository} branch {branch}...") logging.info(f"Will bump revision on {repository} branch {branch}...")
with tempfile.TemporaryDirectory() as folder_str: with tempfile.TemporaryDirectory() as folder_str: