""" The webhook receiving the alerts from alertmanager. """ import asyncio import aiohttp.web import aiohttp.web_request import ssl from typing import ( Any, NoReturn ) from .config import Config def load_ssl_context(config:Config)->ssl.SSLContext: """ Load the SSL context from the config. """ ca_path = None if config.tls_auth: ca_path = config.ca_crt ssl_context = ssl.create_default_context( purpose=ssl.Purpose.CLIENT_AUTH, cafile=ca_path ) if config.tls_auth: ssl_context.verify_mode = ssl.CERT_REQUIRED ssl_context.load_cert_chain(config.tls_crt, config.tls_key) return ssl_context async def run_webhook( alert_queue: asyncio.Queue[dict[str, Any]], config: Config )->NoReturn: """ Run the webhook endpoint and put the alerts in the queue. """ async def handler(request:aiohttp.web_request.Request)->aiohttp.web.Response: alert = await request.json() await alert_queue.put(alert) return aiohttp.web.Response() async def health(request:aiohttp.web_request.Request)->aiohttp.web.Response: return aiohttp.web.Response(text="OK") app = aiohttp.web.Application() app.add_routes([ aiohttp.web.post(config.endpoint, handler), aiohttp.web.get("/health", health) ]) runner = aiohttp.web.AppRunner(app) await runner.setup() ssl_context = None if config.tls: ssl_context = load_ssl_context(config) site = aiohttp.web.TCPSite(runner, config.host, config.port, ssl_context=ssl_context) await site.start()