fix: LiveQuery server is not shut down properly when handleShutdown is called (#8491)
This commit is contained in:
@@ -2,6 +2,7 @@
|
|||||||
const Auth = require('../lib/Auth');
|
const Auth = require('../lib/Auth');
|
||||||
const UserController = require('../lib/Controllers/UserController').UserController;
|
const UserController = require('../lib/Controllers/UserController').UserController;
|
||||||
const Config = require('../lib/Config');
|
const Config = require('../lib/Config');
|
||||||
|
const ParseServer = require('../lib/index').ParseServer;
|
||||||
const triggers = require('../lib/triggers');
|
const triggers = require('../lib/triggers');
|
||||||
const validatorFail = () => {
|
const validatorFail = () => {
|
||||||
throw 'you are not authorized';
|
throw 'you are not authorized';
|
||||||
@@ -1214,6 +1215,40 @@ describe('ParseLiveQuery', function () {
|
|||||||
await object.save();
|
await object.save();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('does shutdown liveQuery server', async () => {
|
||||||
|
await reconfigureServer({ appId: 'test_app_id' });
|
||||||
|
const config = {
|
||||||
|
appId: 'hello_test',
|
||||||
|
masterKey: 'world',
|
||||||
|
port: 1345,
|
||||||
|
mountPath: '/1',
|
||||||
|
serverURL: 'http://localhost:1345/1',
|
||||||
|
liveQuery: {
|
||||||
|
classNames: ['Yolo'],
|
||||||
|
},
|
||||||
|
startLiveQueryServer: true,
|
||||||
|
};
|
||||||
|
if (process.env.PARSE_SERVER_TEST_DB === 'postgres') {
|
||||||
|
config.databaseAdapter = new databaseAdapter.constructor({
|
||||||
|
uri: databaseURI,
|
||||||
|
collectionPrefix: 'test_',
|
||||||
|
});
|
||||||
|
config.filesAdapter = defaultConfiguration.filesAdapter;
|
||||||
|
}
|
||||||
|
const server = await ParseServer.startApp(config);
|
||||||
|
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
|
||||||
|
client.serverURL = 'ws://localhost:1345/1';
|
||||||
|
const query = await new Parse.Query('Yolo').subscribe();
|
||||||
|
await Promise.all([
|
||||||
|
server.handleShutdown(),
|
||||||
|
new Promise(resolve => query.on('close', resolve)),
|
||||||
|
]);
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
|
expect(server.liveQueryServer.server.address()).toBeNull();
|
||||||
|
expect(server.liveQueryServer.subscriber.isOpen).toBeFalse();
|
||||||
|
await new Promise(resolve => server.server.close(resolve));
|
||||||
|
});
|
||||||
|
|
||||||
it('prevent afterSave trigger if not exists', async () => {
|
it('prevent afterSave trigger if not exists', async () => {
|
||||||
await reconfigureServer({
|
await reconfigureServer({
|
||||||
liveQuery: {
|
liveQuery: {
|
||||||
|
|||||||
@@ -212,11 +212,12 @@ export class MongoStorageAdapter implements StorageAdapter {
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|
||||||
handleShutdown() {
|
async handleShutdown() {
|
||||||
if (!this.client) {
|
if (!this.client) {
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
return this.client.close(false);
|
await this.client.close(false);
|
||||||
|
delete this.connectionPromise;
|
||||||
}
|
}
|
||||||
|
|
||||||
_adaptiveCollection(name: string) {
|
_adaptiveCollection(name: string) {
|
||||||
|
|||||||
@@ -1194,7 +1194,9 @@ export class PostgresStorageAdapter implements StorageAdapter {
|
|||||||
const now = new Date().getTime();
|
const now = new Date().getTime();
|
||||||
const helpers = this._pgp.helpers;
|
const helpers = this._pgp.helpers;
|
||||||
debug('deleteAllClasses');
|
debug('deleteAllClasses');
|
||||||
|
if (this._client?.$pool.ended) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
await this._client
|
await this._client
|
||||||
.task('delete-all-classes', async t => {
|
.task('delete-all-classes', async t => {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -93,6 +93,21 @@ class ParseLiveQueryServer {
|
|||||||
}
|
}
|
||||||
this._createSubscribers();
|
this._createSubscribers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async shutdown() {
|
||||||
|
if (this.subscriber.isOpen) {
|
||||||
|
await Promise.all([
|
||||||
|
...[...this.clients.values()].map(client => client.parseWebSocket.ws.close()),
|
||||||
|
this.parseWebSocketServer.close(),
|
||||||
|
...Array.from(this.subscriber.subscriptions.keys()).map(key =>
|
||||||
|
this.subscriber.unsubscribe(key)
|
||||||
|
),
|
||||||
|
this.subscriber.close?.(),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
this.subscriber.isOpen = false;
|
||||||
|
}
|
||||||
|
|
||||||
_createSubscribers() {
|
_createSubscribers() {
|
||||||
const messageRecieved = (channel, messageStr) => {
|
const messageRecieved = (channel, messageStr) => {
|
||||||
logger.verbose('Subscribe message %j', messageStr);
|
logger.verbose('Subscribe message %j', messageStr);
|
||||||
|
|||||||
@@ -168,6 +168,12 @@ class ParseServer {
|
|||||||
if (cacheAdapter && typeof cacheAdapter.handleShutdown === 'function') {
|
if (cacheAdapter && typeof cacheAdapter.handleShutdown === 'function') {
|
||||||
promises.push(cacheAdapter.handleShutdown());
|
promises.push(cacheAdapter.handleShutdown());
|
||||||
}
|
}
|
||||||
|
if (this.liveQueryServer?.server?.close) {
|
||||||
|
promises.push(new Promise(resolve => this.liveQueryServer.server.close(resolve)));
|
||||||
|
}
|
||||||
|
if (this.liveQueryServer) {
|
||||||
|
promises.push(this.liveQueryServer.shutdown());
|
||||||
|
}
|
||||||
return (promises.length > 0 ? Promise.all(promises) : Promise.resolve()).then(() => {
|
return (promises.length > 0 ? Promise.all(promises) : Promise.resolve()).then(() => {
|
||||||
if (this.config.serverCloseComplete) {
|
if (this.config.serverCloseComplete) {
|
||||||
this.config.serverCloseComplete();
|
this.config.serverCloseComplete();
|
||||||
|
|||||||
Reference in New Issue
Block a user