matrix-appservice-kakaotalk/matrix_appservice_kakaotalk/user.py

1010 lines
40 KiB
Python

# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge.
# Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
import asyncio
import time
from mautrix.bridge import BaseUser, async_getter_lock
from mautrix.types import (
EventID,
JSON,
MessageType,
RoomID,
SerializerError,
TextMessageEventContent,
UserID,
)
from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
from mautrix.util.opt_prometheus import Gauge, Summary, async_time
from mautrix.util.simple_lock import SimpleLock
from . import portal as po, puppet as pu
from .config import Config
from .db import User as DBUser, LoginCredential
from .kt.client import Client
from .kt.client.errors import (
AuthenticationRequired,
AnotherLogonDetected,
IncorrectPassword,
OAuthException,
ResponseError,
)
from .kt.client.types import PortalChannelInfo, SettingsStruct
from .kt.types.bson import Long
from .kt.types.channel.channel_info import ChannelInfo, NormalChannelInfo, NormalChannelData
from .kt.types.channel.channel_type import ChannelType, KnownChannelType
from .kt.types.chat.chat import Chatlog
from .kt.types.client.client_session import LoginDataItem, LoginResult
from .kt.types.oauth import OAuthCredential
from .kt.types.openlink.open_channel_info import OpenChannelData, OpenChannelInfo
from .kt.types.openlink.open_link_type import OpenChannelUserPerm
from .kt.types.openlink.open_link_user_info import OpenLinkChannelUserInfo
from .kt.types.packet.chat.kickout import KnownKickoutType, KickoutRes
METRIC_CONNECT_AND_SYNC = Summary("bridge_connect_and_sync", "calls to connect_and_sync")
METRIC_CHAT = Summary("bridge_on_chat", "calls to on_chat")
METRIC_CHAT_DELETED = Summary("bridge_on_chat_deleted", "calls to on_chat_deleted")
METRIC_CHAT_READ = Summary("bridge_on_chat_read", "calls to on_chat_read")
METRIC_CHANNEL_META_CHANGE = Summary("bridge_on_channel_meta_change", "calls to on_channel_meta_change")
METRIC_PROFILE_CHANGE = Summary("bridge_on_profile_changed", "calls to on_profile_changed")
METRIC_PERM_CHANGE = Summary("bridge_on_perm_changed", "calls to on_perm_changed")
METRIC_CHANNEL_ADDED = Summary("bridge_on_channel_added", "calls to on_channel_added")
METRIC_CHANNEL_JOIN = Summary("bridge_on_channel_join", "calls to on_channel_join")
METRIC_CHANNEL_LEFT = Summary("bridge_on_channel_left", "calls to on_channel_left")
METRIC_CHANNEL_KICKED = Summary("bridge_on_channel_kicked", "calls to on_channel_kicked")
METRIC_USER_JOIN = Summary("bridge_on_user_join", "calls to on_user_join")
METRIC_USER_LEFT = Summary("bridge_on_user_left", "calls to on_user_left")
METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk")
if TYPE_CHECKING:
from .__main__ import KakaoTalkBridge
BridgeState.human_readable_errors.update(
{
"kt-reconnection-error": "Failed to reconnect to KakaoTalk",
# TODO Use for errors in Node backend that cause session to be lost
#"kt-connection-error": "KakaoTalk disconnected unexpectedly",
"kt-auth-error": "Authentication error from KakaoTalk: {message}",
"logged-out": "You're not logged into KakaoTalk",
}
)
class User(DBUser, BaseUser):
temp_disconnect_notices: bool = True
shutdown: bool = False
config: Config
by_mxid: dict[UserID, User] = {}
by_ktid: dict[int, User] = {}
_client: Client | None
_notice_room_lock: asyncio.Lock
_notice_send_lock: asyncio.Lock
is_admin: bool
permission_level: str
_is_logged_in: bool | None
_is_connected: bool | None
_connection_time: float
_db_instance: DBUser | None
_sync_lock: SimpleLock
_is_rpc_reconnecting: bool
_logged_in_info: SettingsStruct | None
_logged_in_info_time: float
def __init__(
self,
mxid: UserID,
force_login: bool,
was_connected: bool,
ktid: Long | None = None,
uuid: str | None = None,
access_token: str | None = None,
refresh_token: str | None = None,
notice_room: RoomID | None = None,
) -> None:
super().__init__(
mxid=mxid,
force_login=force_login,
was_connected=was_connected,
ktid=ktid,
uuid=uuid,
access_token=access_token,
refresh_token=refresh_token,
notice_room=notice_room
)
BaseUser.__init__(self)
self.notice_room = notice_room
self._notice_room_lock = asyncio.Lock()
self._notice_send_lock = asyncio.Lock()
(
self.relay_whitelisted,
self.is_whitelisted,
self.is_admin,
self.permission_level,
) = self.config.get_permissions(mxid)
self._is_logged_in = None
self._is_connected = None
self._connection_time = time.monotonic()
self._sync_lock = SimpleLock(
"Waiting for channel sync to finish before handling %s", log=self.log
)
self._is_rpc_reconnecting = False
self._logged_in_info = None
self._logged_in_info_time = 0
self._client = None
@classmethod
def init_cls(cls, bridge: KakaoTalkBridge) -> AsyncIterable[Awaitable[bool]]:
cls.bridge = bridge
cls.config = bridge.config
cls.az = bridge.az
cls.temp_disconnect_notices = bridge.config["bridge.temporary_disconnect_notices"]
return (user.reload_session(is_startup=True) async for user in cls.all_logged_in())
@property
def client(self) -> Client:
if not self._client:
raise ValueError("User must be logged in before its client can be used")
return self._client
@property
def is_connected(self) -> bool | None:
return self._is_connected
@is_connected.setter
def is_connected(self, val: bool | None) -> None:
if self._is_connected != val:
self._is_connected = val
self._connection_time = time.monotonic()
async def is_connected_now(self) -> bool:
return self._client is not None and await self._client.is_connected()
@property
def connection_time(self) -> float:
return self._connection_time
@property
def has_state(self) -> bool:
# TODO If more state is needed, consider returning a saved LoginResult
return bool(self.access_token and self.refresh_token)
# region Database getters
def _add_to_cache(self) -> None:
self.by_mxid[self.mxid] = self
if self.ktid:
self.by_ktid[self.ktid] = self
@classmethod
async def all_logged_in(cls) -> AsyncGenerator["User", None]:
users = await super().all_logged_in()
user: cls
for user in users:
try:
yield cls.by_mxid[user.mxid]
except KeyError:
user._add_to_cache()
yield user
@classmethod
@async_getter_lock
async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
if pu.Puppet.get_id_from_mxid(mxid) or mxid == cls.az.bot_mxid:
return None
try:
return cls.by_mxid[mxid]
except KeyError:
pass
user = cast(cls, await super().get_by_mxid(mxid))
if user is not None:
user._add_to_cache()
return user
if create:
cls.log.debug(f"Creating user instance for {mxid}")
user = cls(mxid)
await user.insert()
user._add_to_cache()
return user
return None
@classmethod
@async_getter_lock
async def get_by_ktid(cls, ktid: int) -> User | None:
try:
return cls.by_ktid[ktid]
except KeyError:
pass
user = cast(cls, await super().get_by_ktid(ktid))
if user is not None:
user._add_to_cache()
return user
return None
async def get_uuid(self, force: bool = False) -> str:
if self.uuid is None or force:
self.uuid = await self._generate_uuid()
# TODO Maybe don't save yet
await self.save()
return self.uuid
@classmethod
async def _generate_uuid(cls) -> str:
return await Client.generate_uuid(await super().get_all_uuids())
# endregion
@property
def oauth_credential(self) -> OAuthCredential:
assert self.ktid is not None
assert self.uuid is not None
assert self.access_token is not None
assert self.refresh_token is not None
return OAuthCredential(
self.ktid,
self.uuid,
self.access_token,
self.refresh_token,
)
@oauth_credential.setter
def oauth_credential(self, oauth_credential: OAuthCredential) -> None:
self.ktid = oauth_credential.userId
self.access_token = oauth_credential.accessToken
self.refresh_token = oauth_credential.refreshToken
if self.uuid != oauth_credential.deviceUUID:
self.log.warning(f"UUID mismatch: expected {self.uuid}, got {oauth_credential.deviceUUID}")
self.uuid = oauth_credential.deviceUUID
async def get_own_info(self, *, force: bool = False) -> SettingsStruct | None:
if force or self._client and (not self._logged_in_info or self._logged_in_info_time + 60 * 60 < time.monotonic()):
self._logged_in_info = await self._client.get_settings()
self._logged_in_info_time = time.monotonic()
return self._logged_in_info
async def _load_session(self, is_startup: bool) -> bool:
if self._is_logged_in and is_startup:
return True
if not self.was_connected and not self.config["bridge.remain_logged_in_on_disconnect"]:
self.log.warning("Not logging in because last session was disconnected, and login+disconnection is forbidden by config")
await self.push_bridge_state(
BridgeStateEvent.LOGGED_OUT,
error="logged-out",
)
return False
latest_exc: Exception | None = None
password_ok = False
try:
creds = await LoginCredential.get_by_mxid(self.mxid)
except Exception as e:
self.log.exception("Exception while looking for saved password")
creds = None
if creds:
uuid = await self.get_uuid()
form = creds.get_form()
oauth_credential = None
try:
oauth_credential = await Client.login(uuid=uuid, form=form, forced=False)
except IncorrectPassword as e:
latest_exc = e
except AnotherLogonDetected as e:
if self.force_login:
try:
# Wait a moment to make it look like a user-initiated response
await asyncio.sleep(2)
oauth_credential = await Client.login(uuid=uuid, form=form, forced=True)
except OAuthException as e:
latest_exc = e
else:
return False
if oauth_credential:
self.oauth_credential = oauth_credential
await self.save()
password_ok = True
else:
try:
await creds.delete()
except:
self.log.exception("Exception while deleting incorrect password")
if password_ok or self.config["bridge.allow_token_relogin"] and self.has_state:
if not password_ok:
self.log.warning("Using token-based relogin after password-based relogin failed")
elif latest_exc:
raise latest_exc
else:
# If we have a user in the DB with no state, we can assume
# KT logged us out and the bridge has restarted
await self.push_bridge_state(
BridgeStateEvent.BAD_CREDENTIALS,
error="logged-out",
)
return False
client = Client(self, log=self.log.getChild("ktclient"))
user_info = await client.start()
# NOTE On failure, client.start throws instead of returning something falsy
self.log.info("Loaded session successfully")
self._client = client
self._logged_in_info = user_info
self._logged_in_info_time = time.monotonic()
self._track_metric(METRIC_LOGGED_IN, True)
self._is_logged_in = True
self.is_connected = None
asyncio.create_task(self.post_login(is_startup=is_startup, is_token_login=not password_ok))
return True
async def _send_reset_notice(self, e: OAuthException, edit: EventID | None = None) -> None:
await self.send_bridge_notice(
"Got authentication error from KakaoTalk:\n\n"
f"> {e.message}\n\n"
"If you changed your KakaoTalk password, this "
"is normal and you just need to log in again.",
edit=edit,
important=True,
state_event=BridgeStateEvent.BAD_CREDENTIALS,
error_code="kt-auth-error",
error_message=str(e),
)
await self.logout(remove_ktid=False)
async def is_logged_in(self, _override: bool = False) -> bool:
if not self.has_state or not self._client:
return False
if self._is_logged_in is None or _override:
try:
self._is_logged_in = bool(await self.get_own_info())
except SerializerError:
self.log.exception(
"Unable to deserialize settings struct, "
"but didn't get auth error, so treating user as logged in"
)
self._is_logged_in = True
except Exception:
self.log.exception("Exception checking login status")
self._is_logged_in = False
return self._is_logged_in
async def reload_session(
self, event_id: EventID | None = None, retries: int = 3, is_startup: bool = False
) -> None:
try:
if not await self._load_session(is_startup=is_startup) and self.has_state:
self.log.debug("reload_session failure: wait for connection to Node module before prompting for manual login")
await Client.wait_for_connection()
self.log.debug("reload_session failure: now connected to Node module")
await self.send_bridge_notice(
"Logged out of KakaoTalk. Must use the `login` command to log back in.",
important=True,
)
await self.logout(remove_ktid=False)
except OAuthException as e:
await self._send_reset_notice(e, edit=event_id)
# TODO Throw a ResponseError on network failures
except ResponseError as e:
will_retry = retries > 0
retry = "Retrying in 1 minute" if will_retry else "Not retrying"
notice = f"Failed to connect to KakaoTalk: unknown response error {e}. {retry}"
if will_retry:
await self.send_bridge_notice(
notice,
edit=event_id,
state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
)
await asyncio.sleep(60)
await self.reload_session(event_id, retries - 1)
else:
await self.send_bridge_notice(
notice,
edit=event_id,
important=True,
state_event=BridgeStateEvent.UNKNOWN_ERROR,
error_code="kt-reconnection-error",
)
except Exception:
self.log.exception("Error connecting to KakaoTalk")
await self.send_bridge_notice(
"Failed to connect to KakaoTalk: unknown error (see logs for more details)",
edit=event_id,
state_event=BridgeStateEvent.UNKNOWN_ERROR,
error_code="kt-reconnection-error",
)
finally:
self._is_rpc_reconnecting = False
async def logout(self, *, remove_ktid: bool = True, reset_device: bool = False) -> None:
if self._client:
await self._client.stop()
if remove_ktid:
await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
self._track_metric(METRIC_LOGGED_IN, False)
if reset_device:
self.uuid = await self._generate_uuid()
self.access_token = None
self.refresh_token = None
self._is_logged_in = False
self.is_connected = None
self.was_connected = False
self._client = None
if self.ktid and remove_ktid:
#await UserPortal.delete_all(self.ktid)
del self.by_ktid[self.ktid]
self.ktid = None
await self.save()
async def post_login(self, is_startup: bool, is_token_login: bool) -> None:
self.log.info("Running post-login actions")
assert self.ktid
self._add_to_cache()
try:
puppet = await pu.Puppet.get_by_ktid(self.ktid)
if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
self.log.info(f"Automatically enabling custom puppet")
await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
except Exception:
self.log.exception("Failed to automatically enable custom puppet")
if not is_token_login or self.was_connected and self.config["bridge.reconnect_on_token_relogin"]:
# TODO Check if things break when a live message comes in during syncing
if self.config["bridge.sync_on_startup"] or not is_startup:
sync_count = self.config["bridge.initial_chat_sync"]
else:
sync_count = 0
await self.connect_and_sync(sync_count, force_sync=False)
else:
await self.send_bridge_notice(
f"Logged into KakaoTalk. To connect to chatroom updates, use the `sync` command."
)
self.was_connected = False
await self.save()
async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
return {
pu.Puppet.get_mxid_from_id(portal.ktid): [portal.mxid]
async for portal in po.Portal.get_all_by_receiver(self.ktid)
if portal.mxid
} if self.ktid else {}
@async_time(METRIC_CONNECT_AND_SYNC)
async def connect_and_sync(self, sync_count: int | None, force_sync: bool) -> bool:
# TODO Look for a way to sync all channels without (re-)logging in
try:
login_result = await self.client.connect()
should_sync = await self.on_connect(force_sync)
if login_result and should_sync:
await self._sync_channels(login_result, sync_count)
return True
except AuthenticationRequired as e:
await self.send_bridge_notice(
f"Got authentication error from KakaoTalk:\n\n> {e.message}\n\n",
important=True,
state_event=BridgeStateEvent.BAD_CREDENTIALS,
error_code="kt-auth-error",
error_message=str(e),
)
await self.logout(remove_ktid=False)
except Exception as e:
self.log.exception("Failed to connect and sync")
await self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR, message=str(e))
return False
async def _sync_channels(self, login_result: LoginResult, sync_count: int | None) -> None:
if sync_count is None:
sync_count = self.config["bridge.initial_chat_sync"]
if not sync_count:
self.log.debug("Skipping channel syncing")
return
# TODO Sync left channels. It's not login_result.removedChannelIdList...
if not login_result.channelList:
self.log.debug("No channels to sync")
return
num_channels = len(login_result.channelList)
sync_count = num_channels if sync_count < 0 else min(sync_count, num_channels)
await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
self.log.debug(f"Syncing {sync_count} of {num_channels} channels...")
def get_channel_update_time(login_data: LoginDataItem):
channel_info = login_data.channel.info
return channel_info.lastChatLog.sendAt if channel_info.lastChatLog else 0
for login_data in sorted(
login_result.channelList,
reverse=True,
key=get_channel_update_time
)[:sync_count]:
try:
await self._sync_channel_on_login(login_data)
except AuthenticationRequired:
raise
except Exception:
self.log.exception(f"Failed to sync channel {login_data.channel.channelId}")
await self.update_direct_chats()
def _sync_channel_on_login(self, login_data: LoginDataItem) -> Awaitable[None]:
channel_data = login_data.channel
self.log.debug(f"Syncing channel {channel_data.channelId} (last updated at {login_data.lastUpdate})")
channel_info = channel_data.info
if isinstance(channel_data, NormalChannelData):
channel_data: NormalChannelData
channel_info: NormalChannelInfo
self.log.debug(f"Join time: {channel_info.joinTime}")
elif isinstance(channel_data, OpenChannelData):
channel_data: OpenChannelData
self.log.debug(f"channel_data link ID: {channel_data.linkId}")
channel_info: OpenChannelInfo
self.log.debug(f"channel_info link ID: {channel_info.linkId}")
self.log.debug(f"openToken: {channel_info.openToken}")
self.log.debug(f"Is direct channel: {channel_info.directChannel}")
self.log.debug(f"Has OpenLink: {channel_info.openLink is not None}")
else:
self.log.error(f"Unexpected channel type: {type(channel_data)}")
channel_info: ChannelInfo
self.log.debug(f"channel_info channel ID: {channel_info.channelId}")
self.log.debug(f"Channel data/info IDs match: {channel_data.channelId == channel_info.channelId}")
self.log.debug(f"Channel type: {channel_info.type}")
self.log.debug(f"Active user count: {channel_info.activeUserCount}")
self.log.debug(f"New chat count: {channel_info.newChatCount}")
self.log.debug(f"New chat count invalid: {channel_info.newChatCountInvalid}")
self.log.debug(f"Last chat log ID: {channel_info.lastChatLogId}")
self.log.debug(f"Last seen log ID: {channel_info.lastSeenLogId}")
self.log.debug(f"Has last chat log: {channel_info.lastChatLog is not None}")
self.log.debug(f"metaMap: {channel_info.metaMap}")
self.log.debug(f"User count: {len(channel_info.displayUserList)}")
self.log.debug(f"Has push alert: {channel_info.pushAlert}")
for display_user_info in channel_info.displayUserList:
self.log.debug(f"Member: {display_user_info.nickname} - {display_user_info.profileURL} - {display_user_info.userId}")
return self._sync_channel(channel_info)
async def _sync_channel(self, channel_info: ChannelInfo) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_info.channelId,
kt_receiver=self.ktid,
kt_type=channel_info.type
)
portal_info = await self.client.get_portal_channel_info(portal.channel_props)
portal_info.channel_info = channel_info
if not portal.mxid:
await portal.create_matrix_room(self, portal_info)
else:
await portal.update_matrix_room(self, portal_info)
await portal.backfill(self, is_initial=False, channel_info=channel_info)
async def get_notice_room(self) -> RoomID:
if not self.notice_room:
async with self._notice_room_lock:
# If someone already created the room while this call was waiting,
# don't make a new room
if self.notice_room:
return self.notice_room
creation_content = {}
if not self.config["bridge.federate_rooms"]:
creation_content["m.federate"] = False
self.notice_room = await self.az.intent.create_room(
is_direct=True,
invitees=[self.mxid],
topic="KakaoTalk bridge notices",
creation_content=creation_content,
)
await self.save()
return self.notice_room
async def send_bridge_notice(
self,
text: str,
edit: EventID | None = None,
state_event: BridgeStateEvent | None = None,
important: bool = False,
error_code: str | None = None,
error_message: str | None = None,
) -> EventID | None:
if state_event:
await self.push_bridge_state(
state_event,
error=error_code,
message=error_message if error_code else text,
)
if self.config["bridge.disable_bridge_notices"]:
return None
event_id = None
try:
self.log.debug("Sending bridge notice: %s", text)
content = TextMessageEventContent(
body=text,
msgtype=(MessageType.TEXT if important else MessageType.NOTICE),
)
if edit:
content.set_edit(edit)
# This is locked to prevent notices going out in the wrong order
async with self._notice_send_lock:
event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
except Exception:
self.log.warning("Failed to send bridge notice", exc_info=True)
return edit or event_id
async def fill_bridge_state(self, state: BridgeState) -> None:
await super().fill_bridge_state(state)
if self.ktid:
state.remote_id = str(self.ktid)
puppet = await pu.Puppet.get_by_ktid(self.ktid)
state.remote_name = puppet.name
async def get_bridge_states(self) -> list[BridgeState]:
if not self.has_state:
return []
state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
if self.is_connected:
state.state_event = BridgeStateEvent.CONNECTED
elif self._is_rpc_reconnecting or self._client:
state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
return [state]
async def get_puppet(self) -> pu.Puppet | None:
if not self.ktid:
return None
return await pu.Puppet.get_by_ktid(self.ktid)
async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
# TODO Make upstream request to return custom failure message
if not self.ktid or not self.is_connected:
return None
if puppet.ktid != self.ktid:
kt_type = KnownChannelType.DirectChat
ktid = await self.client.get_friend_dm_id(puppet.ktid)
else:
kt_type = KnownChannelType.MemoChat
memo_ids = await self.client.get_memo_ids()
if not memo_ids:
ktid = None
else:
ktid = memo_ids[0]
if len(memo_ids) > 1:
self.log.warning("Found multiple memo chats, so using the first one as a fallback")
if ktid:
self.log.debug(f"Found existing direct chat with KakaoTalk user {puppet.ktid}")
else:
self.log.debug(f"Didn't find an existing direct chat with KakaoTalk user {puppet.ktid}, so will create one")
try:
ktid = await self.client.create_direct_chat(puppet.ktid)
except:
self.log.exception(f"Failed to create direct chat with {puppet.ktid}")
return await po.Portal.get_by_ktid(
ktid, kt_receiver=self.ktid, create=create, kt_type=kt_type
) if ktid else None
# region Matrix->KakaoTalk commands
async def join_channel(self, url: str) -> None:
await self.client.join_channel_by_url(url)
# TODO Get channel ID(s) and sync
async def leave_channel(self, portal: po.Portal) -> None:
await self.client.leave_channel(portal.channel_props)
await self.on_channel_left(portal.ktid, portal.kt_type)
# endregion
# region KakaoTalk event handling
async def on_connect(self, force_sync: bool) -> bool:
now = time.monotonic()
disconnected_at = self._connection_time
max_delay = self.config["bridge.resync_max_disconnected_time"]
first_connect = self.is_connected is None
self.is_connected = True
self._track_metric(METRIC_CONNECTED, True)
duration = int(now - disconnected_at)
skip_sync = not first_connect and not force_sync and duration < max_delay
if skip_sync:
self.log.debug(f"Disconnection lasted {duration} seconds, not re-syncing channels...")
elif self.temp_disconnect_notices:
await self.send_bridge_notice("Connected to KakaoTalk chats")
await self.push_bridge_state(BridgeStateEvent.CONNECTED)
self.was_connected = True
await self.save()
return not skip_sync
async def on_disconnect(self, res: KickoutRes | None) -> None:
self.is_connected = False
self._track_metric(METRIC_CONNECTED, False)
logout = not self.config["bridge.remain_logged_in_on_disconnect"]
if res:
if res.reason == KnownKickoutType.LOGIN_ANOTHER:
reason_str = "Logged in on a PC or another bridge."
elif res.reason == KnownKickoutType.CHANGE_SERVER:
# TODO Reconnect automatically instead
reason_str = "KakaoTalk backend server changed."
elif res.reason == KnownKickoutType.ACCOUNT_DELETED:
reason_str = "Your KakaoTalk account was deleted!"
logout = True
else:
reason_str = f"Unknown reason ({res.reason})."
else:
reason_str = None
if not logout:
# TODO What bridge state to push?
self.was_connected = False
await self.save()
else:
await self.logout()
if reason_str:
if not logout:
reason_suffix = "To reconnect, use the `sync` command."
else:
reason_suffix = "You are now logged out. To log back in, use the `login` command."
await self.send_bridge_notice(
f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}",
important=True,
)
async def on_error(self, error: JSON) -> None:
await self.send_bridge_notice(
f"Got error event from KakaoTalk:\n\n> {error}",
# TODO Which error code to use?
#error_code="kt-connection-error",
error_message=str(error),
)
async def on_unexpected_disconnect(self) -> None:
self.is_connected = False
self._track_metric(METRIC_CONNECTED, False)
if self.config["bridge.remain_logged_in_on_disconnect"]:
# TODO What bridge state to push?
self.was_connected = False
await self.save()
reason_suffix = "To reconnect, use the `sync` command."
else:
await self.logout()
reason_suffix = "You are now logged out. To log back in, use the `login` command."
await self.send_bridge_notice(
f"Disconnected from KakaoTalk: unexpected error in backend helper module. {reason_suffix}",
important=True,
)
async def on_client_disconnect(self) -> None:
self.is_connected = False
self._track_metric(METRIC_CONNECTED, False)
self._client = None
if self._is_logged_in and self.was_connected:
if self.temp_disconnect_notices:
await self.send_bridge_notice(
"Disconnected from KakaoTalk: backend helper module exited. "
"Will reconnect once the module resumes."
)
await self.push_bridge_state(BridgeStateEvent.TRANSIENT_DISCONNECT)
self._is_rpc_reconnecting = True
asyncio.create_task(self.reload_session())
async def on_logged_in(self, oauth_credential: OAuthCredential) -> None:
self.log.debug(f"Successfully logged in as {oauth_credential.userId}")
self.oauth_credential = oauth_credential
await self.push_bridge_state(BridgeStateEvent.CONNECTING)
self._client = Client(self, log=self.log.getChild("ktclient"))
await self.save()
self._is_logged_in = True
# TODO Retry network connection failures here, or in the client (like token refreshes are)?
# Should also catch unlikely authentication errors
self._logged_in_info = await self._client.start()
self._logged_in_info_time = time.monotonic()
asyncio.create_task(self.post_login(is_startup=True, is_token_login=False))
@async_time(METRIC_CHAT)
async def on_chat(self, chat: Chatlog, channel_id: Long, channel_type: ChannelType) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
puppet = await pu.Puppet.get_by_ktid(chat.sender.userId)
await portal.backfill_lock.wait(chat.logId)
if not puppet.name:
portal.schedule_resync(self, puppet)
await portal.handle_kakaotalk_chat(self, puppet, chat)
@async_time(METRIC_CHAT_DELETED)
async def on_chat_deleted(
self,
chat_id: Long,
sender_id: Long,
timestamp: int,
channel_id: Long,
channel_type: ChannelType,
) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
create=False,
)
if portal and portal.mxid:
await portal.backfill_lock.wait(f"redaction of {chat_id}")
puppet = await pu.Puppet.get_by_ktid(sender_id)
await portal.handle_kakaotalk_chat_delete(puppet, chat_id, timestamp)
@async_time(METRIC_CHAT_READ)
async def on_chat_read(
self,
chat_id: Long,
sender_id: Long,
channel_id: Long,
channel_type: ChannelType,
) -> None:
assert self.ktid
puppet = await pu.Puppet.get_by_ktid(sender_id)
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
create=False,
)
if portal and portal.mxid:
await portal.backfill_lock.wait(f"read receipt from {sender_id}")
await portal.handle_kakaotalk_chat_read(self, puppet, chat_id)
@async_time(METRIC_CHANNEL_META_CHANGE)
async def on_channel_meta_change(
self,
info: PortalChannelInfo,
channel_id: Long,
channel_type: ChannelType,
) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
await portal.backfill_lock.wait("meta change")
await portal.update_info(self, info)
@async_time(METRIC_PROFILE_CHANGE)
async def on_profile_changed(self, info: OpenLinkChannelUserInfo) -> None:
puppet = await pu.Puppet.get_by_ktid(info.userId)
if puppet:
await puppet.update_info_from_participant(self, info)
@async_time(METRIC_PERM_CHANGE)
async def on_perm_changed(
self,
user_id: Long,
perm: OpenChannelUserPerm,
sender_id: Long,
channel_id: Long,
channel_type: ChannelType,
) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
create=False,
)
if portal and portal.mxid:
sender = await pu.Puppet.get_by_ktid(sender_id)
await portal.backfill_lock.wait("perm changed")
await portal.handle_kakaotalk_perm_changed(self, sender, user_id, perm)
@async_time(METRIC_CHANNEL_ADDED)
def on_channel_added(self, channel_info: ChannelInfo) -> Awaitable[None]:
return self._sync_channel(channel_info)
@async_time(METRIC_CHANNEL_JOIN)
async def on_channel_join(self, channel_info: ChannelInfo) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_info.channelId,
kt_receiver=self.ktid,
kt_type=channel_info.type,
)
if portal.mxid:
user = await pu.Puppet.get_by_ktid(self.ktid)
await portal.backfill_lock.wait("channel join")
await portal.handle_kakaotalk_user_join(self, user)
else:
await self._sync_channel(channel_info)
@async_time(METRIC_CHANNEL_LEFT)
async def on_channel_left(self, channel_id: Long, channel_type: ChannelType | None) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
user = await pu.Puppet.get_by_ktid(self.ktid)
await portal.backfill_lock.wait("channel left")
await portal.handle_kakaotalk_user_left(self, user, user)
@async_time(METRIC_CHANNEL_KICKED)
async def on_channel_kicked(
self,
user_id: Long,
sender_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
sender = await pu.Puppet.get_by_ktid(sender_id)
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("channel kicked")
await portal.handle_kakaotalk_user_left(self, sender, user)
@async_time(METRIC_USER_JOIN)
async def on_user_join(
self,
user_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("user join")
await portal.handle_kakaotalk_user_join(self, user)
@async_time(METRIC_USER_LEFT)
async def on_user_left(
self,
user_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
assert self.ktid
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("user left")
await portal.handle_kakaotalk_user_left(self, user, user)
# endregion