You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

36 lines
825 B
Python

"""
The webhook receiving the alerts from alertmanager.
"""
import asyncio
import aiohttp.web
import aiohttp.web_request
from typing import (
Any,
NoReturn
)
from .config import Config
ENDPOINT = "/webhook"
async def run_webhook(
alert_queue: asyncio.Queue[dict[str, Any]],
config: Config
)->NoReturn:
"""
Run the webhook endpoint and put the alerts in the queue.
"""
async def handler(request:aiohttp.web_request.Request):
alert = await request.json()
await alert_queue.put(alert)
return aiohttp.web.Response()
app = aiohttp.web.Application()
app.add_routes([aiohttp.web.post(ENDPOINT, handler)])
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, config.host, config.port)
await site.start()