Andrew Ferrazzutti
53d3170c04
Fixes -203 error for MemoChats, and automatically retries channel creation when it fails the first time
987 lines
39 KiB
Python
987 lines
39 KiB
Python
# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge.
|
|
# Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable, cast
|
|
import asyncio
|
|
import time
|
|
|
|
from mautrix.bridge import BaseUser, async_getter_lock
|
|
from mautrix.types import (
|
|
EventID,
|
|
JSON,
|
|
MessageType,
|
|
RoomID,
|
|
SerializerError,
|
|
TextMessageEventContent,
|
|
UserID,
|
|
)
|
|
from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
|
|
from mautrix.util.opt_prometheus import Gauge, Summary, async_time
|
|
from mautrix.util.simple_lock import SimpleLock
|
|
|
|
from . import portal as po, puppet as pu
|
|
from .config import Config
|
|
from .db import User as DBUser, LoginCredential
|
|
|
|
from .kt.client import Client
|
|
from .kt.client.errors import (
|
|
AuthenticationRequired,
|
|
AnotherLogonDetected,
|
|
IncorrectPassword,
|
|
OAuthException,
|
|
ResponseError,
|
|
)
|
|
from .kt.client.types import PortalChannelInfo, SettingsStruct
|
|
from .kt.types.bson import Long
|
|
from .kt.types.channel.channel_info import ChannelInfo, NormalChannelInfo, NormalChannelData
|
|
from .kt.types.channel.channel_type import ChannelType, KnownChannelType
|
|
from .kt.types.chat.chat import Chatlog
|
|
from .kt.types.client.client_session import LoginDataItem, LoginResult
|
|
from .kt.types.oauth import OAuthCredential
|
|
from .kt.types.openlink.open_channel_info import OpenChannelData, OpenChannelInfo
|
|
from .kt.types.openlink.open_link_type import OpenChannelUserPerm
|
|
from .kt.types.openlink.open_link_user_info import OpenLinkChannelUserInfo
|
|
from .kt.types.packet.chat.kickout import KnownKickoutType, KickoutRes
|
|
|
|
METRIC_CONNECT_AND_SYNC = Summary("bridge_connect_and_sync", "calls to connect_and_sync")
|
|
METRIC_CHAT = Summary("bridge_on_chat", "calls to on_chat")
|
|
METRIC_CHAT_DELETED = Summary("bridge_on_chat_deleted", "calls to on_chat_deleted")
|
|
METRIC_CHAT_READ = Summary("bridge_on_chat_read", "calls to on_chat_read")
|
|
METRIC_CHANNEL_META_CHANGE = Summary("bridge_on_channel_meta_change", "calls to on_channel_meta_change")
|
|
METRIC_PROFILE_CHANGE = Summary("bridge_on_profile_changed", "calls to on_profile_changed")
|
|
METRIC_PERM_CHANGE = Summary("bridge_on_perm_changed", "calls to on_perm_changed")
|
|
METRIC_CHANNEL_ADDED = Summary("bridge_on_channel_added", "calls to on_channel_added")
|
|
METRIC_CHANNEL_JOIN = Summary("bridge_on_channel_join", "calls to on_channel_join")
|
|
METRIC_CHANNEL_LEFT = Summary("bridge_on_channel_left", "calls to on_channel_left")
|
|
METRIC_CHANNEL_KICKED = Summary("bridge_on_channel_kicked", "calls to on_channel_kicked")
|
|
METRIC_USER_JOIN = Summary("bridge_on_user_join", "calls to on_user_join")
|
|
METRIC_USER_LEFT = Summary("bridge_on_user_left", "calls to on_user_left")
|
|
METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
|
|
METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk")
|
|
|
|
if TYPE_CHECKING:
|
|
from .__main__ import KakaoTalkBridge
|
|
|
|
BridgeState.human_readable_errors.update(
|
|
{
|
|
"kt-reconnection-error": "Failed to reconnect to KakaoTalk",
|
|
# TODO Use for errors in Node backend that cause session to be lost
|
|
#"kt-connection-error": "KakaoTalk disconnected unexpectedly",
|
|
"kt-auth-error": "Authentication error from KakaoTalk: {message}",
|
|
"logged-out": "You're not logged into KakaoTalk",
|
|
}
|
|
)
|
|
|
|
|
|
class User(DBUser, BaseUser):
|
|
temp_disconnect_notices: bool = True
|
|
shutdown: bool = False
|
|
config: Config
|
|
|
|
by_mxid: dict[UserID, User] = {}
|
|
by_ktid: dict[int, User] = {}
|
|
|
|
_client: Client | None
|
|
|
|
_notice_room_lock: asyncio.Lock
|
|
_notice_send_lock: asyncio.Lock
|
|
is_admin: bool
|
|
permission_level: str
|
|
_is_logged_in: bool | None
|
|
_is_connected: bool | None
|
|
_connection_time: float
|
|
_db_instance: DBUser | None
|
|
_sync_lock: SimpleLock
|
|
_is_rpc_reconnecting: bool
|
|
_logged_in_info: SettingsStruct | None
|
|
_logged_in_info_time: float
|
|
|
|
def __init__(
|
|
self,
|
|
mxid: UserID,
|
|
force_login: bool,
|
|
was_connected: bool,
|
|
ktid: Long | None = None,
|
|
uuid: str | None = None,
|
|
access_token: str | None = None,
|
|
refresh_token: str | None = None,
|
|
notice_room: RoomID | None = None,
|
|
) -> None:
|
|
super().__init__(
|
|
mxid=mxid,
|
|
force_login=force_login,
|
|
was_connected=was_connected,
|
|
ktid=ktid,
|
|
uuid=uuid,
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
notice_room=notice_room
|
|
)
|
|
BaseUser.__init__(self)
|
|
self.notice_room = notice_room
|
|
self._notice_room_lock = asyncio.Lock()
|
|
self._notice_send_lock = asyncio.Lock()
|
|
(
|
|
self.relay_whitelisted,
|
|
self.is_whitelisted,
|
|
self.is_admin,
|
|
self.permission_level,
|
|
) = self.config.get_permissions(mxid)
|
|
self._is_logged_in = None
|
|
self._is_connected = None
|
|
self._connection_time = time.monotonic()
|
|
self._sync_lock = SimpleLock(
|
|
"Waiting for channel sync to finish before handling %s", log=self.log
|
|
)
|
|
self._is_rpc_reconnecting = False
|
|
self._logged_in_info = None
|
|
self._logged_in_info_time = 0
|
|
|
|
self._client = None
|
|
|
|
@classmethod
|
|
def init_cls(cls, bridge: KakaoTalkBridge) -> AsyncIterable[Awaitable[bool]]:
|
|
cls.bridge = bridge
|
|
cls.config = bridge.config
|
|
cls.az = bridge.az
|
|
cls.temp_disconnect_notices = bridge.config["bridge.temporary_disconnect_notices"]
|
|
return (user.reload_session(is_startup=True) async for user in cls.all_logged_in())
|
|
|
|
@property
|
|
def client(self) -> Client:
|
|
if not self._client:
|
|
raise ValueError("User must be logged in before its client can be used")
|
|
return self._client
|
|
|
|
@property
|
|
def is_connected(self) -> bool | None:
|
|
return self._is_connected
|
|
|
|
@is_connected.setter
|
|
def is_connected(self, val: bool | None) -> None:
|
|
if self._is_connected != val:
|
|
self._is_connected = val
|
|
self._connection_time = time.monotonic()
|
|
|
|
@property
|
|
def connection_time(self) -> float:
|
|
return self._connection_time
|
|
|
|
@property
|
|
def has_state(self) -> bool:
|
|
# TODO If more state is needed, consider returning a saved LoginResult
|
|
return bool(self.access_token and self.refresh_token)
|
|
|
|
# region Database getters
|
|
|
|
def _add_to_cache(self) -> None:
|
|
self.by_mxid[self.mxid] = self
|
|
if self.ktid:
|
|
self.by_ktid[self.ktid] = self
|
|
|
|
@classmethod
|
|
async def all_logged_in(cls) -> AsyncGenerator["User", None]:
|
|
users = await super().all_logged_in()
|
|
user: cls
|
|
for user in users:
|
|
try:
|
|
yield cls.by_mxid[user.mxid]
|
|
except KeyError:
|
|
user._add_to_cache()
|
|
yield user
|
|
|
|
@classmethod
|
|
@async_getter_lock
|
|
async def get_by_mxid(cls, mxid: UserID, *, create: bool = True) -> User | None:
|
|
if pu.Puppet.get_id_from_mxid(mxid) or mxid == cls.az.bot_mxid:
|
|
return None
|
|
try:
|
|
return cls.by_mxid[mxid]
|
|
except KeyError:
|
|
pass
|
|
|
|
user = cast(cls, await super().get_by_mxid(mxid))
|
|
if user is not None:
|
|
user._add_to_cache()
|
|
return user
|
|
|
|
if create:
|
|
cls.log.debug(f"Creating user instance for {mxid}")
|
|
user = cls(mxid)
|
|
await user.insert()
|
|
user._add_to_cache()
|
|
return user
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
@async_getter_lock
|
|
async def get_by_ktid(cls, ktid: int) -> User | None:
|
|
try:
|
|
return cls.by_ktid[ktid]
|
|
except KeyError:
|
|
pass
|
|
|
|
user = cast(cls, await super().get_by_ktid(ktid))
|
|
if user is not None:
|
|
user._add_to_cache()
|
|
return user
|
|
|
|
return None
|
|
|
|
async def get_uuid(self, force: bool = False) -> str:
|
|
if self.uuid is None or force:
|
|
self.uuid = await self._generate_uuid()
|
|
# TODO Maybe don't save yet
|
|
await self.save()
|
|
return self.uuid
|
|
|
|
@classmethod
|
|
async def _generate_uuid(cls) -> str:
|
|
return await Client.generate_uuid(await super().get_all_uuids())
|
|
|
|
# endregion
|
|
|
|
@property
|
|
def oauth_credential(self) -> OAuthCredential:
|
|
assert self.ktid is not None
|
|
assert self.uuid is not None
|
|
assert self.access_token is not None
|
|
assert self.refresh_token is not None
|
|
return OAuthCredential(
|
|
self.ktid,
|
|
self.uuid,
|
|
self.access_token,
|
|
self.refresh_token,
|
|
)
|
|
|
|
@oauth_credential.setter
|
|
def oauth_credential(self, oauth_credential: OAuthCredential) -> None:
|
|
self.ktid = oauth_credential.userId
|
|
self.access_token = oauth_credential.accessToken
|
|
self.refresh_token = oauth_credential.refreshToken
|
|
if self.uuid != oauth_credential.deviceUUID:
|
|
self.log.warning(f"UUID mismatch: expected {self.uuid}, got {oauth_credential.deviceUUID}")
|
|
self.uuid = oauth_credential.deviceUUID
|
|
|
|
async def get_own_info(self, *, force: bool = False) -> SettingsStruct | None:
|
|
if force or self._client and (not self._logged_in_info or self._logged_in_info_time + 60 * 60 < time.monotonic()):
|
|
self._logged_in_info = await self._client.get_settings()
|
|
self._logged_in_info_time = time.monotonic()
|
|
return self._logged_in_info
|
|
|
|
async def _load_session(self, is_startup: bool) -> bool:
|
|
if self._is_logged_in and is_startup:
|
|
return True
|
|
if not self.was_connected and not self.config["bridge.remain_logged_in_on_disconnect"]:
|
|
self.log.warning("Not logging in because last session was disconnected, and login+disconnection is forbidden by config")
|
|
await self.push_bridge_state(
|
|
BridgeStateEvent.LOGGED_OUT,
|
|
error="logged-out",
|
|
)
|
|
return False
|
|
latest_exc: Exception | None = None
|
|
password_ok = False
|
|
try:
|
|
creds = await LoginCredential.get_by_mxid(self.mxid)
|
|
except Exception as e:
|
|
self.log.exception("Exception while looking for saved password")
|
|
creds = None
|
|
if creds:
|
|
uuid = await self.get_uuid()
|
|
form = creds.get_form()
|
|
oauth_credential = None
|
|
try:
|
|
oauth_credential = await Client.login(uuid=uuid, form=form, forced=False)
|
|
except IncorrectPassword as e:
|
|
latest_exc = e
|
|
except AnotherLogonDetected as e:
|
|
if self.force_login:
|
|
try:
|
|
# Wait a moment to make it look like a user-initiated response
|
|
await asyncio.sleep(2)
|
|
oauth_credential = await Client.login(uuid=uuid, form=form, forced=True)
|
|
except OAuthException as e:
|
|
latest_exc = e
|
|
else:
|
|
return False
|
|
if oauth_credential:
|
|
self.oauth_credential = oauth_credential
|
|
await self.save()
|
|
password_ok = True
|
|
else:
|
|
try:
|
|
await creds.delete()
|
|
except:
|
|
self.log.exception("Exception while deleting incorrect password")
|
|
if password_ok or self.config["bridge.allow_token_relogin"] and self.has_state:
|
|
if not password_ok:
|
|
self.log.warning("Using token-based relogin after password-based relogin failed")
|
|
elif latest_exc:
|
|
raise latest_exc
|
|
else:
|
|
# If we have a user in the DB with no state, we can assume
|
|
# KT logged us out and the bridge has restarted
|
|
await self.push_bridge_state(
|
|
BridgeStateEvent.BAD_CREDENTIALS,
|
|
error="logged-out",
|
|
)
|
|
return False
|
|
client = Client(self, log=self.log.getChild("ktclient"))
|
|
user_info = await client.start()
|
|
# NOTE On failure, client.start throws instead of returning something falsy
|
|
self.log.info("Loaded session successfully")
|
|
self._client = client
|
|
self._logged_in_info = user_info
|
|
self._logged_in_info_time = time.monotonic()
|
|
self._track_metric(METRIC_LOGGED_IN, True)
|
|
self._is_logged_in = True
|
|
self.is_connected = None
|
|
asyncio.create_task(self.post_login(is_startup=is_startup, is_token_login=not password_ok))
|
|
return True
|
|
|
|
async def _send_reset_notice(self, e: OAuthException, edit: EventID | None = None) -> None:
|
|
await self.send_bridge_notice(
|
|
"Got authentication error from KakaoTalk:\n\n"
|
|
f"> {e.message}\n\n"
|
|
"If you changed your KakaoTalk password, this "
|
|
"is normal and you just need to log in again.",
|
|
edit=edit,
|
|
important=True,
|
|
state_event=BridgeStateEvent.BAD_CREDENTIALS,
|
|
error_code="kt-auth-error",
|
|
error_message=str(e),
|
|
)
|
|
await self.logout(remove_ktid=False)
|
|
|
|
async def is_logged_in(self, _override: bool = False) -> bool:
|
|
if not self.has_state or not self._client:
|
|
return False
|
|
if self._is_logged_in is None or _override:
|
|
try:
|
|
self._is_logged_in = bool(await self.get_own_info())
|
|
except SerializerError:
|
|
self.log.exception(
|
|
"Unable to deserialize settings struct, "
|
|
"but didn't get auth error, so treating user as logged in"
|
|
)
|
|
self._is_logged_in = True
|
|
except Exception:
|
|
self.log.exception("Exception checking login status")
|
|
self._is_logged_in = False
|
|
return self._is_logged_in
|
|
|
|
async def reload_session(
|
|
self, event_id: EventID | None = None, retries: int = 3, is_startup: bool = False
|
|
) -> None:
|
|
try:
|
|
if not await self._load_session(is_startup=is_startup) and self.has_state:
|
|
self.log.debug("reload_session failure: wait for connection to Node module before prompting for manual login")
|
|
await Client.wait_for_connection()
|
|
self.log.debug("reload_session failure: now connected to Node module")
|
|
await self.send_bridge_notice(
|
|
"Logged out of KakaoTalk. Must use the `login` command to log back in.",
|
|
important=True,
|
|
)
|
|
await self.logout(remove_ktid=False)
|
|
except OAuthException as e:
|
|
await self._send_reset_notice(e, edit=event_id)
|
|
# TODO Throw a ResponseError on network failures
|
|
except ResponseError as e:
|
|
will_retry = retries > 0
|
|
retry = "Retrying in 1 minute" if will_retry else "Not retrying"
|
|
notice = f"Failed to connect to KakaoTalk: unknown response error {e}. {retry}"
|
|
if will_retry:
|
|
await self.send_bridge_notice(
|
|
notice,
|
|
edit=event_id,
|
|
state_event=BridgeStateEvent.TRANSIENT_DISCONNECT,
|
|
)
|
|
await asyncio.sleep(60)
|
|
await self.reload_session(event_id, retries - 1)
|
|
else:
|
|
await self.send_bridge_notice(
|
|
notice,
|
|
edit=event_id,
|
|
important=True,
|
|
state_event=BridgeStateEvent.UNKNOWN_ERROR,
|
|
error_code="kt-reconnection-error",
|
|
)
|
|
except Exception:
|
|
self.log.exception("Error connecting to KakaoTalk")
|
|
await self.send_bridge_notice(
|
|
"Failed to connect to KakaoTalk: unknown error (see logs for more details)",
|
|
edit=event_id,
|
|
state_event=BridgeStateEvent.UNKNOWN_ERROR,
|
|
error_code="kt-reconnection-error",
|
|
)
|
|
finally:
|
|
self._is_rpc_reconnecting = False
|
|
|
|
async def logout(self, *, remove_ktid: bool = True, reset_device: bool = False) -> None:
|
|
if self._client:
|
|
await self._client.stop()
|
|
if remove_ktid:
|
|
await self.push_bridge_state(BridgeStateEvent.LOGGED_OUT)
|
|
self._track_metric(METRIC_LOGGED_IN, False)
|
|
|
|
if reset_device:
|
|
self.uuid = await self._generate_uuid()
|
|
self.access_token = None
|
|
self.refresh_token = None
|
|
|
|
self._is_logged_in = False
|
|
self.is_connected = None
|
|
self.was_connected = False
|
|
self._client = None
|
|
|
|
if self.ktid and remove_ktid:
|
|
#await UserPortal.delete_all(self.ktid)
|
|
del self.by_ktid[self.ktid]
|
|
self.ktid = None
|
|
|
|
await self.save()
|
|
|
|
async def post_login(self, is_startup: bool, is_token_login: bool) -> None:
|
|
self.log.info("Running post-login actions")
|
|
assert self.ktid
|
|
self._add_to_cache()
|
|
|
|
try:
|
|
puppet = await pu.Puppet.get_by_ktid(self.ktid)
|
|
|
|
if puppet.custom_mxid != self.mxid and puppet.can_auto_login(self.mxid):
|
|
self.log.info(f"Automatically enabling custom puppet")
|
|
await puppet.switch_mxid(access_token="auto", mxid=self.mxid)
|
|
except Exception:
|
|
self.log.exception("Failed to automatically enable custom puppet")
|
|
|
|
if not is_token_login or self.was_connected and self.config["bridge.reconnect_on_token_relogin"]:
|
|
# TODO Check if things break when a live message comes in during syncing
|
|
if self.config["bridge.sync_on_startup"] or not is_startup:
|
|
sync_count = self.config["bridge.initial_chat_sync"]
|
|
else:
|
|
sync_count = 0
|
|
await self.connect_and_sync(sync_count, force_sync=False)
|
|
else:
|
|
await self.send_bridge_notice(
|
|
f"Logged into KakaoTalk. To connect to chatroom updates, use the `sync` command."
|
|
)
|
|
self.was_connected = False
|
|
await self.save()
|
|
|
|
async def get_direct_chats(self) -> dict[UserID, list[RoomID]]:
|
|
return {
|
|
pu.Puppet.get_mxid_from_id(portal.ktid): [portal.mxid]
|
|
async for portal in po.Portal.get_all_by_receiver(self.ktid)
|
|
if portal.mxid
|
|
} if self.ktid else {}
|
|
|
|
@async_time(METRIC_CONNECT_AND_SYNC)
|
|
async def connect_and_sync(self, sync_count: int | None, force_sync: bool) -> bool:
|
|
# TODO Look for a way to sync all channels without (re-)logging in
|
|
try:
|
|
login_result = await self.client.connect()
|
|
should_sync = await self.on_connect(force_sync)
|
|
if login_result and should_sync:
|
|
await self._sync_channels(login_result, sync_count)
|
|
return True
|
|
except AuthenticationRequired as e:
|
|
await self.send_bridge_notice(
|
|
f"Got authentication error from KakaoTalk:\n\n> {e.message}\n\n",
|
|
important=True,
|
|
state_event=BridgeStateEvent.BAD_CREDENTIALS,
|
|
error_code="kt-auth-error",
|
|
error_message=str(e),
|
|
)
|
|
await self.logout(remove_ktid=False)
|
|
except Exception as e:
|
|
self.log.exception("Failed to connect and sync")
|
|
await self.push_bridge_state(BridgeStateEvent.UNKNOWN_ERROR, message=str(e))
|
|
return False
|
|
|
|
async def _sync_channels(self, login_result: LoginResult, sync_count: int | None) -> None:
|
|
if sync_count is None:
|
|
sync_count = self.config["bridge.initial_chat_sync"]
|
|
if not sync_count:
|
|
self.log.debug("Skipping channel syncing")
|
|
return
|
|
# TODO Sync left channels. It's not login_result.removedChannelIdList...
|
|
if not login_result.channelList:
|
|
self.log.debug("No channels to sync")
|
|
return
|
|
|
|
num_channels = len(login_result.channelList)
|
|
sync_count = num_channels if sync_count < 0 else min(sync_count, num_channels)
|
|
await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
|
|
self.log.debug(f"Syncing {sync_count} of {num_channels} channels...")
|
|
|
|
def get_channel_update_time(login_data: LoginDataItem):
|
|
channel_info = login_data.channel.info
|
|
return channel_info.lastChatLog.sendAt if channel_info.lastChatLog else 0
|
|
|
|
for login_data in sorted(
|
|
login_result.channelList,
|
|
reverse=True,
|
|
key=get_channel_update_time
|
|
)[:sync_count]:
|
|
try:
|
|
await self._sync_channel_on_login(login_data)
|
|
except AuthenticationRequired:
|
|
raise
|
|
except Exception:
|
|
self.log.exception(f"Failed to sync channel {login_data.channel.channelId}")
|
|
|
|
await self.update_direct_chats()
|
|
|
|
def _sync_channel_on_login(self, login_data: LoginDataItem) -> Awaitable[None]:
|
|
channel_data = login_data.channel
|
|
self.log.debug(f"Syncing channel {channel_data.channelId} (last updated at {login_data.lastUpdate})")
|
|
channel_info = channel_data.info
|
|
if isinstance(channel_data, NormalChannelData):
|
|
channel_data: NormalChannelData
|
|
channel_info: NormalChannelInfo
|
|
self.log.debug(f"Join time: {channel_info.joinTime}")
|
|
elif isinstance(channel_data, OpenChannelData):
|
|
channel_data: OpenChannelData
|
|
self.log.debug(f"channel_data link ID: {channel_data.linkId}")
|
|
channel_info: OpenChannelInfo
|
|
self.log.debug(f"channel_info link ID: {channel_info.linkId}")
|
|
self.log.debug(f"openToken: {channel_info.openToken}")
|
|
self.log.debug(f"Is direct channel: {channel_info.directChannel}")
|
|
self.log.debug(f"Has OpenLink: {channel_info.openLink is not None}")
|
|
else:
|
|
self.log.error(f"Unexpected channel type: {type(channel_data)}")
|
|
|
|
channel_info: ChannelInfo
|
|
self.log.debug(f"channel_info channel ID: {channel_info.channelId}")
|
|
self.log.debug(f"Channel data/info IDs match: {channel_data.channelId == channel_info.channelId}")
|
|
self.log.debug(f"Channel type: {channel_info.type}")
|
|
self.log.debug(f"Active user count: {channel_info.activeUserCount}")
|
|
self.log.debug(f"New chat count: {channel_info.newChatCount}")
|
|
self.log.debug(f"New chat count invalid: {channel_info.newChatCountInvalid}")
|
|
self.log.debug(f"Last chat log ID: {channel_info.lastChatLogId}")
|
|
self.log.debug(f"Last seen log ID: {channel_info.lastSeenLogId}")
|
|
self.log.debug(f"Has last chat log: {channel_info.lastChatLog is not None}")
|
|
self.log.debug(f"metaMap: {channel_info.metaMap}")
|
|
self.log.debug(f"User count: {len(channel_info.displayUserList)}")
|
|
self.log.debug(f"Has push alert: {channel_info.pushAlert}")
|
|
for display_user_info in channel_info.displayUserList:
|
|
self.log.debug(f"Member: {display_user_info.nickname} - {display_user_info.profileURL} - {display_user_info.userId}")
|
|
|
|
return self._sync_channel(channel_info)
|
|
|
|
async def _sync_channel(self, channel_info: ChannelInfo) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_info.channelId,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_info.type
|
|
)
|
|
portal_info = await self.client.get_portal_channel_info(portal.channel_props)
|
|
portal_info.channel_info = channel_info
|
|
if not portal.mxid:
|
|
await portal.create_matrix_room(self, portal_info)
|
|
else:
|
|
await portal.update_matrix_room(self, portal_info)
|
|
await portal.backfill(self, is_initial=False, channel_info=channel_info)
|
|
|
|
async def get_notice_room(self) -> RoomID:
|
|
if not self.notice_room:
|
|
async with self._notice_room_lock:
|
|
# If someone already created the room while this call was waiting,
|
|
# don't make a new room
|
|
if self.notice_room:
|
|
return self.notice_room
|
|
creation_content = {}
|
|
if not self.config["bridge.federate_rooms"]:
|
|
creation_content["m.federate"] = False
|
|
self.notice_room = await self.az.intent.create_room(
|
|
is_direct=True,
|
|
invitees=[self.mxid],
|
|
topic="KakaoTalk bridge notices",
|
|
creation_content=creation_content,
|
|
)
|
|
await self.save()
|
|
return self.notice_room
|
|
|
|
async def send_bridge_notice(
|
|
self,
|
|
text: str,
|
|
edit: EventID | None = None,
|
|
state_event: BridgeStateEvent | None = None,
|
|
important: bool = False,
|
|
error_code: str | None = None,
|
|
error_message: str | None = None,
|
|
) -> EventID | None:
|
|
if state_event:
|
|
await self.push_bridge_state(
|
|
state_event,
|
|
error=error_code,
|
|
message=error_message if error_code else text,
|
|
)
|
|
if self.config["bridge.disable_bridge_notices"]:
|
|
return None
|
|
event_id = None
|
|
try:
|
|
self.log.debug("Sending bridge notice: %s", text)
|
|
content = TextMessageEventContent(
|
|
body=text,
|
|
msgtype=(MessageType.TEXT if important else MessageType.NOTICE),
|
|
)
|
|
if edit:
|
|
content.set_edit(edit)
|
|
# This is locked to prevent notices going out in the wrong order
|
|
async with self._notice_send_lock:
|
|
event_id = await self.az.intent.send_message(await self.get_notice_room(), content)
|
|
except Exception:
|
|
self.log.warning("Failed to send bridge notice", exc_info=True)
|
|
return edit or event_id
|
|
|
|
async def fill_bridge_state(self, state: BridgeState) -> None:
|
|
await super().fill_bridge_state(state)
|
|
if self.ktid:
|
|
state.remote_id = str(self.ktid)
|
|
puppet = await pu.Puppet.get_by_ktid(self.ktid)
|
|
state.remote_name = puppet.name
|
|
|
|
async def get_bridge_states(self) -> list[BridgeState]:
|
|
if not self.has_state:
|
|
return []
|
|
state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
|
|
if self.is_connected:
|
|
state.state_event = BridgeStateEvent.CONNECTED
|
|
elif self._is_rpc_reconnecting or self._client:
|
|
state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
|
|
return [state]
|
|
|
|
async def get_puppet(self) -> pu.Puppet | None:
|
|
if not self.ktid:
|
|
return None
|
|
return await pu.Puppet.get_by_ktid(self.ktid)
|
|
|
|
async def get_portal_with(self, puppet: pu.Puppet, create: bool = True) -> po.Portal | None:
|
|
# TODO Make upstream request to return custom failure message
|
|
if not self.ktid or not self.is_connected:
|
|
return None
|
|
if puppet.ktid != self.ktid:
|
|
kt_type = KnownChannelType.DirectChat
|
|
ktid = await self.client.get_friend_dm_id(puppet.ktid)
|
|
else:
|
|
kt_type = KnownChannelType.MemoChat
|
|
memo_ids = await self.client.get_memo_ids()
|
|
if not memo_ids:
|
|
ktid = None
|
|
else:
|
|
ktid = memo_ids[0]
|
|
if len(memo_ids) > 1:
|
|
self.log.warning("Found multiple memo chats, so using the first one as a fallback")
|
|
if ktid:
|
|
self.log.debug(f"Found existing direct chat with KakaoTalk user {puppet.ktid}")
|
|
else:
|
|
self.log.debug(f"Didn't find an existing direct chat with KakaoTalk user {puppet.ktid}, so will create one")
|
|
try:
|
|
ktid = await self.client.create_direct_chat(puppet.ktid)
|
|
except:
|
|
self.log.exception(f"Failed to create direct chat with {puppet.ktid}")
|
|
return await po.Portal.get_by_ktid(
|
|
ktid, kt_receiver=self.ktid, create=create, kt_type=kt_type
|
|
) if ktid else None
|
|
|
|
# region Matrix->KakaoTalk commands
|
|
|
|
async def leave_channel(self, portal: po.Portal) -> None:
|
|
await self.client.leave_channel(portal.channel_props)
|
|
await self.on_channel_left(portal.ktid, portal.kt_type)
|
|
|
|
# endregion
|
|
# region KakaoTalk event handling
|
|
|
|
async def on_connect(self, force_sync: bool) -> bool:
|
|
now = time.monotonic()
|
|
disconnected_at = self._connection_time
|
|
max_delay = self.config["bridge.resync_max_disconnected_time"]
|
|
first_connect = self.is_connected is None
|
|
self.is_connected = True
|
|
self._track_metric(METRIC_CONNECTED, True)
|
|
duration = int(now - disconnected_at)
|
|
skip_sync = not first_connect and not force_sync and duration < max_delay
|
|
if skip_sync:
|
|
self.log.debug(f"Disconnection lasted {duration} seconds, not re-syncing channels...")
|
|
elif self.temp_disconnect_notices:
|
|
await self.send_bridge_notice("Connected to KakaoTalk chats")
|
|
await self.push_bridge_state(BridgeStateEvent.CONNECTED)
|
|
self.was_connected = True
|
|
await self.save()
|
|
return not skip_sync
|
|
|
|
async def on_disconnect(self, res: KickoutRes | None) -> None:
|
|
self.is_connected = False
|
|
self._track_metric(METRIC_CONNECTED, False)
|
|
logout = not self.config["bridge.remain_logged_in_on_disconnect"]
|
|
if res:
|
|
if res.reason == KnownKickoutType.LOGIN_ANOTHER:
|
|
reason_str = "Logged in on a PC or another bridge."
|
|
elif res.reason == KnownKickoutType.CHANGE_SERVER:
|
|
# TODO Reconnect automatically instead
|
|
reason_str = "KakaoTalk backend server changed."
|
|
elif res.reason == KnownKickoutType.ACCOUNT_DELETED:
|
|
reason_str = "Your KakaoTalk account was deleted!"
|
|
logout = True
|
|
else:
|
|
reason_str = f"Unknown reason ({res.reason})."
|
|
else:
|
|
reason_str = None
|
|
if not logout:
|
|
# TODO What bridge state to push?
|
|
self.was_connected = False
|
|
await self.save()
|
|
else:
|
|
await self.logout()
|
|
if reason_str:
|
|
if not logout:
|
|
reason_suffix = "To reconnect, use the `sync` command."
|
|
else:
|
|
reason_suffix = "You are now logged out. To log back in, use the `login` command."
|
|
await self.send_bridge_notice(
|
|
f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}",
|
|
important=True,
|
|
)
|
|
|
|
async def on_error(self, error: JSON) -> None:
|
|
await self.send_bridge_notice(
|
|
f"Got error event from KakaoTalk:\n\n> {error}",
|
|
# TODO Which error code to use?
|
|
#error_code="kt-connection-error",
|
|
error_message=str(error),
|
|
)
|
|
|
|
async def on_client_disconnect(self) -> None:
|
|
self.is_connected = False
|
|
self._track_metric(METRIC_CONNECTED, False)
|
|
self._client = None
|
|
if self._is_logged_in and self.was_connected:
|
|
if self.temp_disconnect_notices:
|
|
await self.send_bridge_notice(
|
|
"Disconnected from KakaoTalk: backend helper module exited. "
|
|
"Will reconnect once the module resumes."
|
|
)
|
|
await self.push_bridge_state(BridgeStateEvent.TRANSIENT_DISCONNECT)
|
|
self._is_rpc_reconnecting = True
|
|
asyncio.create_task(self.reload_session())
|
|
|
|
async def on_logged_in(self, oauth_credential: OAuthCredential) -> None:
|
|
self.log.debug(f"Successfully logged in as {oauth_credential.userId}")
|
|
self.oauth_credential = oauth_credential
|
|
await self.push_bridge_state(BridgeStateEvent.CONNECTING)
|
|
self._client = Client(self, log=self.log.getChild("ktclient"))
|
|
await self.save()
|
|
self._is_logged_in = True
|
|
# TODO Retry network connection failures here, or in the client (like token refreshes are)?
|
|
# Should also catch unlikely authentication errors
|
|
self._logged_in_info = await self._client.start()
|
|
self._logged_in_info_time = time.monotonic()
|
|
asyncio.create_task(self.post_login(is_startup=True, is_token_login=False))
|
|
|
|
@async_time(METRIC_CHAT)
|
|
async def on_chat(self, chat: Chatlog, channel_id: Long, channel_type: ChannelType) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
)
|
|
puppet = await pu.Puppet.get_by_ktid(chat.sender.userId)
|
|
await portal.backfill_lock.wait(chat.logId)
|
|
if not puppet.name:
|
|
portal.schedule_resync(self, puppet)
|
|
await portal.handle_kakaotalk_chat(self, puppet, chat)
|
|
|
|
@async_time(METRIC_CHAT_DELETED)
|
|
async def on_chat_deleted(
|
|
self,
|
|
chat_id: Long,
|
|
sender_id: Long,
|
|
timestamp: int,
|
|
channel_id: Long,
|
|
channel_type: ChannelType,
|
|
) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
create=False,
|
|
)
|
|
if portal and portal.mxid:
|
|
await portal.backfill_lock.wait(f"redaction of {chat_id}")
|
|
puppet = await pu.Puppet.get_by_ktid(sender_id)
|
|
await portal.handle_kakaotalk_chat_delete(puppet, chat_id, timestamp)
|
|
|
|
@async_time(METRIC_CHAT_READ)
|
|
async def on_chat_read(
|
|
self,
|
|
chat_id: Long,
|
|
sender_id: Long,
|
|
channel_id: Long,
|
|
channel_type: ChannelType,
|
|
) -> None:
|
|
assert self.ktid
|
|
puppet = await pu.Puppet.get_by_ktid(sender_id)
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
create=False,
|
|
)
|
|
if portal and portal.mxid:
|
|
await portal.backfill_lock.wait(f"read receipt from {sender_id}")
|
|
await portal.handle_kakaotalk_chat_read(self, puppet, chat_id)
|
|
|
|
@async_time(METRIC_CHANNEL_META_CHANGE)
|
|
async def on_channel_meta_change(
|
|
self,
|
|
info: PortalChannelInfo,
|
|
channel_id: Long,
|
|
channel_type: ChannelType,
|
|
) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
)
|
|
await portal.backfill_lock.wait("meta change")
|
|
await portal.update_info(self, info)
|
|
|
|
@async_time(METRIC_PROFILE_CHANGE)
|
|
async def on_profile_changed(self, info: OpenLinkChannelUserInfo) -> None:
|
|
puppet = await pu.Puppet.get_by_ktid(info.userId)
|
|
if puppet:
|
|
await puppet.update_info_from_participant(self, info)
|
|
|
|
@async_time(METRIC_PERM_CHANGE)
|
|
async def on_perm_changed(
|
|
self,
|
|
user_id: Long,
|
|
perm: OpenChannelUserPerm,
|
|
sender_id: Long,
|
|
channel_id: Long,
|
|
channel_type: ChannelType,
|
|
) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
create=False,
|
|
)
|
|
if portal and portal.mxid:
|
|
sender = await pu.Puppet.get_by_ktid(sender_id)
|
|
await portal.backfill_lock.wait("perm changed")
|
|
await portal.handle_kakaotalk_perm_changed(self, sender, user_id, perm)
|
|
|
|
@async_time(METRIC_CHANNEL_ADDED)
|
|
def on_channel_added(self, channel_info: ChannelInfo) -> Awaitable[None]:
|
|
return self._sync_channel(channel_info)
|
|
|
|
@async_time(METRIC_CHANNEL_JOIN)
|
|
async def on_channel_join(self, channel_info: ChannelInfo) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_info.channelId,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_info.type,
|
|
)
|
|
if portal.mxid:
|
|
user = await pu.Puppet.get_by_ktid(self.ktid)
|
|
await portal.backfill_lock.wait("channel join")
|
|
await portal.handle_kakaotalk_user_join(self, user)
|
|
else:
|
|
await self._sync_channel(channel_info)
|
|
|
|
@async_time(METRIC_CHANNEL_LEFT)
|
|
async def on_channel_left(self, channel_id: Long, channel_type: ChannelType | None) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
)
|
|
if portal.mxid:
|
|
user = await pu.Puppet.get_by_ktid(self.ktid)
|
|
await portal.backfill_lock.wait("channel left")
|
|
await portal.handle_kakaotalk_user_left(self, user, user)
|
|
|
|
@async_time(METRIC_CHANNEL_KICKED)
|
|
async def on_channel_kicked(
|
|
self,
|
|
user_id: Long,
|
|
sender_id: Long,
|
|
channel_id: Long,
|
|
channel_type: ChannelType
|
|
) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
)
|
|
if portal.mxid:
|
|
sender = await pu.Puppet.get_by_ktid(sender_id)
|
|
user = await pu.Puppet.get_by_ktid(user_id)
|
|
await portal.backfill_lock.wait("channel kicked")
|
|
await portal.handle_kakaotalk_user_left(self, sender, user)
|
|
|
|
@async_time(METRIC_USER_JOIN)
|
|
async def on_user_join(
|
|
self,
|
|
user_id: Long,
|
|
channel_id: Long,
|
|
channel_type: ChannelType
|
|
) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
)
|
|
if portal.mxid:
|
|
user = await pu.Puppet.get_by_ktid(user_id)
|
|
await portal.backfill_lock.wait("user join")
|
|
await portal.handle_kakaotalk_user_join(self, user)
|
|
|
|
@async_time(METRIC_USER_LEFT)
|
|
async def on_user_left(
|
|
self,
|
|
user_id: Long,
|
|
channel_id: Long,
|
|
channel_type: ChannelType
|
|
) -> None:
|
|
assert self.ktid
|
|
portal = await po.Portal.get_by_ktid(
|
|
channel_id,
|
|
kt_receiver=self.ktid,
|
|
kt_type=channel_type,
|
|
)
|
|
if portal.mxid:
|
|
user = await pu.Puppet.get_by_ktid(user_id)
|
|
await portal.backfill_lock.wait("user left")
|
|
await portal.handle_kakaotalk_user_left(self, user, user)
|
|
|
|
# endregion
|