diff --git a/package.json b/package.json index e2e5daa6..efd522cd 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "request": "2.81.0", "semver": "5.4.0", "tv4": "1.3.0", + "uuid": "^3.1.0", "winston": "2.3.1", "winston-daily-rotate-file": "1.5.0", "ws": "3.2.0" diff --git a/spec/ParseLiveQueryServer.spec.js b/spec/ParseLiveQueryServer.spec.js index 12d3a3f7..f4122aae 100644 --- a/spec/ParseLiveQueryServer.spec.js +++ b/spec/ParseLiveQueryServer.spec.js @@ -81,7 +81,7 @@ describe('ParseLiveQueryServer', function() { var httpServer = {}; var parseLiveQueryServer = new ParseLiveQueryServer(10, 10, httpServer); - expect(parseLiveQueryServer.clientId).toBe(0); + expect(parseLiveQueryServer.clientId).toBeUndefined(); expect(parseLiveQueryServer.clients.size).toBe(0); expect(parseLiveQueryServer.subscriptions.size).toBe(0); }); @@ -94,9 +94,11 @@ describe('ParseLiveQueryServer', function() { parseLiveQueryServer._validateKeys = jasmine.createSpy('validateKeys').and.returnValue(true); parseLiveQueryServer._handleConnect(parseWebSocket); - expect(parseLiveQueryServer.clientId).toBe(1); - expect(parseWebSocket.clientId).toBe(0); - var client = parseLiveQueryServer.clients.get(0); + const clientKeys = parseLiveQueryServer.clients.keys(); + expect(parseLiveQueryServer.clients.size).toBe(1); + const firstKey = clientKeys.next().value; + expect(parseWebSocket.clientId).toBe(firstKey); + var client = parseLiveQueryServer.clients.get(firstKey); expect(client).not.toBeNull(); // Make sure we send connect response to the client expect(client.pushConnect).toHaveBeenCalled(); @@ -394,6 +396,28 @@ describe('ParseLiveQueryServer', function() { parseWebSocket.emit('disconnect'); }); + it('can forward event to cloud code', function() { + const cloudCodeHandler = { + handler: () => {} + } + const spy = spyOn(cloudCodeHandler, 'handler').and.callThrough(); + Parse.Cloud.onLiveQueryEvent(cloudCodeHandler.handler); + var parseLiveQueryServer = new ParseLiveQueryServer(10, 10, {}); + var EventEmitter = require('events'); + var parseWebSocket = new EventEmitter(); + parseWebSocket.clientId = 1; + // Register message handlers for the parseWebSocket + parseLiveQueryServer._onConnect(parseWebSocket); + + // Make sure we do not crash + // Trigger disconnect event + parseWebSocket.emit('disconnect'); + expect(spy).toHaveBeenCalled(); + // call for ws_connect, another for ws_disconnect + expect(spy.calls.count()).toBe(2); + }); + + // TODO: Test server can set disconnect command message handler for a parseWebSocket it('has no subscription and can handle object delete command', function() { diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index 1bc6e079..59874ae6 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -9,10 +9,11 @@ import { matchesQuery, queryHash } from './QueryTools'; import { ParsePubSub } from './ParsePubSub'; import { SessionTokenCache } from './SessionTokenCache'; import _ from 'lodash'; +import uuid from 'uuid'; +import { runLiveQueryEventHandlers } from '../triggers'; class ParseLiveQueryServer { - clientId: number; - clients: Object; + clients: Map; // className -> (queryHash -> subscription) subscriptions: Object; parseWebSocketServer: Object; @@ -21,7 +22,6 @@ class ParseLiveQueryServer { subscriber: Object; constructor(server: any, config: any) { - this.clientId = 0; this.clients = new Map(); this.subscriptions = new Map(); @@ -269,10 +269,16 @@ class ParseLiveQueryServer { }); parseWebsocket.on('disconnect', () => { - logger.info('Client disconnect: %d', parseWebsocket.clientId); + logger.info(`Client disconnect: ${parseWebsocket.clientId}`); const clientId = parseWebsocket.clientId; if (!this.clients.has(clientId)) { - logger.error('Can not find client %d on disconnect', clientId); + runLiveQueryEventHandlers({ + event: 'ws_disconnect_error', + clients: this.clients.size, + subscriptions: this.subscriptions.size, + error: `Unable to find client ${clientId}` + }); + logger.error(`Can not find client ${clientId} on disconnect`); return; } @@ -298,6 +304,17 @@ class ParseLiveQueryServer { logger.verbose('Current clients %d', this.clients.size); logger.verbose('Current subscriptions %d', this.subscriptions.size); + runLiveQueryEventHandlers({ + event: 'ws_disconnect', + clients: this.clients.size, + subscriptions: this.subscriptions.size + }); + }); + + runLiveQueryEventHandlers({ + event: 'ws_connect', + clients: this.clients.size, + subscriptions: this.subscriptions.size }); } @@ -404,12 +421,17 @@ class ParseLiveQueryServer { return; } const hasMasterKey = this._hasMasterKey(request, this.keyPairs); - const client = new Client(this.clientId, parseWebsocket, hasMasterKey); - parseWebsocket.clientId = this.clientId; - this.clientId += 1; + const clientId = uuid(); + const client = new Client(clientId, parseWebsocket, hasMasterKey); + parseWebsocket.clientId = clientId; this.clients.set(parseWebsocket.clientId, client); - logger.info('Create new client: %d', parseWebsocket.clientId); + logger.info(`Create new client: ${parseWebsocket.clientId}`); client.pushConnect(); + runLiveQueryEventHandlers({ + event: 'connect', + clients: this.clients.size, + subscriptions: this.subscriptions.size + }); } _hasMasterKey(request: any, validKeyPairs: any): boolean { @@ -481,8 +503,13 @@ class ParseLiveQueryServer { client.pushSubscribe(request.requestId); - logger.verbose('Create client %d new subscription: %d', parseWebsocket.clientId, request.requestId); + logger.verbose(`Create client ${parseWebsocket.clientId} new subscription: ${request.requestId}`); logger.verbose('Current client number: %d', this.clients.size); + runLiveQueryEventHandlers({ + event: 'subscribe', + clients: this.clients.size, + subscriptions: this.subscriptions.size + }); } _handleUpdateSubscription(parseWebsocket: any, request: any): any { @@ -529,6 +556,11 @@ class ParseLiveQueryServer { if (classSubscriptions.size === 0) { this.subscriptions.delete(className); } + runLiveQueryEventHandlers({ + event: 'unsubscribe', + clients: this.clients.size, + subscriptions: this.subscriptions.size + }); if (!notifyClient) { return; @@ -536,7 +568,7 @@ class ParseLiveQueryServer { client.pushUnsubscribe(request.requestId); - logger.verbose('Delete client: %d | subscription: %d', parseWebsocket.clientId, request.requestId); + logger.verbose(`Delete client: ${parseWebsocket.clientId} | subscription: ${request.requestId}`); } } diff --git a/src/cloud-code/Parse.Cloud.js b/src/cloud-code/Parse.Cloud.js index bfcd742d..8cfb54ef 100644 --- a/src/cloud-code/Parse.Cloud.js +++ b/src/cloud-code/Parse.Cloud.js @@ -55,6 +55,10 @@ ParseCloud.afterFind = function(parseClass, handler) { triggers.addTrigger(triggers.Types.afterFind, className, handler, Parse.applicationId); }; +ParseCloud.onLiveQueryEvent = function(handler) { + triggers.addLiveQueryEventHandler(handler, Parse.applicationId); +}; + ParseCloud._removeAllHooks = () => { triggers._unregisterAll(); } diff --git a/src/triggers.js b/src/triggers.js index b202e613..c5df1e25 100644 --- a/src/triggers.js +++ b/src/triggers.js @@ -15,6 +15,7 @@ const baseStore = function() { const Validators = {}; const Functions = {}; const Jobs = {}; + const LiveQuery = []; const Triggers = Object.keys(Types).reduce(function(base, key){ base[key] = {}; return base; @@ -24,7 +25,8 @@ const baseStore = function() { Functions, Jobs, Validators, - Triggers + Triggers, + LiveQuery, }); }; @@ -49,6 +51,12 @@ export function addTrigger(type, className, handler, applicationId) { _triggerStore[applicationId].Triggers[type][className] = handler; } +export function addLiveQueryEventHandler(handler, applicationId) { + applicationId = applicationId || Parse.applicationId; + _triggerStore[applicationId] = _triggerStore[applicationId] || baseStore(); + _triggerStore[applicationId].LiveQuery.push(handler); +} + export function removeFunction(functionName, applicationId) { applicationId = applicationId || Parse.applicationId; delete _triggerStore[applicationId].Functions[functionName] @@ -411,3 +419,8 @@ export function inflate(data, restObject) { } return Parse.Object.fromJSON(copy); } + +export function runLiveQueryEventHandlers(data, applicationId = Parse.applicationId) { + if (!_triggerStore || !_triggerStore[applicationId] || !_triggerStore[applicationId].LiveQuery) { return; } + _triggerStore[applicationId].LiveQuery.forEach((handler) => handler(data)); +}