Live Query basic monitoring (#4168)
* Adds uuid based client identification to prevent overflows * no-super * Adds cloud code monitoring * fixes test * nit
This commit is contained in:
@@ -39,6 +39,7 @@
|
|||||||
"request": "2.81.0",
|
"request": "2.81.0",
|
||||||
"semver": "5.4.0",
|
"semver": "5.4.0",
|
||||||
"tv4": "1.3.0",
|
"tv4": "1.3.0",
|
||||||
|
"uuid": "^3.1.0",
|
||||||
"winston": "2.3.1",
|
"winston": "2.3.1",
|
||||||
"winston-daily-rotate-file": "1.5.0",
|
"winston-daily-rotate-file": "1.5.0",
|
||||||
"ws": "3.2.0"
|
"ws": "3.2.0"
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ describe('ParseLiveQueryServer', function() {
|
|||||||
var httpServer = {};
|
var httpServer = {};
|
||||||
var parseLiveQueryServer = new ParseLiveQueryServer(10, 10, httpServer);
|
var parseLiveQueryServer = new ParseLiveQueryServer(10, 10, httpServer);
|
||||||
|
|
||||||
expect(parseLiveQueryServer.clientId).toBe(0);
|
expect(parseLiveQueryServer.clientId).toBeUndefined();
|
||||||
expect(parseLiveQueryServer.clients.size).toBe(0);
|
expect(parseLiveQueryServer.clients.size).toBe(0);
|
||||||
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
|
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
|
||||||
});
|
});
|
||||||
@@ -94,9 +94,11 @@ describe('ParseLiveQueryServer', function() {
|
|||||||
parseLiveQueryServer._validateKeys = jasmine.createSpy('validateKeys').and.returnValue(true);
|
parseLiveQueryServer._validateKeys = jasmine.createSpy('validateKeys').and.returnValue(true);
|
||||||
parseLiveQueryServer._handleConnect(parseWebSocket);
|
parseLiveQueryServer._handleConnect(parseWebSocket);
|
||||||
|
|
||||||
expect(parseLiveQueryServer.clientId).toBe(1);
|
const clientKeys = parseLiveQueryServer.clients.keys();
|
||||||
expect(parseWebSocket.clientId).toBe(0);
|
expect(parseLiveQueryServer.clients.size).toBe(1);
|
||||||
var client = parseLiveQueryServer.clients.get(0);
|
const firstKey = clientKeys.next().value;
|
||||||
|
expect(parseWebSocket.clientId).toBe(firstKey);
|
||||||
|
var client = parseLiveQueryServer.clients.get(firstKey);
|
||||||
expect(client).not.toBeNull();
|
expect(client).not.toBeNull();
|
||||||
// Make sure we send connect response to the client
|
// Make sure we send connect response to the client
|
||||||
expect(client.pushConnect).toHaveBeenCalled();
|
expect(client.pushConnect).toHaveBeenCalled();
|
||||||
@@ -394,6 +396,28 @@ describe('ParseLiveQueryServer', function() {
|
|||||||
parseWebSocket.emit('disconnect');
|
parseWebSocket.emit('disconnect');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('can forward event to cloud code', function() {
|
||||||
|
const cloudCodeHandler = {
|
||||||
|
handler: () => {}
|
||||||
|
}
|
||||||
|
const spy = spyOn(cloudCodeHandler, 'handler').and.callThrough();
|
||||||
|
Parse.Cloud.onLiveQueryEvent(cloudCodeHandler.handler);
|
||||||
|
var parseLiveQueryServer = new ParseLiveQueryServer(10, 10, {});
|
||||||
|
var EventEmitter = require('events');
|
||||||
|
var parseWebSocket = new EventEmitter();
|
||||||
|
parseWebSocket.clientId = 1;
|
||||||
|
// Register message handlers for the parseWebSocket
|
||||||
|
parseLiveQueryServer._onConnect(parseWebSocket);
|
||||||
|
|
||||||
|
// Make sure we do not crash
|
||||||
|
// Trigger disconnect event
|
||||||
|
parseWebSocket.emit('disconnect');
|
||||||
|
expect(spy).toHaveBeenCalled();
|
||||||
|
// call for ws_connect, another for ws_disconnect
|
||||||
|
expect(spy.calls.count()).toBe(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
// TODO: Test server can set disconnect command message handler for a parseWebSocket
|
// TODO: Test server can set disconnect command message handler for a parseWebSocket
|
||||||
|
|
||||||
it('has no subscription and can handle object delete command', function() {
|
it('has no subscription and can handle object delete command', function() {
|
||||||
|
|||||||
@@ -9,10 +9,11 @@ import { matchesQuery, queryHash } from './QueryTools';
|
|||||||
import { ParsePubSub } from './ParsePubSub';
|
import { ParsePubSub } from './ParsePubSub';
|
||||||
import { SessionTokenCache } from './SessionTokenCache';
|
import { SessionTokenCache } from './SessionTokenCache';
|
||||||
import _ from 'lodash';
|
import _ from 'lodash';
|
||||||
|
import uuid from 'uuid';
|
||||||
|
import { runLiveQueryEventHandlers } from '../triggers';
|
||||||
|
|
||||||
class ParseLiveQueryServer {
|
class ParseLiveQueryServer {
|
||||||
clientId: number;
|
clients: Map;
|
||||||
clients: Object;
|
|
||||||
// className -> (queryHash -> subscription)
|
// className -> (queryHash -> subscription)
|
||||||
subscriptions: Object;
|
subscriptions: Object;
|
||||||
parseWebSocketServer: Object;
|
parseWebSocketServer: Object;
|
||||||
@@ -21,7 +22,6 @@ class ParseLiveQueryServer {
|
|||||||
subscriber: Object;
|
subscriber: Object;
|
||||||
|
|
||||||
constructor(server: any, config: any) {
|
constructor(server: any, config: any) {
|
||||||
this.clientId = 0;
|
|
||||||
this.clients = new Map();
|
this.clients = new Map();
|
||||||
this.subscriptions = new Map();
|
this.subscriptions = new Map();
|
||||||
|
|
||||||
@@ -269,10 +269,16 @@ class ParseLiveQueryServer {
|
|||||||
});
|
});
|
||||||
|
|
||||||
parseWebsocket.on('disconnect', () => {
|
parseWebsocket.on('disconnect', () => {
|
||||||
logger.info('Client disconnect: %d', parseWebsocket.clientId);
|
logger.info(`Client disconnect: ${parseWebsocket.clientId}`);
|
||||||
const clientId = parseWebsocket.clientId;
|
const clientId = parseWebsocket.clientId;
|
||||||
if (!this.clients.has(clientId)) {
|
if (!this.clients.has(clientId)) {
|
||||||
logger.error('Can not find client %d on disconnect', clientId);
|
runLiveQueryEventHandlers({
|
||||||
|
event: 'ws_disconnect_error',
|
||||||
|
clients: this.clients.size,
|
||||||
|
subscriptions: this.subscriptions.size,
|
||||||
|
error: `Unable to find client ${clientId}`
|
||||||
|
});
|
||||||
|
logger.error(`Can not find client ${clientId} on disconnect`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -298,6 +304,17 @@ class ParseLiveQueryServer {
|
|||||||
|
|
||||||
logger.verbose('Current clients %d', this.clients.size);
|
logger.verbose('Current clients %d', this.clients.size);
|
||||||
logger.verbose('Current subscriptions %d', this.subscriptions.size);
|
logger.verbose('Current subscriptions %d', this.subscriptions.size);
|
||||||
|
runLiveQueryEventHandlers({
|
||||||
|
event: 'ws_disconnect',
|
||||||
|
clients: this.clients.size,
|
||||||
|
subscriptions: this.subscriptions.size
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
runLiveQueryEventHandlers({
|
||||||
|
event: 'ws_connect',
|
||||||
|
clients: this.clients.size,
|
||||||
|
subscriptions: this.subscriptions.size
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -404,12 +421,17 @@ class ParseLiveQueryServer {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const hasMasterKey = this._hasMasterKey(request, this.keyPairs);
|
const hasMasterKey = this._hasMasterKey(request, this.keyPairs);
|
||||||
const client = new Client(this.clientId, parseWebsocket, hasMasterKey);
|
const clientId = uuid();
|
||||||
parseWebsocket.clientId = this.clientId;
|
const client = new Client(clientId, parseWebsocket, hasMasterKey);
|
||||||
this.clientId += 1;
|
parseWebsocket.clientId = clientId;
|
||||||
this.clients.set(parseWebsocket.clientId, client);
|
this.clients.set(parseWebsocket.clientId, client);
|
||||||
logger.info('Create new client: %d', parseWebsocket.clientId);
|
logger.info(`Create new client: ${parseWebsocket.clientId}`);
|
||||||
client.pushConnect();
|
client.pushConnect();
|
||||||
|
runLiveQueryEventHandlers({
|
||||||
|
event: 'connect',
|
||||||
|
clients: this.clients.size,
|
||||||
|
subscriptions: this.subscriptions.size
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
_hasMasterKey(request: any, validKeyPairs: any): boolean {
|
_hasMasterKey(request: any, validKeyPairs: any): boolean {
|
||||||
@@ -481,8 +503,13 @@ class ParseLiveQueryServer {
|
|||||||
|
|
||||||
client.pushSubscribe(request.requestId);
|
client.pushSubscribe(request.requestId);
|
||||||
|
|
||||||
logger.verbose('Create client %d new subscription: %d', parseWebsocket.clientId, request.requestId);
|
logger.verbose(`Create client ${parseWebsocket.clientId} new subscription: ${request.requestId}`);
|
||||||
logger.verbose('Current client number: %d', this.clients.size);
|
logger.verbose('Current client number: %d', this.clients.size);
|
||||||
|
runLiveQueryEventHandlers({
|
||||||
|
event: 'subscribe',
|
||||||
|
clients: this.clients.size,
|
||||||
|
subscriptions: this.subscriptions.size
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
_handleUpdateSubscription(parseWebsocket: any, request: any): any {
|
_handleUpdateSubscription(parseWebsocket: any, request: any): any {
|
||||||
@@ -529,6 +556,11 @@ class ParseLiveQueryServer {
|
|||||||
if (classSubscriptions.size === 0) {
|
if (classSubscriptions.size === 0) {
|
||||||
this.subscriptions.delete(className);
|
this.subscriptions.delete(className);
|
||||||
}
|
}
|
||||||
|
runLiveQueryEventHandlers({
|
||||||
|
event: 'unsubscribe',
|
||||||
|
clients: this.clients.size,
|
||||||
|
subscriptions: this.subscriptions.size
|
||||||
|
});
|
||||||
|
|
||||||
if (!notifyClient) {
|
if (!notifyClient) {
|
||||||
return;
|
return;
|
||||||
@@ -536,7 +568,7 @@ class ParseLiveQueryServer {
|
|||||||
|
|
||||||
client.pushUnsubscribe(request.requestId);
|
client.pushUnsubscribe(request.requestId);
|
||||||
|
|
||||||
logger.verbose('Delete client: %d | subscription: %d', parseWebsocket.clientId, request.requestId);
|
logger.verbose(`Delete client: ${parseWebsocket.clientId} | subscription: ${request.requestId}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -55,6 +55,10 @@ ParseCloud.afterFind = function(parseClass, handler) {
|
|||||||
triggers.addTrigger(triggers.Types.afterFind, className, handler, Parse.applicationId);
|
triggers.addTrigger(triggers.Types.afterFind, className, handler, Parse.applicationId);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
ParseCloud.onLiveQueryEvent = function(handler) {
|
||||||
|
triggers.addLiveQueryEventHandler(handler, Parse.applicationId);
|
||||||
|
};
|
||||||
|
|
||||||
ParseCloud._removeAllHooks = () => {
|
ParseCloud._removeAllHooks = () => {
|
||||||
triggers._unregisterAll();
|
triggers._unregisterAll();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ const baseStore = function() {
|
|||||||
const Validators = {};
|
const Validators = {};
|
||||||
const Functions = {};
|
const Functions = {};
|
||||||
const Jobs = {};
|
const Jobs = {};
|
||||||
|
const LiveQuery = [];
|
||||||
const Triggers = Object.keys(Types).reduce(function(base, key){
|
const Triggers = Object.keys(Types).reduce(function(base, key){
|
||||||
base[key] = {};
|
base[key] = {};
|
||||||
return base;
|
return base;
|
||||||
@@ -24,7 +25,8 @@ const baseStore = function() {
|
|||||||
Functions,
|
Functions,
|
||||||
Jobs,
|
Jobs,
|
||||||
Validators,
|
Validators,
|
||||||
Triggers
|
Triggers,
|
||||||
|
LiveQuery,
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -49,6 +51,12 @@ export function addTrigger(type, className, handler, applicationId) {
|
|||||||
_triggerStore[applicationId].Triggers[type][className] = handler;
|
_triggerStore[applicationId].Triggers[type][className] = handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function addLiveQueryEventHandler(handler, applicationId) {
|
||||||
|
applicationId = applicationId || Parse.applicationId;
|
||||||
|
_triggerStore[applicationId] = _triggerStore[applicationId] || baseStore();
|
||||||
|
_triggerStore[applicationId].LiveQuery.push(handler);
|
||||||
|
}
|
||||||
|
|
||||||
export function removeFunction(functionName, applicationId) {
|
export function removeFunction(functionName, applicationId) {
|
||||||
applicationId = applicationId || Parse.applicationId;
|
applicationId = applicationId || Parse.applicationId;
|
||||||
delete _triggerStore[applicationId].Functions[functionName]
|
delete _triggerStore[applicationId].Functions[functionName]
|
||||||
@@ -411,3 +419,8 @@ export function inflate(data, restObject) {
|
|||||||
}
|
}
|
||||||
return Parse.Object.fromJSON(copy);
|
return Parse.Object.fromJSON(copy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function runLiveQueryEventHandlers(data, applicationId = Parse.applicationId) {
|
||||||
|
if (!_triggerStore || !_triggerStore[applicationId] || !_triggerStore[applicationId].LiveQuery) { return; }
|
||||||
|
_triggerStore[applicationId].LiveQuery.forEach((handler) => handler(data));
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user