Incoming read receipts for MRU chats

TODO: poll other chats for read receipts
This commit is contained in:
Andrew Ferrazzutti 2021-04-20 20:01:50 -04:00
parent d30402a98f
commit c8d1d38d21
12 changed files with 351 additions and 13 deletions

View File

@ -69,6 +69,8 @@
* Message edits
* Formatted messages
* Presence
* Timestamped read receipts
* Read receipts between users other than yourself
### Missing from LINE on Chrome
* Message redaction (delete/unsend)

View File

@ -6,11 +6,12 @@ from .puppet import Puppet
from .portal import Portal
from .message import Message
from .media import Media
from .receipt_reaction import ReceiptReaction
def init(db: Database) -> None:
for table in (User, Puppet, Portal, Message, Media):
for table in (User, Puppet, Portal, Message, Media, ReceiptReaction):
table.db = db
__all__ = ["upgrade_table", "User", "Puppet", "Portal", "Message", "Media"]
__all__ = ["upgrade_table", "User", "Puppet", "Portal", "Message", "Media", "ReceiptReaction"]

View File

@ -0,0 +1,66 @@
# matrix-puppeteer-line - A very hacky Matrix-LINE bridge based on running LINE's Chrome extension in Puppeteer
# Copyright (C) 2020-2021 Tulir Asokan, Andrew Ferrazzutti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional, ClassVar, TYPE_CHECKING, ClassVar
from attr import dataclass
from mautrix.types import RoomID, EventID
from mautrix.util.async_db import Database
fake_db = Database("") if TYPE_CHECKING else None
@dataclass
class ReceiptReaction:
db: ClassVar[Database] = fake_db
mxid: EventID
mx_room: RoomID
relates_to: EventID
num_read: int
async def insert(self) -> None:
q = "INSERT INTO receipt_reaction (mxid, mx_room, relates_to, num_read) VALUES ($1, $2, $3, $4)"
await self.db.execute(q, self.mxid, self.mx_room, self.relates_to, self.num_read)
async def update(self) -> None:
q = ("UPDATE receipt_reaction SET relates_to=$3, num_read=$4 "
"WHERE mxid=$1 AND mx_room=$2")
await self.db.execute(q, self.mxid, self.mx_room, self.relates_to, self.num_read)
async def delete(self) -> None:
q = "DELETE FROM receipt_reaction WHERE mxid=$1 AND mx_room=$2"
await self.db.execute(q, self.mxid, self.mx_room)
@classmethod
async def delete_all(cls, room_id: RoomID) -> None:
await cls.db.execute("DELETE FROM message WHERE mx_room=$1", room_id)
@classmethod
async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Optional['ReceiptReaction']:
row = await cls.db.fetchrow("SELECT mxid, mx_room, relates_to, num_read "
"FROM receipt_reaction WHERE mxid=$1 AND mx_room=$2", mxid, mx_room)
if not row:
return None
return cls(**row)
@classmethod
async def get_by_relation(cls, mxid: EventID, mx_room: RoomID) -> Optional['ReceiptReaction']:
row = await cls.db.fetchrow("SELECT mxid, mx_room, relates_to, num_read "
"FROM receipt_reaction WHERE relates_to=$1 AND mx_room=$2", mxid, mx_room)
if not row:
return None
return cls(**row)

View File

