Attempt to fix ordering of incoming messages

This commit is contained in:
Andrew Ferrazzutti 2021-04-21 02:38:10 -04:00
parent 3126543321
commit 0ce6b833e3
5 changed files with 27 additions and 4 deletions

View File

@ -411,8 +411,6 @@ class Portal(DBPortal, BasePortal):
max_mid = await DBMessage.get_max_mid(self.mxid) or 0 max_mid = await DBMessage.get_max_mid(self.mxid) or 0
messages = [msg for msg in await source.client.get_messages(self.chat_id) messages = [msg for msg in await source.client.get_messages(self.chat_id)
if msg.id > max_mid] if msg.id > max_mid]
# TODO Confirm why sorting in Node isn't enough
messages.sort(key=lambda message: message.id)
if not messages: if not messages:
self.log.debug("Didn't get any entries from server") self.log.debug("Didn't get any entries from server")

View File

@ -35,6 +35,12 @@ class Client(RPCClient):
await self.request("stop") await self.request("stop")
await self.disconnect() await self.disconnect()
async def pause(self) -> None:
await self.request("pause")
async def resume(self) -> None:
await self.request("resume")
async def get_chats(self) -> List[ChatListInfo]: async def get_chats(self) -> List[ChatListInfo]:
resp = await self.request("get_chats") resp = await self.request("get_chats")
return [ChatListInfo.deserialize(data) for data in resp] return [ChatListInfo.deserialize(data) for data in resp]

View File

@ -13,7 +13,7 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Dict, Any, Callable, Awaitable, List, Optional from typing import Dict, Any, Callable, Awaitable, List, Optional, Tuple
import logging import logging
import asyncio import asyncio
import json import json
@ -50,6 +50,8 @@ class RPCClient:
self._response_waiters = {} self._response_waiters = {}
self._writer = None self._writer = None
self._reader = None self._reader = None
self._command_queue: List[Tuple[int, str, Dict[str, Any]]] = []
self._command_sem = asyncio.Semaphore(0)
async def connect(self) -> None: async def connect(self) -> None:
if self._writer is not None: if self._writer is not None:
@ -65,6 +67,7 @@ class RPCClient:
self._reader = r self._reader = r
self._writer = w self._writer = w
self.loop.create_task(self._try_read_loop()) self.loop.create_task(self._try_read_loop())
self.loop.create_task(self._command_loop())
await self.request("register", user_id=self.user_id) await self.request("register", user_id=self.user_id)
async def disconnect(self) -> None: async def disconnect(self) -> None:
@ -109,11 +112,16 @@ class RPCClient:
try: try:
req_id = req.pop("id") req_id = req.pop("id")
command = req.pop("command") command = req.pop("command")
is_sequential = req.pop("is_sequential", False)
except KeyError: except KeyError:
self.log.debug(f"Got invalid request from server: {line}") self.log.debug(f"Got invalid request from server: {line}")
return return
if req_id < 0: if req_id < 0:
self.loop.create_task(self._run_event_handler(req_id, command, req)) if not is_sequential:
self.loop.create_task(self._run_event_handler(req_id, command, req))
else:
self._command_queue.append((req_id, command, req))
self._command_sem.release()
return return
try: try:
waiter = self._response_waiters[req_id] waiter = self._response_waiters[req_id]
@ -127,6 +135,12 @@ class RPCClient:
else: else:
self.log.warning(f"Unexpected response command to {req_id}: {command} {req}") self.log.warning(f"Unexpected response command to {req_id}: {command} {req}")
async def _command_loop(self) -> None:
while True:
await self._command_sem.acquire()
req_id, command, req = self._command_queue.pop(0)
await self._run_event_handler(req_id, command, req)
async def _try_read_loop(self) -> None: async def _try_read_loop(self) -> None:
try: try:
await self._read_loop() await self._read_loop()

View File

@ -116,6 +116,7 @@ class User(DBUser, BaseUser):
self._connection_check_task = self.loop.create_task(self._check_connection_loop()) self._connection_check_task = self.loop.create_task(self._check_connection_loop())
await self.client.set_last_message_ids(await DBMessage.get_max_mids()) await self.client.set_last_message_ids(await DBMessage.get_max_mids())
self.log.info("Syncing chats") self.log.info("Syncing chats")
await self.client.pause()
chats = await self.client.get_chats() chats = await self.client.get_chats()
limit = self.config["bridge.initial_conversation_sync"] limit = self.config["bridge.initial_conversation_sync"]
for index, chat in enumerate(chats): for index, chat in enumerate(chats):
@ -126,6 +127,7 @@ class User(DBUser, BaseUser):
await portal.update_matrix_room(self, chat) await portal.update_matrix_room(self, chat)
else: else:
await portal.create_matrix_room(self, chat) await portal.create_matrix_room(self, chat)
await self.client.resume()
async def stop(self) -> None: async def stop(self) -> None:
if self._connection_check_task: if self._connection_check_task:

View File

@ -103,6 +103,7 @@ export default class Client {
return this._write({ return this._write({
id: --this.notificationID, id: --this.notificationID,
command: "message", command: "message",
is_sequential: true,
message, message,
}) })
} }
@ -230,6 +231,8 @@ export default class Client {
send: req => this.puppet.sendMessage(req.chat_id, req.text), send: req => this.puppet.sendMessage(req.chat_id, req.text),
send_file: req => this.puppet.sendFile(req.chat_id, req.file_path), send_file: req => this.puppet.sendFile(req.chat_id, req.file_path),
set_last_message_ids: req => this.puppet.setLastMessageIDs(req.msg_ids), set_last_message_ids: req => this.puppet.setLastMessageIDs(req.msg_ids),
pause: () => this.puppet.stopObserving(),
resume: () => this.puppet.startObserving(),
get_chats: () => this.puppet.getRecentChats(), get_chats: () => this.puppet.getRecentChats(),
get_chat: req => this.puppet.getChatInfo(req.chat_id), get_chat: req => this.puppet.getChatInfo(req.chat_id),
get_messages: req => this.puppet.getMessages(req.chat_id), get_messages: req => this.puppet.getMessages(req.chat_id),