fix: Parse Server doesn't shutdown gracefully (#9634)
This commit is contained in:
@@ -97,14 +97,22 @@ class ParseLiveQueryServer {
|
||||
if (this.subscriber.isOpen) {
|
||||
await Promise.all([
|
||||
...[...this.clients.values()].map(client => client.parseWebSocket.ws.close()),
|
||||
this.parseWebSocketServer.close(),
|
||||
...Array.from(this.subscriber.subscriptions.keys()).map(key =>
|
||||
this.parseWebSocketServer.close?.(),
|
||||
...Array.from(this.subscriber.subscriptions?.keys() || []).map(key =>
|
||||
this.subscriber.unsubscribe(key)
|
||||
),
|
||||
this.subscriber.close?.(),
|
||||
]);
|
||||
}
|
||||
this.subscriber.isOpen = false;
|
||||
if (typeof this.subscriber.quit === 'function') {
|
||||
try {
|
||||
await this.subscriber.quit();
|
||||
} catch (err) {
|
||||
logger.error('PubSubAdapter error on shutdown', { error: err });
|
||||
}
|
||||
} else {
|
||||
this.subscriber.isOpen = false;
|
||||
}
|
||||
}
|
||||
|
||||
_createSubscribers() {
|
||||
|
||||
@@ -45,10 +45,14 @@ import CheckRunner from './Security/CheckRunner';
|
||||
import Deprecator from './Deprecator/Deprecator';
|
||||
import { DefinedSchemas } from './SchemaMigrations/DefinedSchemas';
|
||||
import OptionsDefinitions from './Options/Definitions';
|
||||
import { resolvingPromise, Connections } from './TestUtils';
|
||||
|
||||
// Mutate the Parse object to add the Cloud Code handlers
|
||||
addParseCloud();
|
||||
|
||||
// Track connections to destroy them on shutdown
|
||||
const connections = new Connections();
|
||||
|
||||
// ParseServer works like a constructor of an express app.
|
||||
// https://parseplatform.org/parse-server/api/master/ParseServerOptions.html
|
||||
class ParseServer {
|
||||
@@ -214,8 +218,39 @@ class ParseServer {
|
||||
return this._app;
|
||||
}
|
||||
|
||||
handleShutdown() {
|
||||
/**
|
||||
* Stops the parse server, cancels any ongoing requests and closes all connections.
|
||||
*
|
||||
* Currently, express doesn't shut down immediately after receiving SIGINT/SIGTERM
|
||||
* if it has client connections that haven't timed out.
|
||||
* (This is a known issue with node - https://github.com/nodejs/node/issues/2642)
|
||||
*
|
||||
* @returns {Promise<void>} a promise that resolves when the server is stopped
|
||||
*/
|
||||
async handleShutdown() {
|
||||
const serverClosePromise = resolvingPromise();
|
||||
const liveQueryServerClosePromise = resolvingPromise();
|
||||
const promises = [];
|
||||
this.server.close((error) => {
|
||||
/* istanbul ignore next */
|
||||
if (error) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error('Error while closing parse server', error);
|
||||
}
|
||||
serverClosePromise.resolve();
|
||||
});
|
||||
if (this.liveQueryServer?.server?.close && this.liveQueryServer.server !== this.server) {
|
||||
this.liveQueryServer.server.close((error) => {
|
||||
/* istanbul ignore next */
|
||||
if (error) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error('Error while closing live query server', error);
|
||||
}
|
||||
liveQueryServerClosePromise.resolve();
|
||||
});
|
||||
} else {
|
||||
liveQueryServerClosePromise.resolve();
|
||||
}
|
||||
const { adapter: databaseAdapter } = this.config.databaseController;
|
||||
if (databaseAdapter && typeof databaseAdapter.handleShutdown === 'function') {
|
||||
promises.push(databaseAdapter.handleShutdown());
|
||||
@@ -228,17 +263,15 @@ class ParseServer {
|
||||
if (cacheAdapter && typeof cacheAdapter.handleShutdown === 'function') {
|
||||
promises.push(cacheAdapter.handleShutdown());
|
||||
}
|
||||
if (this.liveQueryServer?.server?.close) {
|
||||
promises.push(new Promise(resolve => this.liveQueryServer.server.close(resolve)));
|
||||
}
|
||||
if (this.liveQueryServer) {
|
||||
promises.push(this.liveQueryServer.shutdown());
|
||||
}
|
||||
return (promises.length > 0 ? Promise.all(promises) : Promise.resolve()).then(() => {
|
||||
if (this.config.serverCloseComplete) {
|
||||
this.config.serverCloseComplete();
|
||||
}
|
||||
});
|
||||
await Promise.all(promises);
|
||||
connections.destroyAll();
|
||||
await Promise.all([serverClosePromise, liveQueryServerClosePromise]);
|
||||
if (this.config.serverCloseComplete) {
|
||||
this.config.serverCloseComplete();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -419,6 +452,7 @@ class ParseServer {
|
||||
});
|
||||
});
|
||||
this.server = server;
|
||||
connections.track(server);
|
||||
|
||||
if (options.startLiveQueryServer || options.liveQueryServerOptions) {
|
||||
this.liveQueryServer = await ParseServer.createLiveQueryServer(
|
||||
@@ -426,6 +460,9 @@ class ParseServer {
|
||||
options.liveQueryServerOptions,
|
||||
options
|
||||
);
|
||||
if (this.liveQueryServer.server !== this.server) {
|
||||
connections.track(this.liveQueryServer.server);
|
||||
}
|
||||
}
|
||||
if (options.trustProxy) {
|
||||
app.set('trust proxy', options.trustProxy);
|
||||
@@ -600,32 +637,8 @@ function injectDefaults(options: ParseServerOptions) {
|
||||
// Those can't be tested as it requires a subprocess
|
||||
/* istanbul ignore next */
|
||||
function configureListeners(parseServer) {
|
||||
const server = parseServer.server;
|
||||
const sockets = {};
|
||||
/* Currently, express doesn't shut down immediately after receiving SIGINT/SIGTERM if it has client connections that haven't timed out. (This is a known issue with node - https://github.com/nodejs/node/issues/2642)
|
||||
This function, along with `destroyAliveConnections()`, intend to fix this behavior such that parse server will close all open connections and initiate the shutdown process as soon as it receives a SIGINT/SIGTERM signal. */
|
||||
server.on('connection', socket => {
|
||||
const socketId = socket.remoteAddress + ':' + socket.remotePort;
|
||||
sockets[socketId] = socket;
|
||||
socket.on('close', () => {
|
||||
delete sockets[socketId];
|
||||
});
|
||||
});
|
||||
|
||||
const destroyAliveConnections = function () {
|
||||
for (const socketId in sockets) {
|
||||
try {
|
||||
sockets[socketId].destroy();
|
||||
} catch (e) {
|
||||
/* */
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const handleShutdown = function () {
|
||||
process.stdout.write('Termination signal received. Shutting down.');
|
||||
destroyAliveConnections();
|
||||
server.close();
|
||||
parseServer.handleShutdown();
|
||||
};
|
||||
process.on('SIGTERM', handleShutdown);
|
||||
|
||||
@@ -42,3 +42,42 @@ export function resolvingPromise() {
|
||||
export function sleep(ms) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
export function getConnectionsCount(server) {
|
||||
return new Promise((resolve, reject) => {
|
||||
server.getConnections((err, count) => {
|
||||
/* istanbul ignore next */
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(count);
|
||||
}
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
export class Connections {
|
||||
constructor() {
|
||||
this.sockets = new Set();
|
||||
}
|
||||
|
||||
track(server) {
|
||||
server.on('connection', socket => {
|
||||
this.sockets.add(socket);
|
||||
socket.on('close', () => {
|
||||
this.sockets.delete(socket);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
destroyAll() {
|
||||
for (const socket of this.sockets.values()) {
|
||||
socket.destroy();
|
||||
}
|
||||
this.sockets.clear();
|
||||
}
|
||||
|
||||
count() {
|
||||
return this.sockets.size;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user