matrix-appservice-kakaotalk/matrix_appservice_kakaotalk/db/puppet.py

136 lines
4.7 KiB
Python
Raw Normal View History

2022-02-25 02:22:50 -05:00
# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge.
# Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from asyncpg import Record
from attr import dataclass
from yarl import URL
from mautrix.types import ContentURI, SyncToken, UserID
from mautrix.util.async_db import Database
from ..kt.types.bson import Long
fake_db = Database.create("") if TYPE_CHECKING else None
@dataclass
class Puppet:
db: ClassVar[Database] = fake_db
ktid: Long
name: str | None
photo_id: str | None
photo_mxc: ContentURI | None
name_set: bool
avatar_set: bool
is_registered: bool
custom_mxid: UserID | None
access_token: str | None
next_batch: SyncToken | None
base_url: URL | None
@classmethod
def _from_row(cls, row: Record) -> Puppet:
data = {**row}
ktid = data.pop("ktid")
base_url = data.pop("base_url", None)
return cls(**data, ktid=Long.from_optional_bytes(ktid), base_url=URL(base_url) if base_url else None)
@classmethod
def _from_optional_row(cls, row: Record | None) -> Puppet | None:
return cls._from_row(row) if row is not None else None
@classmethod
async def get_by_ktid(cls, ktid: Long) -> Puppet | None:
q = (
"SELECT ktid, name, photo_id, photo_mxc, name_set, avatar_set, is_registered, "
" custom_mxid, access_token, next_batch, base_url "
"FROM puppet WHERE ktid=$1"
)
row = await cls.db.fetchrow(q, bytes(ktid))
return cls._from_optional_row(row)
@classmethod
async def get_by_name(cls, name: str) -> Puppet | None:
q = (
"SELECT ktid, name, photo_id, photo_mxc, name_set, avatar_set, is_registered, "
" custom_mxid, access_token, next_batch, base_url "
"FROM puppet WHERE name=$1"
)
row = await cls.db.fetchrow(q, name)
return cls._from_optional_row(row)
@classmethod
async def get_by_custom_mxid(cls, mxid: UserID) -> Puppet | None:
q = (
"SELECT ktid, name, photo_id, photo_mxc, name_set, avatar_set, is_registered, "
" custom_mxid, access_token, next_batch, base_url "
"FROM puppet WHERE custom_mxid=$1"
)
row = await cls.db.fetchrow(q, mxid)
return cls._from_optional_row(row)
@classmethod
async def get_all_with_custom_mxid(cls) -> list[Puppet]:
q = (
"SELECT ktid, name, photo_id, photo_mxc, name_set, avatar_set, is_registered, "
" custom_mxid, access_token, next_batch, base_url "
"FROM puppet WHERE custom_mxid<>''"
)
rows = await cls.db.fetch(q)
return [cls._from_row(row) for row in rows if row]
@property
def _values(self):
return (
bytes(self.ktid),
self.name,
self.photo_id,
self.photo_mxc,
self.name_set,
self.avatar_set,
self.is_registered,
self.custom_mxid,
self.access_token,
self.next_batch,
str(self.base_url) if self.base_url else None,
)
async def insert(self) -> None:
q = """
INSERT INTO puppet (ktid, name, photo_id, photo_mxc, name_set, avatar_set,
is_registered, custom_mxid, access_token, next_batch, base_url)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"""
await self.db.execute(q, *self._values)
async def delete(self) -> None:
q = "DELETE FROM puppet WHERE ktid=$1"
await self.db.execute(q, bytes(self.ktid))
async def save(self) -> None:
q = """
UPDATE puppet SET name=$2, photo_id=$3, photo_mxc=$4, name_set=$5, avatar_set=$6,
is_registered=$7, custom_mxid=$8, access_token=$9, next_batch=$10,
base_url=$11
WHERE ktid=$1
"""
await self.db.execute(q, *self._values)