diff --git a/matrix_puppeteer_line/db/message.py b/matrix_puppeteer_line/db/message.py index 8b27284..c6dc24b 100644 --- a/matrix_puppeteer_line/db/message.py +++ b/matrix_puppeteer_line/db/message.py @@ -29,20 +29,17 @@ class Message: mxid: EventID mx_room: RoomID - mid: int + mid: Optional[int] chat_id: str async def insert(self) -> None: q = "INSERT INTO message (mxid, mx_room, mid, chat_id) VALUES ($1, $2, $3, $4)" await self.db.execute(q, self.mxid, self.mx_room, self.mid, self.chat_id) - async def delete(self) -> None: - q = "DELETE FROM message WHERE mid=$1" - await self.db.execute(q, self.mid) - - @classmethod - async def delete_all(cls, room_id: RoomID) -> None: - await cls.db.execute("DELETE FROM message WHERE mx_room=$1", room_id) + async def update(self) -> None: + q = ("UPDATE message SET mid=$3, chat_id=$4 " + "WHERE mxid=$1 AND mx_room=$2") + await self.db.execute(q, self.mxid, self.mx_room, self.mid, self.chat_id) @classmethod async def get_max_mid(cls, room_id: RoomID) -> int: @@ -57,6 +54,11 @@ class Message: data[row["chat_id"]] = row["max_mid"] return data + @classmethod + async def get_num_noid_msgs(cls, room_id: RoomID) -> int: + return await cls.db.fetchval("SELECT COUNT(*) FROM message " + "WHERE mid IS NULL AND mx_room=$1", room_id) + @classmethod async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Optional['Message']: row = await cls.db.fetchrow("SELECT mxid, mx_room, mid, chat_id " @@ -72,3 +74,17 @@ class Message: if not row: return None return cls(**row) + + @classmethod + async def get_next_noid_msg(cls, room_id: RoomID) -> Optional['Message']: + row = await cls.db.fetchrow("SELECT mxid, mx_room, mid, chat_id FROM message " + "WHERE mid IS NULL AND mx_room=$1", room_id) + if not row: + return None + return cls(**row) + + @classmethod + async def delete_all_noid_msgs(cls, room_id: RoomID) -> None: + status = await cls.db.execute("DELETE FROM message " + "WHERE mid IS NULL AND mx_room=$1", room_id) + return int(status.removeprefix("DELETE ")) diff --git a/matrix_puppeteer_line/db/upgrade.py b/matrix_puppeteer_line/db/upgrade.py index f3e96f9..9fce3a9 100644 --- a/matrix_puppeteer_line/db/upgrade.py +++ b/matrix_puppeteer_line/db/upgrade.py @@ -128,4 +128,11 @@ async def upgrade_strangers(conn: Connection) -> None: FOREIGN KEY (fake_mid) REFERENCES puppet (mid) ON DELETE CASCADE - )""") \ No newline at end of file + )""") + + +@upgrade_table.register(description="Track messages that lack an ID") +async def upgrade_noid_msgs(conn: Connection) -> None: + await conn.execute("ALTER TABLE message DROP CONSTRAINT IF EXISTS message_pkey") + await conn.execute("ALTER TABLE message ALTER COLUMN mid DROP NOT NULL") + await conn.execute("ALTER TABLE message ADD UNIQUE (mid)") \ No newline at end of file diff --git a/matrix_puppeteer_line/portal.py b/matrix_puppeteer_line/portal.py index 5d42ef3..043e83e 100644 --- a/matrix_puppeteer_line/portal.py +++ b/matrix_puppeteer_line/portal.py @@ -124,6 +124,11 @@ class Portal(DBPortal, BasePortal): except Exception: self.log.exception("Failed to send delivery receipt for %s", event_id) + async def _cleanup_noid_msgs(self) -> None: + num_noid_msgs = await DBMessage.delete_all_noid_msgs(self.mxid) + if num_noid_msgs > 0: + self.log.warn(f"Found {num_noid_msgs} messages in chat {self.chat_id} with no ID that could not be matched with a real ID") + async def handle_matrix_message(self, sender: 'u.User', message: MessageEventContent, event_id: EventID) -> None: if not sender.client: @@ -163,6 +168,8 @@ class Portal(DBPortal, BasePortal): self.log.warning(f"Failed to upload media {event_id} to chat {self.chat_id}: {e}") message_id = -1 remove(file_path) + + await self._cleanup_noid_msgs() msg = None if message_id != -1: try: @@ -211,6 +218,8 @@ class Portal(DBPortal, BasePortal): self.log.debug(f"Ignoring duplicate message {evt.id}") return + is_preseen = evt.id != None and await DBMessage.get_num_noid_msgs(self.mxid) > 0 + if evt.is_outgoing: if source.intent: sender = None @@ -230,12 +239,21 @@ class Portal(DBPortal, BasePortal): if sender: await sender.update_info(evt.sender, source.client) else: - self.log.warning(f"Could not find ID of LINE user who sent event {evt.id}") + self.log.warning(f"Could not find ID of LINE user who sent event {evt.id or 'with no ID'}") sender = await p.Puppet.get_by_profile(evt.sender, source.client) intent = sender.intent await intent.ensure_joined(self.mxid) - if evt.image and evt.image.url: + if is_preseen: + msg = await DBMessage.get_next_noid_msg(self.mxid) + if msg: + self.log.debug(f"Found ID {evt.id} of preseen message in chat {self.mxid} {msg.mxid}") + msg.mid = evt.id + event_id = msg.mxid + else: + self.log.error(f"Could not find an existing event for a message with no ID in chat {self.mxid}") + return + elif evt.image and evt.image.url: if not evt.image.is_sticker or self.config["bridge.receive_stickers"]: media_info = await self._handle_remote_media( source, intent, evt.image.url, @@ -335,13 +353,18 @@ class Portal(DBPortal, BasePortal): if evt.is_outgoing and evt.receipt_count: await self._handle_receipt(event_id, evt.receipt_count) - msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=evt.id, chat_id=self.chat_id) - try: - await msg.insert() - await self._send_delivery_receipt(event_id) - self.log.debug(f"Handled remote message {evt.id} -> {event_id}") - except UniqueViolationError as e: - self.log.debug(f"Failed to handle remote message {evt.id} -> {event_id}: {e}") + + if not is_preseen: + msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=evt.id, chat_id=self.chat_id) + try: + await msg.insert() + #await self._send_delivery_receipt(event_id) + self.log.debug(f"Handled remote message {evt.id or 'with no ID'} -> {event_id}") + except UniqueViolationError as e: + self.log.debug(f"Failed to handle remote message {evt.id or 'with no ID'} -> {event_id}: {e}") + else: + await msg.update() + self.log.debug(f"Handled preseen remote message {evt.id} -> {event_id}") async def handle_remote_receipt(self, receipt: Receipt) -> None: msg = await DBMessage.get_by_mid(receipt.id) @@ -553,6 +576,7 @@ class Portal(DBPortal, BasePortal): if not messages: self.log.debug("Didn't get any entries from server") + await self._cleanup_noid_msgs() return self.log.debug("Got %d messages from server", len(messages)) @@ -560,6 +584,7 @@ class Portal(DBPortal, BasePortal): for evt in messages: await self.handle_remote_message(source, evt) self.log.info("Backfilled %d messages through %s", len(messages), source.mxid) + await self._cleanup_noid_msgs() @property def bridge_info_state_key(self) -> str: @@ -713,9 +738,6 @@ class Portal(DBPortal, BasePortal): self._main_intent = self.az.intent async def delete(self) -> None: - if self.mxid: - # TODO Handle this with db foreign keys instead - await DBMessage.delete_all(self.mxid) self.by_chat_id.pop(self.chat_id, None) self.by_mxid.pop(self.mxid, None) await super().delete() diff --git a/matrix_puppeteer_line/rpc/types.py b/matrix_puppeteer_line/rpc/types.py index af27117..e6f32d1 100644 --- a/matrix_puppeteer_line/rpc/types.py +++ b/matrix_puppeteer_line/rpc/types.py @@ -60,7 +60,7 @@ class MessageImage(SerializableAttrs['MessageImage']): @dataclass class Message(SerializableAttrs['Message']): - id: int + id: Optional[int] chat_id: int is_outgoing: bool sender: Optional[Participant] diff --git a/puppet/src/client.js b/puppet/src/client.js index 636d695..ddce619 100644 --- a/puppet/src/client.js +++ b/puppet/src/client.js @@ -99,7 +99,7 @@ export default class Client { } sendMessage(message) { - this.log(`Sending message ${message.id} to client`) + this.log(`Sending message ${message.id || "with no ID"} to client`) return this._write({ id: --this.notificationID, command: "message", diff --git a/puppet/src/contentscript.js b/puppet/src/contentscript.js index 226a7c4..06e77b0 100644 --- a/puppet/src/contentscript.js +++ b/puppet/src/contentscript.js @@ -23,7 +23,7 @@ */ window.__chronoParseDate = function (text, ref, option) {} /** - * @param {string[]} changes - The hrefs of the chats that changed. + * @param {ChatListInfo[]} changes - The chats that changed. * @return {Promise} */ window.__mautrixReceiveChanges = function (changes) {} @@ -609,6 +609,8 @@ class MautrixController { * May be prefixed by sender name. * @property {string} lastMsgDate - An imprecise date for the most recent message * (e.g. "7:16 PM", "Thu" or "Aug 4") + * @property {number} notificationCount - The number of unread messages in the chat, + * signified by the number in its notification badge. */ getChatListItemID(element) { @@ -624,13 +626,17 @@ class MautrixController { } getChatListItemLastMsg(element) { - return element.querySelector(".mdCMN04Desc").innerText + return element.querySelector(".mdCMN04Desc").innerHTML } getChatListItemLastMsgDate(element) { return element.querySelector("time").innerText } + getChatListItemNotificationCount(element) { + return Number.parseInt(element.querySelector(".MdIcoBadge01:not(.MdNonDisp)")?.innerText) || 0 + } + /** * Parse a conversation list item element. * @@ -645,6 +651,7 @@ class MautrixController { icon: this.getChatListItemIcon(element), lastMsg: this.getChatListItemLastMsg(element), lastMsgDate: this.getChatListItemLastMsgDate(element), + notificationCount: this.getChatListItemNotificationCount(element), } } @@ -682,7 +689,7 @@ class MautrixController { */ _observeChatListMutations(mutations) { // TODO Observe *added/removed* chats, not just new messages - const changedChatIDs = new Set() + const changedChats = new Set() for (const change of mutations) { if (change.target.id == "_chat_list_body") { // TODO @@ -701,7 +708,7 @@ class MautrixController { const chat = this.parseChatListItem(node) if (chat) { console.log("Added chat list item:", chat) - changedChatIDs.add(chat.id) + changedChats.add(chat) } else { console.debug("Could not parse added node as a chat list item:", node) } @@ -709,9 +716,9 @@ class MautrixController { } // change.removedNodes tells you which chats that had notifications are now read. } - if (changedChatIDs.size > 0) { - console.debug("Dispatching chat list mutations:", changedChatIDs) - window.__mautrixReceiveChanges(Array.from(changedChatIDs)).then( + if (changedChats.size > 0) { + console.debug("Dispatching chat list mutations:", changedChats) + window.__mautrixReceiveChanges(Array.from(changedChats)).then( () => console.debug("Chat list mutations dispatched"), err => console.error("Error dispatching chat list mutations:", err)) } diff --git a/puppet/src/puppet.js b/puppet/src/puppet.js index 44a3f03..f449bbe 100644 --- a/puppet/src/puppet.js +++ b/puppet/src/puppet.js @@ -46,6 +46,7 @@ export default class MessagesPuppeteer { this.updatedChats = new Set() this.sentMessageIDs = new Set() this.mostRecentMessages = new Map() + this.numChatNotifications = new Map() this.taskQueue = new TaskQueue(this.id) this.client = client } @@ -551,7 +552,8 @@ export default class MessagesPuppeteer { // Always present, just made visible via classes async _sendMessageUnsafe(chatID, text) { - await this._switchChat(chatID) + // Sync all messages in this chat first + this._receiveMessages(chatID, await this._getMessagesUnsafe(chatID), true) // TODO Initiate the promise in the content script await this.page.evaluate( () => window.__mautrixController.promiseOwnMessage(5000, "time")) @@ -565,7 +567,7 @@ export default class MessagesPuppeteer { } async _sendFileUnsafe(chatID, filePath) { - await this._switchChat(chatID) + this._receiveMessages(chatID, await this._getMessagesUnsafe(chatID), true) await this.page.evaluate( () => window.__mautrixController.promiseOwnMessage( 10000, // Use longer timeout for file uploads @@ -604,9 +606,11 @@ export default class MessagesPuppeteer { } } - async _receiveMessages(chatID, messages) { + _receiveMessages(chatID, messages, skipProcessing = false) { if (this.client) { - messages = await this._processMessages(chatID, messages) + if (!skipProcessing) { + messages = this._processMessages(chatID, messages) + } for (const message of messages) { this.client.sendMessage(message).catch(err => this.error("Failed to send message", message.id, "to client:", err)) @@ -626,11 +630,13 @@ export default class MessagesPuppeteer { // TODO Handle unloaded messages. Maybe scroll up // TODO This will mark the chat as "read"! await this._switchChat(chatID) - const messages = await this.page.evaluate( + // TODO Is it better to reset the notification count in _switchChat instead of here? + this.numChatNotifications.set(chatID, 0) + let messages = await this.page.evaluate( mostRecentMessage => window.__mautrixController.parseMessageList(mostRecentMessage), this.mostRecentMessages.get(chatID)) // Doing this before restoring the observer since it updates minID - const filteredMessages = await this._processMessages(chatID, messages) + messages = this._processMessages(chatID, messages) if (hadMsgListObserver) { this.log("Restoring msg list observer") @@ -641,10 +647,10 @@ export default class MessagesPuppeteer { this.log("Not restoring msg list observer, as there never was one") } - return filteredMessages + return messages } - async _processMessages(chatID, messages) { + _processMessages(chatID, messages) { // TODO Probably don't need minID filtering if Puppeteer context handles it now const minID = this.mostRecentMessages.get(chatID) || 0 const filteredMessages = messages.filter(msg => msg.id > minID && !this.sentMessageIDs.has(msg.id)) @@ -655,7 +661,6 @@ export default class MessagesPuppeteer { this.mostRecentMessages.set(chatID, newLastID) const range = newFirstID === newLastID ? newFirstID : `${newFirstID}-${newLastID}` this.log(`Loaded ${messages.length} messages in ${chatID}, got ${filteredMessages.length} newer than ${minID} (${range})`) - for (const message of filteredMessages) { message.chat_id = chatID } @@ -665,19 +670,59 @@ export default class MessagesPuppeteer { } } - async _processChatListChangeUnsafe(chatID) { + async _processChatListChangeUnsafe(chatListInfo) { + const chatID = chatListInfo.id this.updatedChats.delete(chatID) this.log("Processing change to", chatID) - const messages = await this._getMessagesUnsafe(chatID) - if (messages.length === 0) { - this.log("No new messages found in", chatID) + // TODO Also process name/icon changes + + const prevNumNotifications = this.numChatNotifications.get(chatID) || 0 + const diffNumNotifications = chatListInfo.notificationCount - prevNumNotifications + + if (chatListInfo.notificationCount == 0 && diffNumNotifications < 0) { + // Message was read from another LINE client, so there's no new info to bridge. + // But if the diff == 0, it's an own message sent from LINE, and must bridge it! + this.numChatNotifications.set(chatID, 0) return } + const mustSync = + // Can only use previews for DMs, because sender can't be found otherwise! + chatListInfo.id.charAt(0) != 'u' + || diffNumNotifications > 1 + // Sync when lastMsg is a canned message for a non-previewable message type. + || chatListInfo.lastMsg.endsWith(" sent a photo.") + || chatListInfo.lastMsg.endsWith(" sent a sticker.") + || chatListInfo.lastMsg.endsWith(" sent a location.") + // TODO More? + // TODO With MSC2409, only sync if >1 new messages arrived, + // or if message is unpreviewable. + // Otherwise, send a dummy notice & sync when its read. + + let messages + if (!mustSync) { + messages = [{ + chat_id: chatListInfo.id, + id: null, // because sidebar messages have no ID + timestamp: null, // because this message was sent right now + is_outgoing: chatListInfo.notificationCount == 0, + sender: null, // because only DM messages are handled + html: chatListInfo.lastMsg, + }] + this.numChatNotifications.set(chatID, chatListInfo.notificationCount) + } else { + messages = await this._getMessagesUnsafe(chatListInfo.id) + this.numChatNotifications.set(chatID, 0) + if (messages.length === 0) { + this.log("No new messages found in", chatListInfo.id) + return + } + } + if (this.client) { for (const message of messages) { await this.client.sendMessage(message).catch(err => - this.error("Failed to send message", message.id, "to client:", err)) + this.error("Failed to send message", message.id || "with no ID", "to client:", err)) } } else { this.log("No client connected, not sending messages") @@ -685,10 +730,10 @@ export default class MessagesPuppeteer { } _receiveChatListChanges(changes) { - this.log("Received chat list changes:", changes) + this.log(`Received chat list changes: ${changes.map(item => item.id)}`) for (const item of changes) { - if (!this.updatedChats.has(item)) { - this.updatedChats.add(item) + if (!this.updatedChats.has(item.id)) { + this.updatedChats.add(item.id) this.taskQueue.push(() => this._processChatListChangeUnsafe(item)) .catch(err => this.error("Error handling chat list changes:", err)) }