Add better shutdown handling
This commit is contained in:
parent
938068703b
commit
814c8cbdb3
|
@ -24,17 +24,25 @@ export default class PuppetAPI {
|
||||||
path = "/var/run/mautrix-amp/puppet.sock"
|
path = "/var/run/mautrix-amp/puppet.sock"
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.server = net.createServer(sock =>
|
this.server = net.createServer(this.acceptConnection)
|
||||||
new Client(this, sock, ++this.connIDSequence).start())
|
|
||||||
this.puppets = new Map()
|
this.puppets = new Map()
|
||||||
this.clients = new Map()
|
this.clients = new Map()
|
||||||
this.connIDSequence = 0
|
this.connIDSequence = 0
|
||||||
|
this.stopped = false
|
||||||
}
|
}
|
||||||
|
|
||||||
log(...text) {
|
log(...text) {
|
||||||
console.log("[API]", ...text)
|
console.log("[API]", ...text)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
acceptConnection = sock => {
|
||||||
|
if (this.stopped) {
|
||||||
|
sock.end()
|
||||||
|
} else {
|
||||||
|
new Client(this, sock, ++this.connIDSequence).start()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async start() {
|
async start() {
|
||||||
this.log("Starting server")
|
this.log("Starting server")
|
||||||
|
|
||||||
|
@ -52,15 +60,16 @@ export default class PuppetAPI {
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop() {
|
async stop() {
|
||||||
|
this.stopped = true
|
||||||
|
for (const client of this.clients.values()) {
|
||||||
|
await client.stop("Server is shutting down")
|
||||||
|
}
|
||||||
this.log("Stopping server")
|
this.log("Stopping server")
|
||||||
await promisify(cb => this.server.close(cb))
|
await promisify(cb => this.server.close(cb))
|
||||||
try {
|
try {
|
||||||
await fs.promises.unlink(this.path)
|
await fs.promises.unlink(this.path)
|
||||||
} catch (err) {}
|
} catch (err) {}
|
||||||
this.log("Server stopped")
|
this.log("Server stopped")
|
||||||
for (const client of this.clients.values()) {
|
|
||||||
await client.stop()
|
|
||||||
}
|
|
||||||
for (const puppet of this.puppets.values()) {
|
for (const puppet of this.puppets.values()) {
|
||||||
await puppet.stop()
|
await puppet.stop()
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,14 @@ export default class Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
error(...text) {
|
||||||
|
if (this.userID) {
|
||||||
|
console.error(`[API/${this.userID}/${this.connID}]`, ...text)
|
||||||
|
} else {
|
||||||
|
console.error(`[API/${this.connID}]`, ...text)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
start() {
|
start() {
|
||||||
this.log("Received connection", this.connID)
|
this.log("Received connection", this.connID)
|
||||||
emitLines(this.socket)
|
emitLines(this.socket)
|
||||||
|
@ -53,17 +61,23 @@ export default class Client {
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
if (!this.userID && !this.stopped) {
|
if (!this.userID && !this.stopped) {
|
||||||
this.log("Didn't receive register request within 3 seconds, terminating")
|
this.log("Didn't receive register request within 3 seconds, terminating")
|
||||||
this.stop()
|
this.stop("Register request timeout")
|
||||||
}
|
}
|
||||||
}, 3000)
|
}, 3000)
|
||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
async stop(error = null) {
|
||||||
if (this.stopped) {
|
if (this.stopped) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
this.stopped = true
|
this.stopped = true
|
||||||
return promisify(cb => this.socket.end(cb))
|
try {
|
||||||
|
await this._write({ id: --this.notificationID, command: "quit", error })
|
||||||
|
await promisify(cb => this.socket.end(cb))
|
||||||
|
} catch (err) {
|
||||||
|
this.error("Failed to end connection:", err)
|
||||||
|
this.socket.destroy(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
handleEnd = () => {
|
handleEnd = () => {
|
||||||
|
@ -133,7 +147,7 @@ export default class Client {
|
||||||
const oldClient = this.manager.clients.get(this.userID)
|
const oldClient = this.manager.clients.get(this.userID)
|
||||||
this.manager.clients.set(this.userID, this)
|
this.manager.clients.set(this.userID, this)
|
||||||
this.log("Terminating previous socket", oldClient.connID, "for", this.userID)
|
this.log("Terminating previous socket", oldClient.connID, "for", this.userID)
|
||||||
await oldClient.stop()
|
await oldClient.stop("Socket replaced by new connection")
|
||||||
} else {
|
} else {
|
||||||
this.manager.clients.set(this.userID, this)
|
this.manager.clients.set(this.userID, this)
|
||||||
}
|
}
|
||||||
|
@ -170,11 +184,11 @@ export default class Client {
|
||||||
if (!this.userID) {
|
if (!this.userID) {
|
||||||
if (req.command !== "register") {
|
if (req.command !== "register") {
|
||||||
this.log("First request wasn't a register request, terminating")
|
this.log("First request wasn't a register request, terminating")
|
||||||
await this.stop()
|
await this.stop("Invalid first request")
|
||||||
return
|
return
|
||||||
} else if (!req.user_id) {
|
} else if (!req.user_id) {
|
||||||
this.log("Register request didn't contain user ID, terminating")
|
this.log("Register request didn't contain user ID, terminating")
|
||||||
await this.stop()
|
await this.stop("Invalid register request")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
handler = this.handleRegister
|
handler = this.handleRegister
|
||||||
|
|
Loading…
Reference in New Issue