# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge. # Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast import asyncio import time from mautrix.bridge import BaseUser, async_getter_lock from mautrix.types import ( EventID, JSON, MessageType, RoomID, SerializerError, TextMessageEventContent, UserID, ) from mautrix.util.bridge_state import BridgeState, BridgeStateEvent from mautrix.util.opt_prometheus import Gauge, Summary, async_time from mautrix.util.simple_lock import SimpleLock from . import portal as po, puppet as pu from .config import Config from .db import User as DBUser, LoginCredential from .kt.client import Client from .kt.client.errors import ( AuthenticationRequired, AnotherLogonDetected, IncorrectPassword, OAuthException, ResponseError, ) from .kt.client.types import PortalChannelInfo, SettingsStruct from .kt.types.bson import Long from .kt.types.channel.channel_info import ChannelInfo, NormalChannelInfo, NormalChannelData from .kt.types.channel.channel_type import ChannelType, KnownChannelType from .kt.types.chat.chat import Chatlog from .kt.types.client.client_session import LoginDataItem, LoginResult from .kt.types.oauth import OAuthCredential from .kt.types.openlink.open_channel_info import OpenChannelData, OpenChannelInfo from .kt.types.openlink.open_link_type import OpenChannelUserPerm from .kt.types.openlink.open_link_user_info import OpenLinkChannelUserInfo from .kt.types.packet.chat.kickout import KnownKickoutType, KickoutRes METRIC_CONNECT_AND_SYNC = Summary("bridge_connect_and_sync", "calls to connect_and_sync") METRIC_CHAT = Summary("bridge_on_chat", "calls to on_chat") METRIC_CHAT_DELETED = Summary("bridge_on_chat_deleted", "calls to on_chat_deleted") METRIC_CHAT_READ = Summary("bridge_on_chat_read", "calls to on_chat_read") METRIC_CHANNEL_META_CHANGE = Summary("bridge_on_channel_meta_change", "calls to on_channel_meta_change") METRIC_PROFILE_CHANGE = Summary("bridge_on_profile_changed", "calls to on_profile_changed") METRIC_PERM_CHANGE = Summary("bridge_on_perm_changed", "calls to on_perm_changed") METRIC_CHANNEL_ADDED = Summary("bridge_on_channel_added", "calls to on_channel_added") METRIC_CHANNEL_JOIN = Summary("bridge_on_channel_join", "calls to on_channel_join") METRIC_CHANNEL_LEFT = Summary("bridge_on_channel_left", "calls to on_channel_left") METRIC_CHANNEL_KICKED = Summary("bridge_on_channel_kicked", "calls to on_channel_kicked") METRIC_USER_JOIN = Summary("bridge_on_user_join", "calls to on_user_join") METRIC_USER_LEFT = Summary("bridge_on_user_left", "calls to on_user_left") METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge") METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk") if TYPE_CHECKING: from .__main__ import KakaoTalkBridge BridgeState.human_readable_errors.update( { "kt-reconnection-error": "Failed to reconnect to KakaoTalk", # TODO Use for errors in Node backend that cause session to be lost #"kt-connection-error": "KakaoTalk disconnected unexpectedly", "kt-auth-error": "Authentication error from KakaoTalk: {message}", "logged-out": "You're not logged into KakaoTalk", } ) class User(DBUser, BaseUser): temp_disconnect_notices: bool = True shutdown: bool = False config: Config by_mxid: dict[UserID, User] = {} by_ktid: dict[int, User] = {} _client: Client | None _notice_room_lock: asyncio.Lock _notice_send_lock: asyncio.Lock is_admin: bool permission_level: str _is_logged_in: bool | None _is_connected: bool | None _connection_time: float _db_instance: DBUser | None _sync_lock: SimpleLock _is_rpc_reconnecting: bool _logged_in_info: SettingsStruct | None _logged_in_info_time: float def __init__( self, mxid: UserID, force_login: bool, was_connected: bool, ktid: Long | None = None, uuid: str | None = None, access_token: str | None = None, refresh_token: str | None = None, notice_room: RoomID | None = None, ) -> None: super().__init__( mxid=mxid, force_login=force_login, was_connected=was_connected, ktid=ktid, uuid=uuid, access_token=access_token, refresh_token=refresh_token, notice_room=notice_room ) BaseUser.__init__(self) self.notice_room = notice_room self._notice_room_lock = asyncio.Lock() self._notice_send_lock = asyncio.Lock() ( self.relay_whitelisted, self.is_whitelisted, self.is_admin, self.permission_level, ) = self.config.get_permissions(mxid) self._is_logged_in = None self._is_connected = None self._connection_time = time.monotonic() self._sync_lock = SimpleLock( "Waiting for channel sync to finish before handling %s", log=self.log ) self._is_rpc_reconnecting = False self._logged_in_info = None self._logged_in_info_time = 0 self._client = None @classmethod def init_cls(cls, bridge: KakaoTalkBridge) -> AsyncIterable[Awaitable[bool]]: cls.bridge = bridge cls.config = bridge.config cls.az = bridge.az cls.temp_disconnect_notices = bridge.config["bridge.temporary_disconnect_notices"] return (user.reload_session(is_startup=True) async for user in cls.all_logged_in()) @property def client(self) -> Client: if not self._client: raise ValueError("User must be logged in before its client can be used") return self._client @property def is_connected(self) -> bool | None: return self._is_connected @is_connected.setter def is_connected(self, val: bool | None) -> None: if self._is_connected != val: self._is_connected = val self._connection_time = time.monotonic() async def is_connected_now(self) -> bool: return self._client is not None and await self._client.is_connected() @property def connection_time(self) -> float: return self._connection_time @property def has_state(self) -> bool: # TODO If more state is needed, consider returning a saved LoginResult return bool(self.access_token and self.refresh_token) # region Database getters def _add_to_cache(self) -> None: self.by_mxid[self.mxid] = self if self.ktid: self.by_ktid[self.ktid] = self @classmethod async def all_logged_in(cls) -> AsyncGenerator["User", None]: users = await super().all_logged_in() user: cls for user in users: try: yield cls.by_mxid[user.mxid] except KeyError: user._add_to_cache() yield user @classmethod @async_getter_lock async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None: if pu.Puppet.get_id_from_mxid(mxid) or mxid == cls.az.bot_mxid: return None try: return cls.by_mxid[mxid] except KeyError: pass user = cast(cls, await super().get_by_mxid(mxid)) if user is not None: user._add_to_cache() return user if create: cls.log.debug(f"Creating user instance for {mxid}") user = cls(mxid, False, False) await user.insert() user._add_to_cache() return user return None @classmethod @async_getter_lock async def get_by_ktid(cls, ktid: int) -> User | None: try: return cls.by_ktid[ktid] except KeyError: pass user = cast(cls, await super().get_by_ktid(ktid)) if user is not None: user._add_to_cache() return user return None async def get_uuid(self, force: bool = False) -> str: if self.uuid is None or force: self.uuid = await self._generate_uuid() # TODO Maybe don't save yet await self.save() return self.uuid @classmethod async def _generate_uuid(cls) -> str: return await Client.generate_uuid(await super().get_all_uuids()) # endregion @property def oauth_credential(self) -> OAuthCredential: assert self.ktid is not None assert self.uuid is not None assert self.access_token is not None assert self.refresh_token is not None return OAuthCredential( self.ktid, self.uuid, self.access_token, self.refresh_token, ) @oauth_credential.setter def oauth_credential(self, oauth_credential: OAuthCredential) -> None: self.ktid = oauth_credential.userId self.access_token = oauth_credential.accessToken self.refresh_token = oauth_credential.refreshToken if self.uuid != oauth_credential.deviceUUID: self.log.warning(f"UUID mismatch: expected {self.uuid}, got {oauth_credential.deviceUUID}") self.uuid = oauth_credential.deviceUUID async def get_own_info(self, *, force: bool = False) -> SettingsStruct | None: if force or self._client and (not self._logged_in_info or self._logged_in_info_time + 60 * 60 < time.monotonic()): self._logged_in_info = await self._client.get_settings() self._logged_in_info_time = time.monotonic() return self._logged_in_info async def _load_session(self, is_startup: bool) -> bool: if self._is_logged_in and is_startup: return True if not self.was_connected and not self.config["bridge.remain_logged_in_on_disconnect"]: self.log.warning("Not logging in because last session was disconnected, and login+disconnection is forbidden by config") await self.push_bridge_state( BridgeStateEvent.LOGGED_OUT, error="logged-out", ) return False latest_exc: Exception | None = None password_ok = False try: creds = await LoginCredential.get_by_mxid(self.mxid) except Exception as e: self.log.exception("Exception while looking for saved password") creds = None if creds: uuid = await self.get_uuid() form = creds.get_form() oauth_credential = None try: oauth_credential = await Client.login(uuid=uuid, form=form, forced=False) except IncorrectPassword as e: latest_exc = e except AnotherLogonDetected as e: if self.force_login: try: # Wait a moment to make it look like a user-initiated response await asyncio.sleep(2) oauth_credential = await Client.login(uuid=uuid, form=form, forced=True) except OAuthException as e: latest_exc = e else: return False if oauth_credential: self.oauth_credential = oauth_credential await self.save() password_ok = True else: try: await creds.delete() except: self.log.exception("Exception while deleting incorrect password") if password_ok or self.config["bridge.allow_token_relogin"] and self.has_state: if not password_ok: self.log.warning("Using token-based relogin after password-based relogin failed") elif latest_exc: raise latest_exc else: # If we have a user in the DB with no state, we can assume # KT logged us out and the bridge has restarted await self.push_bridge_state( BridgeStateEvent.BAD_CREDENTIALS, error="logged-out", ) return False client = Client(self, log=self.log.getChild("ktclient")) user_info = await client.start() # NOTE On failure, client.start throws instead of returning something falsy self.log.info("Loaded session successfully") self._client = client self._logged_in_info = user_info self._logged_in_info_time = time.monotonic() self._track_metric(METRIC_LOGGED_IN, True) self._is_logged_in = True self.is_connected = None asyncio.create_task(self.post_login(is_startup=is_startup, is_token_login=not password_ok)) return True async def _send_reset_notice(self, e: OAuthException, edit: EventID | None = None) -> None: await self.send_bridge_notice( "Got authentication error from KakaoTalk:\n\n" f"> {e.message}\n\n" "If you changed your KakaoTalk password, this " "is normal and you just need to log in again.", edit=edit, important=True, state_event=BridgeStateEvent.BAD_CREDENTIALS, error_code="kt-auth-error", error_message=str(e), ) await self.logout(remove_ktid=False) async def is_logged_in(self, _override: bool = False) -> bool: if not self.has_state or not self._client: return False if self._is_logged_in is None or _override: try: self._is_logged_in = bool(await self.get_own_info()) except SerializerError: self.log.exception( "Unable to deserialize settings struct, " "but didn't get auth error, so treating user as logged in" ) self._is_logged_in = True except Exception: self.log.exception("Exception checking login status") self._is_logged_in = False return self._is_logged_in async def reload_session( self, event_id: EventID | None = None, retries: int = 3, is_startup: bool = False ) -> None: try: if not await self._load_session(is_startup=is_startup) and self.has_state: self.log.debug("reload_session failure: wait for connection to Node module before prompting for manual login") await Client.wait_for_connection() self.log.debug("reload_session failure: now connected to Node module") await self.send_bridge_notice( "Logged out of KakaoTalk. Must use the `login` command to log back in.", important=True, ) await self.logout(remove_ktid=False) except OAuthException as e: await self._send_reset_notice(e, edit=event_id) # TODO Throw a ResponseError on network failures except ResponseError as e: will_retry = retries > 0 retry = "Retrying in 1 minute" if will_retry else "Not retrying" notice = f"Failed to connect to KakaoTalk: unknown response error {e}. {retry}" if will_retry: await self.send_bridge_notice( notice, edit=event_id, state_event=BridgeStateEvent.TRANSIENT_DISCONNECT, ) await asyncio.sleep(60) await self.reload_session(event_id, retries - 1) else: await self.send_bridge_notice( notice, edit=event_id, important=True, state_event=BridgeStateEvent.UNKNOWN_ERROR, error_code="kt-reconnection-error", ) except Exception as e: self.log.exception("Error connecting to KakaoTalk") await self.send_bridge_notice( f"Failed to connect to KakaoTalk: {e.message}", edit=event_id, state_event=BridgeStateEvent.UNKNOWN_ERROR, error_code="kt-reconnection-error", ) finally: self._is_rpc_reconnecting = False async def logout(self, *, remove_ktid: bool = True, reset_device: bool = False) -> None: if self._client: await self._client.stop() if remove_ktid: await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT) self._track_metric(METRIC_LOGGED_IN, False) if reset_device: self.uuid = await self._generate_uuid() self.access_token = None self.refresh_token = None self._is_logged_in = False self.is_connected = None self.was_connected = False self._client = None if self.ktid and remove_ktid: #await UserPortal.delete_all(self.ktid) del self.by_ktid[self.ktid] self.ktid = None await self.save() async def post_login(self, is_startup: bool, is_token_login: bool) -> None: self.log.info("Running post-login actions") assert self.ktid self._add_to_cache() try: puppet = await pu.Puppet.get_by_ktid(self.ktid) if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid): self.log.info(f"Automatically enabling custom puppet") await puppet.switch_mxid(access_token="auto", mxid=self.mxid) except Exception: self.log.exception("Failed to automatically enable custom puppet") if not is_token_login or self.was_connected and self.config["bridge.reconnect_on_token_relogin"]: # TODO Check if things break when a live message comes in during syncing if self.config["bridge.sync_on_startup"] or not is_startup: sync_count = self.config["bridge.initial_chat_sync"] else: sync_count = 0 await self.connect_and_sync(sync_count, force_sync=False) else: await self.send_bridge_notice( f"Logged into KakaoTalk. To connect to chatroom updates, use the `sync` command." ) self.was_connected = False await self.save() async def get_direct_chats(self) -> dict[UserID, list[RoomID]]: return { pu.Puppet.get_mxid_from_id(portal.ktid): [portal.mxid] async for portal in po.Portal.get_all_by_receiver(self.ktid) if portal.mxid } if self.ktid else {} @async_time(METRIC_CONNECT_AND_SYNC) async def connect_and_sync(self, sync_count: int | None, force_sync: bool) -> bool: # TODO Look for a way to sync all channels without (re-)logging in try: login_result = await self.client.connect() should_sync = await self.on_connect(force_sync) if login_result and should_sync: await self._sync_channels(login_result, sync_count) return True except AuthenticationRequired as e: await self.send_bridge_notice( f"Got authentication error from KakaoTalk:\n\n> {e.message}\n\n", important=True, state_event=BridgeStateEvent.BAD_CREDENTIALS, error_code="kt-auth-error", error_message=str(e), ) await self.logout(remove_ktid=False) except Exception as e: self.log.exception("Failed to connect and sync") await self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR, message=str(e)) return False async def _sync_channels(self, login_result: LoginResult, sync_count: int | None) -> None: if sync_count is None: sync_count = self.config["bridge.initial_chat_sync"] if not sync_count: self.log.debug("Skipping channel syncing") return # TODO Sync left channels. It's not login_result.removedChannelIdList... if not login_result.channelList: self.log.debug("No channels to sync") return num_channels = len(login_result.channelList) sync_count = num_channels if sync_count < 0 else min(sync_count, num_channels) await self.push_bridge_state(BridgeStateEvent.BACKFILLING) self.log.debug(f"Syncing {sync_count} of {num_channels} channels...") def get_channel_update_time(login_data: LoginDataItem): channel_info = login_data.channel.info return channel_info.lastChatLog.sendAt if channel_info.lastChatLog else 0 for login_data in sorted( login_result.channelList, reverse=True, key=get_channel_update_time )[:sync_count]: try: await self._sync_channel_on_login(login_data) except AuthenticationRequired: raise except Exception: self.log.exception(f"Failed to sync channel {login_data.channel.channelId}") await self.update_direct_chats() def _sync_channel_on_login(self, login_data: LoginDataItem) -> Awaitable[None]: channel_data = login_data.channel self.log.debug(f"Syncing channel {channel_data.channelId} (last updated at {login_data.lastUpdate})") channel_info = channel_data.info if isinstance(channel_data, NormalChannelData): channel_data: NormalChannelData channel_info: NormalChannelInfo self.log.debug(f"Join time: {channel_info.joinTime}") elif isinstance(channel_data, OpenChannelData): channel_data: OpenChannelData self.log.debug(f"channel_data link ID: {channel_data.linkId}") channel_info: OpenChannelInfo self.log.debug(f"channel_info link ID: {channel_info.linkId}") self.log.debug(f"openToken: {channel_info.openToken}") self.log.debug(f"Is direct channel: {channel_info.directChannel}") self.log.debug(f"Has OpenLink: {channel_info.openLink is not None}") else: self.log.error(f"Unexpected channel type: {type(channel_data)}") channel_info: ChannelInfo self.log.debug(f"channel_info channel ID: {channel_info.channelId}") self.log.debug(f"Channel data/info IDs match: {channel_data.channelId == channel_info.channelId}") self.log.debug(f"Channel type: {channel_info.type}") self.log.debug(f"Active user count: {channel_info.activeUserCount}") self.log.debug(f"New chat count: {channel_info.newChatCount}") self.log.debug(f"New chat count invalid: {channel_info.newChatCountInvalid}") self.log.debug(f"Last chat log ID: {channel_info.lastChatLogId}") self.log.debug(f"Last seen log ID: {channel_info.lastSeenLogId}") self.log.debug(f"Has last chat log: {channel_info.lastChatLog is not None}") self.log.debug(f"metaMap: {channel_info.metaMap}") self.log.debug(f"User count: {len(channel_info.displayUserList)}") self.log.debug(f"Has push alert: {channel_info.pushAlert}") for display_user_info in channel_info.displayUserList: self.log.debug(f"Member: {display_user_info.nickname} - {display_user_info.profileURL} - {display_user_info.userId}") return self._sync_channel(channel_info) async def _sync_channel(self, channel_info: ChannelInfo) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_info.channelId, kt_receiver=self.ktid, kt_type=channel_info.type ) portal_info = await self.client.get_portal_channel_info(portal.channel_props) portal_info.channel_info = channel_info if not portal.mxid: await portal.create_matrix_room(self, portal_info) else: await portal.update_matrix_room(self, portal_info) await portal.backfill(self, is_initial=False, channel_info=channel_info) async def get_notice_room(self) -> RoomID: if not self.notice_room: async with self._notice_room_lock: # If someone already created the room while this call was waiting, # don't make a new room if self.notice_room: return self.notice_room creation_content = {} if not self.config["bridge.federate_rooms"]: creation_content["m.federate"] = False self.notice_room = await self.az.intent.create_room( is_direct=True, invitees=[self.mxid], topic="KakaoTalk bridge notices", creation_content=creation_content, ) await self.save() return self.notice_room async def send_bridge_notice( self, text: str, edit: EventID | None = None, state_event: BridgeStateEvent | None = None, important: bool = False, error_code: str | None = None, error_message: str | None = None, ) -> EventID | None: if state_event: await self.push_bridge_state( state_event, error=error_code, message=error_message if error_code else text, ) if self.config["bridge.disable_bridge_notices"]: return None event_id = None try: self.log.debug("Sending bridge notice: %s", text) content = TextMessageEventContent( body=text, msgtype=(MessageType.TEXT if important else MessageType.NOTICE), ) if edit: content.set_edit(edit) # This is locked to prevent notices going out in the wrong order async with self._notice_send_lock: event_id = await self.az.intent.send_message(await self.get_notice_room(), content) except Exception: self.log.warning("Failed to send bridge notice", exc_info=True) return edit or event_id async def fill_bridge_state(self, state: BridgeState) -> None: await super().fill_bridge_state(state) if self.ktid: state.remote_id = str(self.ktid) puppet = await pu.Puppet.get_by_ktid(self.ktid) state.remote_name = puppet.name async def get_bridge_states(self) -> list[BridgeState]: if not self.has_state: return [] state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR) if self.is_connected: state.state_event = BridgeStateEvent.CONNECTED elif self._is_rpc_reconnecting or self._client: state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT return [state] async def get_puppet(self) -> pu.Puppet | None: if not self.ktid: return None return await pu.Puppet.get_by_ktid(self.ktid) async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None: # TODO Make upstream request to return custom failure message if not self.ktid or not self.is_connected: return None if puppet.ktid != self.ktid: kt_type = KnownChannelType.DirectChat ktid = await self.client.get_friend_dm_id(puppet.ktid) else: kt_type = KnownChannelType.MemoChat memo_ids = await self.client.get_memo_ids() if not memo_ids: ktid = None else: ktid = memo_ids[0] if len(memo_ids) > 1: self.log.warning("Found multiple memo chats, so using the first one as a fallback") if ktid: self.log.debug(f"Found existing direct chat with KakaoTalk user {puppet.ktid}") else: self.log.debug(f"Didn't find an existing direct chat with KakaoTalk user {puppet.ktid}, so will create one") try: ktid = await self.client.create_direct_chat(puppet.ktid) except: self.log.exception(f"Failed to create direct chat with {puppet.ktid}") return await po.Portal.get_by_ktid( ktid, kt_receiver=self.ktid, create=create, kt_type=kt_type ) if ktid else None # region Matrix->KakaoTalk commands async def leave_channel(self, portal: po.Portal) -> None: await self.client.leave_channel(portal.channel_props) await self.on_channel_left(portal.ktid, portal.kt_type) # endregion # region KakaoTalk event handling async def on_connect(self, force_sync: bool) -> bool: now = time.monotonic() disconnected_at = self._connection_time max_delay = self.config["bridge.resync_max_disconnected_time"] first_connect = self.is_connected is None self.is_connected = True self._track_metric(METRIC_CONNECTED, True) duration = int(now - disconnected_at) skip_sync = not first_connect and not force_sync and duration < max_delay if skip_sync: self.log.debug(f"Disconnection lasted {duration} seconds, not re-syncing channels...") elif self.temp_disconnect_notices: await self.send_bridge_notice("Connected to KakaoTalk chats") await self.push_bridge_state(BridgeStateEvent.CONNECTED) self.was_connected = True await self.save() return not skip_sync async def on_disconnect(self, res: KickoutRes | None) -> None: self.is_connected = False self._track_metric(METRIC_CONNECTED, False) logout = not self.config["bridge.remain_logged_in_on_disconnect"] if res: if res.reason == KnownKickoutType.LOGIN_ANOTHER: reason_str = "Logged in on a PC or another bridge." elif res.reason == KnownKickoutType.CHANGE_SERVER: # TODO Reconnect automatically instead reason_str = "KakaoTalk backend server changed." elif res.reason == KnownKickoutType.ACCOUNT_DELETED: reason_str = "Your KakaoTalk account was deleted!" logout = True else: reason_str = f"Unknown reason ({res.reason})." else: reason_str = None if not logout: # TODO What bridge state to push? self.was_connected = False await self.save() else: await self.logout() if reason_str: if not logout: reason_suffix = "To reconnect, use the `sync` command." else: reason_suffix = "You are now logged out. To log back in, use the `login` command." await self.send_bridge_notice( f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}", important=True, ) async def on_error(self, error: JSON) -> None: await self.send_bridge_notice( f"Got error event from KakaoTalk:\n\n> {error}", # TODO Which error code to use? #error_code="kt-connection-error", error_message=str(error), ) async def on_unexpected_disconnect(self) -> None: self.is_connected = False self._track_metric(METRIC_CONNECTED, False) if self.config["bridge.remain_logged_in_on_disconnect"]: # TODO What bridge state to push? self.was_connected = False await self.save() reason_suffix = "To reconnect, use the `sync` command." else: await self.logout() reason_suffix = "You are now logged out. To log back in, use the `login` command." await self.send_bridge_notice( f"Disconnected from KakaoTalk: unexpected error in backend helper module. {reason_suffix}", important=True, ) async def on_client_disconnect(self) -> None: self.is_connected = False self._track_metric(METRIC_CONNECTED, False) self._client = None if self._is_logged_in and self.was_connected: if self.temp_disconnect_notices: await self.send_bridge_notice( "Disconnected from KakaoTalk: backend helper module exited. " "Will reconnect once the module resumes." ) await self.push_bridge_state(BridgeStateEvent.TRANSIENT_DISCONNECT) self._is_rpc_reconnecting = True asyncio.create_task(self.reload_session()) async def on_logged_in(self, oauth_credential: OAuthCredential) -> None: self.log.debug(f"Successfully logged in as {oauth_credential.userId}") self.oauth_credential = oauth_credential await self.push_bridge_state(BridgeStateEvent.CONNECTING) self._client = Client(self, log=self.log.getChild("ktclient")) await self.save() self._is_logged_in = True # TODO Retry network connection failures here, or in the client (like token refreshes are)? # Should also catch unlikely authentication errors self._logged_in_info = await self._client.start() self._logged_in_info_time = time.monotonic() asyncio.create_task(self.post_login(is_startup=True, is_token_login=False)) @async_time(METRIC_CHAT) async def on_chat(self, chat: Chatlog, channel_id: Long, channel_type: ChannelType) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, ) puppet = await pu.Puppet.get_by_ktid(chat.sender.userId) await portal.backfill_lock.wait(chat.logId) if not puppet.name: portal.schedule_resync(self, puppet) await portal.handle_kakaotalk_chat(self, puppet, chat) @async_time(METRIC_CHAT_DELETED) async def on_chat_deleted( self, chat_id: Long, sender_id: Long, timestamp: int, channel_id: Long, channel_type: ChannelType, ) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, create=False, ) if portal and portal.mxid: await portal.backfill_lock.wait(f"redaction of {chat_id}") puppet = await pu.Puppet.get_by_ktid(sender_id) await portal.handle_kakaotalk_chat_delete(puppet, chat_id, timestamp) @async_time(METRIC_CHAT_READ) async def on_chat_read( self, chat_id: Long, sender_id: Long, channel_id: Long, channel_type: ChannelType, ) -> None: assert self.ktid puppet = await pu.Puppet.get_by_ktid(sender_id) portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, create=False, ) if portal and portal.mxid: await portal.backfill_lock.wait(f"read receipt from {sender_id}") await portal.handle_kakaotalk_chat_read(self, puppet, chat_id) @async_time(METRIC_CHANNEL_META_CHANGE) async def on_channel_meta_change( self, info: PortalChannelInfo, channel_id: Long, channel_type: ChannelType, ) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, ) await portal.backfill_lock.wait("meta change") await portal.update_info(self, info) @async_time(METRIC_PROFILE_CHANGE) async def on_profile_changed(self, info: OpenLinkChannelUserInfo) -> None: puppet = await pu.Puppet.get_by_ktid(info.userId) if puppet: await puppet.update_info_from_participant(self, info) @async_time(METRIC_PERM_CHANGE) async def on_perm_changed( self, user_id: Long, perm: OpenChannelUserPerm, sender_id: Long, channel_id: Long, channel_type: ChannelType, ) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, create=False, ) if portal and portal.mxid: sender = await pu.Puppet.get_by_ktid(sender_id) await portal.backfill_lock.wait("perm changed") await portal.handle_kakaotalk_perm_changed(self, sender, user_id, perm) @async_time(METRIC_CHANNEL_ADDED) def on_channel_added(self, channel_info: ChannelInfo) -> Awaitable[None]: return self._sync_channel(channel_info) @async_time(METRIC_CHANNEL_JOIN) async def on_channel_join(self, channel_info: ChannelInfo) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_info.channelId, kt_receiver=self.ktid, kt_type=channel_info.type, ) if portal.mxid: user = await pu.Puppet.get_by_ktid(self.ktid) await portal.backfill_lock.wait("channel join") await portal.handle_kakaotalk_user_join(self, user) else: await self._sync_channel(channel_info) @async_time(METRIC_CHANNEL_LEFT) async def on_channel_left(self, channel_id: Long, channel_type: ChannelType | None) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, ) if portal.mxid: user = await pu.Puppet.get_by_ktid(self.ktid) await portal.backfill_lock.wait("channel left") await portal.handle_kakaotalk_user_left(self, user, user) @async_time(METRIC_CHANNEL_KICKED) async def on_channel_kicked( self, user_id: Long, sender_id: Long, channel_id: Long, channel_type: ChannelType ) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, ) if portal.mxid: sender = await pu.Puppet.get_by_ktid(sender_id) user = await pu.Puppet.get_by_ktid(user_id) await portal.backfill_lock.wait("channel kicked") await portal.handle_kakaotalk_user_left(self, sender, user) @async_time(METRIC_USER_JOIN) async def on_user_join( self, user_id: Long, channel_id: Long, channel_type: ChannelType ) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, ) if portal.mxid: user = await pu.Puppet.get_by_ktid(user_id) await portal.backfill_lock.wait("user join") await portal.handle_kakaotalk_user_join(self, user) @async_time(METRIC_USER_LEFT) async def on_user_left( self, user_id: Long, channel_id: Long, channel_type: ChannelType ) -> None: assert self.ktid portal = await po.Portal.get_by_ktid( channel_id, kt_receiver=self.ktid, kt_type=channel_type, ) if portal.mxid: user = await pu.Puppet.get_by_ktid(user_id) await portal.backfill_lock.wait("user left") await portal.handle_kakaotalk_user_left(self, user, user) # endregion