fix: Postrgres group aggregation (#6522)

* Postrgres group aggregation

* convert tabs to spaces

Co-authored-by: Diamond Lewis <findlewis@gmail.com>
This commit is contained in:
Siddharth Ramesh
2020-04-06 22:50:33 +05:30
committed by GitHub
parent bfdcba8d5c
commit cc5f14e11a
2 changed files with 185 additions and 157 deletions

View File

@@ -96,6 +96,7 @@
"docs": "jsdoc -c ./jsdoc-conf.json", "docs": "jsdoc -c ./jsdoc-conf.json",
"dev": "npm run build && node bin/dev", "dev": "npm run build && node bin/dev",
"lint": "flow && eslint --cache ./", "lint": "flow && eslint --cache ./",
"lint-fix": "eslint --fix --cache ./",
"build": "babel src/ -d lib/ --copy-files", "build": "babel src/ -d lib/ --copy-files",
"watch": "babel --watch src/ -d lib/ --copy-files", "watch": "babel --watch src/ -d lib/ --copy-files",
"pretest": "cross-env MONGODB_VERSION=${MONGODB_VERSION:=4.0.4} MONGODB_TOPOLOGY=${MONGODB_TOPOLOGY:=standalone} MONGODB_STORAGE_ENGINE=${MONGODB_STORAGE_ENGINE:=mmapv1} mongodb-runner start", "pretest": "cross-env MONGODB_VERSION=${MONGODB_VERSION:=4.0.4} MONGODB_TOPOLOGY=${MONGODB_TOPOLOGY:=standalone} MONGODB_STORAGE_ENGINE=${MONGODB_STORAGE_ENGINE:=mmapv1} mongodb-runner start",
@@ -133,7 +134,7 @@
"lint-staged": { "lint-staged": {
"{src,spec}/**/*.js": [ "{src,spec}/**/*.js": [
"prettier --write", "prettier --write",
"eslint --cache", "eslint --fix --cache",
"git add" "git add"
] ]
} }

View File

@@ -15,7 +15,7 @@ const PostgresUniqueIndexViolationError = '23505';
const PostgresTransactionAbortedError = '25P02'; const PostgresTransactionAbortedError = '25P02';
const logger = require('../../../logger'); const logger = require('../../../logger');
const debug = function(...args: any) { const debug = function (...args: any) {
args = ['PG: ' + arguments[0]].concat(args.slice(1, args.length)); args = ['PG: ' + arguments[0]].concat(args.slice(1, args.length));
const log = logger.getLogger(); const log = logger.getLogger();
log.debug.apply(log, args); log.debug.apply(log, args);
@@ -24,7 +24,7 @@ const debug = function(...args: any) {
import { StorageAdapter } from '../StorageAdapter'; import { StorageAdapter } from '../StorageAdapter';
import type { SchemaType, QueryType, QueryOptions } from '../StorageAdapter'; import type { SchemaType, QueryType, QueryOptions } from '../StorageAdapter';
const parseTypeToPostgresType = type => { const parseTypeToPostgresType = (type) => {
switch (type.type) { switch (type.type) {
case 'String': case 'String':
return 'text'; return 'text';
@@ -79,7 +79,7 @@ const mongoAggregateToPostgres = {
$year: 'YEAR', $year: 'YEAR',
}; };
const toPostgresValue = value => { const toPostgresValue = (value) => {
if (typeof value === 'object') { if (typeof value === 'object') {
if (value.__type === 'Date') { if (value.__type === 'Date') {
return value.iso; return value.iso;
@@ -91,7 +91,7 @@ const toPostgresValue = value => {
return value; return value;
}; };
const transformValue = value => { const transformValue = (value) => {
if (typeof value === 'object' && value.__type === 'Pointer') { if (typeof value === 'object' && value.__type === 'Pointer') {
return value.objectId; return value.objectId;
} }
@@ -121,7 +121,7 @@ const defaultCLPS = Object.freeze({
protectedFields: { '*': [] }, protectedFields: { '*': [] },
}); });
const toParseSchema = schema => { const toParseSchema = (schema) => {
if (schema.className === '_User') { if (schema.className === '_User') {
delete schema.fields._hashed_password; delete schema.fields._hashed_password;
} }
@@ -145,7 +145,7 @@ const toParseSchema = schema => {
}; };
}; };
const toPostgresSchema = schema => { const toPostgresSchema = (schema) => {
if (!schema) { if (!schema) {
return schema; return schema;
} }
@@ -159,8 +159,8 @@ const toPostgresSchema = schema => {
return schema; return schema;
}; };
const handleDotFields = object => { const handleDotFields = (object) => {
Object.keys(object).forEach(fieldName => { Object.keys(object).forEach((fieldName) => {
if (fieldName.indexOf('.') > -1) { if (fieldName.indexOf('.') > -1) {
const components = fieldName.split('.'); const components = fieldName.split('.');
const first = components.shift(); const first = components.shift();
@@ -186,7 +186,7 @@ const handleDotFields = object => {
return object; return object;
}; };
const transformDotFieldToComponents = fieldName => { const transformDotFieldToComponents = (fieldName) => {
return fieldName.split('.').map((cmpt, index) => { return fieldName.split('.').map((cmpt, index) => {
if (index === 0) { if (index === 0) {
return `"${cmpt}"`; return `"${cmpt}"`;
@@ -195,7 +195,7 @@ const transformDotFieldToComponents = fieldName => {
}); });
}; };
const transformDotField = fieldName => { const transformDotField = (fieldName) => {
if (fieldName.indexOf('.') === -1) { if (fieldName.indexOf('.') === -1) {
return `"${fieldName}"`; return `"${fieldName}"`;
} }
@@ -205,7 +205,7 @@ const transformDotField = fieldName => {
return name; return name;
}; };
const transformAggregateField = fieldName => { const transformAggregateField = (fieldName) => {
if (typeof fieldName !== 'string') { if (typeof fieldName !== 'string') {
return fieldName; return fieldName;
} }
@@ -218,7 +218,7 @@ const transformAggregateField = fieldName => {
return fieldName.substr(1); return fieldName.substr(1);
}; };
const validateKeys = object => { const validateKeys = (object) => {
if (typeof object == 'object') { if (typeof object == 'object') {
for (const key in object) { for (const key in object) {
if (typeof object[key] == 'object') { if (typeof object[key] == 'object') {
@@ -236,10 +236,10 @@ const validateKeys = object => {
}; };
// Returns the list of join tables on a schema // Returns the list of join tables on a schema
const joinTablesForSchema = schema => { const joinTablesForSchema = (schema) => {
const list = []; const list = [];
if (schema) { if (schema) {
Object.keys(schema.fields).forEach(field => { Object.keys(schema.fields).forEach((field) => {
if (schema.fields[field].type === 'Relation') { if (schema.fields[field].type === 'Relation') {
list.push(`_Join:${field}:${schema.className}`); list.push(`_Join:${field}:${schema.className}`);
} }
@@ -343,7 +343,7 @@ const buildWhereClause = ({
} else if (['$or', '$nor', '$and'].includes(fieldName)) { } else if (['$or', '$nor', '$and'].includes(fieldName)) {
const clauses = []; const clauses = [];
const clauseValues = []; const clauseValues = [];
fieldValue.forEach(subQuery => { fieldValue.forEach((subQuery) => {
const clause = buildWhereClause({ const clause = buildWhereClause({
schema, schema,
query: subQuery, query: subQuery,
@@ -378,8 +378,9 @@ const buildWhereClause = ({
// if not null, we need to manually exclude null // if not null, we need to manually exclude null
if (fieldValue.$ne.__type === 'GeoPoint') { if (fieldValue.$ne.__type === 'GeoPoint') {
patterns.push( patterns.push(
`($${index}:name <> POINT($${index + 1}, $${index + `($${index}:name <> POINT($${index + 1}, $${
2}) OR $${index}:name IS NULL)` index + 2
}) OR $${index}:name IS NULL)`
); );
} else { } else {
if (fieldName.indexOf('.') >= 0) { if (fieldName.indexOf('.') >= 0) {
@@ -489,13 +490,13 @@ const buildWhereClause = ({
}; };
if (fieldValue.$in) { if (fieldValue.$in) {
createConstraint( createConstraint(
_.flatMap(fieldValue.$in, elt => elt), _.flatMap(fieldValue.$in, (elt) => elt),
false false
); );
} }
if (fieldValue.$nin) { if (fieldValue.$nin) {
createConstraint( createConstraint(
_.flatMap(fieldValue.$nin, elt => elt), _.flatMap(fieldValue.$nin, (elt) => elt),
true true
); );
} }
@@ -609,8 +610,9 @@ const buildWhereClause = ({
); );
} }
patterns.push( patterns.push(
`to_tsvector($${index}, $${index + 1}:name) @@ to_tsquery($${index + `to_tsvector($${index}, $${index + 1}:name) @@ to_tsquery($${
2}, $${index + 3})` index + 2
}, $${index + 3})`
); );
values.push(language, fieldName, language, search.$term); values.push(language, fieldName, language, search.$term);
index += 4; index += 4;
@@ -621,12 +623,14 @@ const buildWhereClause = ({
const distance = fieldValue.$maxDistance; const distance = fieldValue.$maxDistance;
const distanceInKM = distance * 6371 * 1000; const distanceInKM = distance * 6371 * 1000;
patterns.push( patterns.push(
`ST_DistanceSphere($${index}:name::geometry, POINT($${index + `ST_DistanceSphere($${index}:name::geometry, POINT($${index + 1}, $${
1}, $${index + 2})::geometry) <= $${index + 3}` index + 2
})::geometry) <= $${index + 3}`
); );
sorts.push( sorts.push(
`ST_DistanceSphere($${index}:name::geometry, POINT($${index + `ST_DistanceSphere($${index}:name::geometry, POINT($${index + 1}, $${
1}, $${index + 2})::geometry) ASC` index + 2
})::geometry) ASC`
); );
values.push(fieldName, point.longitude, point.latitude, distanceInKM); values.push(fieldName, point.longitude, point.latitude, distanceInKM);
index += 4; index += 4;
@@ -673,8 +677,9 @@ const buildWhereClause = ({
} }
const distanceInKM = distance * 6371 * 1000; const distanceInKM = distance * 6371 * 1000;
patterns.push( patterns.push(
`ST_DistanceSphere($${index}:name::geometry, POINT($${index + `ST_DistanceSphere($${index}:name::geometry, POINT($${index + 1}, $${
1}, $${index + 2})::geometry) <= $${index + 3}` index + 2
})::geometry) <= $${index + 3}`
); );
values.push(fieldName, point.longitude, point.latitude, distanceInKM); values.push(fieldName, point.longitude, point.latitude, distanceInKM);
index += 4; index += 4;
@@ -706,7 +711,7 @@ const buildWhereClause = ({
); );
} }
points = points points = points
.map(point => { .map((point) => {
if (point instanceof Array && point.length === 2) { if (point instanceof Array && point.length === 2) {
Parse.GeoPoint._validate(point[1], point[0]); Parse.GeoPoint._validate(point[1], point[0]);
return `(${point[0]}, ${point[1]})`; return `(${point[0]}, ${point[1]})`;
@@ -794,7 +799,7 @@ const buildWhereClause = ({
index += 2; index += 2;
} }
Object.keys(ParseToPosgresComparator).forEach(cmp => { Object.keys(ParseToPosgresComparator).forEach((cmp) => {
if (fieldValue[cmp] || fieldValue[cmp] === 0) { if (fieldValue[cmp] || fieldValue[cmp] === 0) {
const pgComparator = ParseToPosgresComparator[cmp]; const pgComparator = ParseToPosgresComparator[cmp];
const postgresValue = toPostgresValue(fieldValue[cmp]); const postgresValue = toPostgresValue(fieldValue[cmp]);
@@ -853,10 +858,10 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
//Note that analyze=true will run the query, executing INSERTS, DELETES, etc. //Note that analyze=true will run the query, executing INSERTS, DELETES, etc.
createExplainableQuery (query: string, analyze:boolean = false) { createExplainableQuery(query: string, analyze: boolean = false) {
if (analyze){ if (analyze) {
return 'EXPLAIN (ANALYZE, FORMAT JSON) ' + query; return 'EXPLAIN (ANALYZE, FORMAT JSON) ' + query;
}else{ } else {
return 'EXPLAIN (FORMAT JSON) ' + query; return 'EXPLAIN (FORMAT JSON) ' + query;
} }
} }
@@ -874,7 +879,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
.none( .none(
'CREATE TABLE IF NOT EXISTS "_SCHEMA" ( "className" varChar(120), "schema" jsonb, "isParseClass" bool, PRIMARY KEY ("className") )' 'CREATE TABLE IF NOT EXISTS "_SCHEMA" ( "className" varChar(120), "schema" jsonb, "isParseClass" bool, PRIMARY KEY ("className") )'
) )
.catch(error => { .catch((error) => {
if ( if (
error.code === PostgresDuplicateRelationError || error.code === PostgresDuplicateRelationError ||
error.code === PostgresUniqueIndexViolationError || error.code === PostgresUniqueIndexViolationError ||
@@ -891,13 +896,13 @@ export class PostgresStorageAdapter implements StorageAdapter {
return this._client.one( return this._client.one(
'SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1)', 'SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1)',
[name], [name],
a => a.exists (a) => a.exists
); );
} }
async setClassLevelPermissions(className: string, CLPs: any) { async setClassLevelPermissions(className: string, CLPs: any) {
const self = this; const self = this;
await this._client.task('set-class-level-permissions', async t => { await this._client.task('set-class-level-permissions', async (t) => {
await self._ensureSchemaCollectionExists(t); await self._ensureSchemaCollectionExists(t);
const values = [ const values = [
className, className,
@@ -929,7 +934,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
const deletedIndexes = []; const deletedIndexes = [];
const insertedIndexes = []; const insertedIndexes = [];
Object.keys(submittedIndexes).forEach(name => { Object.keys(submittedIndexes).forEach((name) => {
const field = submittedIndexes[name]; const field = submittedIndexes[name];
if (existingIndexes[name] && field.__op !== 'Delete') { if (existingIndexes[name] && field.__op !== 'Delete') {
throw new Parse.Error( throw new Parse.Error(
@@ -947,7 +952,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
deletedIndexes.push(name); deletedIndexes.push(name);
delete existingIndexes[name]; delete existingIndexes[name];
} else { } else {
Object.keys(field).forEach(key => { Object.keys(field).forEach((key) => {
if (!Object.prototype.hasOwnProperty.call(fields, key)) { if (!Object.prototype.hasOwnProperty.call(fields, key)) {
throw new Parse.Error( throw new Parse.Error(
Parse.Error.INVALID_QUERY, Parse.Error.INVALID_QUERY,
@@ -962,7 +967,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
}); });
} }
}); });
await conn.tx('set-indexes-with-schema-format', async t => { await conn.tx('set-indexes-with-schema-format', async (t) => {
if (insertedIndexes.length > 0) { if (insertedIndexes.length > 0) {
await self.createIndexes(className, insertedIndexes, t); await self.createIndexes(className, insertedIndexes, t);
} }
@@ -980,7 +985,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
async createClass(className: string, schema: SchemaType, conn: ?any) { async createClass(className: string, schema: SchemaType, conn: ?any) {
conn = conn || this._client; conn = conn || this._client;
return conn return conn
.tx('create-class', async t => { .tx('create-class', async (t) => {
const q1 = this.createTable(className, schema, t); const q1 = this.createTable(className, schema, t);
const q2 = t.none( const q2 = t.none(
'INSERT INTO "_SCHEMA" ("className", "schema", "isParseClass") VALUES ($<className>, $<schema>, true)', 'INSERT INTO "_SCHEMA" ("className", "schema", "isParseClass") VALUES ($<className>, $<schema>, true)',
@@ -1000,7 +1005,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
.then(() => { .then(() => {
return toParseSchema(schema); return toParseSchema(schema);
}) })
.catch(err => { .catch((err) => {
if (err.data[0].result.code === PostgresTransactionAbortedError) { if (err.data[0].result.code === PostgresTransactionAbortedError) {
err = err.data[1].result; err = err.data[1].result;
} }
@@ -1037,7 +1042,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
let index = 2; let index = 2;
const relations = []; const relations = [];
Object.keys(fields).forEach(fieldName => { Object.keys(fields).forEach((fieldName) => {
const parseType = fields[fieldName]; const parseType = fields[fieldName];
// Skip when it's a relation // Skip when it's a relation
// We'll create the tables later // We'll create the tables later
@@ -1060,7 +1065,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
const values = [className, ...valuesArray]; const values = [className, ...valuesArray];
debug(qs, values); debug(qs, values);
return conn.task('create-table', async t => { return conn.task('create-table', async (t) => {
try { try {
await self._ensureSchemaCollectionExists(t); await self._ensureSchemaCollectionExists(t);
await t.none(qs, values); await t.none(qs, values);
@@ -1070,9 +1075,9 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
// ELSE: Table already exists, must have been created by a different request. Ignore the error. // ELSE: Table already exists, must have been created by a different request. Ignore the error.
} }
await t.tx('create-table-tx', tx => { await t.tx('create-table-tx', (tx) => {
return tx.batch( return tx.batch(
relations.map(fieldName => { relations.map((fieldName) => {
return tx.none( return tx.none(
'CREATE TABLE IF NOT EXISTS $<joinTable:name> ("relatedId" varChar(120), "owningId" varChar(120), PRIMARY KEY("relatedId", "owningId") )', 'CREATE TABLE IF NOT EXISTS $<joinTable:name> ("relatedId" varChar(120), "owningId" varChar(120), PRIMARY KEY("relatedId", "owningId") )',
{ joinTable: `_Join:${fieldName}:${className}` } { joinTable: `_Join:${fieldName}:${className}` }
@@ -1088,15 +1093,15 @@ export class PostgresStorageAdapter implements StorageAdapter {
conn = conn || this._client; conn = conn || this._client;
const self = this; const self = this;
await conn.tx('schema-upgrade', async t => { await conn.tx('schema-upgrade', async (t) => {
const columns = await t.map( const columns = await t.map(
'SELECT column_name FROM information_schema.columns WHERE table_name = $<className>', 'SELECT column_name FROM information_schema.columns WHERE table_name = $<className>',
{ className }, { className },
a => a.column_name (a) => a.column_name
); );
const newColumns = Object.keys(schema.fields) const newColumns = Object.keys(schema.fields)
.filter(item => columns.indexOf(item) === -1) .filter((item) => columns.indexOf(item) === -1)
.map(fieldName => .map((fieldName) =>
self.addFieldIfNotExists( self.addFieldIfNotExists(
className, className,
fieldName, fieldName,
@@ -1119,7 +1124,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
debug('addFieldIfNotExists', { className, fieldName, type }); debug('addFieldIfNotExists', { className, fieldName, type });
conn = conn || this._client; conn = conn || this._client;
const self = this; const self = this;
await conn.tx('add-field-if-not-exists', async t => { await conn.tx('add-field-if-not-exists', async (t) => {
if (type.type !== 'Relation') { if (type.type !== 'Relation') {
try { try {
await t.none( await t.none(
@@ -1178,7 +1183,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
}, },
]; ];
return this._client return this._client
.tx(t => t.none(this._pgp.helpers.concat(operations))) .tx((t) => t.none(this._pgp.helpers.concat(operations)))
.then(() => className.indexOf('_Join:') != 0); // resolves with false when _Join table .then(() => className.indexOf('_Join:') != 0); // resolves with false when _Join table
} }
@@ -1189,7 +1194,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
debug('deleteAllClasses'); debug('deleteAllClasses');
await this._client await this._client
.task('delete-all-classes', async t => { .task('delete-all-classes', async (t) => {
try { try {
const results = await t.any('SELECT * FROM "_SCHEMA"'); const results = await t.any('SELECT * FROM "_SCHEMA"');
const joins = results.reduce((list: Array<string>, schema: any) => { const joins = results.reduce((list: Array<string>, schema: any) => {
@@ -1204,14 +1209,14 @@ export class PostgresStorageAdapter implements StorageAdapter {
'_GlobalConfig', '_GlobalConfig',
'_GraphQLConfig', '_GraphQLConfig',
'_Audience', '_Audience',
...results.map(result => result.className), ...results.map((result) => result.className),
...joins, ...joins,
]; ];
const queries = classes.map(className => ({ const queries = classes.map((className) => ({
query: 'DROP TABLE IF EXISTS $<className:name>', query: 'DROP TABLE IF EXISTS $<className:name>',
values: { className }, values: { className },
})); }));
await t.tx(tx => tx.none(helpers.concat(queries))); await t.tx((tx) => tx.none(helpers.concat(queries)));
} catch (error) { } catch (error) {
if (error.code !== PostgresRelationDoesNotExistError) { if (error.code !== PostgresRelationDoesNotExistError) {
throw error; throw error;
@@ -1259,7 +1264,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
}) })
.join(', DROP COLUMN'); .join(', DROP COLUMN');
await this._client.tx('delete-fields', async t => { await this._client.tx('delete-fields', async (t) => {
await t.none( await t.none(
'UPDATE "_SCHEMA" SET "schema" = $<schema> WHERE "className" = $<className>', 'UPDATE "_SCHEMA" SET "schema" = $<schema> WHERE "className" = $<className>',
{ schema, className } { schema, className }
@@ -1275,9 +1280,9 @@ export class PostgresStorageAdapter implements StorageAdapter {
// rejection reason are TBD. // rejection reason are TBD.
async getAllClasses() { async getAllClasses() {
const self = this; const self = this;
return this._client.task('get-all-classes', async t => { return this._client.task('get-all-classes', async (t) => {
await self._ensureSchemaCollectionExists(t); await self._ensureSchemaCollectionExists(t);
return await t.map('SELECT * FROM "_SCHEMA"', null, row => return await t.map('SELECT * FROM "_SCHEMA"', null, (row) =>
toParseSchema({ className: row.className, ...row.schema }) toParseSchema({ className: row.className, ...row.schema })
); );
}); });
@@ -1292,7 +1297,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
.any('SELECT * FROM "_SCHEMA" WHERE "className" = $<className>', { .any('SELECT * FROM "_SCHEMA" WHERE "className" = $<className>', {
className, className,
}) })
.then(result => { .then((result) => {
if (result.length !== 1) { if (result.length !== 1) {
throw undefined; throw undefined;
} }
@@ -1318,7 +1323,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
validateKeys(object); validateKeys(object);
Object.keys(object).forEach(fieldName => { Object.keys(object).forEach((fieldName) => {
if (object[fieldName] === null) { if (object[fieldName] === null) {
return; return;
} }
@@ -1420,7 +1425,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
return `$${index + 2 + columnsArray.length}${termination}`; return `$${index + 2 + columnsArray.length}${termination}`;
}); });
const geoPointsInjects = Object.keys(geoPoints).map(key => { const geoPointsInjects = Object.keys(geoPoints).map((key) => {
const value = geoPoints[key]; const value = geoPoints[key];
valuesArray.push(value.longitude, value.latitude); valuesArray.push(value.longitude, value.latitude);
const l = valuesArray.length + columnsArray.length; const l = valuesArray.length + columnsArray.length;
@@ -1441,7 +1446,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
) )
.none(qs, values) .none(qs, values)
.then(() => ({ ops: [object] })) .then(() => ({ ops: [object] }))
.catch(error => { .catch((error) => {
if (error.code === PostgresUniqueIndexViolationError) { if (error.code === PostgresUniqueIndexViolationError) {
const err = new Parse.Error( const err = new Parse.Error(
Parse.Error.DUPLICATE_VALUE, Parse.Error.DUPLICATE_VALUE,
@@ -1492,8 +1497,8 @@ export class PostgresStorageAdapter implements StorageAdapter {
? transactionalSession.t ? transactionalSession.t
: this._client : this._client
) )
.one(qs, values, a => +a.count) .one(qs, values, (a) => +a.count)
.then(count => { .then((count) => {
if (count === 0) { if (count === 0) {
throw new Parse.Error( throw new Parse.Error(
Parse.Error.OBJECT_NOT_FOUND, Parse.Error.OBJECT_NOT_FOUND,
@@ -1503,7 +1508,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
return count; return count;
} }
}) })
.catch(error => { .catch((error) => {
if (error.code !== PostgresRelationDoesNotExistError) { if (error.code !== PostgresRelationDoesNotExistError) {
throw error; throw error;
} }
@@ -1529,7 +1534,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
query, query,
update, update,
transactionalSession transactionalSession
).then(val => val[0]); ).then((val) => val[0]);
} }
// Apply the update to all objects that match the given Parse Query. // Apply the update to all objects that match the given Parse Query.
@@ -1550,7 +1555,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
// Set flag for dot notation fields // Set flag for dot notation fields
const dotNotationOptions = {}; const dotNotationOptions = {};
Object.keys(update).forEach(fieldName => { Object.keys(update).forEach((fieldName) => {
if (fieldName.indexOf('.') > -1) { if (fieldName.indexOf('.') > -1) {
const components = fieldName.split('.'); const components = fieldName.split('.');
const first = components.shift(); const first = components.shift();
@@ -1622,8 +1627,9 @@ export class PostgresStorageAdapter implements StorageAdapter {
index += 2; index += 2;
} else if (fieldValue.__op === 'Add') { } else if (fieldValue.__op === 'Add') {
updatePatterns.push( updatePatterns.push(
`$${index}:name = array_add(COALESCE($${index}:name, '[]'::jsonb), $${index + `$${index}:name = array_add(COALESCE($${index}:name, '[]'::jsonb), $${
1}::jsonb)` index + 1
}::jsonb)`
); );
values.push(fieldName, JSON.stringify(fieldValue.objects)); values.push(fieldName, JSON.stringify(fieldValue.objects));
index += 2; index += 2;
@@ -1633,15 +1639,17 @@ export class PostgresStorageAdapter implements StorageAdapter {
index += 2; index += 2;
} else if (fieldValue.__op === 'Remove') { } else if (fieldValue.__op === 'Remove') {
updatePatterns.push( updatePatterns.push(
`$${index}:name = array_remove(COALESCE($${index}:name, '[]'::jsonb), $${index + `$${index}:name = array_remove(COALESCE($${index}:name, '[]'::jsonb), $${
1}::jsonb)` index + 1
}::jsonb)`
); );
values.push(fieldName, JSON.stringify(fieldValue.objects)); values.push(fieldName, JSON.stringify(fieldValue.objects));
index += 2; index += 2;
} else if (fieldValue.__op === 'AddUnique') { } else if (fieldValue.__op === 'AddUnique') {
updatePatterns.push( updatePatterns.push(
`$${index}:name = array_add_unique(COALESCE($${index}:name, '[]'::jsonb), $${index + `$${index}:name = array_add_unique(COALESCE($${index}:name, '[]'::jsonb), $${
1}::jsonb)` index + 1
}::jsonb)`
); );
values.push(fieldName, JSON.stringify(fieldValue.objects)); values.push(fieldName, JSON.stringify(fieldValue.objects));
index += 2; index += 2;
@@ -1698,7 +1706,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
) { ) {
// Gather keys to increment // Gather keys to increment
const keysToIncrement = Object.keys(originalUpdate) const keysToIncrement = Object.keys(originalUpdate)
.filter(k => { .filter((k) => {
// choose top level fields that have a delete operation set // choose top level fields that have a delete operation set
// Note that Object.keys is iterating over the **original** update object // Note that Object.keys is iterating over the **original** update object
// and that some of the keys of the original update could be null or undefined: // and that some of the keys of the original update could be null or undefined:
@@ -1711,26 +1719,26 @@ export class PostgresStorageAdapter implements StorageAdapter {
k.split('.')[0] === fieldName k.split('.')[0] === fieldName
); );
}) })
.map(k => k.split('.')[1]); .map((k) => k.split('.')[1]);
let incrementPatterns = ''; let incrementPatterns = '';
if (keysToIncrement.length > 0) { if (keysToIncrement.length > 0) {
incrementPatterns = incrementPatterns =
' || ' + ' || ' +
keysToIncrement keysToIncrement
.map(c => { .map((c) => {
const amount = fieldValue[c].amount; const amount = fieldValue[c].amount;
return `CONCAT('{"${c}":', COALESCE($${index}:name->>'${c}','0')::int + ${amount}, '}')::jsonb`; return `CONCAT('{"${c}":', COALESCE($${index}:name->>'${c}','0')::int + ${amount}, '}')::jsonb`;
}) })
.join(' || '); .join(' || ');
// Strip the keys // Strip the keys
keysToIncrement.forEach(key => { keysToIncrement.forEach((key) => {
delete fieldValue[key]; delete fieldValue[key];
}); });
} }
const keysToDelete: Array<string> = Object.keys(originalUpdate) const keysToDelete: Array<string> = Object.keys(originalUpdate)
.filter(k => { .filter((k) => {
// choose top level fields that have a delete operation set. // choose top level fields that have a delete operation set.
const value = originalUpdate[k]; const value = originalUpdate[k];
return ( return (
@@ -1740,7 +1748,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
k.split('.')[0] === fieldName k.split('.')[0] === fieldName
); );
}) })
.map(k => k.split('.')[1]); .map((k) => k.split('.')[1]);
const deletePatterns = keysToDelete.reduce( const deletePatterns = keysToDelete.reduce(
(p: string, c: string, i: number) => { (p: string, c: string, i: number) => {
@@ -1756,9 +1764,9 @@ export class PostgresStorageAdapter implements StorageAdapter {
updateObject = `COALESCE($${index}:name, '{}'::jsonb)`; updateObject = `COALESCE($${index}:name, '{}'::jsonb)`;
} }
updatePatterns.push( updatePatterns.push(
`$${index}:name = (${updateObject} ${deletePatterns} ${incrementPatterns} || $${index + `$${index}:name = (${updateObject} ${deletePatterns} ${incrementPatterns} || $${
1 + index + 1 + keysToDelete.length
keysToDelete.length}::jsonb )` }::jsonb )`
); );
values.push(fieldName, ...keysToDelete, JSON.stringify(fieldValue)); values.push(fieldName, ...keysToDelete, JSON.stringify(fieldValue));
index += 2 + keysToDelete.length; index += 2 + keysToDelete.length;
@@ -1825,7 +1833,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
schema, schema,
createValue, createValue,
transactionalSession transactionalSession
).catch(error => { ).catch((error) => {
// ignore duplicate value errors as it's upsert // ignore duplicate value errors as it's upsert
if (error.code !== Parse.Error.DUPLICATE_VALUE) { if (error.code !== Parse.Error.DUPLICATE_VALUE) {
throw error; throw error;
@@ -1880,7 +1888,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
if (sort) { if (sort) {
const sortCopy: any = sort; const sortCopy: any = sort;
const sorting = Object.keys(sort) const sorting = Object.keys(sort)
.map(key => { .map((key) => {
const transformKey = transformDotFieldToComponents(key).join('->'); const transformKey = transformDotFieldToComponents(key).join('->');
// Using $idx pattern gives: non-integer constant in ORDER BY // Using $idx pattern gives: non-integer constant in ORDER BY
if (sortCopy[key] === 1) { if (sortCopy[key] === 1) {
@@ -1923,31 +1931,33 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
const originalQuery = `SELECT ${columns} FROM $1:name ${wherePattern} ${sortPattern} ${limitPattern} ${skipPattern}`; const originalQuery = `SELECT ${columns} FROM $1:name ${wherePattern} ${sortPattern} ${limitPattern} ${skipPattern}`;
const qs = explain ? this.createExplainableQuery(originalQuery) : originalQuery; const qs = explain
? this.createExplainableQuery(originalQuery)
: originalQuery;
debug(qs, values); debug(qs, values);
return this._client return this._client
.any(qs, values) .any(qs, values)
.catch(error => { .catch((error) => {
// Query on non existing table, don't crash // Query on non existing table, don't crash
if (error.code !== PostgresRelationDoesNotExistError) { if (error.code !== PostgresRelationDoesNotExistError) {
throw error; throw error;
} }
return []; return [];
}) })
.then(results => { .then((results) => {
if (explain){ if (explain) {
return results; return results;
} }
return results.map(object => return results.map((object) =>
this.postgresObjectToParseObject(className, object, schema) this.postgresObjectToParseObject(className, object, schema)
);} );
); });
} }
// Converts from a postgres-format object to a REST-format object. // Converts from a postgres-format object to a REST-format object.
// Does not strip out anything based on a lack of authentication. // Does not strip out anything based on a lack of authentication.
postgresObjectToParseObject(className: string, object: any, schema: any) { postgresObjectToParseObject(className: string, object: any, schema: any) {
Object.keys(schema.fields).forEach(fieldName => { Object.keys(schema.fields).forEach((fieldName) => {
if (schema.fields[fieldName].type === 'Pointer' && object[fieldName]) { if (schema.fields[fieldName].type === 'Pointer' && object[fieldName]) {
object[fieldName] = { object[fieldName] = {
objectId: object[fieldName], objectId: object[fieldName],
@@ -1971,7 +1981,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
if (object[fieldName] && schema.fields[fieldName].type === 'Polygon') { if (object[fieldName] && schema.fields[fieldName].type === 'Polygon') {
let coords = object[fieldName]; let coords = object[fieldName];
coords = coords.substr(2, coords.length - 4).split('),('); coords = coords.substr(2, coords.length - 4).split('),(');
coords = coords.map(point => { coords = coords.map((point) => {
return [ return [
parseFloat(point.split(',')[1]), parseFloat(point.split(',')[1]),
parseFloat(point.split(',')[0]), parseFloat(point.split(',')[0]),
@@ -2061,7 +2071,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
const qs = `ALTER TABLE $1:name ADD CONSTRAINT $2:name UNIQUE (${constraintPatterns.join()})`; const qs = `ALTER TABLE $1:name ADD CONSTRAINT $2:name UNIQUE (${constraintPatterns.join()})`;
return this._client return this._client
.none(qs, [className, constraintName, ...fieldNames]) .none(qs, [className, constraintName, ...fieldNames])
.catch(error => { .catch((error) => {
if ( if (
error.code === PostgresDuplicateRelationError && error.code === PostgresDuplicateRelationError &&
error.message.includes(constraintName) error.message.includes(constraintName)
@@ -2112,14 +2122,14 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
return this._client return this._client
.one(qs, values, a => { .one(qs, values, (a) => {
if (a.approximate_row_count != null) { if (a.approximate_row_count != null) {
return +a.approximate_row_count; return +a.approximate_row_count;
} else { } else {
return +a.count; return +a.count;
} }
}) })
.catch(error => { .catch((error) => {
if (error.code !== PostgresRelationDoesNotExistError) { if (error.code !== PostgresRelationDoesNotExistError) {
throw error; throw error;
} }
@@ -2168,16 +2178,16 @@ export class PostgresStorageAdapter implements StorageAdapter {
debug(qs, values); debug(qs, values);
return this._client return this._client
.any(qs, values) .any(qs, values)
.catch(error => { .catch((error) => {
if (error.code === PostgresMissingColumnError) { if (error.code === PostgresMissingColumnError) {
return []; return [];
} }
throw error; throw error;
}) })
.then(results => { .then((results) => {
if (!isNested) { if (!isNested) {
results = results.filter(object => object[field] !== null); results = results.filter((object) => object[field] !== null);
return results.map(object => { return results.map((object) => {
if (!isPointerField) { if (!isPointerField) {
return object[field]; return object[field];
} }
@@ -2189,10 +2199,10 @@ export class PostgresStorageAdapter implements StorageAdapter {
}); });
} }
const child = fieldName.split('.')[1]; const child = fieldName.split('.')[1];
return results.map(object => object[column][child]); return results.map((object) => object[column][child]);
}) })
.then(results => .then((results) =>
results.map(object => results.map((object) =>
this.postgresObjectToParseObject(className, object, schema) this.postgresObjectToParseObject(className, object, schema)
) )
); );
@@ -2204,7 +2214,8 @@ export class PostgresStorageAdapter implements StorageAdapter {
pipeline: any, pipeline: any,
readPreference: ?string, readPreference: ?string,
hint: ?mixed, hint: ?mixed,
explain?: boolean) { explain?: boolean
) {
debug('aggregate', className, pipeline, readPreference, hint, explain); debug('aggregate', className, pipeline, readPreference, hint, explain);
const values = [className]; const values = [className];
let index: number = 2; let index: number = 2;
@@ -2257,8 +2268,9 @@ export class PostgresStorageAdapter implements StorageAdapter {
columns.push( columns.push(
`EXTRACT(${ `EXTRACT(${
mongoAggregateToPostgres[operation] mongoAggregateToPostgres[operation]
} FROM $${index}:name AT TIME ZONE 'UTC') AS $${index + } FROM $${index}:name AT TIME ZONE 'UTC') AS $${
1}:name` index + 1
}:name`
); );
values.push(source, alias); values.push(source, alias);
index += 2; index += 2;
@@ -2327,7 +2339,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
if (stage.$match.$or) { if (stage.$match.$or) {
const collapse = {}; const collapse = {};
stage.$match.$or.forEach(element => { stage.$match.$or.forEach((element) => {
for (const key in element) { for (const key in element) {
collapse[key] = element[key]; collapse[key] = element[key];
} }
@@ -2337,7 +2349,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
for (const field in stage.$match) { for (const field in stage.$match) {
const value = stage.$match[field]; const value = stage.$match[field];
const matchPatterns = []; const matchPatterns = [];
Object.keys(ParseToPosgresComparator).forEach(cmp => { Object.keys(ParseToPosgresComparator).forEach((cmp) => {
if (value[cmp]) { if (value[cmp]) {
const pgComparator = ParseToPosgresComparator[cmp]; const pgComparator = ParseToPosgresComparator[cmp];
matchPatterns.push( matchPatterns.push(
@@ -2377,7 +2389,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
const sort = stage.$sort; const sort = stage.$sort;
const keys = Object.keys(sort); const keys = Object.keys(sort);
const sorting = keys const sorting = keys
.map(key => { .map((key) => {
const transformer = sort[key] === 1 ? 'ASC' : 'DESC'; const transformer = sort[key] === 1 ? 'ASC' : 'DESC';
const order = `$${index}:name ${transformer}`; const order = `$${index}:name ${transformer}`;
index += 1; index += 1;
@@ -2389,43 +2401,54 @@ export class PostgresStorageAdapter implements StorageAdapter {
sort !== undefined && sorting.length > 0 ? `ORDER BY ${sorting}` : ''; sort !== undefined && sorting.length > 0 ? `ORDER BY ${sorting}` : '';
} }
} }
const originalQuery = `SELECT ${columns.join()} FROM $1:name ${wherePattern} ${sortPattern} ${limitPattern} ${skipPattern} ${groupPattern}`;
const qs = explain ? this.createExplainableQuery(originalQuery) : originalQuery; if (groupPattern) {
debug(qs, values); columns.forEach((e, i, a) => {
return this._client if (e && e.trim() === '*') {
.any(qs, values) a[i] = '';
.then(a => {
if (explain){
return a;
} }
const results = a.map(object =>
this.postgresObjectToParseObject(className, object, schema)
);
results.forEach(result => {
if (!Object.prototype.hasOwnProperty.call(result, 'objectId')) {
result.objectId = null;
}
if (groupValues) {
result.objectId = {};
for (const key in groupValues) {
result.objectId[key] = result[key];
delete result[key];
}
}
if (countField) {
result[countField] = parseInt(result[countField], 10);
}
});
return results;
}); });
}
const originalQuery = `SELECT ${columns
.filter(Boolean)
.join()} FROM $1:name ${wherePattern} ${skipPattern} ${groupPattern} ${sortPattern} ${limitPattern}`;
const qs = explain
? this.createExplainableQuery(originalQuery)
: originalQuery;
debug(qs, values);
return this._client.any(qs, values).then((a) => {
if (explain) {
return a;
}
const results = a.map((object) =>
this.postgresObjectToParseObject(className, object, schema)
);
results.forEach((result) => {
if (!Object.prototype.hasOwnProperty.call(result, 'objectId')) {
result.objectId = null;
}
if (groupValues) {
result.objectId = {};
for (const key in groupValues) {
result.objectId[key] = result[key];
delete result[key];
}
}
if (countField) {
result[countField] = parseInt(result[countField], 10);
}
});
return results;
});
} }
async performInitialization({ VolatileClassesSchemas }: any) { async performInitialization({ VolatileClassesSchemas }: any) {
// TODO: This method needs to be rewritten to make proper use of connections (@vitaly-t) // TODO: This method needs to be rewritten to make proper use of connections (@vitaly-t)
debug('performInitialization'); debug('performInitialization');
const promises = VolatileClassesSchemas.map(schema => { const promises = VolatileClassesSchemas.map((schema) => {
return this.createTable(schema.className, schema) return this.createTable(schema.className, schema)
.catch(err => { .catch((err) => {
if ( if (
err.code === PostgresDuplicateRelationError || err.code === PostgresDuplicateRelationError ||
err.code === Parse.Error.INVALID_CLASS_NAME err.code === Parse.Error.INVALID_CLASS_NAME
@@ -2438,7 +2461,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
}); });
return Promise.all(promises) return Promise.all(promises)
.then(() => { .then(() => {
return this._client.tx('perform-initialization', t => { return this._client.tx('perform-initialization', (t) => {
return t.batch([ return t.batch([
t.none(sql.misc.jsonObjectSetKeys), t.none(sql.misc.jsonObjectSetKeys),
t.none(sql.array.add), t.none(sql.array.add),
@@ -2450,10 +2473,10 @@ export class PostgresStorageAdapter implements StorageAdapter {
]); ]);
}); });
}) })
.then(data => { .then((data) => {
debug(`initializationDone in ${data.duration}`); debug(`initializationDone in ${data.duration}`);
}) })
.catch(error => { .catch((error) => {
/* eslint-disable no-console */ /* eslint-disable no-console */
console.error(error); console.error(error);
}); });
@@ -2464,9 +2487,9 @@ export class PostgresStorageAdapter implements StorageAdapter {
indexes: any, indexes: any,
conn: ?any conn: ?any
): Promise<void> { ): Promise<void> {
return (conn || this._client).tx(t => return (conn || this._client).tx((t) =>
t.batch( t.batch(
indexes.map(i => { indexes.map((i) => {
return t.none('CREATE INDEX $1:name ON $2:name ($3:name)', [ return t.none('CREATE INDEX $1:name ON $2:name ($3:name)', [
i.name, i.name,
className, className,
@@ -2493,11 +2516,11 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
async dropIndexes(className: string, indexes: any, conn: any): Promise<void> { async dropIndexes(className: string, indexes: any, conn: any): Promise<void> {
const queries = indexes.map(i => ({ const queries = indexes.map((i) => ({
query: 'DROP INDEX $1:name', query: 'DROP INDEX $1:name',
values: i, values: i,
})); }));
await (conn || this._client).tx(t => await (conn || this._client).tx((t) =>
t.none(this._pgp.helpers.concat(queries)) t.none(this._pgp.helpers.concat(queries))
); );
} }
@@ -2517,11 +2540,11 @@ export class PostgresStorageAdapter implements StorageAdapter {
} }
async createTransactionalSession(): Promise<any> { async createTransactionalSession(): Promise<any> {
return new Promise(resolve => { return new Promise((resolve) => {
const transactionalSession = {}; const transactionalSession = {};
transactionalSession.result = this._client.tx(t => { transactionalSession.result = this._client.tx((t) => {
transactionalSession.t = t; transactionalSession.t = t;
transactionalSession.promise = new Promise(resolve => { transactionalSession.promise = new Promise((resolve) => {
transactionalSession.resolve = resolve; transactionalSession.resolve = resolve;
}); });
transactionalSession.batch = []; transactionalSession.batch = [];
@@ -2555,15 +2578,19 @@ export class PostgresStorageAdapter implements StorageAdapter {
caseInsensitive: boolean = false, caseInsensitive: boolean = false,
conn: ?any = null conn: ?any = null
): Promise<any> { ): Promise<any> {
conn = conn != null ? conn : this._client; conn = conn != null ? conn : this._client;
const defaultIndexName = `parse_default_${fieldNames.sort().join('_')}`; const defaultIndexName = `parse_default_${fieldNames.sort().join('_')}`;
const indexNameOptions: Object = indexName != null ? { name: indexName } : { name: defaultIndexName }; const indexNameOptions: Object =
const constraintPatterns = caseInsensitive ? fieldNames.map((fieldName, index) => `lower($${index + 3}:name) varchar_pattern_ops`) : indexName != null ? { name: indexName } : { name: defaultIndexName };
fieldNames.map((fieldName, index) => `$${index + 3}:name`); const constraintPatterns = caseInsensitive
? fieldNames.map(
(fieldName, index) => `lower($${index + 3}:name) varchar_pattern_ops`
)
: fieldNames.map((fieldName, index) => `$${index + 3}:name`);
const qs = `CREATE INDEX $1:name ON $2:name (${constraintPatterns.join()})`; const qs = `CREATE INDEX $1:name ON $2:name (${constraintPatterns.join()})`;
await conn.none(qs, [indexNameOptions.name, className, ...fieldNames]) await conn
.catch(error => { .none(qs, [indexNameOptions.name, className, ...fieldNames])
.catch((error) => {
if ( if (
error.code === PostgresDuplicateRelationError && error.code === PostgresDuplicateRelationError &&
error.message.includes(indexNameOptions.name) error.message.includes(indexNameOptions.name)
@@ -2616,7 +2643,7 @@ function convertPolygonToSQL(polygon) {
); );
} }
const points = polygon const points = polygon
.map(point => { .map((point) => {
Parse.GeoPoint._validate(parseFloat(point[1]), parseFloat(point[0])); Parse.GeoPoint._validate(parseFloat(point[1]), parseFloat(point[0]));
return `(${point[1]}, ${point[0]})`; return `(${point[1]}, ${point[0]})`;
}) })
@@ -2685,7 +2712,7 @@ function isAllValuesRegexOrNone(values) {
} }
function isAnyValueRegexStartsWith(values) { function isAnyValueRegexStartsWith(values) {
return values.some(function(value) { return values.some(function (value) {
return isStartsWithRegex(value.$regex); return isStartsWithRegex(value.$regex);
}); });
} }
@@ -2693,7 +2720,7 @@ function isAnyValueRegexStartsWith(values) {
function createLiteralRegex(remaining) { function createLiteralRegex(remaining) {
return remaining return remaining
.split('') .split('')
.map(c => { .map((c) => {
const regex = RegExp('[0-9 ]|\\p{L}', 'u'); // Support all unicode letter chars const regex = RegExp('[0-9 ]|\\p{L}', 'u'); // Support all unicode letter chars
if (c.match(regex) !== null) { if (c.match(regex) !== null) {
// don't escape alphanumeric characters // don't escape alphanumeric characters