Rework message syncing and sending

This commit is contained in:
Andrew Ferrazzutti 2021-04-27 02:59:16 -04:00
parent 7f937d34e2
commit e13f59a8f3
5 changed files with 260 additions and 170 deletions

View File

@ -273,9 +273,12 @@ class Portal(DBPortal, BasePortal):
if evt.is_outgoing and evt.receipt_count:
await self._handle_receipt(event_id, evt.receipt_count)
msg = DBMessage(mxid=event_id, mx_room=self.mxid, mid=evt.id, chat_id=self.chat_id)
await msg.insert()
await self._send_delivery_receipt(event_id)
self.log.debug(f"Handled remote message {evt.id} -> {event_id}")
try:
await msg.insert()
await self._send_delivery_receipt(event_id)
self.log.debug(f"Handled remote message {evt.id} -> {event_id}")
except UniqueViolationError as e:
self.log.debug(f"Failed to handle remote message {evt.id} -> {event_id}: {e}")
async def handle_remote_receipt(self, receipt: Receipt) -> None:
msg = await DBMessage.get_by_mid(receipt.id)

View File

@ -5,5 +5,6 @@
},
"profile_dir": "./profiles",
"url": "chrome-extension://<extension-uuid>/index.html",
"extension_dir": "./extension_files"
"extension_dir": "./extension_files",
"devtools": false
}

View File

