Compare commits

...

12 Commits

7 changed files with 334 additions and 50 deletions

View File

@ -16,7 +16,7 @@
* [x] Mentions
* [x] Message redactions<sup>[1]</sup>
* [ ] Message reactions
* [ ] Read receipts
* [x] Read receipts
* [ ] Power level
* [ ] Membership actions
* [ ] Invite
@ -42,11 +42,13 @@
* [ ] Message reactions
* [x] Message history
* [ ] Read receipts
* [ ] On backfill
* [x] On live event
* [ ] Admin status
* [ ] Membership actions
* [ ] Add member
* [ ] Remove member
* [ ] Leave
* [x] Membership actions
* [x] Add member
* [x] Remove member
* [x] Leave
* [ ] Chat metadata changes
* [x] Title
* [ ] Avatar
@ -59,9 +61,9 @@
* Misc
* [x] Multi-user support
* [x] Shared group chat portals
* [ ] Automatic portal creation
* [x] Automatic portal creation
* [x] At startup
* [ ] When added to chat
* [x] When added to chat
* [x] When receiving message
* [ ] Private chat creation by inviting Matrix puppet of KakaoTalk user to new room
* [x] For existing recently-updated KakaoTalk channels

View File

@ -22,7 +22,7 @@ with any other potential backend.
from __future__ import annotations
from typing import TYPE_CHECKING, cast, Type, Optional, Union
from typing import TYPE_CHECKING, cast, Awaitable, Type, Optional, Union
import asyncio
from contextlib import asynccontextmanager
import logging
@ -37,13 +37,15 @@ from mautrix.util.logging import TraceLogger
from ...config import Config
from ...rpc import EventHandler, RPCClient
from ..types.api.struct.profile import ProfileReqStruct, ProfileStruct
from ..types.api.struct import FriendListStruct
from ..types.api.struct.profile import ProfileReqStruct, ProfileStruct
from ..types.bson import Long
from ..types.client.client_session import LoginResult
from ..types.channel.channel_info import ChannelInfo
from ..types.chat import Chatlog, KnownChatType
from ..types.chat.attachment import MentionStruct, ReplyAttachment
from ..types.client.client_session import LoginResult
from ..types.oauth import OAuthCredential, OAuthInfo
from ..types.openlink.open_link_user_info import OpenLinkChannelUserInfo
from ..types.packet.chat.kickout import KnownKickoutType, KickoutRes
from ..types.request import (
deserialize_result,
@ -354,6 +356,17 @@ class Client:
chat_id=chat_id.serialize(),
)
async def mark_read(
self,
channel_props: ChannelProps,
read_until_chat_id: Long,
) -> None:
return await self._api_user_request_void(
"mark_read",
channel_props=channel_props.serialize(),
read_until_chat_id=read_until_chat_id.serialize(),
)
# TODO Combine these into one
@ -402,10 +415,52 @@ class Client:
str(data["channelType"]),
)
""" TODO
async def _on_receipt(self, data: Dict[str, JSON]) -> None:
await self.user.on_receipt(Receipt.deserialize(data["receipt"]))
"""
async def _on_chat_read(self, data: dict[str, JSON]) -> None:
await self.user.on_chat_read(
Long.deserialize(data["chatId"]),
Long.deserialize(data["senderId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_profile_changed(self, data: dict[str, JSON]) -> None:
await self.user.on_profile_changed(
OpenLinkChannelUserInfo.deserialize(data["info"]),
)
async def _on_channel_join(self, data: dict[str, JSON]) -> None:
await self.user.on_channel_join(
ChannelInfo.deserialize(data["channelInfo"]),
)
async def _on_channel_left(self, data: dict[str, JSON]) -> None:
await self.user.on_channel_left(
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_channel_kicked(self, data: dict[str, JSON]) -> None:
await self.user.on_channel_kicked(
Long.deserialize(data["userId"]),
Long.deserialize(data["senderId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_user_join(self, data: dict[str, JSON]) -> None:
await self.user.on_user_join(
Long.deserialize(data["userId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_user_left(self, data: dict[str, JSON]) -> None:
await self.user.on_user_left(
Long.deserialize(data["userId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_listen_disconnect(self, data: dict[str, JSON]) -> None:
try:
@ -423,13 +478,23 @@ class Client:
self._stop_listen()
await self.user.on_disconnect(res)
def _on_error(self, data: dict[str, JSON]) -> Awaitable[None]:
return self.user.on_error(data)
def _start_listen(self) -> None:
self._add_event_handler("chat", self._on_chat)
self._add_event_handler("chat_deleted", self._on_chat_deleted)
# TODO many more listeners
self._add_event_handler("chat_read", self._on_chat_read)
self._add_event_handler("profile_changed", self._on_profile_changed)
self._add_event_handler("channel_join", self._on_channel_join)
self._add_event_handler("channel_left", self._on_channel_left)
self._add_event_handler("channel_kicked", self._on_channel_kicked)
self._add_event_handler("user_join", self._on_user_join)
self._add_event_handler("user_left", self._on_user_left)
self._add_event_handler("disconnected", self._on_listen_disconnect)
self._add_event_handler("switch_server", self._on_switch_server)
self._add_event_handler("error", self._on_error)
def _stop_listen(self) -> None:
for method in self._handler_methods:

View File

@ -45,7 +45,7 @@ class ChannelInfo(Channel):
type: ChannelType
activeUserCount: int
newChatCount: int
newChatCountInvalid: bool
newChatCountInvalid: Optional[bool] = None # NOTE Made optional
lastChatLogId: Long
lastSeenLogId: Long
lastChatLog: Optional[Chatlog] = None

View File

@ -24,6 +24,7 @@ from mautrix.types import (
EventType,
ReactionEvent,
ReactionEventContent,
ReceiptEvent,
RedactionEvent,
RelationType,
RoomID,
@ -32,6 +33,7 @@ from mautrix.types import (
)
from . import portal as po, user as u
from .db import Message as DBMessage
if TYPE_CHECKING:
from .__main__ import KakaoTalkBridge
@ -47,7 +49,12 @@ class MatrixHandler(BaseMatrixHandler):
@staticmethod
async def allow_bridging_message(user: u.User, portal: po.Portal) -> bool:
return user.is_connected or (user.relay_whitelisted and portal.has_relay)
if user.is_connected:
return True
if user.relay_whitelisted and portal.has_relay:
relay_user = await portal.get_relay_user()
return relay_user and relay_user.is_connected
return False
async def send_welcome_message(self, room_id: RoomID, inviter: u.User) -> None:
await super().send_welcome_message(room_id, inviter)
@ -148,19 +155,16 @@ class MatrixHandler(BaseMatrixHandler):
event_id: EventID,
data: SingleReceiptEventContent,
) -> None:
self.log.info("TODO: handle_read_receipt")
"""
if not user.mqtt:
if not user.is_connected:
return
timestamp = data.get("ts", int(time.time() * 1000))
message = await DBMessage.get_by_mxid(event_id, portal.mxid)
await user.mqtt.mark_read(
portal.ktid,
True, # TODO
#portal.fb_type != ThreadType.USER,
read_to=message.timestamp if message else timestamp,
)
"""
await user.client.mark_read(portal.channel_props, message.ktid)
async def handle_ephemeral_event(
self, evt: ReceiptEvent | Event
) -> None:
if evt.type == EventType.RECEIPT:
await self.handle_receipt(evt)
async def handle_event(self, evt: Event) -> None:
if evt.type == EventType.ROOM_REDACTION:

View File

@ -190,6 +190,7 @@ class Portal(DBPortal, BasePortal):
# TODO More
cls._chat_type_handler_map = {
KnownChatType.FEED: cls._handle_kakaotalk_feed,
KnownChatType.TEXT: cls._handle_kakaotalk_text,
KnownChatType.REPLY: cls._handle_kakaotalk_reply,
KnownChatType.PHOTO: cls._handle_kakaotalk_photo,
@ -527,8 +528,7 @@ class Portal(DBPortal, BasePortal):
if did_join and self.is_direct:
await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
# TODO
#await self._sync_read_receipts(info.read_receipts.nodes)
# TODO Sync read receipts?
"""
async def _sync_read_receipts(self, receipts: list[None]) -> None:
@ -702,8 +702,7 @@ class Portal(DBPortal, BasePortal):
except Exception:
self.log.exception("Failed to backfill new portal")
# TODO
#await self._sync_read_receipts(info.read_receipts.nodes)
# TODO Sync read receipts?
return self.mxid
@ -775,8 +774,8 @@ class Portal(DBPortal, BasePortal):
sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}")
if not sender:
raise Exception("not logged in")
elif not sender.has_state:
raise Exception("not connected to KakaoTalk")
elif not sender.is_connected:
raise Exception("not connected to KakaoTalk chats")
elif is_relay:
await self.apply_relay_message_format(orig_sender, message)
if message.msgtype == MessageType.TEXT or message.msgtype == MessageType.NOTICE:
@ -1090,6 +1089,7 @@ class Portal(DBPortal, BasePortal):
if chat_text:
events = await self._handle_kakaotalk_text(
intent=intent,
attachment=None,
timestamp=timestamp,
chat_text=chat_text,
)
@ -1104,6 +1104,15 @@ class Portal(DBPortal, BasePortal):
events.append(await self._send_message(intent, content, timestamp=timestamp))
return events
async def _handle_kakaotalk_feed(
self,
timestamp: int,
chat_text: str | None,
**_
) -> list[EventID]:
self.log.info("Got feed message at %s: %s", timestamp, chat_text or "none")
return []
async def _handle_kakaotalk_text(
self,
intent: IntentAPI,
@ -1266,7 +1275,25 @@ class Portal(DBPortal, BasePortal):
await self.main_intent.redact(message.mx_room, message.mxid, timestamp=timestamp)
await message.delete()
# TODO Many more remote handlers
async def handle_kakaotalk_user_join(
self, source: u.User, user: p.Puppet
) -> None:
await self.main_intent.ensure_joined(self.mxid)
if not user.name:
self.schedule_resync(source, user)
async def handle_kakaotalk_user_left(
self, source: u.User, sender: p.Puppet, removed: p.Puppet
) -> None:
if sender == removed:
await removed.intent_for(self).leave_room(self.mxid)
else:
try:
await sender.intent_for(self).kick_user(self.mxid, removed.mxid)
except MForbidden:
await self.main_intent.kick_user(
self.mxid, removed.mxid, reason=f"Kicked by {sender.name}"
)
# endregion
@ -1303,7 +1330,6 @@ class Portal(DBPortal, BasePortal):
source,
limit,
most_recent.ktid if most_recent else None,
channel_info=channel_info,
)
async def _backfill(
@ -1311,7 +1337,6 @@ class Portal(DBPortal, BasePortal):
source: u.User,
limit: int | None,
after_log_id: Long | None,
channel_info: ChannelInfo,
) -> None:
self.log.debug(f"Backfilling history through {source.mxid}")
self.log.debug(f"Fetching {f'up to {limit}' if limit else 'all'} messages through {source.ktid}")

View File

@ -22,6 +22,7 @@ import time
from mautrix.bridge import BaseUser, async_getter_lock
from mautrix.types import (
EventID,
JSON,
MessageType,
RoomID,
TextMessageEventContent,
@ -45,11 +46,19 @@ from .kt.types.chat.chat import Chatlog
from .kt.types.client.client_session import LoginDataItem, LoginResult
from .kt.types.oauth import OAuthCredential
from .kt.types.openlink.open_channel_info import OpenChannelData, OpenChannelInfo
from .kt.types.openlink.open_link_user_info import OpenLinkChannelUserInfo
from .kt.types.packet.chat.kickout import KnownKickoutType, KickoutRes
METRIC_CONNECT_AND_SYNC = Summary("bridge_connect_and_sync", "calls to connect_and_sync")
METRIC_CHAT = Summary("bridge_on_chat", "calls to on_chat")
METRIC_CHAT_DELETED = Summary("bridge_on_chat_deleted", "calls to on_chat_deleted")
METRIC_CHAT_READ = Summary("bridge_on_chat_read", "calls to on_chat_read")
METRIC_PROFILE_CHANGE = Summary("bridge_on_profile_changed", "calls to on_profile_changed")
METRIC_CHANNEL_JOIN = Summary("bridge_on_channel_join", "calls to on_channel_join")
METRIC_CHANNEL_LEFT = Summary("bridge_on_channel_left", "calls to on_channel_left")
METRIC_CHANNEL_KICKED = Summary("bridge_on_channel_kicked", "calls to on_channel_kicked")
METRIC_USER_JOIN = Summary("bridge_on_user_join", "calls to on_user_join")
METRIC_USER_LEFT = Summary("bridge_on_user_left", "calls to on_user_left")
METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk")
@ -438,7 +447,7 @@ class User(DBUser, BaseUser):
key=get_channel_update_time
)[:sync_count]:
try:
await self._sync_channel(login_data)
await self._sync_channel_on_login(login_data)
except AuthenticationRequired:
raise
except Exception:
@ -446,7 +455,7 @@ class User(DBUser, BaseUser):
await self.update_direct_chats()
async def _sync_channel(self, login_data: LoginDataItem) -> None:
def _sync_channel_on_login(self, login_data: LoginDataItem) -> Awaitable[None]:
channel_data = login_data.channel
self.log.debug(f"Syncing channel {channel_data.channelId} (last updated at {login_data.lastUpdate})")
channel_info = channel_data.info
@ -481,6 +490,9 @@ class User(DBUser, BaseUser):
for display_user_info in channel_info.displayUserList:
self.log.debug(f"Member: {display_user_info.nickname} - {display_user_info.profileURL} - {display_user_info.userId}")
return self._sync_channel(channel_info)
async def _sync_channel(self, channel_info: ChannelInfo):
portal = await po.Portal.get_by_ktid(
channel_info.channelId,
kt_receiver=self.ktid,
@ -633,6 +645,14 @@ class User(DBUser, BaseUser):
await self.logout()
await self.send_bridge_notice(f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}")
def on_error(self, error: JSON) -> Awaitable[None]:
return self.send_bridge_notice(
f"Got error event from KakaoTalk:\n\n> {error}",
# TODO Which error code to use?
#error_code="kt-connection-error",
error_message=str(error),
)
async def on_client_disconnect(self) -> None:
self.is_connected = False
self._track_metric(METRIC_CONNECTED, False)
@ -664,7 +684,7 @@ class User(DBUser, BaseUser):
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type
kt_type=channel_type,
)
puppet = await pu.Puppet.get_by_ktid(chat.sender.userId)
await portal.backfill_lock.wait(chat.logId)
@ -685,13 +705,103 @@ class User(DBUser, BaseUser):
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
create=False
create=False,
)
if portal and portal.mxid:
await portal.backfill_lock.wait(f"redaction of {chat_id}")
puppet = await pu.Puppet.get_by_ktid(sender_id)
await portal.handle_kakaotalk_chat_delete(puppet, chat_id, timestamp)
# TODO Many more handlers
@async_time(METRIC_CHAT_READ)
async def on_chat_read(
self,
chat_id: Long,
sender_id: Long,
channel_id: Long,
channel_type: ChannelType,
) -> None:
puppet = await pu.Puppet.get_by_ktid(sender_id)
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
create=False,
)
if portal and portal.mxid:
await portal.backfill_lock.wait(f"read receipt from {sender_id}")
await portal.handle_kakaotalk_read(self, puppet, chat_id)
@async_time(METRIC_PROFILE_CHANGE)
async def on_profile_changed(self, info: OpenLinkChannelUserInfo) -> None:
puppet = await pu.Puppet.get_by_ktid(info.userId)
if puppet:
await puppet.update_info_from_participant(self, info)
@async_time(METRIC_CHANNEL_JOIN)
def on_channel_join(self, channel_info: ChannelInfo) -> Awaitable[None]:
return self._sync_channel(channel_info)
@async_time(METRIC_CHANNEL_LEFT)
async def on_channel_left(self, channel_id: Long, channel_type: ChannelType) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
await portal.main_intent.kick_user(portal.mxid, self.mxid, "Left this channel from KakaoTalk")
@async_time(METRIC_CHANNEL_KICKED)
async def on_channel_kicked(
self,
user_id: Long,
sender_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
sender = await pu.Puppet.get_by_ktid(sender_id)
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("channel kicked")
await portal.handle_kakaotalk_user_left(self, sender, user)
@async_time(METRIC_USER_JOIN)
async def on_user_join(
self,
user_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("user join")
await portal.handle_kakaotalk_user_join(self, user)
@async_time(METRIC_USER_LEFT)
async def on_user_left(
self,
user_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("user left")
await portal.handle_kakaotalk_user_left(self, user, user)
# endregion

View File

@ -48,13 +48,13 @@ ServiceApiClient.prototype.requestFriendList = async function() {
{
phone_number_type: 1,
}
);
)
return {
status: res.status,
success: res.status === 0,
result: res,
};
}
}
@ -93,7 +93,7 @@ class UserClient {
})
this.#talkClient.on("chat_deleted", (feedChatlog, channel, feed) => {
this.log(`${feed.logId} deleted in channel ${channel.channelId} by user ${feedChatlog.sender.userId}`);
this.log(`${feed.logId} deleted in channel ${channel.channelId} by user ${feedChatlog.sender.userId}`)
return this.write("chat_deleted", {
chatId: feed.logId,
senderId: feedChatlog.sender.userId,
@ -104,7 +104,7 @@ class UserClient {
})
this.#talkClient.on("message_hidden", (hideLog, channel, feed) => {
this.log(`Message ${hideLog.logId} hid from channel ${channel.channelId} by user ${hideLog.sender.userId}`);
this.log(`Message ${feed.logId} hid from channel ${channel.channelId} by user ${hideLog.sender.userId}`)
return this.write("chat_deleted", {
chatId: feed.logId,
senderId: hideLog.sender.userId,
@ -114,12 +114,69 @@ class UserClient {
})
})
/* TODO Many more listeners
this.#talkClient.on("chat_read", (chat, channel, reader) => {
this.log(`chat_read in channel ${channel.channelId}`)
//chat.logId
this.log(`${chat.logId} read in channel ${channel.channelId} by ${reader.userId}`)
return this.write("chat_read", {
chatId: chat.logId,
senderId: reader.userId,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("profile_changed", (channel, lastInfo, user) => {
this.log(`Profile of ${user.userId} changed (channel: ${channel ? channel.channelId : "None"})`)
return this.write("profile_changed", {
info: user,
/* TODO Is this ever a per-channel profile change?
channelId: channel.channelId,
channelType: channel.info.type,
*/
})
})
this.#talkClient.on("channel_join", channel => {
this.log(`Joined channel ${channel.channelId}`)
return this.write("channel_join", {
channelInfo: channel.info,
})
})
this.#talkClient.on("channel_left", channel => {
this.log(`Left channel ${channel.channelId}`)
return this.write("channel_left", {
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("channel_kicked", (kickedLog, channel, feed) => {
this.log(`User ${feed.member.userId} kicked from channel ${channel.channelId} by user ${kickedLog.sender.userId}`)
return this.write("channel_kicked", {
userId: feed.member.userId,
senderId: kickedLog.sender.userId,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("user_join", (joinLog, channel, user, feed) => {
this.log(`User ${user.userId} joined channel ${channel.channelId}`)
return this.write("user_join", {
userId: user.userId,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("user_left", (leftLog, channel, user, feed) => {
this.log(`User ${user.userId} left channel ${channel.channelId}`)
return this.write("user_left", {
userId: user.userId,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
*/
this.#talkClient.on("disconnected", (reason) => {
this.log(`Disconnected (reason=${reason})`)
@ -135,6 +192,13 @@ class UserClient {
is_sequential: true,
})
})
this.#talkClient.on("error", (err) => {
this.log(`Client error: ${err}`)
return this.write("error", {
error: err,
})
})
}
/**
@ -584,6 +648,20 @@ export default class PeerClient {
})
}
/**
* @param {Object} req
* @param {string} req.mxid
* @param {ChannelProps} req.channel_props
* @param {Long} req.read_until_chat_id
*/
markRead = async (req) => {
const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props)
return await talkChannel.markRead({
logId: req.read_until_chat_id,
})
}
#makeCommandResult(result) {
return {
success: true,