Merge pull request #909 from ParsePlatform/nlutsenko.databaseController

Move DatabaseController and Schema fully to adaptive mongo collection.
This commit is contained in:
Nikita Lutsenko
2016-03-08 01:10:55 -08:00
5 changed files with 121 additions and 119 deletions

View File

@@ -54,7 +54,11 @@ export default class MongoCollection {
return this._mongoCollection.findAndModify(query, [], update, { new: true }).then(document => { return this._mongoCollection.findAndModify(query, [], update, { new: true }).then(document => {
// Value is the object where mongo returns multiple fields. // Value is the object where mongo returns multiple fields.
return document.value; return document.value;
}) });
}
insertOne(object) {
return this._mongoCollection.insertOne(object);
} }
// Atomically updates data in the database for a single (first) object that matched the query // Atomically updates data in the database for a single (first) object that matched the query
@@ -64,6 +68,10 @@ export default class MongoCollection {
return this._mongoCollection.update(query, update, { upsert: true }); return this._mongoCollection.update(query, update, { upsert: true });
} }
updateOne(query, update) {
return this._mongoCollection.updateOne(query, update);
}
updateMany(query, update) { updateMany(query, update) {
return this._mongoCollection.updateMany(query, update); return this._mongoCollection.updateMany(query, update);
} }
@@ -83,8 +91,8 @@ export default class MongoCollection {
return this._mongoCollection.deleteOne(query); return this._mongoCollection.deleteOne(query);
} }
remove(query) { deleteMany(query) {
return this._mongoCollection.remove(query); return this._mongoCollection.deleteMany(query);
} }
drop() { drop() {

View File

@@ -28,16 +28,6 @@ DatabaseController.prototype.connect = function() {
return this.adapter.connect(); return this.adapter.connect();
}; };
// Returns a promise for a Mongo collection.
// Generally just for internal use.
DatabaseController.prototype.collection = function(className) {
if (!Schema.classNameIsValid(className)) {
throw new Parse.Error(Parse.Error.INVALID_CLASS_NAME,
'invalid className: ' + className);
}
return this.adapter.collection(this.collectionPrefix + className);
};
DatabaseController.prototype.adaptiveCollection = function(className) { DatabaseController.prototype.adaptiveCollection = function(className) {
return this.adapter.adaptiveCollection(this.collectionPrefix + className); return this.adapter.adaptiveCollection(this.collectionPrefix + className);
}; };
@@ -54,15 +44,23 @@ function returnsTrue() {
return true; return true;
} }
DatabaseController.prototype.validateClassName = function(className) {
if (!Schema.classNameIsValid(className)) {
const error = new Parse.Error(Parse.Error.INVALID_CLASS_NAME, 'invalid className: ' + className);
return Promise.reject(error);
}
return Promise.resolve();
};
// Returns a promise for a schema object. // Returns a promise for a schema object.
// If we are provided a acceptor, then we run it on the schema. // If we are provided a acceptor, then we run it on the schema.
// If the schema isn't accepted, we reload it at most once. // If the schema isn't accepted, we reload it at most once.
DatabaseController.prototype.loadSchema = function(acceptor = returnsTrue) { DatabaseController.prototype.loadSchema = function(acceptor = returnsTrue) {
if (!this.schemaPromise) { if (!this.schemaPromise) {
this.schemaPromise = this.collection('_SCHEMA').then((coll) => { this.schemaPromise = this.adaptiveCollection('_SCHEMA').then(collection => {
delete this.schemaPromise; delete this.schemaPromise;
return Schema.load(coll); return Schema.load(collection);
}); });
return this.schemaPromise; return this.schemaPromise;
} }
@@ -71,9 +69,9 @@ DatabaseController.prototype.loadSchema = function(acceptor = returnsTrue) {
if (acceptor(schema)) { if (acceptor(schema)) {
return schema; return schema;
} }
this.schemaPromise = this.collection('_SCHEMA').then((coll) => { this.schemaPromise = this.adaptiveCollection('_SCHEMA').then(collection => {
delete this.schemaPromise; delete this.schemaPromise;
return Schema.load(coll); return Schema.load(collection);
}); });
return this.schemaPromise; return this.schemaPromise;
}); });
@@ -230,30 +228,28 @@ DatabaseController.prototype.handleRelationUpdates = function(className,
// Adds a relation. // Adds a relation.
// Returns a promise that resolves successfully iff the add was successful. // Returns a promise that resolves successfully iff the add was successful.
DatabaseController.prototype.addRelation = function(key, fromClassName, DatabaseController.prototype.addRelation = function(key, fromClassName, fromId, toId) {
fromId, toId) { let doc = {
var doc = {
relatedId: toId, relatedId: toId,
owningId: fromId owningId : fromId
}; };
var className = '_Join:' + key + ':' + fromClassName; let className = `_Join:${key}:${fromClassName}`;
return this.collection(className).then((coll) => { return this.adaptiveCollection(className).then((coll) => {
return coll.update(doc, doc, {upsert: true}); return coll.upsertOne(doc, doc);
}); });
}; };
// Removes a relation. // Removes a relation.
// Returns a promise that resolves successfully iff the remove was // Returns a promise that resolves successfully iff the remove was
// successful. // successful.
DatabaseController.prototype.removeRelation = function(key, fromClassName, DatabaseController.prototype.removeRelation = function(key, fromClassName, fromId, toId) {
fromId, toId) {
var doc = { var doc = {
relatedId: toId, relatedId: toId,
owningId: fromId owningId: fromId
}; };
var className = '_Join:' + key + ':' + fromClassName; let className = `_Join:${key}:${fromClassName}`;
return this.collection(className).then((coll) => { return this.adaptiveCollection(className).then(coll => {
return coll.remove(doc); return coll.deleteOne(doc);
}); });
}; };
@@ -269,40 +265,36 @@ DatabaseController.prototype.destroy = function(className, query, options = {})
var aclGroup = options.acl || []; var aclGroup = options.acl || [];
var schema; var schema;
return this.loadSchema().then((s) => { return this.loadSchema()
schema = s; .then(s => {
if (!isMaster) { schema = s;
return schema.validatePermission(className, aclGroup, 'delete'); if (!isMaster) {
} return schema.validatePermission(className, aclGroup, 'delete');
return Promise.resolve();
}).then(() => {
return this.collection(className);
}).then((coll) => {
var mongoWhere = transform.transformWhere(schema, className, query);
if (options.acl) {
var writePerms = [
{_wperm: {'$exists': false}}
];
for (var entry of options.acl) {
writePerms.push({_wperm: {'$in': [entry]}});
} }
mongoWhere = {'$and': [mongoWhere, {'$or': writePerms}]}; return Promise.resolve();
} })
.then(() => this.adaptiveCollection(className))
.then(collection => {
let mongoWhere = transform.transformWhere(schema, className, query);
return coll.remove(mongoWhere); if (options.acl) {
}).then((resp) => { var writePerms = [
//Check _Session to avoid changing password failed without any session. { _wperm: { '$exists': false } }
if (resp.result.n === 0 && className !== "_Session") { ];
return Promise.reject( for (var entry of options.acl) {
new Parse.Error(Parse.Error.OBJECT_NOT_FOUND, writePerms.push({ _wperm: { '$in': [entry] } });
'Object not found.')); }
mongoWhere = { '$and': [mongoWhere, { '$or': writePerms }] };
} }
}, (error) => { return collection.deleteMany(mongoWhere);
throw error; })
}); .then(resp => {
//Check _Session to avoid changing password failed without any session.
// TODO: @nlutsenko Stop relying on `result.n`
if (resp.result.n === 0 && className !== "_Session") {
throw new Parse.Error(Parse.Error.OBJECT_NOT_FOUND, 'Object not found.');
}
});
}; };
// Inserts an object into the database. // Inserts an object into the database.
@@ -312,21 +304,21 @@ DatabaseController.prototype.create = function(className, object, options) {
var isMaster = !('acl' in options); var isMaster = !('acl' in options);
var aclGroup = options.acl || []; var aclGroup = options.acl || [];
return this.loadSchema().then((s) => { return this.validateClassName(className)
schema = s; .then(() => this.loadSchema())
if (!isMaster) { .then(s => {
return schema.validatePermission(className, aclGroup, 'create'); schema = s;
} if (!isMaster) {
return Promise.resolve(); return schema.validatePermission(className, aclGroup, 'create');
}).then(() => { }
return Promise.resolve();
return this.handleRelationUpdates(className, null, object); })
}).then(() => { .then(() => this.handleRelationUpdates(className, null, object))
return this.collection(className); .then(() => this.adaptiveCollection(className))
}).then((coll) => { .then(coll => {
var mongoObject = transform.transformCreate(schema, className, object); var mongoObject = transform.transformCreate(schema, className, object);
return coll.insert([mongoObject]); return coll.insertOne(mongoObject);
}); });
}; };
// Runs a mongo query on the database. // Runs a mongo query on the database.
@@ -386,14 +378,14 @@ DatabaseController.prototype.owningIds = function(className, key, relatedIds) {
// equal-to-pointer constraints on relation fields. // equal-to-pointer constraints on relation fields.
// Returns a promise that resolves when query is mutated // Returns a promise that resolves when query is mutated
DatabaseController.prototype.reduceInRelation = function(className, query, schema) { DatabaseController.prototype.reduceInRelation = function(className, query, schema) {
// Search for an in-relation or equal-to-relation // Search for an in-relation or equal-to-relation
// Make it sequential for now, not sure of paralleization side effects // Make it sequential for now, not sure of paralleization side effects
if (query['$or']) { if (query['$or']) {
let ors = query['$or']; let ors = query['$or'];
return Promise.all(ors.map((aQuery, index) => { return Promise.all(ors.map((aQuery, index) => {
return this.reduceInRelation(className, aQuery, schema).then((aQuery) => { return this.reduceInRelation(className, aQuery, schema).then((aQuery) => {
query['$or'][index] = aQuery; query['$or'][index] = aQuery;
}) })
})); }));
} }
@@ -413,14 +405,14 @@ DatabaseController.prototype.reduceInRelation = function(className, query, schem
relatedIds = [query[key].objectId]; relatedIds = [query[key].objectId];
} }
return this.owningIds(className, key, relatedIds).then((ids) => { return this.owningIds(className, key, relatedIds).then((ids) => {
delete query[key]; delete query[key];
this.addInObjectIdsIds(ids, query); this.addInObjectIdsIds(ids, query);
return Promise.resolve(query); return Promise.resolve(query);
}); });
} }
return Promise.resolve(query); return Promise.resolve(query);
}) })
return Promise.all(promises).then(() => { return Promise.all(promises).then(() => {
return Promise.resolve(query); return Promise.resolve(query);
}) })
@@ -429,13 +421,13 @@ DatabaseController.prototype.reduceInRelation = function(className, query, schem
// Modifies query so that it no longer has $relatedTo // Modifies query so that it no longer has $relatedTo
// Returns a promise that resolves when query is mutated // Returns a promise that resolves when query is mutated
DatabaseController.prototype.reduceRelationKeys = function(className, query) { DatabaseController.prototype.reduceRelationKeys = function(className, query) {
if (query['$or']) { if (query['$or']) {
return Promise.all(query['$or'].map((aQuery) => { return Promise.all(query['$or'].map((aQuery) => {
return this.reduceRelationKeys(className, aQuery); return this.reduceRelationKeys(className, aQuery);
})); }));
} }
var relatedTo = query['$relatedTo']; var relatedTo = query['$relatedTo'];
if (relatedTo) { if (relatedTo) {
return this.relatedIds( return this.relatedIds(

View File

@@ -71,7 +71,7 @@ export class HooksController {
_removeHooks(query) { _removeHooks(query) {
return this.getCollection().then(collection => { return this.getCollection().then(collection => {
return collection.remove(query); return collection.deleteMany(query);
}).then(() => { }).then(() => {
return {}; return {};
}); });

View File

@@ -18,7 +18,7 @@ function getAllSchemas(req) {
return req.config.database.adaptiveCollection('_SCHEMA') return req.config.database.adaptiveCollection('_SCHEMA')
.then(collection => collection.find({})) .then(collection => collection.find({}))
.then(schemas => schemas.map(Schema.mongoSchemaToSchemaAPIResponse)) .then(schemas => schemas.map(Schema.mongoSchemaToSchemaAPIResponse))
.then(schemas => ({ response: { results: schemas }})); .then(schemas => ({ response: { results: schemas } }));
} }
function getOneSchema(req) { function getOneSchema(req) {
@@ -65,7 +65,7 @@ function modifySchema(req) {
throw new Parse.Error(Parse.Error.INVALID_CLASS_NAME, `Class ${req.params.className} does not exist.`); throw new Parse.Error(Parse.Error.INVALID_CLASS_NAME, `Class ${req.params.className} does not exist.`);
} }
let existingFields = Object.assign(schema.data[className], {_id: className}); let existingFields = Object.assign(schema.data[className], { _id: className });
Object.keys(submittedFields).forEach(name => { Object.keys(submittedFields).forEach(name => {
let field = submittedFields[name]; let field = submittedFields[name];
if (existingFields[name] && field.__op !== 'Delete') { if (existingFields[name] && field.__op !== 'Delete') {
@@ -83,24 +83,27 @@ function modifySchema(req) {
} }
// Finally we have checked to make sure the request is valid and we can start deleting fields. // Finally we have checked to make sure the request is valid and we can start deleting fields.
// Do all deletions first, then a single save to _SCHEMA collection to handle all additions. // Do all deletions first, then add fields to avoid duplicate geopoint error.
let deletionPromises = []; let deletePromises = [];
Object.keys(submittedFields).forEach(submittedFieldName => { let insertedFields = [];
if (submittedFields[submittedFieldName].__op === 'Delete') { Object.keys(submittedFields).forEach(fieldName => {
let promise = schema.deleteField(submittedFieldName, className, req.config.database); if (submittedFields[fieldName].__op === 'Delete') {
deletionPromises.push(promise); const promise = schema.deleteField(fieldName, className, req.config.database);
deletePromises.push(promise);
} else {
insertedFields.push(fieldName);
} }
}); });
return Promise.all(deletePromises) // Delete Everything
return Promise.all(deletionPromises) .then(() => schema.reloadData()) // Reload our Schema, so we have all the new values
.then(() => new Promise((resolve, reject) => { .then(() => {
schema.collection.update({_id: className}, mongoObject.result, {w: 1}, (err, docs) => { let promises = insertedFields.map(fieldName => {
if (err) { const mongoType = mongoObject.result[fieldName];
reject(err); return schema.validateField(className, fieldName, mongoType);
} });
resolve({ response: Schema.mongoSchemaToSchemaAPIResponse(mongoObject.result)}); return Promise.all(promises);
}) })
})); .then(() => ({ response: Schema.mongoSchemaToSchemaAPIResponse(mongoObject.result) }));
}); });
} }
@@ -140,7 +143,7 @@ function deleteSchema(req) {
// We've dropped the collection now, so delete the item from _SCHEMA // We've dropped the collection now, so delete the item from _SCHEMA
// and clear the _Join collections // and clear the _Join collections
return req.config.database.adaptiveCollection('_SCHEMA') return req.config.database.adaptiveCollection('_SCHEMA')
.then(coll => coll.findOneAndDelete({_id: req.params.className})) .then(coll => coll.findOneAndDelete({ _id: req.params.className }))
.then(document => { .then(document => {
if (document === null) { if (document === null) {
//tried to delete non-existent class //tried to delete non-existent class

View File

@@ -168,12 +168,12 @@ function schemaAPITypeToMongoFieldType(type) {
// '_metadata' is ignored for now // '_metadata' is ignored for now
// Everything else is expected to be a userspace field. // Everything else is expected to be a userspace field.
class Schema { class Schema {
collection; _collection;
data; data;
perms; perms;
constructor(collection) { constructor(collection) {
this.collection = collection; this._collection = collection;
// this.data[className][fieldName] tells you the type of that field // this.data[className][fieldName] tells you the type of that field
this.data = {}; this.data = {};
@@ -184,8 +184,8 @@ class Schema {
reloadData() { reloadData() {
this.data = {}; this.data = {};
this.perms = {}; this.perms = {};
return this.collection.find({}, {}).toArray().then(mongoSchema => { return this._collection.find({}).then(results => {
for (let obj of mongoSchema) { for (let obj of results) {
let className = null; let className = null;
let classData = {}; let classData = {};
let permsData = null; let permsData = null;
@@ -231,7 +231,7 @@ class Schema {
return Promise.reject(mongoObject); return Promise.reject(mongoObject);
} }
return this.collection.insertOne(mongoObject.result) return this._collection.insertOne(mongoObject.result)
.then(result => result.ops[0]) .then(result => result.ops[0])
.catch(error => { .catch(error => {
if (error.code === 11000) { //Mongo's duplicate key error if (error.code === 11000) { //Mongo's duplicate key error
@@ -268,7 +268,7 @@ class Schema {
'schema is frozen, cannot add: ' + className); 'schema is frozen, cannot add: ' + className);
} }
// We don't have this class. Update the schema // We don't have this class. Update the schema
return this.collection.insert([{_id: className}]).then(() => { return this._collection.insertOne({ _id: className }).then(() => {
// The schema update succeeded. Reload the schema // The schema update succeeded. Reload the schema
return this.reloadData(); return this.reloadData();
}, () => { }, () => {
@@ -280,10 +280,9 @@ class Schema {
}).then(() => { }).then(() => {
// Ensure that the schema now validates // Ensure that the schema now validates
return this.validateClassName(className, true); return this.validateClassName(className, true);
}, (error) => { }, () => {
// The schema still doesn't validate. Give up // The schema still doesn't validate. Give up
throw new Parse.Error(Parse.Error.INVALID_JSON, throw new Parse.Error(Parse.Error.INVALID_JSON, 'schema class name does not revalidate');
'schema class name does not revalidate');
}); });
} }
@@ -296,7 +295,7 @@ class Schema {
} }
}; };
update = {'$set': update}; update = {'$set': update};
return this.collection.findAndModify(query, {}, update, {}).then(() => { return this._collection.updateOne(query, update).then(() => {
// The update succeeded. Reload the schema // The update succeeded. Reload the schema
return this.reloadData(); return this.reloadData();
}); });
@@ -354,12 +353,12 @@ class Schema {
// We don't have this field. Update the schema. // We don't have this field. Update the schema.
// Note that we use the $exists guard and $set to avoid race // Note that we use the $exists guard and $set to avoid race
// conditions in the database. This is important! // conditions in the database. This is important!
var query = {_id: className}; var query = { _id: className };
query[key] = {'$exists': false}; query[key] = { '$exists': false };
var update = {}; var update = {};
update[key] = type; update[key] = type;
update = {'$set': update}; update = {'$set': update};
return this.collection.findAndModify(query, {}, update, {}).then(() => { return this._collection.upsertOne(query, update).then(() => {
// The update succeeded. Reload the schema // The update succeeded. Reload the schema
return this.reloadData(); return this.reloadData();
}, () => { }, () => {
@@ -422,14 +421,14 @@ class Schema {
// for non-relations, remove all the data. // for non-relations, remove all the data.
// This is necessary to ensure that the data is still gone if they add the same field. // This is necessary to ensure that the data is still gone if they add the same field.
return database.collection(className) return database.adaptiveCollection(className)
.then(collection => { .then(collection => {
var mongoFieldName = this.data[className][fieldName].startsWith('*') ? '_p_' + fieldName : fieldName; let mongoFieldName = this.data[className][fieldName].startsWith('*') ? `_p_${fieldName}` : fieldName;
return collection.update({}, { "$unset": { [mongoFieldName] : null } }, { multi: true }); return collection.updateMany({}, { "$unset": { [mongoFieldName]: null } });
}); });
}) })
// Save the _SCHEMA object // Save the _SCHEMA object
.then(() => this.collection.update({ _id: className }, { $unset: {[fieldName]: null }})); .then(() => this._collection.updateOne({ _id: className }, { $unset: { [fieldName]: null } }));
}); });
} }