Sync, store, and poll for incoming read receipts

This commit is contained in:
Andrew Ferrazzutti 2021-06-30 03:04:25 -04:00
parent c124b4c49e
commit 8c2c0126c9
11 changed files with 549 additions and 91 deletions

View File

@ -7,12 +7,13 @@ from .stranger import Stranger
from .portal import Portal from .portal import Portal
from .message import Message from .message import Message
from .media import Media from .media import Media
from .receipt import Receipt
from .receipt_reaction import ReceiptReaction from .receipt_reaction import ReceiptReaction
def init(db: Database) -> None: def init(db: Database) -> None:
for table in (User, Puppet, Stranger, Portal, Message, Media, ReceiptReaction): for table in (User, Puppet, Stranger, Portal, Message, Media, Receipt, ReceiptReaction):
table.db = db table.db = db
__all__ = ["upgrade_table", "User", "Puppet", "Stranger", "Portal", "Message", "Media", "ReceiptReaction"] __all__ = ["upgrade_table", "User", "Puppet", "Stranger", "Portal", "Message", "Media", "Receipt", "ReceiptReaction"]

View File

@ -13,7 +13,7 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional, ClassVar, Dict, TYPE_CHECKING from typing import Optional, ClassVar, Dict, List, TYPE_CHECKING
from attr import dataclass from attr import dataclass
@ -31,10 +31,11 @@ class Message:
mx_room: RoomID mx_room: RoomID
mid: Optional[int] mid: Optional[int]
chat_id: str chat_id: str
is_outgoing: bool
async def insert(self) -> None: async def insert(self) -> None:
q = "INSERT INTO message (mxid, mx_room, mid, chat_id) VALUES ($1, $2, $3, $4)" q = "INSERT INTO message (mxid, mx_room, mid, chat_id, is_outgoing) VALUES ($1, $2, $3, $4, $5)"
await self.db.execute(q, self.mxid, self.mx_room, self.mid, self.chat_id) await self.db.execute(q, self.mxid, self.mx_room, self.mid, self.chat_id, self.is_outgoing)
async def update_ids(self, new_mxid: EventID, new_mid: int) -> None: async def update_ids(self, new_mxid: EventID, new_mid: int) -> None:
q = ("UPDATE message SET mxid=$1, mid=$2 " q = ("UPDATE message SET mxid=$1, mid=$2 "
@ -55,6 +56,15 @@ class Message:
data[row["chat_id"]] = row["max_mid"] data[row["chat_id"]] = row["max_mid"]
return data return data
@classmethod
async def get_max_outgoing_mids(cls) -> Dict[str, int]:
rows = await cls.db.fetch("SELECT chat_id, MAX(mid) AS max_mid "
"FROM message WHERE is_outgoing GROUP BY chat_id")
data = {}
for row in rows:
data[row["chat_id"]] = row["max_mid"]
return data
@classmethod @classmethod
async def get_num_noid_msgs(cls, room_id: RoomID) -> int: async def get_num_noid_msgs(cls, room_id: RoomID) -> int:
return await cls.db.fetchval("SELECT COUNT(*) FROM message " return await cls.db.fetchval("SELECT COUNT(*) FROM message "
@ -74,7 +84,7 @@ class Message:
@classmethod @classmethod
async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Optional['Message']: async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Optional['Message']:
row = await cls.db.fetchrow("SELECT mxid, mx_room, mid, chat_id " row = await cls.db.fetchrow("SELECT mxid, mx_room, mid, chat_id, is_outgoing "
"FROM message WHERE mxid=$1 AND mx_room=$2", mxid, mx_room) "FROM message WHERE mxid=$1 AND mx_room=$2", mxid, mx_room)
if not row: if not row:
return None return None
@ -82,15 +92,22 @@ class Message:
@classmethod @classmethod
async def get_by_mid(cls, mid: int) -> Optional['Message']: async def get_by_mid(cls, mid: int) -> Optional['Message']:
row = await cls.db.fetchrow("SELECT mxid, mx_room, mid, chat_id FROM message WHERE mid=$1", row = await cls.db.fetchrow("SELECT mxid, mx_room, mid, chat_id, is_outgoing FROM message WHERE mid=$1",
mid) mid)
if not row: if not row:
return None return None
return cls(**row) return cls(**row)
@classmethod
async def get_all_since(cls, chat_id: str, min_mid: int, max_mid: int) -> List['Message']:
rows = await cls.db.fetch("SELECT mxid, mx_room, mid, chat_id, is_outgoing FROM message "
"WHERE chat_id=$1 AND $2<mid AND mid<=$3",
chat_id, min_mid, max_mid)
return [cls(**row) for row in rows]
@classmethod @classmethod
async def get_next_noid_msg(cls, room_id: RoomID) -> Optional['Message']: async def get_next_noid_msg(cls, room_id: RoomID) -> Optional['Message']:
row = await cls.db.fetchrow("SELECT mxid, mx_room, mid, chat_id FROM message " row = await cls.db.fetchrow("SELECT mxid, mx_room, mid, chat_id, is_outgoing FROM message "
"WHERE mid IS NULL AND mx_room=$1", room_id) "WHERE mid IS NULL AND mx_room=$1", room_id)
if not row: if not row:
return None return None

View File

@ -0,0 +1,72 @@
# matrix-puppeteer-line - A very hacky Matrix-LINE bridge based on running LINE's Chrome extension in Puppeteer
# Copyright (C) 2020-2021 Tulir Asokan, Andrew Ferrazzutti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List, ClassVar, Dict, Optional, TYPE_CHECKING
from attr import dataclass
from mautrix.util.async_db import Database
fake_db = Database("") if TYPE_CHECKING else None
@dataclass
class Receipt:
db: ClassVar[Database] = fake_db
mid: int
chat_id: str
num_read: int
async def insert_or_update(self) -> None:
q = ("INSERT INTO receipt (mid, chat_id, num_read) "
"VALUES ($1, $2, $3) "
"ON CONFLICT (chat_id, num_read) "
"DO UPDATE SET mid=EXCLUDED.mid, num_read=EXCLUDED.num_read")
await self.db.execute(q, self.mid, self.chat_id, self.num_read)
# Delete lower counts for earlier messages
# TODO Consider using a CHECK for this instead
q = ("DELETE FROM receipt "
"WHERE chat_id=$1 AND mid<$2 AND num_read<$3")
await self.db.execute(q, self.chat_id, self.mid, self.num_read)
@classmethod
async def get_max_mid(cls, chat_id: str, num_read: int) -> Optional[int]:
q = ("SELECT mid FROM receipt "
"WHERE chat_id=$1 AND num_read=$2")
return await cls.db.fetchval(q, chat_id, num_read)
@classmethod
async def get_max_mid_per_num_read(cls, chat_id: str) -> Dict[int, int]:
rows = await cls.db.fetch("SELECT chat_id, mid, num_read FROM receipt WHERE chat_id=$1", chat_id)
data = {}
for row in rows:
data[row["num_read"]] = row["mid"]
return data
@classmethod
async def get_max_mids_per_num_read(cls) -> Dict[str, Dict[int, int]]:
rows = await cls.db.fetch("SELECT chat_id, mid, num_read FROM receipt")
data = {}
for row in rows:
chat_id = row["chat_id"]
if chat_id not in data:
inner_data = {}
data[chat_id] = inner_data
else:
inner_data = data[chat_id]
inner_data[row["num_read"]] = row["mid"]
return data

View File

@ -135,4 +135,31 @@ async def upgrade_strangers(conn: Connection) -> None:
async def upgrade_noid_msgs(conn: Connection) -> None: async def upgrade_noid_msgs(conn: Connection) -> None:
await conn.execute("ALTER TABLE message DROP CONSTRAINT IF EXISTS message_pkey") await conn.execute("ALTER TABLE message DROP CONSTRAINT IF EXISTS message_pkey")
await conn.execute("ALTER TABLE message ALTER COLUMN mid DROP NOT NULL") await conn.execute("ALTER TABLE message ALTER COLUMN mid DROP NOT NULL")
await conn.execute("ALTER TABLE message ADD UNIQUE (mid)")
table_name = "message"
constraint_name = f"{table_name}_mid_key"
q = ( "SELECT EXISTS(SELECT FROM information_schema.constraint_table_usage "
f"WHERE table_name='{table_name}' AND constraint_name='{constraint_name}')")
has_constraint = await conn.fetchval(q)
if not has_constraint:
await conn.execute(f"ALTER TABLE {table_name} ADD UNIQUE (mid)")
@upgrade_table.register(description="Track LINE read receipts")
async def upgrade_latest_read_receipts(conn: Connection) -> None:
await conn.execute("ALTER TABLE message DROP CONSTRAINT IF EXISTS message_mid_key")
await conn.execute("ALTER TABLE message ADD UNIQUE (mid, chat_id)")
await conn.execute("ALTER TABLE message "
"ADD COLUMN IF NOT EXISTS "
"is_outgoing BOOLEAN NOT NULL DEFAULT false")
await conn.execute("""CREATE TABLE IF NOT EXISTS receipt (
mid INTEGER NOT NULL,
chat_id TEXT NOT NULL,
num_read INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (chat_id, num_read),
FOREIGN KEY (mid, chat_id)
REFERENCES message (mid, chat_id)
ON DELETE CASCADE
)""")

View File

@ -32,7 +32,7 @@ from mautrix.types import (EventID, MessageEventContent, RoomID, EventType, Mess
from mautrix.errors import IntentError from mautrix.errors import IntentError
from mautrix.util.simple_lock import SimpleLock from mautrix.util.simple_lock import SimpleLock
from .db import Portal as DBPortal, Message as DBMessage, ReceiptReaction as DBReceiptReaction, Media as DBMedia from .db import Portal as DBPortal, Message as DBMessage, Receipt as DBReceipt, ReceiptReaction as DBReceiptReaction, Media as DBMedia
from .config import Config from .config import Config
from .rpc import ChatInfo, Participant, Message, Receipt, Client, PathImage from .rpc import ChatInfo, Participant, Message, Receipt, Client, PathImage
from .rpc.types import RPCError from .rpc.types import RPCError
@ -173,7 +173,7 @@ class Portal(DBPortal, BasePortal):
msg = None msg = None
if message_id != -1: if message_id != -1:
try: try:
msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=message_id, chat_id=self.chat_id) msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=message_id, chat_id=self.chat_id, is_outgoing=True)
await msg.insert() await msg.insert()
await self._send_delivery_receipt(event_id) await self._send_delivery_receipt(event_id)
self.log.debug(f"Handled Matrix message {event_id} -> {message_id}") self.log.debug(f"Handled Matrix message {event_id} -> {message_id}")
@ -211,7 +211,7 @@ class Portal(DBPortal, BasePortal):
intent = None intent = None
return intent return intent
async def handle_remote_message(self, source: 'u.User', evt: Message) -> None: async def handle_remote_message(self, source: 'u.User', evt: Message, handle_receipt: bool = True) -> None:
if await DBMessage.get_by_mid(evt.id): if await DBMessage.get_by_mid(evt.id):
self.log.debug(f"Ignoring duplicate message {evt.id}") self.log.debug(f"Ignoring duplicate message {evt.id}")
return return
@ -371,11 +371,8 @@ class Portal(DBPortal, BasePortal):
content.set_edit(prev_event_id) content.set_edit(prev_event_id)
event_id = await self._send_message(intent, content, timestamp=evt.timestamp) event_id = await self._send_message(intent, content, timestamp=evt.timestamp)
if evt.is_outgoing and evt.receipt_count:
await self._handle_receipt(event_id, evt.receipt_count)
if not msg: if not msg:
msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=evt.id, chat_id=self.chat_id) msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=evt.id, chat_id=self.chat_id, is_outgoing=evt.is_outgoing)
try: try:
await msg.insert() await msg.insert()
#await self._send_delivery_receipt(event_id) #await self._send_delivery_receipt(event_id)
@ -386,32 +383,56 @@ class Portal(DBPortal, BasePortal):
await msg.update_ids(new_mxid=event_id, new_mid=evt.id) await msg.update_ids(new_mxid=event_id, new_mid=evt.id)
self.log.debug(f"Handled preseen remote message {evt.id} -> {event_id}") self.log.debug(f"Handled preseen remote message {evt.id} -> {event_id}")
if handle_receipt and evt.is_outgoing and evt.receipt_count:
await self._handle_receipt(event_id, evt.id, evt.receipt_count)
async def handle_remote_receipt(self, receipt: Receipt) -> None: async def handle_remote_receipt(self, receipt: Receipt) -> None:
msg = await DBMessage.get_by_mid(receipt.id) msg = await DBMessage.get_by_mid(receipt.id)
if msg: if msg:
await self._handle_receipt(msg.mxid, receipt.count) await self._handle_receipt(msg.mxid, receipt.id, receipt.count)
else: else:
self.log.debug(f"Could not find message for read receipt {receipt.id}") self.log.debug(f"Could not find message for read receipt {receipt.id}")
async def _handle_receipt(self, event_id: EventID, receipt_count: int) -> None: async def _handle_receipt(self, event_id: EventID, receipt_id: int, receipt_count: int) -> None:
if self.is_direct: if self.is_direct:
await self.main_intent.send_receipt(self.mxid, event_id) await self.main_intent.send_receipt(self.mxid, event_id)
else: else:
reaction = await DBReceiptReaction.get_by_relation(event_id, self.mxid) # Update receipts not only for this message, but also for
# all messages before it with an equivalent "read by" count.
prev_receipt_id = await DBReceipt.get_max_mid(self.chat_id, receipt_count) or 0
messages = await DBMessage.get_all_since(self.chat_id, prev_receipt_id, receipt_id)
# Remove reactions for outdated "read by" counts.
for message in messages:
reaction = await DBReceiptReaction.get_by_relation(message.mxid, self.mxid)
if reaction: if reaction:
await self.main_intent.redact(self.mxid, reaction.mxid) await self.main_intent.redact(self.mxid, reaction.mxid)
await reaction.delete() await reaction.delete()
# If there are as many receipts as there are chat participants, then everyone # If there are as many receipts as there are chat participants, then everyone
# must have read the message, so send real read receipts from each puppet. # must have read the message, so send real read receipts from each puppet.
# TODO Not just -1 if there are multiple _OWN_ puppets... # TODO Not just -1 if there are multiple _OWN_ puppets...
if receipt_count == len(self._last_participant_update) - 1: is_fully_read = receipt_count >= len(self._last_participant_update) - 1
if is_fully_read:
for mid in filter(lambda mid: not p.Puppet.is_mid_for_own_puppet(mid), self._last_participant_update): for mid in filter(lambda mid: not p.Puppet.is_mid_for_own_puppet(mid), self._last_participant_update):
intent = (await p.Puppet.get_by_mid(mid)).intent intent = (await p.Puppet.get_by_mid(mid)).intent
await intent.send_receipt(self.mxid, event_id) await intent.send_receipt(self.mxid, event_id)
else: else:
# TODO messages list should exclude non-outgoing messages,
# but include them just to get rid of potential stale reactions
for message in (msg for msg in messages if msg.is_outgoing):
# TODO Translatable string for "Read by" # TODO Translatable string for "Read by"
reaction_mxid = await self.main_intent.react(self.mxid, event_id, f"(Read by {receipt_count})") try:
await DBReceiptReaction(reaction_mxid, self.mxid, event_id, receipt_count).insert() reaction_mxid = await self.main_intent.react(self.mxid, message.mxid, f"(Read by {receipt_count})")
await DBReceiptReaction(reaction_mxid, self.mxid, message.mxid, receipt_count).insert()
except Exception as e:
self.log.warning(f"Failed to send read receipt reaction for message {message.mxid} in {self.chat_id}: {e}")
try:
await DBReceipt(mid=receipt_id, chat_id=self.chat_id, num_read=receipt_count).insert_or_update()
self.log.debug(f"Handled read receipt for message {receipt_id} read by {receipt_count}")
except Exception as e:
self.log.debug(f"Failed to handle read receipt for message {receipt_id} read by {receipt_count}: {e}")
async def _handle_remote_media(self, source: 'u.User', intent: IntentAPI, async def _handle_remote_media(self, source: 'u.User', intent: IntentAPI,
media_url: str, media_id: Optional[str] = None, media_url: str, media_id: Optional[str] = None,
@ -581,32 +602,55 @@ class Portal(DBPortal, BasePortal):
await self.main_intent.kick_user(self.mxid, user_id, await self.main_intent.kick_user(self.mxid, user_id,
reason="Kicking own puppet") reason="Kicking own puppet")
async def backfill(self, source: 'u.User') -> None: async def backfill(self, source: 'u.User', info: ChatInfo) -> None:
try: try:
with self.backfill_lock: with self.backfill_lock:
await self._backfill(source) await self._backfill(source, info)
except Exception: except Exception:
self.log.exception("Failed to backfill portal") self.log.exception("Failed to backfill portal")
async def _backfill(self, source: 'u.User') -> None: async def _backfill(self, source: 'u.User', info: ChatInfo) -> None:
self.log.debug("Backfilling history through %s", source.mxid) self.log.debug("Backfilling history through %s", source.mxid)
events = await source.client.get_messages(self.chat_id)
max_mid = await DBMessage.get_max_mid(self.mxid) or 0 max_mid = await DBMessage.get_max_mid(self.mxid) or 0
messages = [msg for msg in await source.client.get_messages(self.chat_id) messages = [msg for msg in events.messages
if msg.id > max_mid] if msg.id > max_mid]
if not messages: if not messages:
self.log.debug("Didn't get any entries from server") self.log.debug("Didn't get any messages from server")
await self._cleanup_noid_msgs() else:
return
self.log.debug("Got %d messages from server", len(messages)) self.log.debug("Got %d messages from server", len(messages))
async with NotificationDisabler(self.mxid, source): async with NotificationDisabler(self.mxid, source):
for evt in messages: for evt in messages:
await self.handle_remote_message(source, evt) await self.handle_remote_message(source, evt, handle_receipt=self.is_direct)
self.log.info("Backfilled %d messages through %s", len(messages), source.mxid) self.log.info("Backfilled %d messages through %s", len(messages), source.mxid)
await self._cleanup_noid_msgs() await self._cleanup_noid_msgs()
if not self.is_direct:
# Update participants before sending any receipts
# TODO Joins and leaves are (usually) shown after all, so track them properly.
# In the meantime, just check the participants list after backfilling.
await self._update_participants(info.participants)
for evt in messages:
if evt.is_outgoing and evt.receipt_count:
await self.handle_remote_message(source, evt, handle_receipt=False)
max_mid_per_num_read = await DBReceipt.get_max_mid_per_num_read(self.chat_id)
receipts = [rct for rct in events.receipts
if rct.id > max_mid_per_num_read.get(rct.count, 0)]
if not receipts:
self.log.debug("Didn't get any receipts from server")
else:
self.log.debug("Got %d receipts from server", len(receipts))
for rct in receipts:
await self.handle_remote_receipt(rct)
self.log.info("Backfilled %d receipts through %s", len(receipts), source.mxid)
@property @property
def bridge_info_state_key(self) -> str: def bridge_info_state_key(self) -> str:
return f"net.miscworks.line://line/{self.chat_id}" return f"net.miscworks.line://line/{self.chat_id}"
@ -661,8 +705,7 @@ class Portal(DBPortal, BasePortal):
await puppet.intent.ensure_joined(self.mxid) await puppet.intent.ensure_joined(self.mxid)
await self.update_info(info, source.client) await self.update_info(info, source.client)
await self.backfill(source) await self.backfill(source, info)
await self._update_participants(info.participants)
async def _create_matrix_room(self, source: 'u.User', info: ChatInfo) -> Optional[RoomID]: async def _create_matrix_room(self, source: 'u.User', info: ChatInfo) -> Optional[RoomID]:
if self.mxid: if self.mxid:
@ -741,11 +784,7 @@ class Portal(DBPortal, BasePortal):
await self.update() await self.update()
self.log.debug(f"Matrix room created: {self.mxid}") self.log.debug(f"Matrix room created: {self.mxid}")
self.by_mxid[self.mxid] = self self.by_mxid[self.mxid] = self
await self.backfill(source) await self.backfill(source, info)
if not self.is_direct:
# TODO Joins and leaves are (usually) shown after all, so track them properly.
# In the meantime, just check the participants list after backfilling.
await self._update_participants(info.participants)
return self.mxid return self.mxid

View File

@ -19,7 +19,7 @@ from base64 import b64decode
import asyncio import asyncio
from .rpc import RPCClient from .rpc import RPCClient
from .types import ChatListInfo, ChatInfo, ImageData, Message, Participant, Receipt, StartStatus from .types import ChatEvents, ChatListInfo, ChatInfo, ImageData, Message, Participant, Receipt, StartStatus
class LoginCommand(TypedDict): class LoginCommand(TypedDict):
@ -51,9 +51,8 @@ class Client(RPCClient):
async def get_chat(self, chat_id: str, force_view: bool = False) -> ChatInfo: async def get_chat(self, chat_id: str, force_view: bool = False) -> ChatInfo:
return ChatInfo.deserialize(await self.request("get_chat", chat_id=chat_id, force_view=force_view)) return ChatInfo.deserialize(await self.request("get_chat", chat_id=chat_id, force_view=force_view))
async def get_messages(self, chat_id: str) -> List[Message]: async def get_messages(self, chat_id: str) -> ChatEvents:
resp = await self.request("get_messages", chat_id=chat_id) return ChatEvents.deserialize(await self.request("get_messages", chat_id=chat_id))
return [Message.deserialize(data) for data in resp]
async def read_image(self, image_url: str) -> ImageData: async def read_image(self, image_url: str) -> ImageData:
resp = await self.request("read_image", image_url=image_url) resp = await self.request("read_image", image_url=image_url)
@ -86,8 +85,8 @@ class Client(RPCClient):
resp = await self.request("send_file", chat_id=chat_id, file_path=file_path) resp = await self.request("send_file", chat_id=chat_id, file_path=file_path)
return resp["id"] return resp["id"]
async def set_last_message_ids(self, msg_ids: Dict[str, int]) -> None: async def set_last_message_ids(self, msg_ids: Dict[str, int], own_msg_ids: Dict[str, int], rct_ids: Dict[str, Dict[int, int]]) -> None:
await self.request("set_last_message_ids", msg_ids=msg_ids) await self.request("set_last_message_ids", msg_ids=msg_ids, own_msg_ids=own_msg_ids, rct_ids=rct_ids)
async def on_message(self, func: Callable[[Message], Awaitable[None]]) -> None: async def on_message(self, func: Callable[[Message], Awaitable[None]]) -> None:
async def wrapper(data: Dict[str, Any]) -> None: async def wrapper(data: Dict[str, Any]) -> None:

View File

@ -77,6 +77,12 @@ class Receipt(SerializableAttrs['Receipt']):
count: int = 1 count: int = 1
@dataclass
class ChatEvents(SerializableAttrs['ChatEvents']):
messages: List[Message]
receipts: List[Receipt]
@dataclass @dataclass
class ImageData: class ImageData:
mime: str mime: str

View File

@ -22,7 +22,7 @@ from mautrix.types import UserID, RoomID
from mautrix.appservice import AppService, IntentAPI from mautrix.appservice import AppService, IntentAPI
from mautrix.util.opt_prometheus import Gauge from mautrix.util.opt_prometheus import Gauge
from .db import User as DBUser, Portal as DBPortal, Message as DBMessage from .db import User as DBUser, Portal as DBPortal, Message as DBMessage, Receipt as DBReceipt
from .config import Config from .config import Config
from .rpc import Client, Message, Receipt from .rpc import Client, Message, Receipt
from . import puppet as pu, portal as po from . import puppet as pu, portal as po
@ -135,7 +135,10 @@ class User(DBUser, BaseUser):
self._connection_check_task = self.loop.create_task(self._check_connection_loop()) self._connection_check_task = self.loop.create_task(self._check_connection_loop())
await self.client.pause() await self.client.pause()
await self.sync_own_profile() await self.sync_own_profile()
await self.client.set_last_message_ids(await DBMessage.get_max_mids()) await self.client.set_last_message_ids(
await DBMessage.get_max_mids(),
await DBMessage.get_max_outgoing_mids(),
await DBReceipt.get_max_mids_per_num_read())
limit = self.config["bridge.initial_conversation_sync"] limit = self.config["bridge.initial_conversation_sync"]
self.log.info("Syncing chats") self.log.info("Syncing chats")
await self.send_bridge_notice("Synchronizing chats...") await self.send_bridge_notice("Synchronizing chats...")
@ -186,7 +189,10 @@ class User(DBUser, BaseUser):
self.log.trace("Received message %s", evt) self.log.trace("Received message %s", evt)
portal = await po.Portal.get_by_chat_id(evt.chat_id, create=True) portal = await po.Portal.get_by_chat_id(evt.chat_id, create=True)
if not portal.mxid: if not portal.mxid:
await self.client.set_last_message_ids(await DBMessage.get_max_mids()) await self.client.set_last_message_ids(
await DBMessage.get_max_mids(),
await DBMessage.get_max_outgoing_mids(),
await DBReceipt.get_max_mids_per_num_read())
chat_info = await self.client.get_chat(evt.chat_id) chat_info = await self.client.get_chat(evt.chat_id)
await portal.create_matrix_room(self, chat_info) await portal.create_matrix_room(self, chat_info)
await portal.handle_remote_message(self, evt) await portal.handle_remote_message(self, evt)

View File

@ -257,7 +257,7 @@ export default class Client {
cancel_login: () => this.puppet.cancelLogin(), cancel_login: () => this.puppet.cancelLogin(),
send: req => this.puppet.sendMessage(req.chat_id, req.text), send: req => this.puppet.sendMessage(req.chat_id, req.text),
send_file: req => this.puppet.sendFile(req.chat_id, req.file_path), send_file: req => this.puppet.sendFile(req.chat_id, req.file_path),
set_last_message_ids: req => this.puppet.setLastMessageIDs(req.msg_ids), set_last_message_ids: req => this.puppet.setLastMessageIDs(req.msg_ids, req.own_msg_ids, req.rct_ids),
pause: () => this.puppet.stopObserving(), pause: () => this.puppet.stopObserving(),
resume: () => this.puppet.startObserving(), resume: () => this.puppet.startObserving(),
get_own_profile: () => this.puppet.getOwnProfile(), get_own_profile: () => this.puppet.getOwnProfile(),

View File

@ -64,11 +64,6 @@ window.__mautrixReceivePIN = function (pin) {}
* @return {Promise<void>} * @return {Promise<void>}
*/ */
window.__mautrixExpiry = function (button) {} window.__mautrixExpiry = function (button) {}
/**
* @param {number} id - The ID of the message that was sent
* @return {Promise<void>}
*/
window.__mautrixReceiveMessageID = function(id) {}
/** /**
* @return {void} * @return {void}
*/ */
@ -489,6 +484,13 @@ class MautrixController {
return this.ownMsgPromise ? await this.ownMsgPromise : -1 return this.ownMsgPromise ? await this.ownMsgPromise : -1
} }
/**
* @typedef ChatEvents
* @type {object}
* @property {MessageData[]} messages - All synced messages, which include receipts for them (if any).
* @property {ReceiptData[]} receipts - All synced receipts for messages already present.
*/
/** /**
* Parse the message list of whatever the currently-viewed chat is. * Parse the message list of whatever the currently-viewed chat is.
* *
@ -521,6 +523,61 @@ class MautrixController {
.map(value => value.value) .map(value => value.value)
} }
/**
* Parse receipts of whatever the currently-viewed chat is.
* Should only be used for already-processed messages that
* get skipped by parseMessageList.
*
* @param {?Object} rctIDs - The minimum receipt ID to consider for each "read by" count.
* It's an Object because Puppeteer can't send a Map.
* @return {ReceiptData[]} - A list of receipts.
*/
parseReceiptList(rctIDs = {}) {
console.debug(`rctIDs for full refresh: ${rctIDs}`)
const isDirect = this.getChatType(this.getCurrentChatID()) == ChatTypeEnum.DIRECT
const numOthers = isDirect ? 1 : document.querySelector(SEL_PARTICIPANTS_LIST).childElementCount - 1
const idGetter = e => +e.closest("[data-local-id]").getAttribute("data-local-id")
const receipts =
Array.from(document.querySelectorAll("#_chat_room_msg_list .mdRGT07Read:not(.MdNonDisp)"))
.map(isDirect
? e => {
return {
id: idGetter(e),
count: 1
}
}
: e => {
return {
id: idGetter(e),
count: this._getReceiptCount(e)
}
}
// Using two lambdas to not branch on isDirect for every element
)
const newReceipts = []
const prevFullyReadID = rctIDs[`${numOthers}`] || 0
let minCountToFind = 1
for (let i = receipts.length-1; i >= 0; i--) {
const receipt = receipts[i]
if (receipt.count >= minCountToFind && receipt.id > (rctIDs[`${receipt.count}`] || 0)) {
newReceipts.push(receipt)
if (receipt.count < numOthers) {
minCountToFind = receipt.count+1
} else {
break
}
} else if (receipt.id <= prevFullyReadID) {
break
}
}
return newReceipts
}
/** /**
* @typedef PathImage * @typedef PathImage
* @type object * @type object
@ -611,6 +668,16 @@ class MautrixController {
* signified by the number in its notification badge. * signified by the number in its notification badge.
*/ */
/**
* @typedef ChatListInfoForCycle
* @type object
* @property {number} id - The ID of the chat.
* @property {number} notificationCount - The number of unread messages in the chat,
* signified by the number in its notification badge.
* @property {number} numParticipants - The number of participants in the chat,
* signified by a count next to the chat title.
*/
getChatListItemID(element) { getChatListItemID(element) {
return element.getAttribute("data-chatid") return element.getAttribute("data-chatid")
} }
@ -635,6 +702,12 @@ class MautrixController {
return Number.parseInt(element.querySelector(".MdIcoBadge01:not(.MdNonDisp)")?.innerText) || 0 return Number.parseInt(element.querySelector(".MdIcoBadge01:not(.MdNonDisp)")?.innerText) || 0
} }
getChatListItemOtherParticipantCount(element) {
const countElement = element.querySelector(".mdCMN04Count:not(.MdNonDisp)")
const match = countElement?.innerText.match(/\d+/)
return match ? match[0] - 1 : 1
}
/** /**
* Parse a conversation list item element. * Parse a conversation list item element.
* *
@ -664,6 +737,32 @@ class MautrixController {
child => this.parseChatListItem(child.firstElementChild)) child => this.parseChatListItem(child.firstElementChild))
} }
/**
* Parse a conversation list item element for cycling.
*
* @param {Element} element - The element to parse.
* @return {ChatListInfoForCycle} - The info in the element.
*/
parseChatListItemForCycle(element) {
return {
id: this.getChatListItemID(element),
notificationCount: this.getChatListItemNotificationCount(element),
otherParticipantCount: this.getChatListItemOtherParticipantCount(element),
}
}
/**
* Parse the list of recent/saved chats, but for properties
* relevant to knowing which chat to cycle onto for read receipts.
*
* @return {ChatListInfoForCycle[]} - The list of chats with relevant properties.
*/
parseChatListForCycle() {
const chatList = document.querySelector("#_chat_list_body")
return Array.from(chatList.children).map(
child => this.parseChatListItemForCycle(child.firstElementChild))
}
/** /**
* Download an image at a given URL and return it as a data URL. * Download an image at a given URL and return it as a data URL.
* *
@ -800,7 +899,14 @@ class MautrixController {
} }
/** /**
* @param {[MutationRecord]} mutations - The mutation records that occurred * @typedef ReceiptData
* @type {object}
* @property {number} id - The ID of the read message.
* @property {?number} count - The number of users who have read the message.
*/
/**
* @param {MutationRecord[]} mutations - The mutation records that occurred
* @param {string} chatID - The ID of the chat being observed. * @param {string} chatID - The ID of the chat being observed.
* @private * @private
*/ */
@ -1182,8 +1288,7 @@ class MautrixController {
const resolve = this.promiseOwnMsgResolve const resolve = this.promiseOwnMsgResolve
this._promiseOwnMsgReset() this._promiseOwnMsgReset()
window.__mautrixReceiveMessageID(msgID).then( resolve(msgID)
() => resolve(msgID))
} }
_rejectOwnMessage(failureElement = null) { _rejectOwnMessage(failureElement = null) {

View File

@ -46,9 +46,11 @@ export default class MessagesPuppeteer {
this.sendPlaceholders = sendPlaceholders this.sendPlaceholders = sendPlaceholders
this.profilePath = profilePath this.profilePath = profilePath
this.updatedChats = new Set() this.updatedChats = new Set()
this.sentMessageIDs = new Set()
this.mostRecentMessages = new Map() this.mostRecentMessages = new Map()
this.mostRecentOwnMessages = new Map()
this.mostRecentReceipts = new Map()
this.numChatNotifications = new Map() this.numChatNotifications = new Map()
this.cycleTimerID = null
this.taskQueue = new TaskQueue(this.id) this.taskQueue = new TaskQueue(this.id)
this.client = client this.client = client
} }
@ -105,8 +107,6 @@ export default class MessagesPuppeteer {
await this.page.exposeFunction("__mautrixReceiveQR", this._receiveQRChange.bind(this)) await this.page.exposeFunction("__mautrixReceiveQR", this._receiveQRChange.bind(this))
await this.page.exposeFunction("__mautrixSendEmailCredentials", this._sendEmailCredentials.bind(this)) await this.page.exposeFunction("__mautrixSendEmailCredentials", this._sendEmailCredentials.bind(this))
await this.page.exposeFunction("__mautrixReceivePIN", this._receivePIN.bind(this)) await this.page.exposeFunction("__mautrixReceivePIN", this._receivePIN.bind(this))
await this.page.exposeFunction("__mautrixReceiveMessageID",
id => this.sentMessageIDs.add(id))
await this.page.exposeFunction("__mautrixReceiveChanges", await this.page.exposeFunction("__mautrixReceiveChanges",
this._receiveChatListChanges.bind(this)) this._receiveChatListChanges.bind(this))
await this.page.exposeFunction("__mautrixReceiveMessages", await this.page.exposeFunction("__mautrixReceiveMessages",
@ -424,19 +424,40 @@ export default class MessagesPuppeteer {
* Get messages in a chat. * Get messages in a chat.
* *
* @param {string} chatID The ID of the chat whose messages to get. * @param {string} chatID The ID of the chat whose messages to get.
* @return {Promise<[MessageData]>} - The messages visible in the chat. * @return {Promise<ChatEvents>} - New messages and receipts synced fron the chat.
*/ */
async getMessages(chatID) { async getMessages(chatID) {
return await this.taskQueue.push(() => this._getMessagesUnsafe(chatID)) return await this.taskQueue.push(() => this._getMessagesUnsafe(chatID))
} }
setLastMessageIDs(ids) { setLastMessageIDs(msgIDs, ownMsgIDs, rctIDs) {
this.mostRecentMessages.clear() this.mostRecentMessages.clear()
for (const [chatID, messageID] of Object.entries(ids)) { for (const [chatID, messageID] of Object.entries(msgIDs)) {
this.mostRecentMessages.set(chatID, messageID) this.mostRecentMessages.set(chatID, messageID)
} }
this.log("Updated most recent message ID map:") this.log("Updated most recent message ID map:")
this.log(JSON.stringify(this.mostRecentMessages)) this.log(JSON.stringify(msgIDs))
for (const [chatID, messageID] of Object.entries(ownMsgIDs)) {
this.mostRecentOwnMessages.set(chatID, messageID)
}
this.log("Updated most recent own message ID map:")
this.log(JSON.stringify(ownMsgIDs))
this.mostRecentReceipts.clear()
for (const [chatID, receipts] of Object.entries(rctIDs)) {
const receiptMap = this._getReceiptMap(chatID)
for (const [count, receiptID] of Object.entries(receipts)) {
receiptMap.set(+count, receiptID)
}
}
this.log("Updated most recent receipt ID map")
for (const [chatID, receiptMap] of this.mostRecentReceipts) {
this.log(`${chatID}:`)
for (const [count, receiptID] of receiptMap) {
this.log(`Read by ${count}: ${receiptID}`)
}
}
} }
async readImage(imageUrl) { async readImage(imageUrl) {
@ -450,9 +471,69 @@ export default class MessagesPuppeteer {
return { id: await this.taskQueue.push(() => this._sendFileUnsafe(chatID, filePath)) } return { id: await this.taskQueue.push(() => this._sendFileUnsafe(chatID, filePath)) }
} }
_cycleTimerStart() {
// TODO Config for cycle delay
this.cycleTimerID = setTimeout(
() => this.taskQueue.push(() => this._cycleChatUnsafe()),
5000)
}
async _cycleChatUnsafe() {
const currentChatID = await this.page.evaluate(() => window.__mautrixController.getCurrentChatID())
const chatList = await this.page.evaluate(() => window.__mautrixController.parseChatListForCycle())
// Add 1 to start at the chat after the currently-viewed one
const offset = 1 + Math.max(chatList.findIndex(item => item.id == currentChatID), 0)
// Visit next chat for which:
// - there are no unread notifications
// - the most recently-sent own message is not fully read
let chatIDToSync
for (let i = 0, n = chatList.length; i < n; i++) {
const chatListItem = chatList[(i+offset) % n]
if (chatListItem.notificationCount > 0) {
// Chat has unread notifications, so don't view it
continue
}
if (chatListItem.otherParticipantCount == 0) {
// Chat has no other participants (must be a non-DM with only you), so nothing to sync
continue
}
const mostRecentOwnMsgID = this.mostRecentOwnMessages.get(chatListItem.id)
if (mostRecentOwnMsgID == undefined) {
// Chat doesn't have any own messages, so no need to view it
continue
}
const receiptMap = this._getReceiptMap(chatListItem.id)
const mostRecentFullyReadMsgID = receiptMap.get(chatListItem.otherParticipantCount)
if (mostRecentFullyReadMsgID == mostRecentOwnMsgID) {
// Latest own message is fully-read, nothing to see here, move along
continue
}
chatIDToSync = chatListItem.id
break
}
if (!chatIDToSync) {
// TODO Confirm if this actually works...!
this.log(`Found no chats in need of read receipt updates, so force-viewing ${currentChatID} just to keep LINE alive`)
await this._switchChat(currentChatID, true)
} else {
this.log(`Viewing chat ${chatIDToSync} to check for new read receipts`)
await this._syncChat(chatIDToSync)
}
this._cycleTimerStart()
}
async startObserving() { async startObserving() {
// TODO Highly consider syncing anything that was missed since stopObserving...
const chatID = await this.page.evaluate(() => window.__mautrixController.getCurrentChatID()) const chatID = await this.page.evaluate(() => window.__mautrixController.getCurrentChatID())
this.log(`Adding observers for ${chatID || "empty chat"}`) this.log(`Adding observers for ${chatID || "empty chat"}, and global timers`)
await this.page.evaluate( await this.page.evaluate(
() => window.__mautrixController.addChatListObserver()) () => window.__mautrixController.addChatListObserver())
if (chatID) { if (chatID) {
@ -460,14 +541,23 @@ export default class MessagesPuppeteer {
(mostRecentMessage) => window.__mautrixController.addMsgListObserver(mostRecentMessage), (mostRecentMessage) => window.__mautrixController.addMsgListObserver(mostRecentMessage),
this.mostRecentMessages.get(chatID)) this.mostRecentMessages.get(chatID))
} }
if (this.cycleTimerID == null) {
this._cycleTimerStart()
}
} }
async stopObserving() { async stopObserving() {
this.log("Removing observers") this.log("Removing observers and timers")
await this.page.evaluate( await this.page.evaluate(
() => window.__mautrixController.removeChatListObserver()) () => window.__mautrixController.removeChatListObserver())
await this.page.evaluate( await this.page.evaluate(
() => window.__mautrixController.removeMsgListObserver()) () => window.__mautrixController.removeMsgListObserver())
if (this.cycleTimerID != null) {
clearTimeout(this.cycleTimerID)
this.cycleTimerID = null
}
} }
async getOwnProfile() { async getOwnProfile() {
@ -651,7 +741,7 @@ export default class MessagesPuppeteer {
async _sendMessageUnsafe(chatID, text) { async _sendMessageUnsafe(chatID, text) {
// Sync all messages in this chat first // Sync all messages in this chat first
this._receiveMessages(chatID, await this._getMessagesUnsafe(chatID), true) await this._syncChat(chatID)
// TODO Initiate the promise in the content script // TODO Initiate the promise in the content script
await this.page.evaluate( await this.page.evaluate(
() => window.__mautrixController.promiseOwnMessage(5000, "time")) () => window.__mautrixController.promiseOwnMessage(5000, "time"))
@ -670,7 +760,7 @@ export default class MessagesPuppeteer {
} }
async _sendFileUnsafe(chatID, filePath) { async _sendFileUnsafe(chatID, filePath) {
this._receiveMessages(chatID, await this._getMessagesUnsafe(chatID), true) await this._syncChat(chatID)
await this.page.evaluate( await this.page.evaluate(
() => window.__mautrixController.promiseOwnMessage( () => window.__mautrixController.promiseOwnMessage(
10000, // Use longer timeout for file uploads 10000, // Use longer timeout for file uploads
@ -701,6 +791,8 @@ export default class MessagesPuppeteer {
const id = await this.page.evaluate( const id = await this.page.evaluate(
() => window.__mautrixController.waitForOwnMessage()) () => window.__mautrixController.waitForOwnMessage())
this.log(`Successfully sent message ${id} to ${chatID}`) this.log(`Successfully sent message ${id} to ${chatID}`)
this.mostRecentMessages.set(chatID, id)
this.mostRecentOwnMessages.set(chatID, id)
return id return id
} catch (e) { } catch (e) {
// TODO Catch if something other than a timeout // TODO Catch if something other than a timeout
@ -712,10 +804,10 @@ export default class MessagesPuppeteer {
} }
_receiveMessages(chatID, messages, skipProcessing = false) { _receiveMessages(chatID, messages, skipProcessing = false) {
if (this.client) {
if (!skipProcessing) { if (!skipProcessing) {
messages = this._processMessages(chatID, messages) messages = this._processMessages(chatID, messages)
} }
if (this.client) {
for (const message of messages) { for (const message of messages) {
this.client.sendMessage(message).catch(err => this.client.sendMessage(message).catch(err =>
this.error("Failed to send message", message.id, "to client:", err)) this.error("Failed to send message", message.id, "to client:", err))
@ -737,12 +829,45 @@ export default class MessagesPuppeteer {
await this._switchChat(chatID) await this._switchChat(chatID)
// TODO Is it better to reset the notification count in _switchChat instead of here? // TODO Is it better to reset the notification count in _switchChat instead of here?
this.numChatNotifications.set(chatID, 0) this.numChatNotifications.set(chatID, 0)
let messages = await this.page.evaluate( let messages = await this.page.evaluate(
mostRecentMessage => window.__mautrixController.parseMessageList(mostRecentMessage), mostRecentMessage => window.__mautrixController.parseMessageList(mostRecentMessage),
this.mostRecentMessages.get(chatID)) this.mostRecentMessages.get(chatID))
// Doing this before restoring the observer since it updates minID // Doing this before restoring the observer since it updates minID
messages = this._processMessages(chatID, messages) messages = this._processMessages(chatID, messages)
const receiptMap = this._getReceiptMap(chatID)
// Sync receipts seen from newly-synced messages
// TODO When user leaves, clear the read-by count for the old number of other participants
let minCountToFind = 1
for (let i = messages.length-1; i >= 0; i--) {
const message = messages[i]
if (!message.is_outgoing) {
continue
}
const count = message.receipt_count
if (count >= minCountToFind && message.id > (receiptMap.get(count) || 0)) {
minCountToFind = count+1
receiptMap.set(count, message.id)
}
// TODO Early exit when count == num other participants
}
// Sync receipts from previously-seen messages
const receipts = await this.page.evaluate(
mostRecentReceipts => window.__mautrixController.parseReceiptList(mostRecentReceipts),
Object.fromEntries(receiptMap))
for (const receipt of receipts) {
receiptMap.set(receipt.count, receipt.id)
receipt.chat_id = chatID
}
this._trimReceiptMap(receiptMap)
if (hadMsgListObserver) { if (hadMsgListObserver) {
this.log("Restoring msg list observer") this.log("Restoring msg list observer")
await this.page.evaluate( await this.page.evaluate(
@ -752,13 +877,16 @@ export default class MessagesPuppeteer {
this.log("Not restoring msg list observer, as there never was one") this.log("Not restoring msg list observer, as there never was one")
} }
return messages return {
messages: messages,
receipts: receipts
}
} }
_processMessages(chatID, messages) { _processMessages(chatID, messages) {
// TODO Probably don't need minID filtering if Puppeteer context handles it now // TODO Probably don't need minID filtering if Puppeteer context handles it now
const minID = this.mostRecentMessages.get(chatID) || 0 const minID = this.mostRecentMessages.get(chatID) || 0
const filteredMessages = messages.filter(msg => msg.id > minID && !this.sentMessageIDs.has(msg.id)) const filteredMessages = messages.filter(msg => msg.id > minID)
if (filteredMessages.length > 0) { if (filteredMessages.length > 0) {
const newFirstID = filteredMessages[0].id const newFirstID = filteredMessages[0].id
@ -769,12 +897,40 @@ export default class MessagesPuppeteer {
for (const message of filteredMessages) { for (const message of filteredMessages) {
message.chat_id = chatID message.chat_id = chatID
} }
for (let i = filteredMessages.length - 1; i >= 0; i--) {
const message = filteredMessages[i]
if (message.is_outgoing) {
this.mostRecentOwnMessages.set(chatID, message.id)
break
}
}
return filteredMessages return filteredMessages
} else { } else {
return [] return []
} }
} }
_getReceiptMap(chatID) {
if (!this.mostRecentReceipts.has(chatID)) {
const newMap = new Map()
this.mostRecentReceipts.set(chatID, newMap)
return newMap
} else {
return this.mostRecentReceipts.get(chatID)
}
}
_trimReceiptMap(receiptMap) {
// Delete lower counts for earlier messages
let prevCount = null
for (const count of Array.from(receiptMap.keys()).sort()) {
if (prevCount != null && receiptMap.get(prevCount) < receiptMap.get(count)) {
receiptMap.delete(count)
}
prevCount = count
}
}
async _processChatListChangeUnsafe(chatListInfo) { async _processChatListChangeUnsafe(chatListInfo) {
const chatID = chatListInfo.id const chatID = chatListInfo.id
this.updatedChats.delete(chatID) this.updatedChats.delete(chatID)
@ -818,22 +974,26 @@ export default class MessagesPuppeteer {
html: chatListInfo.lastMsg, html: chatListInfo.lastMsg,
}] }]
this.numChatNotifications.set(chatID, chatListInfo.notificationCount) this.numChatNotifications.set(chatID, chatListInfo.notificationCount)
this._receiveMessages(chatID, messages, true)
} else { } else {
messages = await this._getMessagesUnsafe(chatListInfo.id)
this.numChatNotifications.set(chatID, 0) this.numChatNotifications.set(chatID, 0)
if (messages.length === 0) { await this._syncChat(chatListInfo.id)
this.log("No new messages found in", chatListInfo.id)
return
} }
} }
if (this.client) { async _syncChat(chatID) {
for (const message of messages) { const {messages, receipts} = await this._getMessagesUnsafe(chatID)
await this.client.sendMessage(message).catch(err =>
this.error("Failed to send message", message.id || "with no ID", "to client:", err)) if (messages.length == 0) {
} this.log("No new messages found in", chatID)
} else { } else {
this.log("No client connected, not sending messages") this._receiveMessages(chatID, messages, true)
}
if (receipts.length == 0) {
this.log("No new receipts found in", chatID)
} else {
this._receiveReceiptMulti(chatID, receipts, true)
} }
} }
@ -849,7 +1009,15 @@ export default class MessagesPuppeteer {
} }
_receiveReceiptDirectLatest(chat_id, receipt_id) { _receiveReceiptDirectLatest(chat_id, receipt_id) {
this.log(`Received read receipt ${receipt_id} for chat ${chat_id}`) const receiptMap = this._getReceiptMap(chat_id)
const prevReceiptID = (receiptMap.get(1) || 0)
if (receipt_id <= prevReceiptID) {
this.log(`Received OUTDATED read receipt ${receipt_id} (older than ${prevReceiptID}) for chat ${chat_id}`)
return
}
receiptMap.set(1, receipt_id)
this.log(`Received read receipt ${receipt_id} (since ${prevReceiptID}) for chat ${chat_id}`)
if (this.client) { if (this.client) {
this.client.sendReceipt({chat_id: chat_id, id: receipt_id}) this.client.sendReceipt({chat_id: chat_id, id: receipt_id})
.catch(err => this.error("Error handling read receipt:", err)) .catch(err => this.error("Error handling read receipt:", err))
@ -858,8 +1026,26 @@ export default class MessagesPuppeteer {
} }
} }
async _receiveReceiptMulti(chat_id, receipts) { async _receiveReceiptMulti(chat_id, receipts, skipProcessing = false) {
// Use async to ensure that receipts are sent in order // Use async to ensure that receipts are sent in order
if (!skipProcessing) {
const receiptMap = this._getReceiptMap(chat_id)
receipts.filter(receipt => {
if (receipt.id > (receiptMap.get(receipt.count) || 0)) {
receiptMap.set(receipt.count, receipt.id)
return true
} else {
return false
}
})
if (receipts.length == 0) {
this.log(`Received ALL OUTDATED bulk read receipts for chat ${chat_id}:`, receipts)
return
}
this._trimReceiptMap(receiptMap)
}
this.log(`Received bulk read receipts for chat ${chat_id}:`, receipts) this.log(`Received bulk read receipts for chat ${chat_id}:`, receipts)
if (this.client) { if (this.client) {
for (const receipt of receipts) { for (const receipt of receipts) {