You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

36 lines
794 B

3 years ago
The webhook receiving the alerts from alertmanager.
import asyncio
import aiohttp.web
import aiohttp.web_request
from typing import (
ENDPOINT = "/webhook"
async def run_webhook(
alert_queue: asyncio.Queue[dict[str, Any]],
host: str,
port: int
Run the webhook endpoint and put the alerts in the queue.
async def handler(request:aiohttp.web_request.Request):
alert = await request.json()
await alert_queue.put(alert)
return aiohttp.web.Response()
app = aiohttp.web.Application()
app.add_routes([, handler)])
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, host, port)
await site.start()