From a7ffe6761a15e99869ea7657ac086679701bd08f Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Fri, 28 Aug 2020 19:34:13 +0300 Subject: [PATCH] Fix getting ID after sending and missed message backfilling --- mautrix_amp/portal.py | 26 +++++++++++++++----------- mautrix_amp/rpc/client.py | 8 ++++---- mautrix_amp/user.py | 6 +++++- puppet/src/contentscript.js | 35 ++++++++++++++++++++++++++++++++++- puppet/src/puppet.js | 15 ++++++++++----- 5 files changed, 68 insertions(+), 22 deletions(-) diff --git a/mautrix_amp/portal.py b/mautrix_amp/portal.py index d015da6..0d61c08 100644 --- a/mautrix_amp/portal.py +++ b/mautrix_amp/portal.py @@ -125,11 +125,9 @@ class Portal(DBPortal, BasePortal): # mime_type = message.info.mimetype or magic.from_buffer(data, mime=True) # TODO media return - await sender.client.send(self.chat_id, text) - message_id = 0 - # TODO figure out the message ID and store it - # msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=message_id) - # await msg.insert() + message_id = await sender.client.send(self.chat_id, text) + msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=message_id, chat_id=self.chat_id) + await msg.insert() await self._send_delivery_receipt(event_id) self.log.debug(f"Handled Matrix message {event_id} -> {message_id}") @@ -155,7 +153,10 @@ class Portal(DBPortal, BasePortal): self.log.warning(f"Ignoring message {evt.id}: group chats aren't supported yet") return - # TODO deduplication of outgoing messages + if await DBMessage.get_by_mid(evt.id): + self.log.debug(f"Ignoring duplicate message {evt.id}") + return + event_id = None if evt.image: content = await self._handle_remote_photo(source, intent, evt) @@ -238,8 +239,8 @@ class Portal(DBPortal, BasePortal): if user_id == self.az.bot_mxid: continue mid = p.Puppet.get_id_from_mxid(user_id) - print(mid) if mid and mid not in current_members: + print(mid) await self.main_intent.kick_user(self.mxid, user_id, reason="User had left this chat") @@ -298,12 +299,15 @@ class Portal(DBPortal, BasePortal): except Exception: self.log.warning("Failed to update bridge info", exc_info=True) + async def update_matrix_room(self, source: 'u.User', info: ChatInfo) -> Optional[RoomID]: + try: + await self._update_matrix_room(source, info) + except Exception: + self.log.exception("Failed to update portal") + async def create_matrix_room(self, source: 'u.User', info: ChatInfo) -> Optional[RoomID]: if self.mxid: - try: - await self._update_matrix_room(source, info) - except Exception: - self.log.exception("Failed to update portal") + await self.update_matrix_room(source, info) return self.mxid async with self._create_room_lock: return await self._create_matrix_room(source, info) diff --git a/mautrix_amp/rpc/client.py b/mautrix_amp/rpc/client.py index 89d020f..11a8cfa 100644 --- a/mautrix_amp/rpc/client.py +++ b/mautrix_amp/rpc/client.py @@ -49,16 +49,16 @@ class Client(RPCClient): resp = await self.request("get_messages", chat_id=chat_id) return [Message.deserialize(data) for data in resp] - async def send(self, chat_id: int, text: str) -> None: - await self.request("send", chat_id=chat_id, text=text) + async def send(self, chat_id: int, text: str) -> int: + resp = await self.request("send", chat_id=chat_id, text=text) + return resp["id"] async def set_last_message_ids(self, msg_ids: Dict[int, int]) -> None: await self.request("set_last_message_ids", msg_ids=msg_ids) async def on_message(self, func: Callable[[Message], Awaitable[None]]) -> None: async def wrapper(data: Dict[str, Any]) -> None: - print("Received", data) - await func(Message.deserialize(data)) + await func(Message.deserialize(data["message"])) self.add_event_handler("message", wrapper) diff --git a/mautrix_amp/user.py b/mautrix_amp/user.py index 991c8f2..cdf74c1 100644 --- a/mautrix_amp/user.py +++ b/mautrix_amp/user.py @@ -105,7 +105,11 @@ class User(DBUser, BaseUser): portal = await po.Portal.get_by_chat_id(chat.id, create=True) if portal.mxid or index < limit: chat = await self.client.get_chat(chat.id) - await portal.create_matrix_room(self, chat) + if portal.mxid: + await portal.update_matrix_room(self, chat) + await portal.backfill(self) + else: + await portal.create_matrix_room(self, chat) async def stop(self) -> None: if self.client: diff --git a/puppet/src/contentscript.js b/puppet/src/contentscript.js index 4fa3814..c81612a 100644 --- a/puppet/src/contentscript.js +++ b/puppet/src/contentscript.js @@ -32,6 +32,11 @@ window.__mautrixReceiveChanges = function (changes) {} * @return {Promise} */ window.__mautrixReceiveQR = function (url) {} +/** + * @param {number} id - The ID of the message that was sent + * @return {Promise} + */ +window.__mautrixReceiveMessageID = function(id) {} class MautrixController { constructor() { @@ -112,6 +117,32 @@ class MautrixController { return messageData } + waitForMessage(elem) { + return new Promise(resolve => { + let msgID = null + const observer = new MutationObserver(changes => { + for (const change of changes) { + if (change.type === "attributes" && change.attributeName === "msg-id") { + msgID = +elem.getAttribute("msg-id") + window.__mautrixReceiveMessageID(msgID) + } else if (change.type === "childList" + && change.target.nodeName.toLowerCase() === "mws-relative-timestamp" + && change.addedNodes.length > 0 + && change.addedNodes[0] instanceof Text) { + resolve(msgID) + observer.disconnect() + return + } + } + }) + observer.observe(elem, { attributes: true, attributeFilter: ["msg-id"] }) + observer.observe(elem.querySelector("mws-message-status"), { + childList: true, + subtree: true, + }) + }) + } + /** * Parse a message list in the given element. The element should probably be the .content div * inside a mws-message-list element. @@ -125,7 +156,9 @@ class MautrixController { for (const child of element.children) { switch (child.tagName.toLowerCase()) { case "mws-message-wrapper": - messages.push(this._parseMessage(messageDate, child)) + if (!child.getAttribute("msg-id").startsWith("tmp_")) { + messages.push(this._parseMessage(messageDate, child)) + } break case "mws-tombstone-message-wrapper": messageDate = await this._parseDate( diff --git a/puppet/src/puppet.js b/puppet/src/puppet.js index d5f8719..033b624 100644 --- a/puppet/src/puppet.js +++ b/puppet/src/puppet.js @@ -43,6 +43,7 @@ export default class MessagesPuppeteer { this.id = id this.profilePath = profilePath this.updatedChats = new Set() + this.sentMessageIDs = new Set() this.mostRecentMessages = new Map() this.taskQueue = new TaskQueue(this.id) this.client = client @@ -83,6 +84,8 @@ export default class MessagesPuppeteer { await this.page.addScriptTag({ path: "./src/contentscript.js", type: "module" }) this.log("Exposing functions") await this.page.exposeFunction("__mautrixReceiveQR", this._receiveQRChange.bind(this)) + await this.page.exposeFunction("__mautrixReceiveMessageID", + id => this.sentMessageIDs.add(id)) await this.page.exposeFunction("__mautrixReceiveChanges", this._receiveChatListChanges.bind(this)) await this.page.exposeFunction("__chronoParseDate", chrono.parseDate) @@ -195,9 +198,10 @@ export default class MessagesPuppeteer { * * @param {number} chatID - The ID of the chat to send a message to. * @param {string} text - The text to send. + * @return {Promise<{id: number}>} - The ID of the sent message. */ async sendMessage(chatID, text) { - await this.taskQueue.push(() => this._sendMessageUnsafe(chatID, text)) + return { id: await this.taskQueue.push(() => this._sendMessageUnsafe(chatID, text)) } } /** @@ -273,6 +277,10 @@ export default class MessagesPuppeteer { await this.page.focus("mws-message-compose .input-box textarea") await this.page.keyboard.type(text) await this.page.click(".compose-container > mws-message-send-button > button") + const id = await this.page.$eval("mws-message-wrapper.outgoing[msg-id^='tmp_']", + elem => window.__mautrixController.waitForMessage(elem)) + this.log("Successfully sent message", id, "to", chatID) + return id } async _getMessagesUnsafe(id, minID = 0) { @@ -281,10 +289,7 @@ export default class MessagesPuppeteer { await this.page.waitFor("mws-message-wrapper") const messages = await this.page.$eval("mws-messages-list .content", element => window.__mautrixController.parseMessageList(element)) - if (minID) { - return messages.filter(message => message.id > minID) - } - return messages + return messages.filter(msg => msg.id > minID && !this.sentMessageIDs.has(msg.id)) } async _processChatListChangeUnsafe(id) {