""" The Client class. Connect to the matrix server and handle interactions with the server. """ import asyncio import nio from aiopath import AsyncPath from typing import ( Optional, NoReturn, Union ) from .async_utils import Aobject from .utils import ( Room, RoomAlias, RoomId ) class Client(Aobject): """ Connect to the matrix server and handle interactions with the server. allowed_rooms: dict of the rooms where the bot is allowed to connect, indexed by id (the name starting with '!'). If set to None, the bot connect to all room where it is invited. /!\ The client is initialized asyncronously: `client = await Client(...)` """ __client: nio.AsyncClient __rooms_by_aliases: dict[RoomAlias, Room] __rooms_by_id: dict[RoomId, Room] __sync_token_file: AsyncPath __sync_token:Optional[str] __sync_token_queue:asyncio.Queue[str] allowed_rooms: Optional[dict[RoomId, Room]] async def __init__( self, username: str, homeserver: str, password: str, allowed_rooms_names: Optional[list[Union[RoomAlias, RoomId]]]=None, sync_token_file:str="./.sync_token", ): """ Initialize the Client. username: the username used by the bot homeserver: the matrix home server of the bot (expl: "https://matrix.org") password: the password of the user allowed_rooms: the list of the rooms where the bot is allowed to connect (given by room id (expl: '!xxx:matrix.org') of room alias (expl: '#xxx:matrix.org')) sync_token_file: the file where is stored the last sync token received from the sync responses. This token avoid reloadind all the history of the bot each time we start it. """ self.__client = nio.AsyncClient( homeserver, username ) self.__rooms_by_aliases = {} self.__rooms_by_id = {} self.__sync_token_file = AsyncPath(sync_token_file) if (await self.__sync_token_file.is_file()): async with self.__sync_token_file.open() as file: self.__sync_token = (await file.read()).strip() else: self.__sync_token = None self.__sync_token_queue = asyncio.Queue() resp = await self.__client.login(password) if isinstance(resp, nio.responses.LoginError): raise RuntimeError(f"Fail to connect: {resp.message}") self.allowed_rooms = None if allowed_rooms_names: self.allowed_rooms = {} rooms = await asyncio.gather(*(self.resolve_room(room_name) for room_name in allowed_rooms_names)) for room in rooms: self.allowed_rooms[room.id] = room # room uniqueness is handled by self.resolve_room async def resolve_room( self, room_name: Union[RoomAlias, RoomId] )->Room: """ Lookup a room from its id or alias. If the name has already been resolved by this client, the room is return directly without querying the server. """ # If the room_name is empty: if len(room_name) == 0: raise ValueError(f"Invalid room_name: {room_name}") # If it is a known room id: if room_name[0] == '!' and room_name in self.__rooms_by_id: return self.__rooms_by_id[room_name] # If it is a unknown room id: elif room_name[0] == '!': room = Room(id=room_name) self.__rooms_by_id[room_name] = room return room # If it is not a room id nor a room alias: elif room_name[0] != '#': raise ValueError(f"Invalid room_name: {room_name}") # If it is a known room alias: elif room_name in self.__rooms_by_aliases: return self.__rooms_by_aliases[room_name] # If it is an unknown room alias: else: resp = await self.__client.room_resolve_alias(room_name) if isinstance(resp, nio.responses.RoomResolveAliasError): raise RuntimeError(f"Error while resolving alias: {resp.message}") # If the room is already known: if resp.room_id in self.__rooms_by_id: room = self.__rooms_by_id[resp.room_id] room.aliases.add(room_name) # If the room is unknwon: else: room = Room(id=resp.room_id,aliases={room_name}) self.__rooms_by_id[resp.room_id] = room self.__rooms_by_aliases[room_name] = room return room async def __save_sync_tocken(self)->NoReturn: """ Save the sync token. """ while True: token = await self.__sync_token_queue.get() async with self.__sync_token_file.open(mode='w') as file: await file.write(token) self.__sync_token_queue.task_done() async def __sync( self, sync_timeout:int=30 )->NoReturn: """ Sync with the server every sync_delta seconds. sync_timeout: the max time in sec between each sync. """ while True: sync_resp = await self.__client.sync( sync_delta*1000, since=self.__sync_token ) if isinstance(sync_resp, nio.responses.SyncError): print(f"Error while syncronizing: {sync_resp.message}") # TODO: use proper logging continue self.__sync_token = sync_resp.next_batch await self.__sync_token_queue.put(self.__sync_token) async def run( self, sync_timeout:int=30 )->NoReturn: """ Run the bot: sync with the server and execute callbacks. sync_timeout: the max time in sec between each sync. """ await asyncio.gather( self.__sync(sync_delta=sync_delta), self.__save_sync_tocken() )