Live query CLP (#4387)

* Auth module refactoring in order to be reusable

* Ensure cache controller is properly forwarded from helpers

* Nits

* Adds support for static validation

* Adds support for CLP in Live query (no support for roles yet)

* Adds e2e test to validate liveQuery hooks is properly called

* Adds tests over LiveQueryController to ensure data is correctly transmitted

* nits

* Fixes for flow types

* Removes usage of Parse.Promise

* Use the Auth module for authentication and caches

* Cleaner implementation of getting auth

* Adds authCache that stores auth promises

* Proper testing of the caching

* nits
This commit is contained in:
Florent Vilmart
2018-10-17 17:53:49 -04:00
committed by GitHub
parent 17bd5c3adb
commit 7c81290252
12 changed files with 829 additions and 237 deletions

View File

@@ -7,10 +7,13 @@ import logger from '../logger';
import RequestSchema from './RequestSchema';
import { matchesQuery, queryHash } from './QueryTools';
import { ParsePubSub } from './ParsePubSub';
import { SessionTokenCache } from './SessionTokenCache';
import SchemaController from '../Controllers/SchemaController';
import _ from 'lodash';
import uuid from 'uuid';
import { runLiveQueryEventHandlers } from '../triggers';
import { getAuthForSessionToken, Auth } from '../Auth';
import { getCacheController } from '../Controllers';
import LRU from 'lru-cache';
class ParseLiveQueryServer {
clients: Map;
@@ -21,12 +24,13 @@ class ParseLiveQueryServer {
// The subscriber we use to get object update from publisher
subscriber: Object;
constructor(server: any, config: any) {
constructor(server: any, config: any = {}) {
this.server = server;
this.clients = new Map();
this.subscriptions = new Map();
config = config || {};
config.appId = config.appId || Parse.applicationId;
config.masterKey = config.masterKey || Parse.masterKey;
// Store keys, convert obj to map
const keyPairs = config.keyPairs || {};
@@ -38,14 +42,20 @@ class ParseLiveQueryServer {
// Initialize Parse
Parse.Object.disableSingleInstance();
const serverURL = config.serverURL || Parse.serverURL;
Parse.serverURL = serverURL;
const appId = config.appId || Parse.applicationId;
const javascriptKey = Parse.javaScriptKey;
const masterKey = config.masterKey || Parse.masterKey;
Parse.initialize(appId, javascriptKey, masterKey);
Parse.initialize(config.appId, Parse.javaScriptKey, config.masterKey);
// The cache controller is a proper cache controller
// with access to User and Roles
this.cacheController = getCacheController(config);
// This auth cache stores the promises for each auth resolution.
// The main benefit is to be able to reuse the same user / session token resolution.
this.authCache = new LRU({
max: 500, // 500 concurrent
maxAge: 60 * 60 * 1000, // 1h
});
// Initialize websocket server
this.parseWebSocketServer = new ParseWebSocketServer(
server,
@@ -81,9 +91,6 @@ class ParseLiveQueryServer {
);
}
});
// Initialize sessionToken cache
this.sessionTokenCache = new SessionTokenCache(config.cacheTimeout);
}
// Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.
@@ -111,6 +118,7 @@ class ParseLiveQueryServer {
logger.verbose(Parse.applicationId + 'afterDelete is triggered');
const deletedParseObject = message.currentParseObject.toJSON();
const classLevelPermissions = message.classLevelPermissions;
const className = deletedParseObject.className;
logger.verbose(
'ClassName: %j | ObjectId: %s',
@@ -141,18 +149,28 @@ class ParseLiveQueryServer {
}
for (const requestId of requestIds) {
const acl = message.currentParseObject.getACL();
// Check ACL
this._matchesACL(acl, client, requestId).then(
isMatched => {
// Check CLP
const op = this._getCLPOperation(subscription.query);
this._matchesCLP(
classLevelPermissions,
message.currentParseObject,
client,
requestId,
op
)
.then(() => {
// Check ACL
return this._matchesACL(acl, client, requestId);
})
.then(isMatched => {
if (!isMatched) {
return null;
}
client.pushDelete(requestId, deletedParseObject);
},
error => {
})
.catch(error => {
logger.error('Matching ACL error : ', error);
}
);
});
}
}
}
@@ -167,6 +185,7 @@ class ParseLiveQueryServer {
if (message.originalParseObject) {
originalParseObject = message.originalParseObject.toJSON();
}
const classLevelPermissions = message.classLevelPermissions;
const currentParseObject = message.currentParseObject.toJSON();
const className = currentParseObject.className;
logger.verbose(
@@ -227,45 +246,55 @@ class ParseLiveQueryServer {
requestId
);
}
const op = this._getCLPOperation(subscription.query);
this._matchesCLP(
classLevelPermissions,
message.currentParseObject,
client,
requestId,
op
)
.then(() => {
return Promise.all([
originalACLCheckingPromise,
currentACLCheckingPromise,
]);
})
.then(
([isOriginalMatched, isCurrentMatched]) => {
logger.verbose(
'Original %j | Current %j | Match: %s, %s, %s, %s | Query: %s',
originalParseObject,
currentParseObject,
isOriginalSubscriptionMatched,
isCurrentSubscriptionMatched,
isOriginalMatched,
isCurrentMatched,
subscription.hash
);
Promise.all([
originalACLCheckingPromise,
currentACLCheckingPromise,
]).then(
([isOriginalMatched, isCurrentMatched]) => {
logger.verbose(
'Original %j | Current %j | Match: %s, %s, %s, %s | Query: %s',
originalParseObject,
currentParseObject,
isOriginalSubscriptionMatched,
isCurrentSubscriptionMatched,
isOriginalMatched,
isCurrentMatched,
subscription.hash
);
// Decide event type
let type;
if (isOriginalMatched && isCurrentMatched) {
type = 'Update';
} else if (isOriginalMatched && !isCurrentMatched) {
type = 'Leave';
} else if (!isOriginalMatched && isCurrentMatched) {
if (originalParseObject) {
type = 'Enter';
// Decide event type
let type;
if (isOriginalMatched && isCurrentMatched) {
type = 'Update';
} else if (isOriginalMatched && !isCurrentMatched) {
type = 'Leave';
} else if (!isOriginalMatched && isCurrentMatched) {
if (originalParseObject) {
type = 'Enter';
} else {
type = 'Create';
}
} else {
type = 'Create';
return null;
}
} else {
return null;
const functionName = 'push' + type;
client[functionName](requestId, currentParseObject);
},
error => {
logger.error('Matching ACL error : ', error);
}
const functionName = 'push' + type;
client[functionName](requestId, currentParseObject);
},
error => {
logger.error('Matching ACL error : ', error);
}
);
);
}
}
}
@@ -374,98 +403,149 @@ class ParseLiveQueryServer {
return matchesQuery(parseObject, subscription.query);
}
_matchesACL(acl: any, client: any, requestId: number): any {
getAuthForSessionToken(
sessionToken: ?string
): Promise<{ auth: ?Auth, userId: ?string }> {
if (!sessionToken) {
return Promise.resolve({});
}
const fromCache = this.authCache.get(sessionToken);
if (fromCache) {
return fromCache;
}
const authPromise = getAuthForSessionToken({
cacheController: this.cacheController,
sessionToken: sessionToken,
})
.then(auth => {
return { auth, userId: auth && auth.user && auth.user.id };
})
.catch(() => {
// If you can't continue, let's just wrap it up and delete it.
// Next time, one will try again
this.authCache.del(sessionToken);
return {};
});
this.authCache.set(sessionToken, authPromise);
return authPromise;
}
async _matchesCLP(
classLevelPermissions: ?any,
object: any,
client: any,
requestId: number,
op: string
): any {
// try to match on user first, less expensive than with roles
const subscriptionInfo = client.getSubscriptionInfo(requestId);
const aclGroup = ['*'];
let userId;
if (typeof subscriptionInfo !== 'undefined') {
const { userId } = await this.getAuthForSessionToken(
subscriptionInfo.sessionToken
);
if (userId) {
aclGroup.push(userId);
}
}
try {
await SchemaController.validatePermission(
classLevelPermissions,
object.className,
aclGroup,
op
);
return true;
} catch (e) {
logger.verbose(`Failed matching CLP for ${object.id} ${userId} ${e}`);
return false;
}
// TODO: handle roles permissions
// Object.keys(classLevelPermissions).forEach((key) => {
// const perm = classLevelPermissions[key];
// Object.keys(perm).forEach((key) => {
// if (key.indexOf('role'))
// });
// })
// // it's rejected here, check the roles
// var rolesQuery = new Parse.Query(Parse.Role);
// rolesQuery.equalTo("users", user);
// return rolesQuery.find({useMasterKey:true});
}
_getCLPOperation(query: any) {
return typeof query === 'object' &&
Object.keys(query).length == 1 &&
typeof query.objectId === 'string'
? 'get'
: 'find';
}
async _matchesACL(
acl: any,
client: any,
requestId: number
): Promise<boolean> {
// Return true directly if ACL isn't present, ACL is public read, or client has master key
if (!acl || acl.getPublicReadAccess() || client.hasMasterKey) {
return Promise.resolve(true);
return true;
}
// Check subscription sessionToken matches ACL first
const subscriptionInfo = client.getSubscriptionInfo(requestId);
if (typeof subscriptionInfo === 'undefined') {
return Promise.resolve(false);
return false;
}
const subscriptionSessionToken = subscriptionInfo.sessionToken;
return this.sessionTokenCache
.getUserId(subscriptionSessionToken)
.then(userId => {
return acl.getReadAccess(userId);
})
.then(isSubscriptionSessionTokenMatched => {
if (isSubscriptionSessionTokenMatched) {
return Promise.resolve(true);
// TODO: get auth there and de-duplicate code below to work with the same Auth obj.
const { auth, userId } = await this.getAuthForSessionToken(
subscriptionInfo.sessionToken
);
const isSubscriptionSessionTokenMatched = acl.getReadAccess(userId);
if (isSubscriptionSessionTokenMatched) {
return true;
}
// Check if the user has any roles that match the ACL
return Promise.resolve()
.then(async () => {
// Resolve false right away if the acl doesn't have any roles
const acl_has_roles = Object.keys(acl.permissionsById).some(key =>
key.startsWith('role:')
);
if (!acl_has_roles) {
return false;
}
// Check if the user has any roles that match the ACL
return new Promise((resolve, reject) => {
// Resolve false right away if the acl doesn't have any roles
const acl_has_roles = Object.keys(acl.permissionsById).some(key =>
key.startsWith('role:')
);
if (!acl_has_roles) {
return resolve(false);
const roleNames = await auth.getUserRoles();
// Finally, see if any of the user's roles allow them read access
for (const role of roleNames) {
// We use getReadAccess as `role` is in the form `role:roleName`
if (acl.getReadAccess(role)) {
return true;
}
this.sessionTokenCache
.getUserId(subscriptionSessionToken)
.then(userId => {
// Pass along a null if there is no user id
if (!userId) {
return Promise.resolve(null);
}
// Prepare a user object to query for roles
// To eliminate a query for the user, create one locally with the id
var user = new Parse.User();
user.id = userId;
return user;
})
.then(user => {
// Pass along an empty array (of roles) if no user
if (!user) {
return Promise.resolve([]);
}
// Then get the user's roles
var rolesQuery = new Parse.Query(Parse.Role);
rolesQuery.equalTo('users', user);
return rolesQuery.find({ useMasterKey: true });
})
.then(roles => {
// Finally, see if any of the user's roles allow them read access
for (const role of roles) {
if (acl.getRoleReadAccess(role)) {
return resolve(true);
}
}
resolve(false);
})
.catch(error => {
reject(error);
});
});
}
return false;
})
.then(isRoleMatched => {
.then(async isRoleMatched => {
if (isRoleMatched) {
return Promise.resolve(true);
}
// Check client sessionToken matches ACL
const clientSessionToken = client.sessionToken;
return this.sessionTokenCache
.getUserId(clientSessionToken)
.then(userId => {
return acl.getReadAccess(userId);
});
})
.then(
isMatched => {
return Promise.resolve(isMatched);
},
() => {
return Promise.resolve(false);
if (clientSessionToken) {
const { userId } = await this.getAuthForSessionToken(
clientSessionToken
);
return acl.getReadAccess(userId);
} else {
return isRoleMatched;
}
);
})
.catch(() => {
return false;
});
}
_handleConnect(parseWebsocket: any, request: any): any {