@ -81,16 +81,20 @@ const ChatTypeEnum = Object.freeze({
})
class MautrixController {
constructor(ownID) {
constructor() {
this.chatListObserver = null
this.msgListObserver = null
this.receiptObserver = null
this.qrChangeObserver = null
this.qrAppearObserver = null
this.emailAppearObserver = null
this.pinAppearObserver = null
this.expiryObserver = null
this.ownID = null
this.ownMsgPromise = Promise.resolve(-1)
this._promiseOwnMsgReset()
}
setOwnID(ownID) {
@ -253,14 +257,14 @@ class MautrixController {
) {
const img = messageElement.querySelector(".mdRGT07MsgImg > img")
if (img) {
let resolve
let imgResolve
// TODO Should reject on "#_chat_message_image_failure"
let observer = new MutationObserver((changes) => {
let observer = new MutationObserver(changes => {
for (const change of changes) {
if (this._isLoadedImageURL(change.target.src) && observer) {
observer.disconnect()
observer = null
resolve(change.target.src)
imgResolve(change.target.src)
return
}
}
@ -274,8 +278,8 @@ class MautrixController {
messageData.image_url = img.src
observer.disconnect()
} else {
messageData.image_url = await new Promise((realResolve, reject) => {
resolve = realResolve
messageData.image_url = await new Promise(resolve => {
imgResolve = resolve
setTimeout(() => {
if (observer) {
observer.disconnect()
@ -302,97 +306,42 @@ class MautrixController {
}
/**
* Create and store a promise that resolves when a message written
* by the user finishes getting sent.
* Accepts selectors for elements that become visible once the message
* has succeeded or failed to be sent.
*
* @param {int} timeoutLimitMillis - The maximum amount of time to wait for the message to be sent.
* @param {str} successSelector - The selector for the element that indicates the message was sent.
* @param {str} failureSelector - The selector for the element that indicates the message failed to be sent.
*/
promiseOwnMessage(timeoutLimitMillis, successSelector, failureSelector=null) {
let observer
let msgID = -1
let resolve
let reject
this.promiseOwnMsgSuccessSelector = successSelector
this.promiseOwnMsgFailureSelector = failureSelector
const resolveMessage = () => {
observer.disconnect()
observer = null
window.__mautrixReceiveMessageID(msgID)
resolve(msgID)
}
const rejectMessage = failureElement => {
observer.disconnect()
observer = null
reject(failureElement)
}
const changeCallback = (changes) => {
for (const change of changes) {
for (const addedNode of change.addedNodes) {
if (addedNode.classList.contains("mdRGT07Own")) {
const successElement = addedNode.querySelector(successSelector)
if (successElement) {
console.log("Found success element")
console.log(successElement)
msgID = +addedNode.getAttribute("data-local-id")
if (successElement.classList.contains("MdNonDisp")) {
console.log("Invisible success, wait")
observer.disconnect()
observer = new MutationObserver(getVisibleCallback(true))
observer.observe(successElement, { attributes: true, attributeFilter: ["class"] })
} else {
console.log("Already visible success")
resolveMessage()
}
return
} else if (failureSelector) {
const failureElement = addedNode.querySelector(failureSelector)
if (failureElement) {
console.log("Found failure element")
console.log(failureElement)
if (failureElement.classList.contains("MdNonDisp")) {
console.log("Invisible failure, wait")
observer.disconnect()
observer = new MutationObserver(getVisibleCallback(false))
observer.observe(successElement, { attributes: true, attributeFilter: ["class"] })
} else {
console.log("Already visible failure")
rejectMessage(failureElement)
}
return
}
}
}
}
}
}
const getVisibleCallback = isSuccess => {
return changes => {
for (const change of changes) {
if (!change.target.classList.contains("MdNonDisp")) {
console.log(`Waited for visible ${isSuccess ? "success" : "failure"}`)
console.log(change.target)
isSuccess ? resolveMessage() : rejectMessage(change.target)
return
}
}
}
}
observer = new MutationObserver(changeCallback)
observer.observe(
document.querySelector("#_chat_room_msg_list"),
{ childList: true })
return new Promise((realResolve, realReject) => {
resolve = realResolve
reject = realReject
this.ownMsgPromise = new Promise((resolve, reject) => {
this.promiseOwnMsgResolve = resolve
this.promiseOwnMsgReject = reject
setTimeout(() => {
if (observer) {
console.log("Timeout!")
observer.disconnect()
this._promiseOwnMsgReset()
reject()
}
}, timeoutLimitMillis)
})
}
/**
* Wait for a user-sent message to finish getting sent.
*
* @return {Promise<int>} - The ID of the sent message.
*/
async waitForOwnMessage() {
return await this.ownMsgPromise
}
async _tryParseMessages(msgList, chatType) {
const messages = []
let refDate = null
@ -403,6 +352,8 @@ class MautrixController {
// TODO :not(.MdNonDisp) to exclude not-yet-posted messages,
// but that is unlikely to be a problem here.
// Also, offscreen times may have .MdNonDisp on them
// TODO Explicitly look for the most recent date element,
// as it might not have been one of the new items in msgList
const timeElement = child.querySelector("time")
if (timeElement) {
const messageDate = await this._tryParseDate(timeElement.innerText, refDate)
@ -604,14 +555,14 @@ class MautrixController {
}
*/
} else if (change.target.tagName == "LI") {
if (!change.target.classList.contains("ExSelected")) {
if (change.target.classList.contains("ExSelected")) {
console.log("Not using chat list mutation response for currently-active chat")
continue
}
for (const node of change.addedNodes) {
const chat = this.parseChatListItem(node)
if (chat) {
console.debug("Changed chat list item:", chat)
console.log("Changed chat list item:", chat)
changedChatIDs.add(chat.id)
} else {
console.debug("Could not parse node as a chat list item:", node)
@ -633,7 +584,12 @@ class MautrixController {
*/
addChatListObserver() {
this.removeChatListObserver()
this.chatListObserver = new MutationObserver(mutations => {
this.chatListObserver = new MutationObserver(async (mutations) => {
// Wait for pending sent messages to be resolved before responding to mutations
try {
await this.ownMsgPromise
} catch (e) {}
try {
this._observeChatListMutations(mutations)
} catch (err) {
@ -739,15 +695,31 @@ class MautrixController {
const chatID = this.getCurrentChatID()
const chatType = this.getChatType(chatID)
this.msgListObserver = new MutationObserver(async (changes) => {
let orderedPromises = [Promise.resolve()]
this.msgListObserver = new MutationObserver(changes => {
let msgList = []
for (const change of changes) {
const msgList = Array.from(change.addedNodes).filter(
child => child.tagName == "DIV" && child.hasAttribute("data-local-id"))
msgList.sort((a,b) => a.getAttribute("data-local-id") - b.getAttribute("data-local-id"))
window.__mautrixReceiveMessages(chatID, await this._tryParseMessages(msgList, chatType))
change.addedNodes.forEach(child => {
if (child.tagName == "DIV" && child.hasAttribute("data-local-id")) {
msgList.push(child)
}
})
}
if (msgList.length == 0) {
return
}
msgList.sort((a,b) => a.getAttribute("data-local-id") - b.getAttribute("data-local-id"))
if (!this._observeOwnMessage(msgList)) {
let prevPromise = orderedPromises.shift()
orderedPromises.push(new Promise(resolve => prevPromise
.then(() => this._tryParseMessages(msgList, chatType))
.then(msgs => window.__mautrixReceiveMessages(chatID, msgs))
.then(() => resolve())
))
}
})
this.msgListObserver.observe(chat_room_msg_list,
this.msgListObserver.observe(
chat_room_msg_list,
{ childList: true })
console.debug("Started msg list observer")
@ -773,6 +745,106 @@ class MautrixController {
console.debug("Started receipt observer")
}
_observeOwnMessage(msgList) {
if (!this.promiseOwnMsgSuccessSelector && !this.promiseOwnMsgFailureSelector) {
// Not waiting for a pending sent message
return false
}
if (this.visibleSuccessObserver || this.visibleFailureObserver) {
// Already found a element that we're waiting on becoming visible
return true
}
for (const ownMsg of msgList.filter(msg => msg.classList.contains("mdRGT07Own"))) {
const successElement =
this.promiseOwnMsgSuccessSelector &&
ownMsg.querySelector(this.promiseOwnMsgSuccessSelector)
if (successElement) {
if (successElement.classList.contains("MdNonDisp")) {
console.log("Invisible success, wait")
console.log(successElement)
const msgID = +ownMsg.getAttribute("data-local-id")
this.visibleSuccessObserver = new MutationObserver(
this._getOwnVisibleCallback(msgID))
this.visibleSuccessObserver.observe(
successElement,
{ attributes: true, attributeFilter: ["class"] })
} else {
console.debug("Already visible success, must not be it")
console.debug(successElement)
}
}
const failureElement =
this.promiseOwnMsgFailureSelector &&
ownMsg.querySelector(this.promiseOwnMsgFailureSelector)
if (failureElement) {
if (failureElement.classList.contains("MdNonDisp")) {
console.log("Invisible failure, wait")
console.log(failureElement)
this.visibleFailureObserver = new MutationObserver(
this._getOwnVisibleCallback())
this.visibleFailureObserver.observe(
failureElement,
{ attributes: true, attributeFilter: ["class"] })
} else {
console.debug("Already visible failure, must not be it")
console.log(failureElement)
}
}
if (this.visibleSuccessObserver || this.visibleFailureObserver) {
return true
}
}
return false
}
_getOwnVisibleCallback(msgID=null) {
const isSuccess = !!msgID
return changes => {
for (const change of changes) {
if (!change.target.classList.contains("MdNonDisp")) {
console.log(`Waited for visible ${isSuccess ? "success" : "failure"}`)
console.log(change.target)
isSuccess ? this._resolveOwnMessage(msgID) : this._rejectOwnMessage(change.target)
return
}
}
}
}
_resolveOwnMessage(msgID) {
const resolve = this.promiseOwnMsgResolve
this._promiseOwnMsgReset()
window.__mautrixReceiveMessageID(msgID).then(
() => resolve(msgID))
}
_rejectOwnMessage(failureElement) {
const reject = this.promiseOwnMsgReject
this._promiseOwnMsgReset()
reject(failureElement)
}
_promiseOwnMsgReset() {
this.promiseOwnMsgSuccessSelector = null
this.promiseOwnMsgFailureSelector = null
this.promiseOwnMsgResolve = null
this.promiseOwnMsgReject = null
if (this.visibleSuccessObserver) {
this.visibleSuccessObserver.disconnect()
}
this.visibleSuccessObserver = null
if (this.visibleFailureObserver) {
this.visibleFailureObserver.disconnect()
}
this.visibleFailureObserver = null
}
removeMsgListObserver() {
let result = false
if (this.msgListObserver !== null) {

View File

@ -36,7 +36,7 @@ MessagesPuppeteer.noSandbox = args["--no-sandbox"]
console.log("[Main] Reading config from", configPath)
const config = JSON.parse(fs.readFileSync(configPath).toString())
MessagesPuppeteer.profileDir = config.profile_dir || MessagesPuppeteer.profileDir
MessagesPuppeteer.disableDebug = !!config.disable_debug
MessagesPuppeteer.devtools = config.devtools || false
MessagesPuppeteer.url = config.url
MessagesPuppeteer.extensionDir = config.extension_dir || MessagesPuppeteer.extensionDir

View File

@ -25,7 +25,7 @@ import { sleep } from "./util.js"
export default class MessagesPuppeteer {
static profileDir = "./profiles"
static executablePath = undefined
static disableDebug = false
static devtools = false
static noSandbox = false
static viewport = { width: 960, height: 880 }
static url = undefined
@ -65,17 +65,22 @@ export default class MessagesPuppeteer {
async start() {
this.log("Launching browser")
const extensionArgs = [
let extensionArgs = [
`--disable-extensions-except=${MessagesPuppeteer.extensionDir}`,
`--load-extension=${MessagesPuppeteer.extensionDir}`
]
if (MessagesPuppeteer.noSandbox) {
extensionArgs = extensionArgs.concat(`--no-sandbox`)
}
this.browser = await puppeteer.launch({
executablePath: MessagesPuppeteer.executablePath,
userDataDir: this.profilePath,
args: MessagesPuppeteer.noSandbox ? extensionArgs.concat("--no-sandbox") : extensionArgs,
args: extensionArgs,
headless: false, // Needed to load extensions
defaultViewport: MessagesPuppeteer.viewport,
devtools: MessagesPuppeteer.devtools,
timeout: 0,
})
this.log("Opening new tab")
const pages = await this.browser.pages()
@ -337,8 +342,8 @@ export default class MessagesPuppeteer {
* @return {Promise<[ChatListInfo]>} - List of chat IDs in order of most recent message.
*/
async getRecentChats() {
return await this.page.evaluate(
() => window.__mautrixController.parseChatList())
return await this.taskQueue.push(() =>
this.page.evaluate(() => window.__mautrixController.parseChatList()))
}
/**
@ -368,21 +373,6 @@ export default class MessagesPuppeteer {
return { id: await this.taskQueue.push(() => this._sendMessageUnsafe(chatID, text)) }
}
_filterMessages(chatID, messages) {
const minID = this.mostRecentMessages.get(chatID) || 0
const filtered_messages = messages.filter(msg => msg.id > minID && !this.sentMessageIDs.has(msg.id))
let range = 0
if (filtered_messages.length > 0) {
const newFirstID = filtered_messages[0].id
const newLastID = filtered_messages[filtered_messages.length - 1].id
this.mostRecentMessages.set(chatID, newLastID)
range = newFirstID === newLastID ? newFirstID : `${newFirstID}-${newLastID}`
}
this.log(`Loaded ${messages.length} messages in ${chatID}: got ${range} newer than ${minID}`)
return filtered_messages
}
/**
* Get messages in a chat.
*
@ -390,7 +380,7 @@ export default class MessagesPuppeteer {
* @return {Promise<[MessageData]>} - The messages visible in the chat.
*/
async getMessages(chatID) {
return this.taskQueue.push(async () => {
return await this.taskQueue.push(async () => {
const messages = await this._getMessagesUnsafe(chatID)
if (messages.length > 0) {
for (const message of messages) {
@ -420,41 +410,6 @@ export default class MessagesPuppeteer {
return { id: await this.taskQueue.push(() => this._sendFileUnsafe(chatID, filePath)) }
}
async _sendFileUnsafe(chatID, filePath) {
await this._switchChat(chatID)
const promise = this.page.evaluate(
() => window.__mautrixController.promiseOwnMessage(
10000, // Use longer timeout for file uploads
"#_chat_message_success_menu",
"#_chat_message_fail_menu"))
try {
this.log(`About to ask for file chooser in ${chatID}`)
const [fileChooser] = await Promise.all([
this.page.waitForFileChooser(),
this.page.click("#_chat_room_plus_btn")
])
this.log(`About to upload ${filePath}`)
await fileChooser.accept([filePath])
} catch (e) {
this.log(`Failed to upload file to ${chatID}`)
return -1
}
// TODO Commonize with text message sending
try {
this.log("Waiting for file to be sent")
const id = await promise
this.log(`Successfully sent file in message ${id} to ${chatID}`)
return id
} catch (e) {
this.error(`Error sending file to ${chatID}`)
// TODO Figure out why e is undefined...
//this.error(e)
return -1
}
}
async startObserving() {
this.log("Adding observers")
await this.page.evaluate(
@ -521,7 +476,7 @@ export default class MessagesPuppeteer {
}
// TODO Commonize
async getParticipantList() {
async _getParticipantList() {
await this._showParticipantList()
return await this.page.$("#_chat_detail_area > .mdRGT02Info ul.mdRGT13Ul")
}
@ -532,9 +487,9 @@ export default class MessagesPuppeteer {
if (!participantList) {
this.log("Participant list hidden, so clicking chat header to show it")
await this.page.click("#_chat_header_area > .mdRGT04Link")
participantList = await this.page.waitForSelector(selector)
// Use no timeout since the browser itself is using this
await this.page.waitForSelector(selector, {timeout: 0})
}
//return participantList
}
async _getChatInfoUnsafe(chatID) {
@ -561,7 +516,7 @@ export default class MessagesPuppeteer {
this.log("Found multi-user chat, so clicking chat header to get participants")
// TODO This will mark the chat as "read"!
await this._switchChat(chatID)
const participantList = await this.getParticipantList()
const participantList = await this._getParticipantList()
// TODO Is a group not actually created until a message is sent(?)
// If so, maybe don't create a portal until there is a message.
participants = await participantList.evaluate(
@ -591,7 +546,7 @@ export default class MessagesPuppeteer {
async _sendMessageUnsafe(chatID, text) {
await this._switchChat(chatID)
const promise = this.page.evaluate(
await this.page.evaluate(
() => window.__mautrixController.promiseOwnMessage(5000, "time"))
const input = await this.page.$("#_chat_room_input")
@ -599,14 +554,45 @@ export default class MessagesPuppeteer {
await input.type(text)
await input.press("Enter")
return await this._waitForSentMessage(chatID)
}
async _sendFileUnsafe(chatID, filePath) {
await this._switchChat(chatID)
await this.page.evaluate(
() => window.__mautrixController.promiseOwnMessage(
10000, // Use longer timeout for file uploads
"#_chat_message_success_menu",
"#_chat_message_fail_menu"))
try {
this.log(`About to ask for file chooser in ${chatID}`)
const [fileChooser] = await Promise.all([
this.page.waitForFileChooser(),
this.page.click("#_chat_room_plus_btn")
])
this.log(`About to upload ${filePath}`)
await fileChooser.accept([filePath])
} catch (e) {
this.log(`Failed to upload file to ${chatID}`)
return -1
}
return await this._waitForSentMessage(chatID)
}
async _waitForSentMessage(chatID) {
try {
this.log("Waiting for message to be sent")
const id = await promise
const id = await this.page.evaluate(
() => window.__mautrixController.waitForOwnMessage())
this.log(`Successfully sent message ${id} to ${chatID}`)
return id
} catch (e) {
// TODO Catch if something other than a timeout
this.error(`Timed out sending message to ${chatID}`)
// TODO Figure out why e is undefined...
//this.error(e)
return -1
}
}
@ -636,6 +622,20 @@ export default class MessagesPuppeteer {
return this._filterMessages(chatID, messages)
}
_filterMessages(chatID, messages) {
const minID = this.mostRecentMessages.get(chatID) || 0
const filtered_messages = messages.filter(msg => msg.id > minID && !this.sentMessageIDs.has(msg.id))
if (filtered_messages.length > 0) {
const newFirstID = filtered_messages[0].id
const newLastID = filtered_messages[filtered_messages.length - 1].id
this.mostRecentMessages.set(chatID, newLastID)
const range = newFirstID === newLastID ? newFirstID : `${newFirstID}-${newLastID}`
this.log(`Loaded ${messages.length} messages in ${chatID}, got ${filtered_messages.length} newer than ${minID} (${range})`)
}
return filtered_messages
}
async _processChatListChangeUnsafe(chatID) {
this.updatedChats.delete(chatID)
this.log("Processing change to", chatID)
@ -669,16 +669,29 @@ export default class MessagesPuppeteer {
_receiveReceiptDirectLatest(chat_id, receipt_id) {
this.log(`Received read receipt ${receipt_id} for chat ${chat_id}`)
this.taskQueue.push(() => this.client.sendReceipt({chat_id: chat_id, id: receipt_id}))
.catch(err => this.error("Error handling read receipt changes:", err))
if (this.client) {
this.client.sendReceipt({chat_id: chat_id, id: receipt_id})
.catch(err => this.error("Error handling read receipt:", err))
} else {
this.log("No client connected, not sending receipts")
}
}
_receiveReceiptMulti(chat_id, receipts) {
async _receiveReceiptMulti(chat_id, receipts) {
// Use async to ensure that receipts are sent in order
this.log(`Received bulk read receipts for chat ${chat_id}:`, receipts)
for (const receipt of receipts) {
receipt.chat_id = chat_id
this.taskQueue.push(() => this.client.sendReceipt(receipt))
.catch(err => this.error("Error handling read receipt changes:", err))
if (this.client) {
this.client.sendReceipt()
for (const receipt of receipts) {
receipt.chat_id = chat_id
try {
await this.client.sendReceipt(receipt)
} catch(err) {
this.error("Error handling read receipt:", err)
}
}
} else {
this.log("No client connected, not sending receipts")
}
}
@ -730,6 +743,7 @@ export default class MessagesPuppeteer {
async _receiveExpiry(button) {
this.log("Something expired, clicking OK button to continue")
await this.page.click(button)
this.page.click(button).catch(err =>
this.error("Failed to dismiss expiry dialog:", err))
}
}