# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge. # Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from typing import TYPE_CHECKING, ClassVar, List, Optional from asyncpg import Record from attr import dataclass, field from mautrix.types import ContentURI, RoomID, UserID from mautrix.util.async_db import Database from ..kt.types.bson import Long, to_optional_long from ..kt.types.channel.channel_type import ChannelType fake_db = Database.create("") if TYPE_CHECKING else None @dataclass class Portal: db: ClassVar[Database] = fake_db ktid: Long = field(converter=Long) kt_receiver: Long = field(converter=Long) kt_type: ChannelType mxid: Optional[RoomID] name: Optional[str] description: Optional[str] photo_id: Optional[str] avatar_url: Optional[ContentURI] encrypted: bool name_set: bool topic_set: bool avatar_set: bool fully_read_kt_chat: Optional[Long] = field(converter=to_optional_long) relay_user_id: Optional[UserID] @classmethod def _from_row(cls, row: Record) -> Portal: return cls(**row) @classmethod def _from_optional_row(cls, row: Optional[Record]) -> Optional[Portal]: return cls._from_row(row) if row is not None else None _columns = ( "ktid, kt_receiver, kt_type, mxid, name, description, photo_id, avatar_url, encrypted, " "name_set, avatar_set, fully_read_kt_chat, relay_user_id" ) @classmethod async def get_by_ktid(cls, ktid: int, kt_receiver: int) -> Optional[Portal]: q = f"SELECT {cls._columns} FROM portal WHERE ktid=$1 AND kt_receiver=$2" row = await cls.db.fetchrow(q, ktid, kt_receiver) return cls._from_optional_row(row) @classmethod async def get_by_mxid(cls, mxid: RoomID) -> Optional[Portal]: q = f"SELECT {cls._columns} FROM portal WHERE mxid=$1" row = await cls.db.fetchrow(q, mxid) return cls._from_optional_row(row) @classmethod async def get_all_by_receiver(cls, kt_receiver: int) -> List[Portal]: q = f"SELECT {cls._columns} FROM portal WHERE kt_receiver=$1" rows = await cls.db.fetch(q, kt_receiver) return [cls._from_row(row) for row in rows if row] @classmethod async def all(cls) -> List[Portal]: q = f"SELECT {cls._columns} FROM portal" rows = await cls.db.fetch(q) return [cls._from_row(row) for row in rows if row] @property def _values(self): return ( self.ktid, self.kt_receiver, self.kt_type, self.mxid, self.name, self.description, self.photo_id, self.avatar_url, self.encrypted, self.name_set, self.avatar_set, self.fully_read_kt_chat, self.relay_user_id, ) _args = "$" + ", $".join(str(i + 1) for i in range(_columns.count(',') + 1)) async def insert(self) -> None: q = f"INSERT INTO portal ({self._columns}) VALUES ({self._args})" await self.db.execute(q, *self._values) async def delete(self) -> None: q = "DELETE FROM portal WHERE ktid=$1 AND kt_receiver=$2" await self.db.execute(q, self.ktid, self.kt_receiver) _nonkey_column_asgns = ", ".join( map(lambda t: (i:=t[0], word:=t[1], f"{word}=${i + 3}")[-1], enumerate(_columns.split(", ")[2:]) ) ) async def save(self) -> None: q = f"UPDATE portal SET {self._nonkey_column_asgns} WHERE ktid=$1 AND kt_receiver=$2" await self.db.execute(q, *self._values)