Improve WebSocketServer Error Handling (#6230)
* Improve WebSocketServer Error Handling Closes: https://github.com/parse-community/parse-server/issues/6173 Prevents an unhandled server rejection. Includes an example for LiveQuery test and closing the proper connections. Improve live query monitoring * fix tests
This commit is contained in:
35
spec/ParseLiveQuery.spec.js
Normal file
35
spec/ParseLiveQuery.spec.js
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
describe('ParseLiveQuery', function() {
|
||||||
|
it('can subscribe to query', async done => {
|
||||||
|
await reconfigureServer({
|
||||||
|
liveQuery: {
|
||||||
|
classNames: ['TestObject'],
|
||||||
|
},
|
||||||
|
startLiveQueryServer: true,
|
||||||
|
verbose: false,
|
||||||
|
silent: true,
|
||||||
|
});
|
||||||
|
const object = new TestObject();
|
||||||
|
await object.save();
|
||||||
|
|
||||||
|
const query = new Parse.Query(TestObject);
|
||||||
|
query.equalTo('objectId', object.id);
|
||||||
|
const subscription = await query.subscribe();
|
||||||
|
subscription.on('update', async object => {
|
||||||
|
expect(object.get('foo')).toBe('bar');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
object.set({ foo: 'bar' });
|
||||||
|
await object.save();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function(done) {
|
||||||
|
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
|
||||||
|
client.close();
|
||||||
|
// Wait for live query client to disconnect
|
||||||
|
setTimeout(() => {
|
||||||
|
done();
|
||||||
|
}, 1000);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -39,6 +39,45 @@ describe('ParseWebSocketServer', function() {
|
|||||||
}, 10);
|
}, 10);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('can handle error event', async () => {
|
||||||
|
jasmine.restoreLibrary('ws', 'Server');
|
||||||
|
const WebSocketServer = require('ws').Server;
|
||||||
|
let wssError;
|
||||||
|
class WSSAdapter {
|
||||||
|
constructor(options) {
|
||||||
|
this.options = options;
|
||||||
|
}
|
||||||
|
onListen() {}
|
||||||
|
onConnection() {}
|
||||||
|
onError() {}
|
||||||
|
start() {
|
||||||
|
const wss = new WebSocketServer({ server: this.options.server });
|
||||||
|
wss.on('listening', this.onListen);
|
||||||
|
wss.on('connection', this.onConnection);
|
||||||
|
wss.on('error', error => {
|
||||||
|
wssError = error;
|
||||||
|
this.onError(error);
|
||||||
|
});
|
||||||
|
this.wss = wss;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const server = await reconfigureServer({
|
||||||
|
liveQuery: {
|
||||||
|
classNames: ['TestObject'],
|
||||||
|
},
|
||||||
|
liveQueryServerOptions: {
|
||||||
|
wssAdapter: WSSAdapter,
|
||||||
|
},
|
||||||
|
startLiveQueryServer: true,
|
||||||
|
verbose: false,
|
||||||
|
silent: true,
|
||||||
|
});
|
||||||
|
const wssAdapter = server.liveQueryServer.parseWebSocketServer.server;
|
||||||
|
wssAdapter.wss.emit('error', 'Invalid Packet');
|
||||||
|
expect(wssError).toBe('Invalid Packet');
|
||||||
|
});
|
||||||
|
|
||||||
afterEach(function() {
|
afterEach(function() {
|
||||||
jasmine.restoreLibrary('ws', 'Server');
|
jasmine.restoreLibrary('ws', 'Server');
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -13,10 +13,12 @@ export class WSAdapter extends WSSAdapter {
|
|||||||
|
|
||||||
onListen() {}
|
onListen() {}
|
||||||
onConnection(ws) {}
|
onConnection(ws) {}
|
||||||
|
onError(error) {}
|
||||||
start() {
|
start() {
|
||||||
const wss = new WebSocketServer({ server: this.options.server });
|
const wss = new WebSocketServer({ server: this.options.server });
|
||||||
wss.on('listening', this.onListen);
|
wss.on('listening', this.onListen);
|
||||||
wss.on('connection', this.onConnection);
|
wss.on('connection', this.onConnection);
|
||||||
|
wss.on('error', this.onError);
|
||||||
}
|
}
|
||||||
close() {}
|
close() {}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
// Adapter classes must implement the following functions:
|
// Adapter classes must implement the following functions:
|
||||||
// * onListen()
|
// * onListen()
|
||||||
// * onConnection(ws)
|
// * onConnection(ws)
|
||||||
|
// * onError(error)
|
||||||
// * start()
|
// * start()
|
||||||
// * close()
|
// * close()
|
||||||
//
|
//
|
||||||
@@ -22,6 +23,7 @@ export class WSSAdapter {
|
|||||||
constructor(options) {
|
constructor(options) {
|
||||||
this.onListen = () => {};
|
this.onListen = () => {};
|
||||||
this.onConnection = () => {};
|
this.onConnection = () => {};
|
||||||
|
this.onError = () => {};
|
||||||
}
|
}
|
||||||
|
|
||||||
// /**
|
// /**
|
||||||
@@ -36,6 +38,13 @@ export class WSSAdapter {
|
|||||||
// */
|
// */
|
||||||
// onConnection(ws) {}
|
// onConnection(ws) {}
|
||||||
|
|
||||||
|
// /**
|
||||||
|
// * Emitted when error event is called.
|
||||||
|
// *
|
||||||
|
// * @param {Error} error - WebSocketServer error
|
||||||
|
// */
|
||||||
|
// onError(error) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize Connection.
|
* Initialize Connection.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -392,6 +392,8 @@ class ParseLiveQueryServer {
|
|||||||
event: 'ws_disconnect',
|
event: 'ws_disconnect',
|
||||||
clients: this.clients.size,
|
clients: this.clients.size,
|
||||||
subscriptions: this.subscriptions.size,
|
subscriptions: this.subscriptions.size,
|
||||||
|
useMasterKey: client.hasMasterKey,
|
||||||
|
installationId: client.installationId,
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,9 @@ export class ParseWebSocketServer {
|
|||||||
}
|
}
|
||||||
}, config.websocketTimeout || 10 * 1000);
|
}, config.websocketTimeout || 10 * 1000);
|
||||||
};
|
};
|
||||||
|
wss.onError = error => {
|
||||||
|
logger.error(error);
|
||||||
|
};
|
||||||
wss.start();
|
wss.start();
|
||||||
this.server = wss;
|
this.server = wss;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user