# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge.
# Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
import asyncio
import time

from mautrix.bridge import BaseUser, async_getter_lock
from mautrix.types import (
    EventID,
    JSON,
    MessageType,
    RoomID,
    SerializerError,
    TextMessageEventContent,
    UserID,
)
from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
from mautrix.util.opt_prometheus import Gauge, Summary, async_time
from mautrix.util.simple_lock import SimpleLock

from . import portal as po, puppet as pu
from .config import Config
from .db import User as DBUser, LoginCredential

from .kt.client import Client
from .kt.client.errors import (
    AuthenticationRequired,
    AnotherLogonDetected,
    IncorrectPassword,
    OAuthException,
    ResponseError,
)
from .kt.client.types import PortalChannelInfo, SettingsStruct
from .kt.types.bson import Long
from .kt.types.channel.channel_info import ChannelInfo, NormalChannelInfo, NormalChannelData
from .kt.types.channel.channel_type import ChannelType, KnownChannelType
from .kt.types.chat.chat import Chatlog
from .kt.types.client.client_session import LoginDataItem, LoginResult
from .kt.types.oauth import OAuthCredential
from .kt.types.openlink.open_channel_info import OpenChannelData, OpenChannelInfo
from .kt.types.openlink.open_link_type import OpenChannelUserPerm
from .kt.types.openlink.open_link_user_info import OpenLinkChannelUserInfo
from .kt.types.packet.chat.kickout import KnownKickoutType, KickoutRes

METRIC_CONNECT_AND_SYNC = Summary("bridge_connect_and_sync", "calls to connect_and_sync")
METRIC_CHAT = Summary("bridge_on_chat", "calls to on_chat")
METRIC_CHAT_DELETED = Summary("bridge_on_chat_deleted", "calls to on_chat_deleted")
METRIC_CHAT_READ = Summary("bridge_on_chat_read", "calls to on_chat_read")
METRIC_CHANNEL_META_CHANGE = Summary("bridge_on_channel_meta_change", "calls to on_channel_meta_change")
METRIC_PROFILE_CHANGE = Summary("bridge_on_profile_changed", "calls to on_profile_changed")
METRIC_PERM_CHANGE = Summary("bridge_on_perm_changed", "calls to on_perm_changed")
METRIC_CHANNEL_ADDED = Summary("bridge_on_channel_added", "calls to on_channel_added")
METRIC_CHANNEL_JOIN = Summary("bridge_on_channel_join", "calls to on_channel_join")
METRIC_CHANNEL_LEFT = Summary("bridge_on_channel_left", "calls to on_channel_left")
METRIC_CHANNEL_KICKED = Summary("bridge_on_channel_kicked", "calls to on_channel_kicked")
METRIC_USER_JOIN = Summary("bridge_on_user_join", "calls to on_user_join")
METRIC_USER_LEFT = Summary("bridge_on_user_left", "calls to on_user_left")
METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk")

if TYPE_CHECKING:
    from .__main__ import KakaoTalkBridge

BridgeState.human_readable_errors.update(
    {
        "kt-reconnection-error": "Failed to reconnect to KakaoTalk",
        # TODO Use for errors in Node backend that cause session to be lost
        #"kt-connection-error": "KakaoTalk disconnected unexpectedly",
        "kt-auth-error": "Authentication error from KakaoTalk: {message}",
        "logged-out": "You're not logged into KakaoTalk",
    }
)


