Add more postgres support (#2080)

* reload the right data

More passing postgres tests

Handle schema updates, and $in for non array columns

remove authdata from user and implement ensureUniqueness

Make some tests work, detect existing classes

Throw proper error for unique index violation

* fix findOneAndUpdate
This commit is contained in:
Drew
2016-06-16 15:39:05 -07:00
committed by Peter J. Shin
parent 1a75101146
commit 0ff35e18f0
8 changed files with 194 additions and 127 deletions

View File

@@ -2,6 +2,8 @@ const pgp = require('pg-promise')();
const PostgresRelationDoesNotExistError = '42P01';
const PostgresDuplicateRelationError = '42P07';
const PostgresDuplicateColumnError = '42701';
const PostgresUniqueIndexViolationError = '23505';
const parseTypeToPostgresType = type => {
switch (type.type) {
@@ -21,6 +23,62 @@ const parseTypeToPostgresType = type => {
}
};
const buildWhereClause = ({ schema, query, index }) => {
let patterns = [];
let values = [];
for (let fieldName in query) {
let fieldValue = query[fieldName];
if (typeof fieldValue === 'string') {
patterns.push(`$${index}:name = $${index + 1}`);
values.push(fieldName, fieldValue);
index += 2;
} else if (fieldValue.$ne) {
patterns.push(`$${index}:name <> $${index + 1}`);
values.push(fieldName, fieldValue.$ne);
index += 2;
} else if (fieldName === '$or') {
fieldValue.map(subQuery => buildWhereClause({ schema, query: subQuery, index })).forEach(result => {
patterns.push(result.pattern);
values.push(...result.values);
});
} else if (Array.isArray(fieldValue.$in) && schema.fields[fieldName].type === 'Array') {
let inPatterns = [];
let allowNull = false;
values.push(fieldName);
fieldValue.$in.forEach((listElem, listIndex) => {
if (listElem === null ) {
allowNull = true;
} else {
values.push(listElem);
inPatterns.push(`$${index + 1 + listIndex - (allowNull ? 1 : 0)}`);
}
});
if (allowNull) {
patterns.push(`($${index}:name IS NULL OR $${index}:name && ARRAY[${inPatterns.join(',')}])`);
} else {
patterns.push(`$${index}:name && ARRAY[${inPatterns.join(',')}]`);
}
index = index + 1 + inPatterns.length;
} else if (Array.isArray(fieldValue.$in) && schema.fields[fieldName].type === 'String') {
let inPatterns = [];
values.push(fieldName);
fieldValue.$in.forEach((listElem, listIndex) => {
values.push(listElem);
inPatterns.push(`$${index + 1 + listIndex}`);
});
patterns.push(`$${index}:name IN (${inPatterns.join(',')})`);
index = index + 1 + inPatterns.length;
} else if (fieldValue.__type === 'Pointer') {
patterns.push(`$${index}:name = $${index + 1}`);
values.push(fieldName, fieldValue.objectId);
index += 2;
} else {
throw new Parse.Error(Parse.Error.OPERATION_FORBIDDEN, `Postgres doesn't support this query type yet`);
}
}
return { pattern: patterns.join(' AND '), values };
}
export class PostgresStorageAdapter {
// Private
_collectionPrefix: string;
@@ -65,7 +123,15 @@ export class PostgresStorageAdapter {
valuesArray.push(parseTypeToPostgresType(parseType));
patternsArray.push(`$${index * 2 + 2}:name $${index * 2 + 3}:raw`);
});
return this._client.query(`CREATE TABLE $1:name (${patternsArray.join(',')})`, [className, ...valuesArray])
return this._ensureSchemaCollectionExists()
.then(() => this._client.query(`CREATE TABLE $1:name (${patternsArray.join(',')})`, [className, ...valuesArray]))
.catch(error => {
if (error.code === PostgresDuplicateRelationError) {
// Table already exists, must have been created by a different request. Ignore error.
} else {
throw error;
}
})
.then(() => this._client.query('INSERT INTO "_SCHEMA" ("className", "schema", "isParseClass") VALUES ($<className>, $<schema>, true)', { className, schema }))
}
@@ -75,11 +141,14 @@ export class PostgresStorageAdapter {
.catch(error => {
if (error.code === PostgresRelationDoesNotExistError) {
return this.createClass(className, { fields: { [fieldName]: type } })
} else if (error.code === PostgresDuplicateColumnError) {
// Column already exists, created by other request. Carry on to
// See if it's the right type.
} else {
throw error;
}
})
.then(() => this._client.query('SELECT "schema" FROM "_SCHEMA"', { className }))
.then(() => this._client.query('SELECT "schema" FROM "_SCHEMA" WHERE "className" = $<className>', { className }))
.then(result => {
if (fieldName in result[0].schema) {
throw "Attempted to add a field that already exists";
@@ -155,7 +224,7 @@ export class PostgresStorageAdapter {
return this._client.query('SELECT * FROM "_SCHEMA" WHERE "className"=$<className>', { className })
.then(result => {
if (result.length === 1) {
return result[0];
return result[0].schema;
} else {
throw undefined;
}
@@ -166,11 +235,6 @@ export class PostgresStorageAdapter {
createObject(className, schema, object) {
let columnsArray = [];
let valuesArray = [];
console.log('creating');
console.log(schema);
console.log(object);
console.log(className);
console.log(new Error().stack);
Object.keys(object).forEach(fieldName => {
columnsArray.push(fieldName);
switch (schema.fields[fieldName].type) {
@@ -186,9 +250,18 @@ export class PostgresStorageAdapter {
}
});
let columnsPattern = columnsArray.map((col, index) => `$${index + 2}:name`).join(',');
let valuesPattern = valuesArray.map((val, index) => `$${index + 2 + columnsArray.length}`).join(',');
return this._client.query(`INSERT INTO $1:name (${columnsPattern}) VALUES (${valuesPattern})`, [className, ...columnsArray, ...valuesArray])
let valuesPattern = valuesArray.map((val, index) => `$${index + 2 + columnsArray.length}${(['_rperm','_wperm'].includes(columnsArray[index])) ? '::text[]' : ''}`).join(',');
let qs = `INSERT INTO $1:name (${columnsPattern}) VALUES (${valuesPattern})`
let values = [className, ...columnsArray, ...valuesArray]
return this._client.query(qs, values)
.then(() => ({ ops: [object] }))
.catch(error => {
if (error.code === PostgresUniqueIndexViolationError) {
throw new Parse.Error(Parse.Error.DUPLICATE_VALUE, 'A duplicate value for a field with unique values was provided');
} else {
throw error;
}
})
}
// Remove all objects that match the given Parse Query.
@@ -214,8 +287,7 @@ export class PostgresStorageAdapter {
findOneAndUpdate(className, schema, query, update) {
let conditionPatterns = [];
let updatePatterns = [];
let values = []
values.push(className);
let values = [className]
let index = 2;
for (let fieldName in update) {
@@ -233,26 +305,10 @@ export class PostgresStorageAdapter {
}
}
for (let fieldName in query) {
let fieldValue = query[fieldName];
if (typeof fieldValue === 'string') {
conditionPatterns.push(`$${index}:name = $${index + 1}`);
values.push(fieldName, fieldValue);
index += 2;
} else if (Array.isArray(fieldValue.$in)) {
let inPatterns = [];
values.push(fieldName);
fieldValue.$in.forEach((listElem, listIndex) => {
values.push(listElem);
inPatterns.push(`$${index + 1 + listIndex}`);
});
conditionPatterns.push(`$${index}:name && ARRAY[${inPatterns.join(',')}]`);
index = index + 1 + inPatterns.length;
} else {
return Promise.reject(new Parse.Error(Parse.Error.OPERATION_FORBIDDEN, `Postgres doesn't support this type of request yet`));
}
}
let qs = `UPDATE $1:name SET ${updatePatterns.join(',')} WHERE ${conditionPatterns.join(' AND ')} RETURNING *`;
let where = buildWhereClause({ schema, index, query })
values.push(...where.values);
let qs = `UPDATE $1:name SET ${updatePatterns.join(',')} WHERE ${where.pattern} RETURNING *`;
return this._client.query(qs, values)
.then(val => {
return val[0];
@@ -264,42 +320,16 @@ export class PostgresStorageAdapter {
return Promise.reject('Not implented yet.')
}
// Executes a find. Accepts: className, query in Parse format, and { skip, limit, sort }.
find(className, schema, query, { skip, limit, sort }) {
let conditionPatterns = [];
let values = [];
values.push(className);
let index = 2;
let values = [className];
let where = buildWhereClause({ schema, query, index: 2 })
values.push(...where.values);
for (let fieldName in query) {
let fieldValue = query[fieldName];
if (typeof fieldValue === 'string') {
conditionPatterns.push(`$${index}:name = $${index + 1}`);
values.push(fieldName, fieldValue);
index += 2;
} else if (fieldValue.$ne) {
conditionPatterns.push(`$${index}:name <> $${index + 1}`);
values.push(fieldName, fieldValue.$ne)
index += 2;
} else if (Array.isArray(fieldValue.$in)) {
let inPatterns = [];
values.push(fieldName);
fieldValue.$in.forEach((listElem, listIndex) => {
values.push(listElem);
inPatterns.push(`$${index + 1 + listIndex}`);
});
conditionPatterns.push(`$${index}:name IN (${inPatterns.join(',')})`);
index = index + 1 + inPatterns.length;
} else if (fieldValue.__type === 'Pointer') {
conditionPatterns.push(`$${index}:name = $${index + 1}`);
values.push(fieldName, fieldValue.objectId);
index += 2;
} else {
return Promise.reject(new Parse.Error(Parse.Error.OPERATION_FORBIDDEN, "Postgres doesn't support this query type yet"));
}
const qs = `SELECT * FROM $1:name WHERE ${where.pattern} ${limit !== undefined ? `LIMIT $${values.length + 1}` : ''}`;
if (limit !== undefined) {
values.push(limit);
}
return this._client.query(`SELECT * FROM $1:name WHERE ${conditionPatterns.join(' AND ')}`, values)
return this._client.query(qs, values)
.then(results => results.map(object => {
Object.keys(schema.fields).filter(field => schema.fields[field].type === 'Pointer').forEach(fieldName => {
object[fieldName] = { objectId: object[fieldName], __type: 'Pointer', className: schema.fields[fieldName].targetClass };
@@ -331,7 +361,12 @@ export class PostgresStorageAdapter {
// Way of determining if a field is nullable. Undefined doesn't count against uniqueness,
// which is why we use sparse indexes.
ensureUniqueness(className, schema, fieldNames) {
return Promise.resolve('ensureUniqueness not implented yet.')
// Use the same name for every ensureUniqueness attempt, because postgres
// Will happily create the same index with multiple names.
const constraintName = `unique_${fieldNames.sort().join('_')}`;
const constraintPatterns = fieldNames.map((fieldName, index) => `$${index + 3}:name`);
const qs = `ALTER TABLE $1:name ADD CONSTRAINT $2:name UNIQUE (${constraintPatterns.join(',')})`;
return this._client.query(qs,[className, constraintName, ...fieldNames])
}
// Executs a count.