#!/usr/bin/env python3 import argparse import asyncio import dataclasses import logging import typing import aiohttp.web import jinja2 import markdown import nio import yaml @dataclasses.dataclass class Config: matrix_homeserver: str matrix_user: str matrix_password: str matrix_rooms: typing.Mapping[int, str] webhook_token: str TEMPLATES = { "task.create": ( "[{{ event_data.task.project_name }}] " "" "{{ event_data.task.title }}" " " "(#{{ event_data.task.id }}): " "{{ event_author }} created the task in " "'{{ event_data.task.column_title }}'\n" "
" "{{ event_data.task.description | markdown }}" "
" ), "task.move.column": ( "[{{ event_data.task.project_name }}] " "" "{{ event_data.task.title }}" " " "(#{{ event_data.task.id }}): " "{{ event_author }} moved the task to " "'{{ event_data.task.column_title }}'" ), "task.close": ( "[{{ event_data.task.project_name }}] " "" "{{ event_data.task.title }}" "" "(#{{ event_data.task.id }}): " "{{ event_author }} closed the task" ), "comment.create": ( "[{{ event_data.task.project_name }}] " "" "{{ event_data.task.title }}" " " "(#{{ event_data.task.id }}): " "{{ event_author }} added a comment:\n" "
" "{{ event_data.comment.comment | markdown }}" "
" ), } async def format_events(config, src, dest): env = jinja2.Environment() md = markdown.Markdown(extensions=["fenced_code"]) env.filters["markdown"] = lambda src: jinja2.Markup(md.convert(src)) while True: event = await src.get() logging.debug("Formatting message: %s", event) try: project = event["event_data"]["project_id"] except KeyError: logging.warning("Missing project ID: %s", event) continue try: room = config.matrix_rooms[project] except KeyError: logging.info("Unknown project ID: %d", project) continue try: template = env.from_string(TEMPLATES[event["event_name"]]) except KeyError: logging.info("Unknown message type: %s", event["event_name"]) continue rendered = template.render(**event) await dest.put((room, rendered)) async def send_notices(config, src): client = nio.AsyncClient(config.matrix_homeserver, config.matrix_user) await client.login(config.matrix_password) while True: room, formatted = await src.get() await client.room_send( room, message_type="m.room.message", content={ "msgtype": "m.notice", "format": "org.matrix.custom.html", "body": formatted, # à corriger "formatted_body": formatted, }, ) async def run_app(config, dest, host, port): async def webhook(request): tokens = request.query.getall("token", []) if config.webhook_token not in tokens: logging.warning("Invalid tokens: %s", tokens) raise aiohttp.web.HTTPForbidden("Invalid tokens") event = await request.json() await dest.put(event) app = aiohttp.web.Application() app.add_routes([aiohttp.web.post("/webhook", webhook)]) runner = aiohttp.web.AppRunner(app) await runner.setup() await aiohttp.web.TCPSite(runner, host, port).start() async def main(): logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", default="config.yaml") parser.add_argument("--host", default="0.0.0.0") parser.add_argument("--port", type=int, default=8000) args = parser.parse_args() with open(args.config) as f: config = yaml.safe_load(f) config = Config(**config) events = asyncio.Queue() formatted = asyncio.Queue() logging.info("Started Kanbot on %s:%d", args.host, args.port) tasks = ( asyncio.create_task(run_app(config, events, args.host, args.port)), asyncio.create_task(format_events(config, events, formatted)), asyncio.create_task(send_notices(config, formatted)), ) # on propage les exceptions await asyncio.gather(*tasks, return_exceptions=False) if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main())