class User(DBUser, BaseUser):
    temp_disconnect_notices: bool = True
    shutdown: bool = False
    config: Config

    by_mxid: dict[UserID, User] = {}
    by_ktid: dict[int, User] = {}

    _client: Client | None

    _notice_room_lock: asyncio.Lock
    _notice_send_lock: asyncio.Lock
    is_admin: bool
    permission_level: str
    _is_logged_in: bool | None
    _is_connected: bool | None
    _connection_time: float
    _db_instance: DBUser | None
    _sync_lock: SimpleLock
    _is_rpc_reconnecting: bool
    _logged_in_info: SettingsStruct | None
    _logged_in_info_time: float

    def __init__(
        self,
        mxid: UserID,
        force_login: bool,
        was_connected: bool,
        ktid: Long | None = None,
        uuid: str | None = None,
        access_token: str | None = None,
        refresh_token: str | None = None,
        notice_room: RoomID | None = None,
    ) -> None:
        super().__init__(
            mxid=mxid,
            force_login=force_login,
            was_connected=was_connected,
            ktid=ktid,
            uuid=uuid,
            access_token=access_token,
            refresh_token=refresh_token,
            notice_room=notice_room
        )
        BaseUser.__init__(self)
        self.notice_room = notice_room
        self._notice_room_lock = asyncio.Lock()
        self._notice_send_lock = asyncio.Lock()
        (
            self.relay_whitelisted,
            self.is_whitelisted,
            self.is_admin,
            self.permission_level,
        ) = self.config.get_permissions(mxid)
        self._is_logged_in = None
        self._is_connected = None
        self._connection_time = time.monotonic()
        self._sync_lock = SimpleLock(
            "Waiting for channel sync to finish before handling %s", log=self.log
        )
        self._is_rpc_reconnecting = False
        self._logged_in_info = None
        self._logged_in_info_time = 0

        self._client = None

    @classmethod
    def init_cls(cls, bridge: KakaoTalkBridge) -> AsyncIterable[Awaitable[bool]]:
        cls.bridge = bridge
        cls.config = bridge.config
        cls.az = bridge.az
        cls.temp_disconnect_notices = bridge.config["bridge.temporary_disconnect_notices"]
        return (user.reload_session(is_startup=True) async for user in cls.all_logged_in())

    @property
    def client(self) -> Client:
        if not self._client:
            raise ValueError("User must be logged in before its client can be used")
        return self._client

    @property
    def is_connected(self) -> bool | None:
        return self._is_connected

    @is_connected.setter
    def is_connected(self, val: bool | None) -> None:
        if self._is_connected != val:
            self._is_connected = val
            self._connection_time = time.monotonic()

    @property
    def connection_time(self) -> float:
        return self._connection_time

    @property
    def has_state(self) -> bool:
        # TODO If more state is needed, consider returning a saved LoginResult
        return bool(self.access_token and self.refresh_token)

    # region Database getters

    def _add_to_cache(self) -> None:
        self.by_mxid[self.mxid] = self
        if self.ktid:
            self.by_ktid[self.ktid] = self

    @classmethod
    async def all_logged_in(cls) -> AsyncGenerator["User", None]:
        users = await super().all_logged_in()
        user: cls
        for user in users:
            try:
                yield cls.by_mxid[user.mxid]
            except KeyError:
                user._add_to_cache()
                yield user

    @classmethod
    @async_getter_lock
    async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
        if pu.Puppet.get_id_from_mxid(mxid) or mxid == cls.az.bot_mxid:
            return None
        try:
            return cls.by_mxid[mxid]
        except KeyError:
            pass

        user = cast(cls, await super().get_by_mxid(mxid))
        if user is not None:
            user._add_to_cache()
            return user

        if create:
            cls.log.debug(f"Creating user instance for {mxid}")
            user = cls(mxid)
            await user.insert()
            user._add_to_cache()
            return user

        return None

    @classmethod
    @async_getter_lock
    async def get_by_ktid(cls, ktid: int) -> User | None:
        try:
            return cls.by_ktid[ktid]
        except KeyError:
            pass

        user = cast(cls, await super().get_by_ktid(ktid))
        if user is not None:
            user._add_to_cache()
            return user

        return None

    async def get_uuid(self, force: bool = False) -> str:
        if self.uuid is None or force:
            self.uuid = await self._generate_uuid()
            # TODO Maybe don't save yet
            await self.save()
        return self.uuid

    @classmethod
    async def _generate_uuid(cls) -> str:
        return await Client.generate_uuid(await super().get_all_uuids())

    # endregion

    @property
    def oauth_credential(self) -> OAuthCredential:
        assert self.ktid is not None
        assert self.uuid is not None
        assert self.access_token is not None
        assert self.refresh_token is not None
        return OAuthCredential(
            self.ktid,
            self.uuid,
            self.access_token,
            self.refresh_token,
        )

    @oauth_credential.setter
    def oauth_credential(self, oauth_credential: OAuthCredential) -> None:
        self.ktid = oauth_credential.userId
        self.access_token = oauth_credential.accessToken
        self.refresh_token = oauth_credential.refreshToken
        if self.uuid != oauth_credential.deviceUUID:
            self.log.warning(f"UUID mismatch: expected {self.uuid}, got {oauth_credential.deviceUUID}")
            self.uuid = oauth_credential.deviceUUID

    async def get_own_info(self) -> SettingsStruct | None:
        if self._client and (not self._logged_in_info or self._logged_in_info_time + 60 * 60 < time.monotonic()):
            self._logged_in_info = await self._client.get_settings()
            self._logged_in_info_time = time.monotonic()
        return self._logged_in_info

    async def _load_session(self, is_startup: bool) -> bool:
        if self._is_logged_in and is_startup:
            return True
        if not self.was_connected and not self.config["bridge.remain_logged_in_on_disconnect"]:
            self.log.warning("Not logging in because last session was disconnected, and login+disconnection is forbidden by config")
            await self.push_bridge_state(
                BridgeStateEvent.LOGGED_OUT,
                error="logged-out",
            )
            return False
        latest_exc: Exception | None = None
        password_ok = False
        try:
            creds = await LoginCredential.get_by_mxid(self.mxid)
        except Exception as e:
            self.log.exception("Exception while looking for saved password")
            creds = None
        if creds:
            uuid = await self.get_uuid()
            form = creds.get_form()
            oauth_credential = None
            try:
                oauth_credential = await Client.login(uuid=uuid, form=form, forced=False)
            except IncorrectPassword as e:
                latest_exc = e
            except AnotherLogonDetected as e:
                if self.force_login:
                    try:
                        # Wait a moment to make it look like a user-initiated response
                        await asyncio.sleep(2)
                        oauth_credential = await Client.login(uuid=uuid, form=form, forced=True)
                    except OAuthException as e:
                        latest_exc = e
            if oauth_credential:
                self.oauth_credential = oauth_credential
                await self.save()
                password_ok = True
            else:
                try:
                    await creds.delete()
                except:
                    self.log.exception("Exception while deleting incorrect password")
        if password_ok or self.config["bridge.allow_token_relogin"] and self.has_state:
            if not password_ok:
                self.log.warning("Using token-based relogin after password-based relogin failed")
        elif latest_exc:
            raise latest_exc
        else:
            # If we have a user in the DB with no state, we can assume
            # KT logged us out and the bridge has restarted
            await self.push_bridge_state(
                BridgeStateEvent.BAD_CREDENTIALS,
                error="logged-out",
            )
            return False
        client = Client(self, log=self.log.getChild("ktclient"))
        user_info = await client.start()
        # NOTE On failure, client.start throws instead of returning something falsy
        self.log.info("Loaded session successfully")
        self._client = client
        self._logged_in_info = user_info
        self._logged_in_info_time = time.monotonic()
        self._track_metric(METRIC_LOGGED_IN, True)
        self._is_logged_in = True
        self.is_connected = None
        asyncio.create_task(self.post_login(is_startup=is_startup, is_token_login=not password_ok))
        return True

    async def _send_reset_notice(self, e: OAuthException, edit: EventID | None = None) -> None:
        await self.send_bridge_notice(
            "Got authentication error from KakaoTalk:\n\n"
            f"> {e.message}\n\n"
            "If you changed your KakaoTalk password, this "
            "is normal and you just need to log in again.",
            edit=edit,
            important=True,
            state_event=BridgeStateEvent.BAD_CREDENTIALS,
            error_code="kt-auth-error",
            error_message=str(e),
        )
        await self.logout(remove_ktid=False)

    async def is_logged_in(self, _override: bool = False) -> bool:
        if not self.has_state or not self._client:
            return False
        if self._is_logged_in is None or _override:
            try:
                self._is_logged_in = bool(await self.get_own_info())
            except SerializerError:
                self.log.exception(
                    "Unable to deserialize settings struct, "
                    "but didn't get auth error, so treating user as logged in"
                )
                self._is_logged_in = True
            except Exception:
                self.log.exception("Exception checking login status")
                self._is_logged_in = False
        return self._is_logged_in

    async def reload_session(
        self, event_id: EventID | None = None, retries: int = 3, is_startup: bool = False
    ) -> None:
        try:
            if not await self._load_session(is_startup=is_startup) and self.has_state:
                await self.send_bridge_notice(
                    "Logged out of KakaoTalk. Must use the `login` command to log back in.",
                    important=True,
                )
                await self.logout(remove_ktid=False)
        except OAuthException as e:
            await self._send_reset_notice(e, edit=event_id)
        # TODO Throw a ResponseError on network failures
        except ResponseError as e:
            will_retry = retries > 0
            retry = "Retrying in 1 minute" if will_retry else "Not retrying"
            notice = f"Failed to connect to KakaoTalk: unknown response error {e}. {retry}"
            if will_retry:
                await self.send_bridge_notice(
                    notice,
                    edit=event_id,
                    state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
                )
                await asyncio.sleep(60)
                await self.reload_session(event_id, retries - 1)
            else:
                await self.send_bridge_notice(
                    notice,
                    edit=event_id,
                    important=True,
                    state_event=BridgeStateEvent.UNKNOWN_ERROR,
                    error_code="kt-reconnection-error",
                )
        except Exception:
            self.log.exception("Error connecting to KakaoTalk")
            await self.send_bridge_notice(
                "Failed to connect to KakaoTalk: unknown error (see logs for more details)",
                edit=event_id,
                state_event=BridgeStateEvent.UNKNOWN_ERROR,
                error_code="kt-reconnection-error",
            )
        finally:
            self._is_rpc_reconnecting = False

    async def logout(self, *, remove_ktid: bool = True, reset_device: bool = False) -> None:
        if self._client:
            # TODO Look for a logout API call
            await self._client.stop()
        if remove_ktid:
            await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
        self._track_metric(METRIC_LOGGED_IN, False)

        if reset_device:
            self.uuid = await self._generate_uuid()
        self.access_token = None
        self.refresh_token = None

        self._is_logged_in = False
        self.is_connected = None
        self.was_connected = False
        self._client = None

        if self.ktid and remove_ktid:
            #await UserPortal.delete_all(self.ktid)
            del self.by_ktid[self.ktid]
            self.ktid = None

        await self.save()

    async def post_login(self, is_startup: bool, is_token_login: bool) -> None:
        self.log.info("Running post-login actions")
        assert self.ktid
        self._add_to_cache()

        try:
            puppet = await pu.Puppet.get_by_ktid(self.ktid)

            if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
                self.log.info(f"Automatically enabling custom puppet")
                await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
        except Exception:
            self.log.exception("Failed to automatically enable custom puppet")

        if not is_token_login or self.was_connected and self.config["bridge.reconnect_on_token_relogin"]:
            # TODO Check if things break when a live message comes in during syncing
            if self.config["bridge.sync_on_startup"] or not is_startup:
                sync_count = self.config["bridge.initial_chat_sync"]
            else:
                sync_count = None
            await self.connect_and_sync(sync_count)
        else:
            await self.send_bridge_notice(
                f"Logged into KakaoTalk. To connect to chatroom updates, use the `sync` command."
            )
            self.was_connected = False
            await self.save()

    async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
        return {
            pu.Puppet.get_mxid_from_id(portal.ktid): [portal.mxid]
            async for portal in po.Portal.get_all_by_receiver(self.ktid)
            if portal.mxid
        } if self.ktid else {}

    @async_time(METRIC_CONNECT_AND_SYNC)
    async def connect_and_sync(self, sync_count: int | None) -> bool:
        # TODO Look for a way to sync all channels without (re-)logging in
        try:
            login_result = await self.client.connect()
            should_sync = await self.on_connect()
            if login_result and should_sync:
                await self._sync_channels(login_result, sync_count)
            return True
        except AuthenticationRequired as e:
            await self.send_bridge_notice(
                f"Got authentication error from KakaoTalk:\n\n> {e.message}\n\n",
                important=True,
                state_event=BridgeStateEvent.BAD_CREDENTIALS,
                error_code="kt-auth-error",
                error_message=str(e),
            )
            await self.logout(remove_ktid=False)
        except Exception as e:
            self.log.exception("Failed to connect and sync")
            await self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR, message=str(e))
        return False

    async def _sync_channels(self, login_result: LoginResult, sync_count: int | None) -> None:
        if sync_count is None:
            sync_count = self.config["bridge.initial_chat_sync"]
        if not sync_count:
            self.log.debug("Skipping channel syncing")
            return
        # TODO Sync left channels. It's not login_result.removedChannelIdList...
        if not login_result.channelList:
            self.log.debug("No channels to sync")
            return

        num_channels = len(login_result.channelList)
        sync_count = num_channels if sync_count < 0 else min(sync_count, num_channels)
        await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
        self.log.debug(f"Syncing {sync_count} of {num_channels} channels...")

        def get_channel_update_time(login_data: LoginDataItem):
            channel_info = login_data.channel.info
            return channel_info.lastChatLog.sendAt if channel_info.lastChatLog else 0

        for login_data in sorted(
            login_result.channelList,
            reverse=True,
            key=get_channel_update_time
        )[:sync_count]:
            try:
                await self._sync_channel_on_login(login_data)
            except AuthenticationRequired:
                raise
            except Exception:
                self.log.exception(f"Failed to sync channel {login_data.channel.channelId}")

        await self.update_direct_chats()

    def _sync_channel_on_login(self, login_data: LoginDataItem) -> Awaitable[None]:
        channel_data = login_data.channel
        self.log.debug(f"Syncing channel {channel_data.channelId} (last updated at {login_data.lastUpdate})")
        channel_info = channel_data.info
        if isinstance(channel_data, NormalChannelData):
            channel_data: NormalChannelData
            channel_info: NormalChannelInfo
            self.log.debug(f"Join time: {channel_info.joinTime}")
        elif isinstance(channel_data, OpenChannelData):
            channel_data: OpenChannelData
            self.log.debug(f"channel_data link ID: {channel_data.linkId}")
            channel_info: OpenChannelInfo
            self.log.debug(f"channel_info link ID: {channel_info.linkId}")
            self.log.debug(f"openToken: {channel_info.openToken}")
            self.log.debug(f"Is direct channel: {channel_info.directChannel}")
            self.log.debug(f"Has OpenLink: {channel_info.openLink is not None}")
        else:
            self.log.error(f"Unexpected channel type: {type(channel_data)}")

        channel_info: ChannelInfo
        self.log.debug(f"channel_info channel ID: {channel_info.channelId}")
        self.log.debug(f"Channel data/info IDs match: {channel_data.channelId == channel_info.channelId}")
        self.log.debug(f"Channel type: {channel_info.type}")
        self.log.debug(f"Active user count: {channel_info.activeUserCount}")
        self.log.debug(f"New chat count: {channel_info.newChatCount}")
        self.log.debug(f"New chat count invalid: {channel_info.newChatCountInvalid}")
        self.log.debug(f"Last chat log ID: {channel_info.lastChatLogId}")
        self.log.debug(f"Last seen log ID: {channel_info.lastSeenLogId}")
        self.log.debug(f"Has last chat log: {channel_info.lastChatLog is not None}")
        self.log.debug(f"metaMap: {channel_info.metaMap}")
        self.log.debug(f"User count: {len(channel_info.displayUserList)}")
        self.log.debug(f"Has push alert: {channel_info.pushAlert}")
        for display_user_info in channel_info.displayUserList:
            self.log.debug(f"Member: {display_user_info.nickname} - {display_user_info.profileURL} - {display_user_info.userId}")

        return self._sync_channel(channel_info)

    async def _sync_channel(self, channel_info: ChannelInfo) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_info.channelId,
            kt_receiver=self.ktid,
            kt_type=channel_info.type
        )
        portal_info = await self.client.get_portal_channel_info(portal.channel_props)
        portal_info.channel_info = channel_info
        if not portal.mxid:
            await portal.create_matrix_room(self, portal_info)
        else:
            await portal.update_matrix_room(self, portal_info)
            await portal.backfill(self, is_initial=False, channel_info=channel_info)

    async def get_notice_room(self) -> RoomID:
        if not self.notice_room:
            async with self._notice_room_lock:
                # If someone already created the room while this call was waiting,
                # don't make a new room
                if self.notice_room:
                    return self.notice_room
                creation_content = {}
                if not self.config["bridge.federate_rooms"]:
                    creation_content["m.federate"] = False
                self.notice_room = await self.az.intent.create_room(
                    is_direct=True,
                    invitees=[self.mxid],
                    topic="KakaoTalk bridge notices",
                    creation_content=creation_content,
                )
                await self.save()
        return self.notice_room

    async def send_bridge_notice(
        self,
        text: str,
        edit: EventID | None = None,
        state_event: BridgeStateEvent | None = None,
        important: bool = False,
        error_code: str | None = None,
        error_message: str | None = None,
    ) -> EventID | None:
        if state_event:
            await self.push_bridge_state(
                state_event,
                error=error_code,
                message=error_message if error_code else text,
            )
        if self.config["bridge.disable_bridge_notices"]:
            return None
        event_id = None
        try:
            self.log.debug("Sending bridge notice: %s", text)
            content = TextMessageEventContent(
                body=text,
                msgtype=(MessageType.TEXT if important else MessageType.NOTICE),
            )
            if edit:
                content.set_edit(edit)
            # This is locked to prevent notices going out in the wrong order
            async with self._notice_send_lock:
                event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
        except Exception:
            self.log.warning("Failed to send bridge notice", exc_info=True)
        return edit or event_id

    async def fill_bridge_state(self, state: BridgeState) -> None:
        await super().fill_bridge_state(state)
        if self.ktid:
            state.remote_id = str(self.ktid)
            puppet = await pu.Puppet.get_by_ktid(self.ktid)
            state.remote_name = puppet.name

    async def get_bridge_states(self) -> list[BridgeState]:
        if not self.has_state:
            return []
        state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
        if self.is_connected:
            state.state_event = BridgeStateEvent.CONNECTED
        elif self._is_rpc_reconnecting or self._client:
            state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
        return [state]

    async def get_puppet(self) -> pu.Puppet | None:
        if not self.ktid:
            return None
        return await pu.Puppet.get_by_ktid(self.ktid)

    async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
        # TODO Make upstream request to return custom failure message
        if not self.ktid or not self.is_connected:
            return None
        if puppet.ktid != self.ktid:
            kt_type = KnownChannelType.DirectChat
            ktid = await self.client.get_friend_dm_id(puppet.ktid)
        else:
            kt_type = KnownChannelType.MemoChat
            memo_ids = await self.client.get_memo_ids()
            if not memo_ids:
                ktid = Long(0)
            else:
                ktid = memo_ids[0]
                if len(memo_ids) > 1:
                    self.log.info("Found multiple memo chats, so using the first one as a fallback")
        if ktid:
            return await po.Portal.get_by_ktid(
                ktid, kt_receiver=self.ktid, create=create, kt_type=kt_type
            )
        else:
            self.log.warning(f"Didn't find an existing DM channel with KakaoTalk user {puppet.ktid}, so not creating one")
            return None

    # region KakaoTalk event handling

    async def on_connect(self) -> bool:
        now = time.monotonic()
        disconnected_at = self._connection_time
        max_delay = self.config["bridge.resync_max_disconnected_time"]
        first_connect = self.is_connected is None
        self.is_connected = True
        self._track_metric(METRIC_CONNECTED, True)
        duration = int(now - disconnected_at)
        skip_sync = not first_connect and duration < max_delay
        if skip_sync:
            self.log.debug(f"Disconnection lasted {duration} seconds, not re-syncing channels...")
        elif self.temp_disconnect_notices:
            await self.send_bridge_notice("Connected to KakaoTalk chats")
        await self.push_bridge_state(BridgeStateEvent.CONNECTED)
        self.was_connected = True
        await self.save()
        return not skip_sync

    async def on_disconnect(self, res: KickoutRes | None) -> None:
        self.is_connected = False
        self._track_metric(METRIC_CONNECTED, False)
        logout = not self.config["bridge.remain_logged_in_on_disconnect"]
        if res:
            if res.reason == KnownKickoutType.LOGIN_ANOTHER:
                reason_str = "Logged in on a PC or another bridge."
            elif res.reason == KnownKickoutType.CHANGE_SERVER:
                # TODO Reconnect automatically instead
                reason_str = "KakaoTalk backend server changed."
            elif res.reason == KnownKickoutType.ACCOUNT_DELETED:
                reason_str = "Your KakaoTalk account was deleted!"
                logout = True
            else:
                reason_str = f"Unknown reason ({res.reason})."
        else:
            reason_str = None
        if not logout:
            # TODO What bridge state to push?
            self.was_connected = False
            await self.save()
        else:
            await self.logout()
        if reason_str:
            if not logout:
                reason_suffix = "To reconnect, use the `sync` command."
            else:
                reason_suffix = "You are now logged out. To log back in, use the `login` command."
            await self.send_bridge_notice(f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}")

    async def on_error(self, error: JSON) -> None:
        await self.send_bridge_notice(
            f"Got error event from KakaoTalk:\n\n> {error}",
            # TODO Which error code to use?
            #error_code="kt-connection-error",
            error_message=str(error),
        )

    async def on_client_disconnect(self) -> None:
        self.is_connected = False
        self._track_metric(METRIC_CONNECTED, False)
        self._client = None
        if self._is_logged_in and self.was_connected:
            if self.temp_disconnect_notices:
                await self.send_bridge_notice(
                    "Disconnected from KakaoTalk: backend helper module exited. "
                    "Will reconnect once the module resumes."
                )
            await self.push_bridge_state(BridgeStateEvent.TRANSIENT_DISCONNECT)
            self._is_rpc_reconnecting = True
            asyncio.create_task(self.reload_session())

    async def on_logged_in(self, oauth_credential: OAuthCredential) -> None:
        self.log.debug(f"Successfully logged in as {oauth_credential.userId}")
        self.oauth_credential = oauth_credential
        await self.push_bridge_state(BridgeStateEvent.CONNECTING)
        self._client = Client(self, log=self.log.getChild("ktclient"))
        await self.save()
        self._is_logged_in = True
        # TODO Retry network connection failures here, or in the client (like token refreshes are)?
        #      Should also catch unlikely authentication errors
        self._logged_in_info = await self._client.start()
        self._logged_in_info_time = time.monotonic()
        asyncio.create_task(self.post_login(is_startup=True, is_token_login=False))

    @async_time(METRIC_CHAT)
    async def on_chat(self, chat: Chatlog, channel_id: Long, channel_type: ChannelType) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
        )
        puppet = await pu.Puppet.get_by_ktid(chat.sender.userId)
        await portal.backfill_lock.wait(chat.logId)
        if not puppet.name:
            portal.schedule_resync(self, puppet)
        await portal.handle_kakaotalk_chat(self, puppet, chat)

    @async_time(METRIC_CHAT_DELETED)
    async def on_chat_deleted(
        self,
        chat_id: Long,
        sender_id: Long,
        timestamp: int,
        channel_id: Long,
        channel_type: ChannelType,
    ) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
            create=False,
        )
        if portal and portal.mxid:
            await portal.backfill_lock.wait(f"redaction of {chat_id}")
            puppet = await pu.Puppet.get_by_ktid(sender_id)
            await portal.handle_kakaotalk_chat_delete(puppet, chat_id, timestamp)

    @async_time(METRIC_CHAT_READ)
    async def on_chat_read(
        self,
        chat_id: Long,
        sender_id: Long,
        channel_id: Long,
        channel_type: ChannelType,
    ) -> None:
        assert self.ktid
        puppet = await pu.Puppet.get_by_ktid(sender_id)
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
            create=False,
        )
        if portal and portal.mxid:
            await portal.backfill_lock.wait(f"read receipt from {sender_id}")
            await portal.handle_kakaotalk_chat_read(self, puppet, chat_id)

    @async_time(METRIC_CHANNEL_META_CHANGE)
    async def on_channel_meta_change(
        self,
        info: PortalChannelInfo,
        channel_id: Long,
        channel_type: ChannelType,
    ) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
        )
        await portal.backfill_lock.wait("meta change")
        await portal.update_info(self, info)

    @async_time(METRIC_PROFILE_CHANGE)
    async def on_profile_changed(self, info: OpenLinkChannelUserInfo) -> None:
        puppet = await pu.Puppet.get_by_ktid(info.userId)
        if puppet:
            await puppet.update_info_from_participant(self, info)

    @async_time(METRIC_PERM_CHANGE)
    async def on_perm_changed(
        self,
        user_id: Long,
        perm: OpenChannelUserPerm,
        sender_id: Long,
        channel_id: Long,
        channel_type: ChannelType,
    ) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
            create=False,
        )
        if portal and portal.mxid:
            sender = await pu.Puppet.get_by_ktid(sender_id)
            await portal.backfill_lock.wait("perm changed")
            await portal.handle_kakaotalk_perm_changed(self, sender, user_id, perm)

    @async_time(METRIC_CHANNEL_ADDED)
    def on_channel_added(self, channel_info: ChannelInfo) -> Awaitable[None]:
        return self._sync_channel(channel_info)

    @async_time(METRIC_CHANNEL_JOIN)
    async def on_channel_join(self, channel_info: ChannelInfo) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_info.channelId,
            kt_receiver=self.ktid,
            kt_type=channel_info.type,
        )
        if portal.mxid:
            user = await pu.Puppet.get_by_ktid(self.ktid)
            await portal.backfill_lock.wait("channel join")
            await portal.handle_kakaotalk_user_join(self, user)
        else:
            await self._sync_channel(channel_info)

    @async_time(METRIC_CHANNEL_LEFT)
    async def on_channel_left(self, channel_id: Long, channel_type: ChannelType | None) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
        )
        if portal.mxid:
            user = await pu.Puppet.get_by_ktid(self.ktid)
            await portal.backfill_lock.wait("channel left")
            await portal.handle_kakaotalk_user_left(self, user, user)

    @async_time(METRIC_CHANNEL_KICKED)
    async def on_channel_kicked(
        self,
        user_id: Long,
        sender_id: Long,
        channel_id: Long,
        channel_type: ChannelType
    ) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
        )
        if portal.mxid:
            sender = await pu.Puppet.get_by_ktid(sender_id)
            user = await pu.Puppet.get_by_ktid(user_id)
            await portal.backfill_lock.wait("channel kicked")
            await portal.handle_kakaotalk_user_left(self, sender, user)

    @async_time(METRIC_USER_JOIN)
    async def on_user_join(
        self,
        user_id: Long,
        channel_id: Long,
        channel_type: ChannelType
    ) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
        )
        if portal.mxid:
            user = await pu.Puppet.get_by_ktid(user_id)
            await portal.backfill_lock.wait("user join")
            await portal.handle_kakaotalk_user_join(self, user)

    @async_time(METRIC_USER_LEFT)
    async def on_user_left(
        self,
        user_id: Long,
        channel_id: Long,
        channel_type: ChannelType
    ) -> None:
        assert self.ktid
        portal = await po.Portal.get_by_ktid(
            channel_id,
            kt_receiver=self.ktid,
            kt_type=channel_type,
        )
        if portal.mxid:
            user = await pu.Puppet.get_by_ktid(user_id)
            await portal.backfill_lock.wait("user left")
            await portal.handle_kakaotalk_user_left(self, user, user)

    # endregion