Inbound channel/user join/leave

This commit is contained in:
Andrew Ferrazzutti 2022-04-10 04:30:26 -04:00
parent 4a68796fdf
commit c9189d17cf
4 changed files with 182 additions and 10 deletions

View File

@ -37,12 +37,13 @@ from mautrix.util.logging import TraceLogger
from ...config import Config
from ...rpc import EventHandler, RPCClient
from ..types.api.struct.profile import ProfileReqStruct, ProfileStruct
from ..types.api.struct import FriendListStruct
from ..types.api.struct.profile import ProfileReqStruct, ProfileStruct
from ..types.bson import Long
from ..types.client.client_session import LoginResult
from ..types.channel.channel_info import ChannelInfo
from ..types.chat import Chatlog, KnownChatType
from ..types.chat.attachment import MentionStruct, ReplyAttachment
from ..types.client.client_session import LoginResult
from ..types.oauth import OAuthCredential, OAuthInfo
from ..types.openlink.open_link_user_info import OpenLinkChannelUserInfo
from ..types.packet.chat.kickout import KnownKickoutType, KickoutRes
@ -427,6 +428,40 @@ class Client:
OpenLinkChannelUserInfo.deserialize(data["info"]),
)
async def _on_channel_join(self, data: dict[str, JSON]) -> None:
await self.user.on_channel_join(
ChannelInfo.deserialize(data["channelInfo"]),
)
async def _on_channel_left(self, data: dict[str, JSON]) -> None:
await self.user.on_channel_left(
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_channel_kicked(self, data: dict[str, JSON]) -> None:
await self.user.on_channel_kicked(
Long.deserialize(data["userId"]),
Long.deserialize(data["senderId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_user_join(self, data: dict[str, JSON]) -> None:
await self.user.on_user_join(
Long.deserialize(data["userId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_user_left(self, data: dict[str, JSON]) -> None:
await self.user.on_user_left(
Long.deserialize(data["userId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
async def _on_listen_disconnect(self, data: dict[str, JSON]) -> None:
try:
res = KickoutRes.deserialize(data)
@ -452,6 +487,11 @@ class Client:
self._add_event_handler("chat_deleted", self._on_chat_deleted)
self._add_event_handler("chat_read", self._on_chat_read)
self._add_event_handler("profile_changed", self._on_profile_changed)
self._add_event_handler("channel_join", self._on_channel_join)
self._add_event_handler("channel_left", self._on_channel_left)
self._add_event_handler("channel_kicked", self._on_channel_kicked)
self._add_event_handler("user_join", self._on_user_join)
self._add_event_handler("user_left", self._on_user_left)
self._add_event_handler("disconnected", self._on_listen_disconnect)
self._add_event_handler("switch_server", self._on_switch_server)
self._add_event_handler("error", self._on_error)

View File

@ -1264,7 +1264,25 @@ class Portal(DBPortal, BasePortal):
await self.main_intent.redact(message.mx_room, message.mxid, timestamp=timestamp)
await message.delete()
# TODO Many more remote handlers
async def handle_kakaotalk_user_join(
self, source: u.User, user: p.Puppet
) -> None:
await self.main_intent.ensure_joined(self.mxid)
if not user.name:
self.schedule_resync(source, user)
async def handle_kakaotalk_user_left(
self, source: u.User, sender: p.Puppet, removed: p.Puppet
) -> None:
if sender == removed:
await removed.intent_for(self).leave_room(self.mxid)
else:
try:
await sender.intent_for(self).kick_user(self.mxid, removed.mxid)
except MForbidden:
await self.main_intent.kick_user(
self.mxid, removed.mxid, reason=f"Kicked by {sender.name}"
)
# endregion
@ -1301,7 +1319,6 @@ class Portal(DBPortal, BasePortal):
source,
limit,
most_recent.ktid if most_recent else None,
channel_info=channel_info,
)
async def _backfill(
@ -1309,7 +1326,6 @@ class Portal(DBPortal, BasePortal):
source: u.User,
limit: int | None,
after_log_id: Long | None,
channel_info: ChannelInfo,
) -> None:
self.log.debug(f"Backfilling history through {source.mxid}")
self.log.debug(f"Fetching {f'up to {limit}' if limit else 'all'} messages through {source.ktid}")

View File

@ -54,6 +54,11 @@ METRIC_CHAT = Summary("bridge_on_chat", "calls to on_chat")
METRIC_CHAT_DELETED = Summary("bridge_on_chat_deleted", "calls to on_chat_deleted")
METRIC_CHAT_READ = Summary("bridge_on_chat_read", "calls to on_chat_read")
METRIC_PROFILE_CHANGE = Summary("bridge_on_profile_changed", "calls to on_profile_changed")
METRIC_CHANNEL_JOIN = Summary("bridge_on_channel_join", "calls to on_channel_join")
METRIC_CHANNEL_LEFT = Summary("bridge_on_channel_left", "calls to on_channel_left")
METRIC_CHANNEL_KICKED = Summary("bridge_on_channel_kicked", "calls to on_channel_kicked")
METRIC_USER_JOIN = Summary("bridge_on_user_join", "calls to on_user_join")
METRIC_USER_LEFT = Summary("bridge_on_user_left", "calls to on_user_left")
METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk")
@ -442,7 +447,7 @@ class User(DBUser, BaseUser):
key=get_channel_update_time
)[:sync_count]:
try:
await self._sync_channel(login_data)
await self._sync_channel_on_login(login_data)
except AuthenticationRequired:
raise
except Exception:
@ -450,7 +455,7 @@ class User(DBUser, BaseUser):
await self.update_direct_chats()
async def _sync_channel(self, login_data: LoginDataItem) -> None:
def _sync_channel_on_login(self, login_data: LoginDataItem) -> Awaitable[None]:
channel_data = login_data.channel
self.log.debug(f"Syncing channel {channel_data.channelId} (last updated at {login_data.lastUpdate})")
channel_info = channel_data.info
@ -485,6 +490,9 @@ class User(DBUser, BaseUser):
for display_user_info in channel_info.displayUserList:
self.log.debug(f"Member: {display_user_info.nickname} - {display_user_info.profileURL} - {display_user_info.userId}")
return self._sync_channel(channel_info)
async def _sync_channel(self, channel_info: ChannelInfo):
portal = await po.Portal.get_by_ktid(
channel_info.channelId,
kt_receiver=self.ktid,
@ -729,6 +737,71 @@ class User(DBUser, BaseUser):
if puppet:
await puppet.update_info_from_participant(self, info)
# TODO Many more handlers
@async_time(METRIC_CHANNEL_JOIN)
def on_channel_join(self, channel_info: ChannelInfo) -> Awaitable[None]:
return self._sync_channel(channel_info)
@async_time(METRIC_CHANNEL_LEFT)
async def on_channel_left(self, channel_id: Long, channel_type: ChannelType) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
await portal.main_intent.kick_user(portal.mxid, self.mxid, "Left this channel from KakaoTalk")
@async_time(METRIC_CHANNEL_KICKED)
async def on_channel_kicked(
self,
user_id: Long,
sender_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
sender = await pu.Puppet.get_by_ktid(sender_id)
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("channel kicked")
await portal.handle_kakaotalk_user_left(self, sender, user)
@async_time(METRIC_USER_JOIN)
async def on_user_join(
self,
user_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("user join")
await portal.handle_kakaotalk_user_join(self, user)
@async_time(METRIC_USER_LEFT)
async def on_user_left(
self,
user_id: Long,
channel_id: Long,
channel_type: ChannelType
) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
)
if portal.mxid:
user = await pu.Puppet.get_by_ktid(user_id)
await portal.backfill_lock.wait("user left")
await portal.handle_kakaotalk_user_left(self, user, user)
# endregion

View File

@ -104,7 +104,7 @@ class UserClient {
})
this.#talkClient.on("message_hidden", (hideLog, channel, feed) => {
this.log(`Message ${hideLog.logId} hid from channel ${channel.channelId} by user ${hideLog.sender.userId}`)
this.log(`Message ${feed.logId} hid from channel ${channel.channelId} by user ${hideLog.sender.userId}`)
return this.write("chat_deleted", {
chatId: feed.logId,
senderId: hideLog.sender.userId,
@ -125,7 +125,7 @@ class UserClient {
})
this.#talkClient.on("profile_changed", (channel, lastInfo, user) => {
this.log(`Profile of ${user.userId} changed (channel: ${channel})`)
this.log(`Profile of ${user.userId} changed (channel: ${channel ? channel.channelId : "None"})`)
return this.write("profile_changed", {
info: user,
/* TODO Is this ever a per-channel profile change?
@ -135,6 +135,49 @@ class UserClient {
})
})
this.#talkClient.on("channel_join", channel => {
this.log(`Joined channel ${channel.channelId}`)
return this.write("channel_join", {
channelInfo: channel.info,
})
})
this.#talkClient.on("channel_left", channel => {
this.log(`Left channel ${channel.channelId}`)
return this.write("channel_left", {
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("channel_kicked", (kickedLog, channel, feed) => {
this.log(`User ${feed.member.userId} kicked from channel ${channel.channelId} by user ${kickedLog.sender.userId}`)
return this.write("channel_kicked", {
userId: feed.member.userId,
senderId: kickedLog.sender.userId,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("user_join", (joinLog, channel, user, feed) => {
this.log(`User ${user.userId} joined channel ${channel.channelId}`)
return this.write("user_join", {
userId: user.userId,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("user_left", (leftLog, channel, user, feed) => {
this.log(`User ${user.userId} left channel ${channel.channelId}`)
return this.write("user_left", {
userId: user.userId,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("disconnected", (reason) => {
this.log(`Disconnected (reason=${reason})`)
this.disconnect()