From 0ce6b833e37a64056ab7ef89331f19a36fa39701 Mon Sep 17 00:00:00 2001 From: Andrew Ferrazzutti Date: Wed, 21 Apr 2021 02:38:10 -0400 Subject: [PATCH] Attempt to fix ordering of incoming messages --- matrix_puppeteer_line/portal.py | 2 -- matrix_puppeteer_line/rpc/client.py | 6 ++++++ matrix_puppeteer_line/rpc/rpc.py | 18 ++++++++++++++++-- matrix_puppeteer_line/user.py | 2 ++ puppet/src/client.js | 3 +++ 5 files changed, 27 insertions(+), 4 deletions(-) diff --git a/matrix_puppeteer_line/portal.py b/matrix_puppeteer_line/portal.py index d112378..da318c5 100644 --- a/matrix_puppeteer_line/portal.py +++ b/matrix_puppeteer_line/portal.py @@ -411,8 +411,6 @@ class Portal(DBPortal, BasePortal): max_mid = await DBMessage.get_max_mid(self.mxid) or 0 messages = [msg for msg in await source.client.get_messages(self.chat_id) if msg.id > max_mid] - # TODO Confirm why sorting in Node isn't enough - messages.sort(key=lambda message: message.id) if not messages: self.log.debug("Didn't get any entries from server") diff --git a/matrix_puppeteer_line/rpc/client.py b/matrix_puppeteer_line/rpc/client.py index eab9833..b919cde 100644 --- a/matrix_puppeteer_line/rpc/client.py +++ b/matrix_puppeteer_line/rpc/client.py @@ -35,6 +35,12 @@ class Client(RPCClient): await self.request("stop") await self.disconnect() + async def pause(self) -> None: + await self.request("pause") + + async def resume(self) -> None: + await self.request("resume") + async def get_chats(self) -> List[ChatListInfo]: resp = await self.request("get_chats") return [ChatListInfo.deserialize(data) for data in resp] diff --git a/matrix_puppeteer_line/rpc/rpc.py b/matrix_puppeteer_line/rpc/rpc.py index 1cd287f..6114f75 100644 --- a/matrix_puppeteer_line/rpc/rpc.py +++ b/matrix_puppeteer_line/rpc/rpc.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Dict, Any, Callable, Awaitable, List, Optional +from typing import Dict, Any, Callable, Awaitable, List, Optional, Tuple import logging import asyncio import json @@ -50,6 +50,8 @@ class RPCClient: self._response_waiters = {} self._writer = None self._reader = None + self._command_queue: List[Tuple[int, str, Dict[str, Any]]] = [] + self._command_sem = asyncio.Semaphore(0) async def connect(self) -> None: if self._writer is not None: @@ -65,6 +67,7 @@ class RPCClient: self._reader = r self._writer = w self.loop.create_task(self._try_read_loop()) + self.loop.create_task(self._command_loop()) await self.request("register", user_id=self.user_id) async def disconnect(self) -> None: @@ -109,11 +112,16 @@ class RPCClient: try: req_id = req.pop("id") command = req.pop("command") + is_sequential = req.pop("is_sequential", False) except KeyError: self.log.debug(f"Got invalid request from server: {line}") return if req_id < 0: - self.loop.create_task(self._run_event_handler(req_id, command, req)) + if not is_sequential: + self.loop.create_task(self._run_event_handler(req_id, command, req)) + else: + self._command_queue.append((req_id, command, req)) + self._command_sem.release() return try: waiter = self._response_waiters[req_id] @@ -127,6 +135,12 @@ class RPCClient: else: self.log.warning(f"Unexpected response command to {req_id}: {command} {req}") + async def _command_loop(self) -> None: + while True: + await self._command_sem.acquire() + req_id, command, req = self._command_queue.pop(0) + await self._run_event_handler(req_id, command, req) + async def _try_read_loop(self) -> None: try: await self._read_loop() diff --git a/matrix_puppeteer_line/user.py b/matrix_puppeteer_line/user.py index 3bc157a..50fd3d9 100644 --- a/matrix_puppeteer_line/user.py +++ b/matrix_puppeteer_line/user.py @@ -116,6 +116,7 @@ class User(DBUser, BaseUser): self._connection_check_task = self.loop.create_task(self._check_connection_loop()) await self.client.set_last_message_ids(await DBMessage.get_max_mids()) self.log.info("Syncing chats") + await self.client.pause() chats = await self.client.get_chats() limit = self.config["bridge.initial_conversation_sync"] for index, chat in enumerate(chats): @@ -126,6 +127,7 @@ class User(DBUser, BaseUser): await portal.update_matrix_room(self, chat) else: await portal.create_matrix_room(self, chat) + await self.client.resume() async def stop(self) -> None: if self._connection_check_task: diff --git a/puppet/src/client.js b/puppet/src/client.js index ba2f3e2..b48aaaa 100644 --- a/puppet/src/client.js +++ b/puppet/src/client.js @@ -103,6 +103,7 @@ export default class Client { return this._write({ id: --this.notificationID, command: "message", + is_sequential: true, message, }) } @@ -230,6 +231,8 @@ export default class Client { send: req => this.puppet.sendMessage(req.chat_id, req.text), send_file: req => this.puppet.sendFile(req.chat_id, req.file_path), set_last_message_ids: req => this.puppet.setLastMessageIDs(req.msg_ids), + pause: () => this.puppet.stopObserving(), + resume: () => this.puppet.startObserving(), get_chats: () => this.puppet.getRecentChats(), get_chat: req => this.puppet.getChatInfo(req.chat_id), get_messages: req => this.puppet.getMessages(req.chat_id),