@ -68,3 +68,39 @@ async def upgrade_media(conn: Connection) -> None:
media_id TEXT PRIMARY KEY,
mxc TEXT NOT NULL
)""")
@upgrade_table.register(description="Helpful table constraints")
async def upgrade_table_constraints(conn: Connection) -> None:
constraint_name = "portal_mxid_key"
q = ( "SELECT EXISTS(SELECT FROM information_schema.constraint_table_usage "
f"WHERE table_name='portal' AND constraint_name='{constraint_name}')")
has_unique_mxid = await conn.fetchval(q)
if not has_unique_mxid:
await conn.execute(f"ALTER TABLE portal ADD CONSTRAINT {constraint_name} UNIQUE(mxid)")
constraint_name = "message_chat_id_fkey"
q = ( "SELECT EXISTS(SELECT FROM information_schema.table_constraints "
f"WHERE table_name='message' AND constraint_name='{constraint_name}')")
has_fkey = await conn.fetchval(q)
if not has_fkey:
await conn.execute(
f"ALTER TABLE message ADD CONSTRAINT {constraint_name} "
"FOREIGN KEY (chat_id) "
"REFERENCES portal (chat_id) "
"ON DELETE CASCADE")
@upgrade_table.register(description="Read receipts for groups & rooms")
async def upgrade_read_receipts(conn: Connection) -> None:
await conn.execute("""CREATE TABLE IF NOT EXISTS receipt_reaction (
mxid TEXT NOT NULL,
mx_room TEXT NOT NULL,
relates_to TEXT NOT NULL,
num_read INTEGER NOT NULL,
PRIMARY KEY (mxid, mx_room),
FOREIGN KEY (mx_room)
REFERENCES portal (mxid)
ON DELETE CASCADE
)""")

View File

@ -33,9 +33,9 @@ from mautrix.errors import MatrixError
from mautrix.util.simple_lock import SimpleLock
from mautrix.util.network_retry import call_with_net_retry
from .db import Portal as DBPortal, Message as DBMessage, Media as DBMedia
from .db import Portal as DBPortal, Message as DBMessage, ReceiptReaction as DBReceiptReaction, Media as DBMedia
from .config import Config
from .rpc import ChatInfo, Participant, Message, Client, PathImage
from .rpc import ChatInfo, Participant, Message, Receipt, Client, PathImage
from . import user as u, puppet as p, matrix as m
if TYPE_CHECKING:
@ -77,7 +77,6 @@ class Portal(DBPortal, BasePortal):
self.backfill_lock = SimpleLock("Waiting for backfilling to finish before handling %s",
log=self.log)
self._main_intent = None
self._reaction_lock = asyncio.Lock()
self._last_participant_update = set()
@property
@ -271,11 +270,37 @@ class Portal(DBPortal, BasePortal):
body=msg_text, formatted_body=msg_html)
event_id = await self._send_message(intent, content, timestamp=evt.timestamp)
if event_id:
if evt.is_outgoing and evt.receipt_count:
await self._handle_receipt(event_id, evt.receipt_count)
msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=evt.id, chat_id=self.chat_id)
await msg.insert()
await self._send_delivery_receipt(event_id)
self.log.debug(f"Handled remote message {evt.id} -> {event_id}")
async def handle_remote_receipt(self, receipt: Receipt) -> None:
msg = await DBMessage.get_by_mid(receipt.id)
if msg:
await self._handle_receipt(msg.mxid, receipt.count)
else:
self.log.debug(f"Could not find message for read receipt {receipt.id}")
async def _handle_receipt(self, event_id: EventID, receipt_count: int) -> None:
if self.is_direct:
await self.main_intent.send_receipt(self.mxid, event_id)
else:
reaction = await DBReceiptReaction.get_by_relation(event_id, self.mxid)
if reaction:
await self.main_intent.redact(self.mxid, reaction.mxid)
await reaction.delete()
if receipt_count == len(self._last_participant_update) - 1:
for participant in self._last_participant_update:
puppet = await p.Puppet.get_by_mid(participant.id)
await puppet.intent.send_receipt(self.mxid, event_id)
else:
# TODO Translatable string for "Read by"
reaction_mxid = await self.main_intent.react(self.mxid, event_id, f"(Read by {receipt_count})")
await DBReceiptReaction(reaction_mxid, self.mxid, event_id, receipt_count).insert()
async def _handle_remote_photo(self, source: 'u.User', intent: IntentAPI, message: Message
) -> Optional[MediaMessageEventContent]:
resp = await source.client.read_image(message.image_url)
@ -530,6 +555,9 @@ class Portal(DBPortal, BasePortal):
"users": {
self.az.bot_mxid: 100,
self.main_intent.mxid: 100,
},
"events": {
str(EventType.REACTION): 1
}
}
})

View File

@ -1,2 +1,2 @@
from .client import Client
from .types import RPCError, PathImage, ChatListInfo, ChatInfo, Participant, Message, StartStatus
from .types import RPCError, PathImage, ChatListInfo, ChatInfo, Participant, Message, Receipt, StartStatus

View File

@ -19,7 +19,7 @@ from base64 import b64decode
import asyncio
from .rpc import RPCClient
from .types import ChatListInfo, ChatInfo, Message, ImageData, StartStatus
from .types import ChatListInfo, ChatInfo, Message, Receipt, ImageData, StartStatus
class LoginCommand(TypedDict):
@ -92,6 +92,12 @@ class Client(RPCClient):
self.add_event_handler("message", wrapper)
async def on_receipt(self, func: Callable[[Receipt], Awaitable[None]]) -> None:
async def wrapper(data: Dict[str, Any]) -> None:
await func(Receipt.deserialize(data["receipt"]))
self.add_event_handler("receipt", wrapper)
# TODO Type hint for sender
async def login(self, sender, **login_data) -> AsyncGenerator[Tuple[str, str], None]:
login_data["login_type"] = sender.command_status["login_type"]

View File

@ -60,6 +60,14 @@ class Message(SerializableAttrs['Message']):
timestamp: int = None
html: Optional[str] = None
image_url: Optional[str] = None
receipt_count: Optional[int] = None
@dataclass
class Receipt(SerializableAttrs['Receipt']):
id: int
chat_id: int
count: int = 1
@dataclass

View File

@ -24,7 +24,7 @@ from mautrix.util.opt_prometheus import Gauge
from .db import User as DBUser, Portal as DBPortal, Message as DBMessage
from .config import Config
from .rpc import Client, Message
from .rpc import Client, Message, Receipt
from . import puppet as pu, portal as po
if TYPE_CHECKING:
@ -94,6 +94,7 @@ class User(DBUser, BaseUser):
self.log.debug("Starting client")
state = await self.client.start()
await self.client.on_message(self.handle_message)
await self.client.on_receipt(self.handle_receipt)
if state.is_connected:
self._track_metric(METRIC_CONNECTED, True)
if state.is_logged_in:
@ -153,6 +154,14 @@ class User(DBUser, BaseUser):
await portal.create_matrix_room(self, chat_info)
await portal.handle_remote_message(self, puppet, evt)
async def handle_receipt(self, receipt: Receipt) -> None:
self.log.trace(f"Received receipt for chat {receipt.chat_id}")
portal = await po.Portal.get_by_chat_id(receipt.chat_id, create=True)
if not portal.mxid:
chat_info = await self.client.get_chat(receipt.chat_id)
await portal.create_matrix_room(self, chat_info)
await portal.handle_remote_receipt(receipt)
def _add_to_cache(self) -> None:
self.by_mxid[self.mxid] = self

View File

@ -108,6 +108,15 @@ export default class Client {
})
}
sendReceipt(receipt) {
this.log(`Sending read receipt (${receipt.count || "DM"}) of msg ${receipt.id} for chat ${receipt.chat_id}`)
return this._write({
id: --this.notificationID,
command: "receipt",
receipt
})
}
sendQRCode(url) {
this.log(`Sending QR ${url} to client`)
return this._write({

View File

@ -27,6 +27,18 @@ window.__chronoParseDate = function (text, ref, option) {}
* @return {Promise<void>}
*/
window.__mautrixReceiveChanges = function (changes) {}
/**
* @param {str} chatID - The ID of the chat whose receipts are being processed.
* @param {str} receipt_id - The ID of the most recently-read message for the current chat.
* @return {Promise<void>}
*/
window.__mautrixReceiveReceiptDirectLatest = function (chat_id, receipt_id) {}
/**
* @param {str} chatID - The ID of the chat whose receipts are being processed.
* @param {[Receipt]} receipts - All newly-seen receipts for the current chat.
* @return {Promise<void>}
*/
window.__mautrixReceiveReceiptMulti = function (chat_id, receipts) {}
/**
* @param {string} url - The URL for the QR code.
* @return {Promise<void>}
@ -65,6 +77,7 @@ const ChatTypeEnum = Object.freeze({
class MautrixController {
constructor(ownID) {
this.chatListObserver = null
this.msgListObserver = null
this.qrChangeObserver = null
this.qrAppearObserver = null
this.emailAppearObserver = null
@ -93,6 +106,11 @@ class MautrixController {
}
}
getCurrentChatId() {
const chatListElement = document.querySelector("#_chat_list_body > .ExSelected > .chatList")
return chatListElement ? this.getChatListItemId(chatListElement) : null
}
/**
* Parse a date string.
*
@ -148,6 +166,7 @@ class MautrixController {
* @property {?Participant} sender - Full data of the participant who sent the message, if needed and available.
* @property {?string} html - The HTML format of the message, if necessary.
* @property {?string} image_url - The URL to the image in the message, if it's an image-only message.
* @property {?int} receipt_count - The number of users who have read the message.
*/
_isLoadedImageURL(src) {
@ -167,12 +186,16 @@ class MautrixController {
const is_outgoing = element.classList.contains("mdRGT07Own")
let sender = {}
const receipt = element.querySelector(".mdRGT07Own .mdRGT07Read:not(.MdNonDisp)")
let receipt_count
// TODO Clean up participantsList access...
const participantsListSelector = "#_chat_detail_area > .mdRGT02Info ul.mdRGT13Ul"
// Don't need sender ID for direct chats, since the portal will have it already.
if (chatType == ChatTypeEnum.DIRECT) {
sender = null
receipt_count = is_outgoing ? (receipt ? 1 : 0) : null
} else if (!is_outgoing) {
sender.name = element.querySelector(".mdRGT07Body > .mdRGT07Ttl").innerText
// Room members are always friends (right?),
@ -187,6 +210,7 @@ class MautrixController {
sender.id = participantsList.querySelector(`img[alt='${senderName}'`).parentElement.parentElement.getAttribute("data-mid")
}
sender.avatar = this.getParticipantListItemAvatar(element)
receipt_count = null
} else {
// TODO Get own ID and store it somewhere appropriate.
// Unable to get own ID from a room chat...
@ -202,6 +226,8 @@ class MautrixController {
sender.name = this.getParticipantListItemName(participantsList.children[0])
sender.avatar = this.getParticipantListItemAvatar(participantsList.children[0])
sender.id = this.ownID
receipt_count = receipt ? this._getReceiptCount(receipt) : null
}
const messageData = {
@ -209,6 +235,7 @@ class MautrixController {
timestamp: date ? date.getTime() : null,
is_outgoing: is_outgoing,
sender: sender,
receipt_count: receipt_count
}
const messageElement = element.querySelector(".mdRGT07Body > .mdRGT07Msg")
if (messageElement.classList.contains("mdRGT07Text")) {
@ -255,6 +282,18 @@ class MautrixController {
return messageData
}
/**
* Find the number in the "Read #" receipt message.
* Don't look for "Read" specifically, to support multiple languages.
*
* @param {Element} receipt - The element containing the receipt message.
* @private
*/
_getReceiptCount(receipt) {
const match = receipt.innerText.match(/\d+/)
return Number.parseInt(match ? match[0] : 0) || null
}
promiseOwnMessage(timeoutLimitMillis, successSelector, failureSelector=null) {
let observer
@ -355,9 +394,9 @@ class MautrixController {
*/
async parseMessageList(chatId) {
if (!chatId) {
chatId = this.getChatListItemId(document.querySelector("#_chat_list_body > .ExSelected > div"))
chatId = this.getCurrentChatId()
}
const chatType = this.getChatType(chatId);
const chatType = this.getChatType(chatId)
const msgList = document.querySelector("#_chat_room_msg_list")
const messages = []
let refDate = null
@ -614,6 +653,109 @@ class MautrixController {
}
}
/**
* @param {[MutationRecord]} mutations - The mutation records that occurred
* @param {str} chat_id - The ID of the chat being observed.
* @private
*/
_observeReceiptsDirect(mutations, chat_id) {
let receipt_id
for (const change of mutations) {
if ( change.target.classList.contains("mdRGT07Read") &&
!change.target.classList.contains("MdNonDisp")) {
const msgElement = change.target.closest(".mdRGT07Own")
if (msgElement) {
let id = +msgElement.getAttribute("data-local-id")
if (!receipt_id || receipt_id < id) {
receipt_id = id
}
}
}
}
if (receipt_id) {
window.__mautrixReceiveReceiptDirectLatest(chat_id, receipt_id).then(
() => console.debug(`Receipt sent for message ${receipt_id}`),
err => console.error(`Error sending receipt for message ${receipt_id}:`, err))
}
}
/**
* @param {[MutationRecord]} mutations - The mutation records that occurred
* @param {str} chat_id - The ID of the chat being observed.
* @private
*/
_observeReceiptsMulti(mutations, chat_id) {
const receipts = []
for (const change of mutations) {
if ( change.target.classList.contains("mdRGT07Read") &&
!change.target.classList.contains("MdNonDisp")) {
const msgElement = change.target.closest(".mdRGT07Own")
if (msgElement) {
receipts.push({
id: +msgElement.getAttribute("data-local-id"),
count: this._getReceiptCount(msgElement),
})
}
}
}
if (receipts.length > 0) {
window.__mautrixReceiveReceiptMulti(chat_id, receipts).then(
() => console.debug(`Receipt sent for message ${receipt_id}`),
err => console.error(`Error sending receipt for message ${receipt_id}:`, err))
}
}
/**
* Add a mutation observer to the message list.
* Used for observing read receipts.
* TODO Should also use for observing messages of the currently-viewed chat.
*/
addMsgListObserver(forceCreate) {
const chat_room_msg_list = document.querySelector("#_chat_room_msg_list")
if (!chat_room_msg_list) {
console.debug("Could not start msg list observer: no msg list available!")
return
}
if (this.msgListObserver !== null) {
this.removeMsgListObserver()
} else if (!forceCreate) {
console.debug("No pre-existing msg list observer to replace")
return
}
const observeReadReceipts =
this.getChatType(this.getCurrentChatId()) == ChatTypeEnum.DIRECT ?
this._observeReceiptsDirect :
this._observeReceiptsMulti
const chat_id = this.getCurrentChatId()
this.msgListObserver = new MutationObserver(mutations => {
try {
observeReadReceipts(mutations, chat_id)
} catch (err) {
console.error("Error observing msg list mutations:", err)
}
})
this.msgListObserver.observe(
chat_room_msg_list,
{ subtree: true, attributes: true, attributeFilter: ["class"], characterData: true })
console.debug("Started msg list observer")
}
/**
* Disconnect the most recently added mutation observer.
*/
removeMsgListObserver() {
if (this.msgListObserver !== null) {
this.msgListObserver.disconnect()
this.msgListObserver = null
console.debug("Disconnected msg list observer")
}
}
addQRChangeObserver(element) {
if (this.qrChangeObserver !== null) {
this.removeQRChangeObserver()

View File

@ -97,6 +97,10 @@ export default class MessagesPuppeteer {
id => this.sentMessageIDs.add(id))
await this.page.exposeFunction("__mautrixReceiveChanges",
this._receiveChatListChanges.bind(this))
await this.page.exposeFunction("__mautrixReceiveReceiptDirectLatest",
this._receiveReceiptDirectLatest.bind(this))
await this.page.exposeFunction("__mautrixReceiveReceiptMulti",
this._receiveReceiptMulti.bind(this))
await this.page.exposeFunction("__mautrixShowParticipantsList", this._showParticipantList.bind(this))
await this.page.exposeFunction("__chronoParseDate", chrono.parseDate)
@ -441,14 +445,19 @@ export default class MessagesPuppeteer {
}
async startObserving() {
this.log("Adding chat list observer")
this.log("Adding observers")
await this.page.evaluate(
() => window.__mautrixController.addChatListObserver())
await this.page.evaluate(
() => window.__mautrixController.addMsgListObserver(true))
}
async stopObserving() {
this.log("Removing chat list observer")
await this.page.evaluate(() => window.__mautrixController.removeChatListObserver())
this.log("Removing observers")
await this.page.evaluate(
() => window.__mautrixController.removeChatListObserver())
await this.page.evaluate(
() => window.__mautrixController.removeMsgListObserver())
}
_listItemSelector(id) {
@ -485,6 +494,9 @@ export default class MessagesPuppeteer {
detailArea => detailArea.childElementCount == 0,
{},
await this.page.$("#_chat_detail_area"))
await this.page.evaluate(
() => window.__mautrixController.addMsgListObserver(false))
}
}
@ -632,6 +644,25 @@ export default class MessagesPuppeteer {
}
}
_receiveReceiptDirectLatest(chat_id, receipt_id) {
this.log(`Received read receipt ${receipt_id} for chat ${chat_id}`)
this.taskQueue.push(() => this.client.sendReceipt({chat_id: chat_id, id: receipt_id}))
.catch(err => this.error("Error handling read receipt changes:", err))
}
_receiveReceiptMulti(chat_id, receipts) {
this.log(`Received bulk read receipts for chat ${chat_id}:`, receipts)
this.taskQueue.push(() => this._receiveReceiptMulti(chat_id, receipts))
.catch(err => this.error("Error handling read receipt changes:", err))
}
async _receiveReceiptMultiUnsafe(chat_id, receipts) {
for (receipt of receipts) {
receipt.chat_id = chat_id
await this.client.sendReceipt(receipt)
}
}
async _sendEmailCredentials() {
this.log("Inputting login credentials")