#!/usr/bin/env python3 import argparse import asyncio import dataclasses import aiohttp.web import jinja2 import markdown import nio import yaml @dataclasses.dataclass class Config: listen_addr: str listen_port: int matrix_homeserver: str matrix_user: str matrix_password: str matrix_room: str webhook_token: str TEMPLATES = { "task.create": ( "[{{ event_data.task.project_name }}] " "" "{{ event_data.task.title }}" " " "(#{{ event_data.task.id }}): " "{{ event_author }} created the task in " "'{{ event_data.task.column_title }}'\n" "
" "{{ event_data.task.description | markdown }}" "
" ), "task.move.column": ( "[{{ event_data.task.project_name }}] " "" "{{ event_data.task.title }}" " " "(#{{ event_data.task.id }}): " "{{ event_author }} moved the task to " "'{{ event_data.task.column_title }}'" ), "task.close": ( "[{{ event_data.task.project_name }}] " "" "{{ event_data.task.title }}" "" "(#{{ event_data.task.id }}): " "{{ event_author }} closed the task" ), "comment.create": ( "[{{ event_data.task.project_name }}] " "" "{{ event_data.task.title }}" " " "(#{{ event_data.task.id }}): " "{{ event_author }} added a comment:\n" "
" "{{ event_data.comment.comment | markdown }}" "
" ), } async def format_events(config, src, dest): env = jinja2.Environment() md = markdown.Markdown(extensions=["fenced_code"]) env.filters["markdown"] = lambda src: jinja2.Markup(md.convert(src)) while True: event = await src.get() print(event) try: template = env.from_string(TEMPLATES[event["event_name"]]) except KeyError: print("Unknown message type:", event["event_name"]) continue rendered = template.render(**event) await dest.put(rendered) async def send_notices(config, src): client = nio.AsyncClient(config.matrix_homeserver, config.matrix_user) await client.login(config.matrix_password) while True: formatted = str(await src.get()) await client.room_send( config.matrix_room, message_type="m.room.message", content={ "msgtype": "m.notice", "format": "org.matrix.custom.html", "body": formatted, # à corriger "formatted_body": formatted, }, ) async def run_app(config, dest): async def webhook(request): tokens = request.query.getall("token", []) if config.webhook_token not in tokens: raise aiohttp.web.HTTPForbidden("Invalid token") event = await request.json() await dest.put(event) app = aiohttp.web.Application() app.add_routes([aiohttp.web.post("/webhook", webhook)]) runner = aiohttp.web.AppRunner(app) await runner.setup() await aiohttp.web.TCPSite( runner, config.listen_addr, config.listen_port ).start() async def main(): parser = argparse.ArgumentParser() parser.add_argument("-c", "--config-file", default="config.yaml") args = parser.parse_args() with open(args.config_file) as f: config = yaml.safe_load(f) config = Config(**config) events = asyncio.Queue() formatted = asyncio.Queue() tasks = ( asyncio.create_task(run_app(config, events)), asyncio.create_task(format_events(config, events, formatted)), asyncio.create_task(send_notices(config, formatted)), ) # on propage les exceptions await asyncio.gather(*tasks, return_exceptions=False) if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main())