#!/usr/bin/env python3 import argparse import asyncio import dataclasses import logging import aiohttp.web import jinja2 import nio import yaml @dataclasses.dataclass class Config: matrix_homeserver: str matrix_user: str matrix_password: str matrix_room: str webhook_token: str TEMPLATE_TEXT = ( "{% if status == 'resolved' %}" "RÉSOLU " "{% elif labels.severity == 'critical' %}" "@room CRITIQUE " "{% elif labels.severity == 'warning' %}" "ATTENTION " "{% endif %}" "{{ labels.alertname }} {{ labels.instance }}" "{% if annotations.summary %}" "\n{{ annotations.summary }}" "{% else %}" "{% for key, value in annotations.items() %}" "\n{{ key }} : {{ value }}" "{% endfor %}" "{% endif %}" ) TEMPLATE_HTML = ( "{% if status == 'resolved' %}" "RÉSOLU " "{% elif labels.severity == 'critical' %}" "@room CRITIQUE " "{% elif labels.severity == 'warning' %}" "ATTENTION " "{% endif %}" "{{ labels.alertname }} " "{{ labels.instance }}" "{% if annotations.summary %}" "
" "
{{ annotations.summary }}
" "{% else %}" "
" "" "{% endif %}" ) async def format_messages(config, src, dest): env = jinja2.Environment() template_text = env.from_string(TEMPLATE_TEXT) template_html = env.from_string(TEMPLATE_HTML) while True: message = await src.get() for alert in message["alerts"]: await dest.put( ( template_text.render(**alert), template_html.render(**alert), ) ) async def send_notices(config, messages): client = nio.AsyncClient(config.matrix_homeserver, config.matrix_user) await client.login(config.matrix_password) while True: text, html = await messages.get() await client.room_send( config.matrix_room, message_type="m.room.message", content={ "msgtype": "m.text", "format": "org.matrix.custom.html", "body": text, "formatted_body": html, }, ) async def run_app(config, out, host, port): async def webhook(request): tokens = request.query.getall("token", []) if config.webhook_token not in tokens: logging.warning("Invalid tokens: %s", tokens) raise aiohttp.web.HTTPForbidden("Invalid tokens") message = await request.json() logging.debug("Incoming message: %s", message) await out.put(message) return aiohttp.web.Response() app = aiohttp.web.Application() app.add_routes([aiohttp.web.post("/webhook", webhook)]) runner = aiohttp.web.AppRunner(app) await runner.setup() await aiohttp.web.TCPSite(runner, host, port).start() async def main(): logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", default="config.yaml") parser.add_argument("--host", default="*") parser.add_argument("--port", type=int, default=8000) args = parser.parse_args() with open(args.config) as f: config = yaml.safe_load(f) config = Config(**config) messages = asyncio.Queue() formatted = asyncio.Queue() logging.info("Started Alertbot on %s:%d", args.host, args.port) tasks = ( asyncio.create_task(run_app(config, messages, args.host, args.port)), asyncio.create_task(format_messages(config, messages, formatted)), asyncio.create_task(send_notices(config, formatted)), ) # on propage les exceptions await asyncio.gather(*tasks, return_exceptions=False) if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main())