# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge. # Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. from __future__ import annotations from typing import TYPE_CHECKING, ClassVar from asyncpg import Record from attr import dataclass from mautrix.types import ContentURI, RoomID, UserID from mautrix.util.async_db import Database from ..kt.types.bson import Long from ..kt.types.channel.channel_type import ChannelType fake_db = Database.create("") if TYPE_CHECKING else None @dataclass class Portal: db: ClassVar[Database] = fake_db ktid: Long kt_receiver: Long kt_type: ChannelType mxid: RoomID | None name: str | None photo_id: str | None avatar_url: ContentURI | None encrypted: bool name_set: bool avatar_set: bool relay_user_id: UserID | None @classmethod def _from_row(cls, row: Record) -> Portal: data = {**row} ktid = data.pop("ktid") kt_receiver = data.pop("kt_receiver") return cls(**data, ktid=Long.from_bytes(ktid), kt_receiver=Long.from_bytes(kt_receiver)) @classmethod def _from_optional_row(cls, row: Record | None) -> Portal | None: return cls._from_row(row) if row is not None else None @classmethod async def get_by_ktid(cls, ktid: Long, kt_receiver: Long) -> Portal | None: q = """ SELECT ktid, kt_receiver, kt_type, mxid, name, photo_id, avatar_url, encrypted, name_set, avatar_set, relay_user_id FROM portal WHERE ktid=$1 AND kt_receiver=$2 """ row = await cls.db.fetchrow(q, bytes(ktid), bytes(kt_receiver)) return cls._from_optional_row(row) @classmethod async def get_by_mxid(cls, mxid: RoomID) -> Portal | None: q = """ SELECT ktid, kt_receiver, kt_type, mxid, name, photo_id, avatar_url, encrypted, name_set, avatar_set, relay_user_id FROM portal WHERE mxid=$1 """ row = await cls.db.fetchrow(q, mxid) return cls._from_optional_row(row) @classmethod async def get_all_by_receiver(cls, kt_receiver: Long) -> list[Portal]: q = """ SELECT ktid, kt_receiver, kt_type, mxid, name, photo_id, avatar_url, encrypted, name_set, avatar_set, relay_user_id FROM portal WHERE kt_receiver=$1 """ rows = await cls.db.fetch(q, bytes(kt_receiver)) return [cls._from_row(row) for row in rows if row] @classmethod async def all(cls) -> list[Portal]: q = """ SELECT ktid, kt_receiver, kt_type, mxid, name, photo_id, avatar_url, encrypted, name_set, avatar_set, relay_user_id FROM portal """ rows = await cls.db.fetch(q) return [cls._from_row(row) for row in rows if row] @property def _values(self): return ( Long.to_optional_bytes(self.ktid), Long.to_optional_bytes(self.kt_receiver), self.kt_type, self.mxid, self.name, self.photo_id, self.avatar_url, self.encrypted, self.name_set, self.avatar_set, self.relay_user_id, ) async def insert(self) -> None: q = """ INSERT INTO portal (ktid, kt_receiver, kt_type, mxid, name, photo_id, avatar_url, encrypted, name_set, avatar_set, relay_user_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) """ await self.db.execute(q, *self._values) async def delete(self) -> None: q = "DELETE FROM portal WHERE ktid=$1 AND kt_receiver=$2" await self.db.execute(q, Long.to_optional_bytes(self.ktid), Long.to_optional_bytes(self.kt_receiver)) async def save(self) -> None: q = """ UPDATE portal SET kt_type=$3, mxid=$4, name=$5, photo_id=$6, avatar_url=$7, encrypted=$8, name_set=$9, avatar_set=$10, relay_user_id=$11 WHERE ktid=$1 AND kt_receiver=$2 """ await self.db.execute(q, *self._values)