import tv4 from 'tv4'; import Parse from 'parse/node'; import { Subscription } from './Subscription'; import { Client } from './Client'; import { ParseWebSocketServer } from './ParseWebSocketServer'; import PLog from './PLog'; import RequestSchema from './RequestSchema'; import { matchesQuery, queryHash } from './QueryTools'; import { ParsePubSub } from './ParsePubSub'; import { SessionTokenCache } from './SessionTokenCache'; class ParseLiveQueryServer { clientId: number; clients: Object; // className -> (queryHash -> subscription) subscriptions: Object; parseWebSocketServer: Object; keyPairs : any; // The subscriber we use to get object update from publisher subscriber: Object; constructor(server: any, config: any) { this.clientId = 0; this.clients = new Map(); this.subscriptions = new Map(); config = config || {}; // Set LogLevel PLog.level = config.logLevel || 'INFO'; // Store keys, convert obj to map let keyPairs = config.keyPairs || {}; this.keyPairs = new Map(); for (let key of Object.keys(keyPairs)) { this.keyPairs.set(key, keyPairs[key]); } PLog.verbose('Support key pairs', this.keyPairs); // Initialize Parse Parse.Object.disableSingleInstance(); Parse.User.enableUnsafeCurrentUser(); let serverURL = config.serverURL || Parse.serverURL; Parse.serverURL = serverURL; let appId = config.appId || Parse.applicationId; let javascriptKey = Parse.javaScriptKey; let masterKey = config.masterKey || Parse.masterKey; Parse.initialize(appId, javascriptKey, masterKey); // Initialize websocket server this.parseWebSocketServer = new ParseWebSocketServer( server, (parseWebsocket) => this._onConnect(parseWebsocket), config.websocketTimeout ); // Initialize subscriber this.subscriber = ParsePubSub.createSubscriber({ redisURL: config.redisURL }); this.subscriber.subscribe('afterSave'); this.subscriber.subscribe('afterDelete'); // Register message handler for subscriber. When publisher get messages, it will publish message // to the subscribers and the handler will be called. this.subscriber.on('message', (channel, messageStr) => { PLog.verbose('Subscribe messsage %j', messageStr); let message = JSON.parse(messageStr); this._inflateParseObject(message); if (channel === 'afterSave') { this._onAfterSave(message); } else if (channel === 'afterDelete') { this._onAfterDelete(message); } else { PLog.error('Get message %s from unknown channel %j', message, channel); } }); // Initialize sessionToken cache this.sessionTokenCache = new SessionTokenCache(config.cacheTimeout); } // Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes. // Message.originalParseObject is the original ParseObject JSON. _inflateParseObject(message: any): void { // Inflate merged object let currentParseObject = message.currentParseObject; let className = currentParseObject.className; let parseObject = new Parse.Object(className); parseObject._finishFetch(currentParseObject); message.currentParseObject = parseObject; // Inflate original object let originalParseObject = message.originalParseObject; if (originalParseObject) { className = originalParseObject.className; parseObject = new Parse.Object(className); parseObject._finishFetch(originalParseObject); message.originalParseObject = parseObject; } } // Message is the JSON object from publisher after inflated. Message.currentParseObject is the ParseObject after changes. // Message.originalParseObject is the original ParseObject. _onAfterDelete(message: any): void { PLog.verbose('afterDelete is triggered'); let deletedParseObject = message.currentParseObject.toJSON(); let className = deletedParseObject.className; PLog.verbose('ClassName: %j | ObjectId: %s', className, deletedParseObject.id); PLog.verbose('Current client number : %d', this.clients.size); let classSubscriptions = this.subscriptions.get(className); if (typeof classSubscriptions === 'undefined') { PLog.error('Can not find subscriptions under this class ' + className); return; } for (let subscription of classSubscriptions.values()) { let isSubscriptionMatched = this._matchesSubscription(deletedParseObject, subscription); if (!isSubscriptionMatched) { continue; } for (let [clientId, requestIds] of subscription.clientRequestIds.entries()) { let client = this.clients.get(clientId); if (typeof client === 'undefined') { continue; } for (let requestId of requestIds) { let acl = message.currentParseObject.getACL(); // Check ACL this._matchesACL(acl, client, requestId).then((isMatched) => { if (!isMatched) { return null; } client.pushDelete(requestId, deletedParseObject); }, (error) => { PLog.error('Matching ACL error : ', error); }); } } } } // Message is the JSON object from publisher after inflated. Message.currentParseObject is the ParseObject after changes. // Message.originalParseObject is the original ParseObject. _onAfterSave(message: any): void { PLog.verbose('afterSave is triggered'); let originalParseObject = null; if (message.originalParseObject) { originalParseObject = message.originalParseObject.toJSON(); } let currentParseObject = message.currentParseObject.toJSON(); let className = currentParseObject.className; PLog.verbose('ClassName: %s | ObjectId: %s', className, currentParseObject.id); PLog.verbose('Current client number : %d', this.clients.size); let classSubscriptions = this.subscriptions.get(className); if (typeof classSubscriptions === 'undefined') { PLog.error('Can not find subscriptions under this class ' + className); return; } for (let subscription of classSubscriptions.values()) { let isOriginalSubscriptionMatched = this._matchesSubscription(originalParseObject, subscription); let isCurrentSubscriptionMatched = this._matchesSubscription(currentParseObject, subscription); for (let [clientId, requestIds] of subscription.clientRequestIds.entries()) { let client = this.clients.get(clientId); if (typeof client === 'undefined') { continue; } for (let requestId of requestIds) { // Set orignal ParseObject ACL checking promise, if the object does not match // subscription, we do not need to check ACL let originalACLCheckingPromise; if (!isOriginalSubscriptionMatched) { originalACLCheckingPromise = Parse.Promise.as(false); } else { let originalACL; if (message.originalParseObject) { originalACL = message.originalParseObject.getACL(); } originalACLCheckingPromise = this._matchesACL(originalACL, client, requestId); } // Set current ParseObject ACL checking promise, if the object does not match // subscription, we do not need to check ACL let currentACLCheckingPromise; if (!isCurrentSubscriptionMatched) { currentACLCheckingPromise = Parse.Promise.as(false); } else { let currentACL = message.currentParseObject.getACL(); currentACLCheckingPromise = this._matchesACL(currentACL, client, requestId); } Parse.Promise.when( originalACLCheckingPromise, currentACLCheckingPromise ).then((isOriginalMatched, isCurrentMatched) => { PLog.verbose('Original %j | Current %j | Match: %s, %s, %s, %s | Query: %s', originalParseObject, currentParseObject, isOriginalSubscriptionMatched, isCurrentSubscriptionMatched, isOriginalMatched, isCurrentMatched, subscription.hash ); // Decide event type let type; if (isOriginalMatched && isCurrentMatched) { type = 'Update'; } else if (isOriginalMatched && !isCurrentMatched) { type = 'Leave'; } else if (!isOriginalMatched && isCurrentMatched) { if (originalParseObject) { type = 'Enter'; } else { type = 'Create'; } } else { return null; } let functionName = 'push' + type; client[functionName](requestId, currentParseObject); }, (error) => { PLog.error('Matching ACL error : ', error); }); } } } } _onConnect(parseWebsocket: any): void { parseWebsocket.on('message', (request) => { if (typeof request === 'string') { request = JSON.parse(request); } PLog.verbose('Request: %j', request); // Check whether this request is a valid request, return error directly if not if (!tv4.validate(request, RequestSchema['general']) || !tv4.validate(request, RequestSchema[request.op])) { Client.pushError(parseWebsocket, 1, tv4.error.message); PLog.error('Connect message error %s', tv4.error.message); return; } switch(request.op) { case 'connect': this._handleConnect(parseWebsocket, request); break; case 'subscribe': this._handleSubscribe(parseWebsocket, request); break; case 'unsubscribe': this._handleUnsubscribe(parseWebsocket, request); break; default: Client.pushError(parseWebsocket, 3, 'Get unknown operation'); PLog.error('Get unknown operation', request.op); } }); parseWebsocket.on('disconnect', () => { PLog.log('Client disconnect: %d', parseWebsocket.clientId); let clientId = parseWebsocket.clientId; if (!this.clients.has(clientId)) { PLog.error('Can not find client %d on disconnect', clientId); return; } // Delete client let client = this.clients.get(clientId); this.clients.delete(clientId); // Delete client from subscriptions for (let [requestId, subscriptionInfo] of client.subscriptionInfos.entries()) { let subscription = subscriptionInfo.subscription; subscription.deleteClientSubscription(clientId, requestId); // If there is no client which is subscribing this subscription, remove it from subscriptions let classSubscriptions = this.subscriptions.get(subscription.className); if (!subscription.hasSubscribingClient()) { classSubscriptions.delete(subscription.hash); } // If there is no subscriptions under this class, remove it from subscriptions if (classSubscriptions.size === 0) { this.subscriptions.delete(subscription.className); } } PLog.verbose('Current clients %d', this.clients.size); PLog.verbose('Current subscriptions %d', this.subscriptions.size); }); } _matchesSubscription(parseObject: any, subscription: any): boolean { // Object is undefined or null, not match if (!parseObject) { return false; } return matchesQuery(parseObject, subscription.query); } _matchesACL(acl: any, client: any, requestId: number): any { // If ACL is undefined or null, or ACL has public read access, return true directly if (!acl || acl.getPublicReadAccess()) { return Parse.Promise.as(true); } // Check subscription sessionToken matches ACL first let subscriptionInfo = client.getSubscriptionInfo(requestId); if (typeof subscriptionInfo === 'undefined') { return Parse.Promise.as(false); } let subscriptionSessionToken = subscriptionInfo.sessionToken; return this.sessionTokenCache.getUserId(subscriptionSessionToken).then((userId) => { return acl.getReadAccess(userId); }).then((isSubscriptionSessionTokenMatched) => { if (isSubscriptionSessionTokenMatched) { return Parse.Promise.as(true); } // Check client sessionToken matches ACL let clientSessionToken = client.sessionToken; return this.sessionTokenCache.getUserId(clientSessionToken).then((userId) => { return acl.getReadAccess(userId); }); }).then((isMatched) => { return Parse.Promise.as(isMatched); }, (error) => { return Parse.Promise.as(false); }); } _handleConnect(parseWebsocket: any, request: any): any { if (!this._validateKeys(request, this.keyPairs)) { Client.pushError(parseWebsocket, 4, 'Key in request is not valid'); PLog.error('Key in request is not valid'); return; } let client = new Client(this.clientId, parseWebsocket); parseWebsocket.clientId = this.clientId; this.clientId += 1; this.clients.set(parseWebsocket.clientId, client); PLog.log('Create new client: %d', parseWebsocket.clientId); client.pushConnect(); } _validateKeys(request: any, validKeyPairs: any): boolean { if (!validKeyPairs || validKeyPairs.size == 0) { return true; } let isValid = false; for (let [key, secret] of validKeyPairs) { if (!request[key] || request[key] !== secret) { continue; } isValid = true; break; } return isValid; } _handleSubscribe(parseWebsocket: any, request: any): any { // If we can not find this client, return error to client if (!parseWebsocket.hasOwnProperty('clientId')) { Client.pushError(parseWebsocket, 2, 'Can not find this client, make sure you connect to server before subscribing'); PLog.error('Can not find this client, make sure you connect to server before subscribing'); return; } let client = this.clients.get(parseWebsocket.clientId); // Get subscription from subscriptions, create one if necessary let subscriptionHash = queryHash(request.query); // Add className to subscriptions if necessary let className = request.query.className; if (!this.subscriptions.has(className)) { this.subscriptions.set(className, new Map()); } let classSubscriptions = this.subscriptions.get(className); let subscription; if (classSubscriptions.has(subscriptionHash)) { subscription = classSubscriptions.get(subscriptionHash); } else { subscription = new Subscription(className, request.query.where, subscriptionHash); classSubscriptions.set(subscriptionHash, subscription); } // Add subscriptionInfo to client let subscriptionInfo = { subscription: subscription }; // Add selected fields and sessionToken for this subscription if necessary if (request.query.fields) { subscriptionInfo.fields = request.query.fields; } if (request.sessionToken) { subscriptionInfo.sessionToken = request.sessionToken; } client.addSubscriptionInfo(request.requestId, subscriptionInfo); // Add clientId to subscription subscription.addClientSubscription(parseWebsocket.clientId, request.requestId); client.pushSubscribe(request.requestId); PLog.verbose('Create client %d new subscription: %d', parseWebsocket.clientId, request.requestId); PLog.verbose('Current client number: %d', this.clients.size); } _handleUnsubscribe(parseWebsocket: any, request: any): any { // If we can not find this client, return error to client if (!parseWebsocket.hasOwnProperty('clientId')) { Client.pushError(parseWebsocket, 2, 'Can not find this client, make sure you connect to server before unsubscribing'); PLog.error('Can not find this client, make sure you connect to server before unsubscribing'); return; } let requestId = request.requestId; let client = this.clients.get(parseWebsocket.clientId); if (typeof client === 'undefined') { Client.pushError(parseWebsocket, 2, 'Cannot find client with clientId ' + parseWebsocket.clientId + '. Make sure you connect to live query server before unsubscribing.'); PLog.error('Can not find this client ' + parseWebsocket.clientId); return; } let subscriptionInfo = client.getSubscriptionInfo(requestId); if (typeof subscriptionInfo === 'undefined') { Client.pushError(parseWebsocket, 2, 'Cannot find subscription with clientId ' + parseWebsocket.clientId + ' subscriptionId ' + requestId + '. Make sure you subscribe to live query server before unsubscribing.'); PLog.error('Can not find subscription with clientId ' + parseWebsocket.clientId + ' subscriptionId ' + requestId); return; } // Remove subscription from client client.deleteSubscriptionInfo(requestId); // Remove client from subscription let subscription = subscriptionInfo.subscription; let className = subscription.className; subscription.deleteClientSubscription(parseWebsocket.clientId, requestId); // If there is no client which is subscribing this subscription, remove it from subscriptions let classSubscriptions = this.subscriptions.get(className); if (!subscription.hasSubscribingClient()) { classSubscriptions.delete(subscription.hash); } // If there is no subscriptions under this class, remove it from subscriptions if (classSubscriptions.size === 0) { this.subscriptions.delete(className); } client.pushUnsubscribe(request.requestId); PLog.verbose('Delete client: %d | subscription: %d', parseWebsocket.clientId, request.requestId); } } ParseLiveQueryServer.setLogLevel = function(logLevel) { PLog.logLevel = logLevel; } export { ParseLiveQueryServer }