Support for Aggregate Queries (#4207)
* Support for Aggregate Queries * improve pg and coverage * Mongo 3.4 aggregates and tests * replace _id with objectId * improve tests for objectId * project with group query * typo
This commit is contained in:
committed by
Florent Vilmart
parent
4e207d32a7
commit
7223add446
@@ -60,6 +60,14 @@ export default class MongoCollection {
|
||||
return countOperation;
|
||||
}
|
||||
|
||||
distinct(field, query) {
|
||||
return this._mongoCollection.distinct(field, query);
|
||||
}
|
||||
|
||||
aggregate(pipeline, { maxTimeMS, readPreference } = {}) {
|
||||
return this._mongoCollection.aggregate(pipeline, { maxTimeMS, readPreference }).toArray();
|
||||
}
|
||||
|
||||
insertOne(object) {
|
||||
return this._mongoCollection.insertOne(object);
|
||||
}
|
||||
|
||||
@@ -405,6 +405,27 @@ export class MongoStorageAdapter {
|
||||
}));
|
||||
}
|
||||
|
||||
distinct(className, schema, query, fieldName) {
|
||||
schema = convertParseSchemaToMongoSchema(schema);
|
||||
return this._adaptiveCollection(className)
|
||||
.then(collection => collection.distinct(fieldName, transformWhere(className, query, schema)));
|
||||
}
|
||||
|
||||
aggregate(className, pipeline, readPreference) {
|
||||
readPreference = this._parseReadPreference(readPreference);
|
||||
return this._adaptiveCollection(className)
|
||||
.then(collection => collection.aggregate(pipeline, { readPreference, maxTimeMS: this._maxTimeMS }))
|
||||
.then(results => {
|
||||
results.forEach(result => {
|
||||
if (result.hasOwnProperty('_id')) {
|
||||
result.objectId = result._id;
|
||||
delete result._id;
|
||||
}
|
||||
});
|
||||
return results;
|
||||
});
|
||||
}
|
||||
|
||||
_parseReadPreference(readPreference) {
|
||||
if (readPreference) {
|
||||
switch (readPreference) {
|
||||
|
||||
@@ -165,6 +165,10 @@ const transformDotField = (fieldName) => {
|
||||
return name;
|
||||
}
|
||||
|
||||
const transformAggregateField = (fieldName) => {
|
||||
return fieldName.substr(1);
|
||||
}
|
||||
|
||||
const validateKeys = (object) => {
|
||||
if (typeof object == 'object') {
|
||||
for (const key in object) {
|
||||
@@ -1366,6 +1370,140 @@ export class PostgresStorageAdapter {
|
||||
});
|
||||
}
|
||||
|
||||
distinct(className, schema, query, fieldName) {
|
||||
debug('distinct', className, query);
|
||||
let field = fieldName;
|
||||
let column = fieldName;
|
||||
if (fieldName.indexOf('.') >= 0) {
|
||||
field = transformDotFieldToComponents(fieldName).join('->');
|
||||
column = fieldName.split('.')[0];
|
||||
}
|
||||
const isArrayField = schema.fields
|
||||
&& schema.fields[fieldName]
|
||||
&& schema.fields[fieldName].type === 'Array';
|
||||
const values = [field, column, className];
|
||||
const where = buildWhereClause({ schema, query, index: 4 });
|
||||
values.push(...where.values);
|
||||
|
||||
const wherePattern = where.pattern.length > 0 ? `WHERE ${where.pattern}` : '';
|
||||
let qs = `SELECT DISTINCT ON ($1:raw) $2:raw FROM $3:name ${wherePattern}`;
|
||||
if (isArrayField) {
|
||||
qs = `SELECT distinct jsonb_array_elements($1:raw) as $2:raw FROM $3:name ${wherePattern}`;
|
||||
}
|
||||
debug(qs, values);
|
||||
return this._client.any(qs, values)
|
||||
.catch(() => [])
|
||||
.then((results) => {
|
||||
if (fieldName.indexOf('.') === -1) {
|
||||
return results.map(object => object[field]);
|
||||
}
|
||||
const child = fieldName.split('.')[1];
|
||||
return results.map(object => object[column][child]);
|
||||
});
|
||||
}
|
||||
|
||||
aggregate(className, pipeline) {
|
||||
debug('aggregate', className, pipeline);
|
||||
const values = [className];
|
||||
let columns = [];
|
||||
let countField = null;
|
||||
let wherePattern = '';
|
||||
let limitPattern = '';
|
||||
let skipPattern = '';
|
||||
let sortPattern = '';
|
||||
let groupPattern = '';
|
||||
for (let i = 0; i < pipeline.length; i += 1) {
|
||||
const stage = pipeline[i];
|
||||
if (stage.$group) {
|
||||
for (const field in stage.$group) {
|
||||
const value = stage.$group[field];
|
||||
if (value === null || value === undefined) {
|
||||
continue;
|
||||
}
|
||||
if (field === '_id') {
|
||||
columns.push(`${transformAggregateField(value)} AS "objectId"`);
|
||||
groupPattern = `GROUP BY ${transformAggregateField(value)}`;
|
||||
continue;
|
||||
}
|
||||
if (value.$sum) {
|
||||
if (typeof value.$sum === 'string') {
|
||||
columns.push(`SUM(${transformAggregateField(value.$sum)}) AS "${field}"`);
|
||||
} else {
|
||||
countField = field;
|
||||
columns.push(`COUNT(*) AS "${field}"`);
|
||||
}
|
||||
}
|
||||
if (value.$max) {
|
||||
columns.push(`MAX(${transformAggregateField(value.$max)}) AS "${field}"`);
|
||||
}
|
||||
if (value.$min) {
|
||||
columns.push(`MIN(${transformAggregateField(value.$min)}) AS "${field}"`);
|
||||
}
|
||||
if (value.$avg) {
|
||||
columns.push(`AVG(${transformAggregateField(value.$avg)}) AS "${field}"`);
|
||||
}
|
||||
}
|
||||
columns.join(',');
|
||||
} else {
|
||||
columns.push('*');
|
||||
}
|
||||
if (stage.$project) {
|
||||
if (columns.includes('*')) {
|
||||
columns = [];
|
||||
}
|
||||
for (const field in stage.$project) {
|
||||
const value = stage.$project[field];
|
||||
if ((value === 1 || value === true)) {
|
||||
columns.push(field);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (stage.$match) {
|
||||
const patterns = [];
|
||||
for (const field in stage.$match) {
|
||||
const value = stage.$match[field];
|
||||
Object.keys(ParseToPosgresComparator).forEach(cmp => {
|
||||
if (value[cmp]) {
|
||||
const pgComparator = ParseToPosgresComparator[cmp];
|
||||
patterns.push(`${field} ${pgComparator} ${value[cmp]}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
wherePattern = patterns.length > 0 ? `WHERE ${patterns.join(' ')}` : '';
|
||||
}
|
||||
if (stage.$limit) {
|
||||
limitPattern = `LIMIT ${stage.$limit}`;
|
||||
}
|
||||
if (stage.$skip) {
|
||||
skipPattern = `OFFSET ${stage.$skip}`;
|
||||
}
|
||||
if (stage.$sort) {
|
||||
const sort = stage.$sort;
|
||||
const sorting = Object.keys(sort).map((key) => {
|
||||
if (sort[key] === 1) {
|
||||
return `"${key}" ASC`;
|
||||
}
|
||||
return `"${key}" DESC`;
|
||||
}).join(',');
|
||||
sortPattern = sort !== undefined && Object.keys(sort).length > 0 ? `ORDER BY ${sorting}` : '';
|
||||
}
|
||||
}
|
||||
|
||||
const qs = `SELECT ${columns} FROM $1:name ${wherePattern} ${sortPattern} ${limitPattern} ${skipPattern} ${groupPattern}`;
|
||||
debug(qs, values);
|
||||
return this._client.any(qs, values).then(results => {
|
||||
if (countField) {
|
||||
results[0][countField] = parseInt(results[0][countField], 10);
|
||||
}
|
||||
results.forEach(result => {
|
||||
if (!result.hasOwnProperty('objectId')) {
|
||||
result.objectId = null;
|
||||
}
|
||||
});
|
||||
return results;
|
||||
});
|
||||
}
|
||||
|
||||
performInitialization({ VolatileClassesSchemas }) {
|
||||
debug('performInitialization');
|
||||
const promises = VolatileClassesSchemas.map((schema) => {
|
||||
|
||||
@@ -785,6 +785,8 @@ DatabaseController.prototype.find = function(className, query, {
|
||||
count,
|
||||
keys,
|
||||
op,
|
||||
distinct,
|
||||
pipeline,
|
||||
readPreference
|
||||
} = {}) {
|
||||
const isMaster = acl === undefined;
|
||||
@@ -853,6 +855,18 @@ DatabaseController.prototype.find = function(className, query, {
|
||||
} else {
|
||||
return this.adapter.count(className, schema, query, readPreference);
|
||||
}
|
||||
} else if (distinct) {
|
||||
if (!classExists) {
|
||||
return [];
|
||||
} else {
|
||||
return this.adapter.distinct(className, schema, query, distinct);
|
||||
}
|
||||
} else if (pipeline) {
|
||||
if (!classExists) {
|
||||
return [];
|
||||
} else {
|
||||
return this.adapter.aggregate(className, pipeline, readPreference);
|
||||
}
|
||||
} else {
|
||||
if (!classExists) {
|
||||
return [];
|
||||
|
||||
@@ -34,6 +34,7 @@ import { SessionsRouter } from './Routers/SessionsRouter';
|
||||
import { UsersRouter } from './Routers/UsersRouter';
|
||||
import { PurgeRouter } from './Routers/PurgeRouter';
|
||||
import { AudiencesRouter } from './Routers/AudiencesRouter';
|
||||
import { AggregateRouter } from './Routers/AggregateRouter';
|
||||
|
||||
import { ParseServerRESTController } from './ParseServerRESTController';
|
||||
import * as controllers from './Controllers';
|
||||
@@ -197,7 +198,8 @@ class ParseServer {
|
||||
new PurgeRouter(),
|
||||
new HooksRouter(),
|
||||
new CloudCodeRouter(),
|
||||
new AudiencesRouter()
|
||||
new AudiencesRouter(),
|
||||
new AggregateRouter()
|
||||
];
|
||||
|
||||
const routes = routers.reduce((memo, router) => {
|
||||
|
||||
@@ -86,6 +86,8 @@ function RestQuery(config, auth, className, restWhere = {}, restOptions = {}, cl
|
||||
case 'count':
|
||||
this.doCount = true;
|
||||
break;
|
||||
case 'distinct':
|
||||
case 'pipeline':
|
||||
case 'skip':
|
||||
case 'limit':
|
||||
case 'readPreference':
|
||||
|
||||
77
src/Routers/AggregateRouter.js
Normal file
77
src/Routers/AggregateRouter.js
Normal file
@@ -0,0 +1,77 @@
|
||||
import ClassesRouter from './ClassesRouter';
|
||||
import rest from '../rest';
|
||||
import * as middleware from '../middlewares';
|
||||
import Parse from 'parse/node';
|
||||
|
||||
const ALLOWED_KEYS = [
|
||||
'where',
|
||||
'distinct',
|
||||
'project',
|
||||
'match',
|
||||
'redact',
|
||||
'limit',
|
||||
'skip',
|
||||
'unwind',
|
||||
'group',
|
||||
'sample',
|
||||
'sort',
|
||||
'geoNear',
|
||||
'lookup',
|
||||
'out',
|
||||
'indexStats',
|
||||
'facet',
|
||||
'bucket',
|
||||
'bucketAuto',
|
||||
'sortByCount',
|
||||
'addFields',
|
||||
'replaceRoot',
|
||||
'count',
|
||||
'graphLookup',
|
||||
];
|
||||
|
||||
export class AggregateRouter extends ClassesRouter {
|
||||
|
||||
handleFind(req) {
|
||||
const body = Object.assign(req.body, ClassesRouter.JSONFromQuery(req.query));
|
||||
const options = {};
|
||||
const pipeline = [];
|
||||
|
||||
for (const key in body) {
|
||||
if (ALLOWED_KEYS.indexOf(key) === -1) {
|
||||
throw new Parse.Error(Parse.Error.INVALID_QUERY, `Invalid parameter for query: ${key}`);
|
||||
}
|
||||
if (key === 'group') {
|
||||
if (body[key].hasOwnProperty('_id')) {
|
||||
throw new Parse.Error(
|
||||
Parse.Error.INVALID_QUERY,
|
||||
`Invalid parameter for query: group. Please use objectId instead of _id`
|
||||
);
|
||||
}
|
||||
if (!body[key].hasOwnProperty('objectId')) {
|
||||
throw new Parse.Error(
|
||||
Parse.Error.INVALID_QUERY,
|
||||
`Invalid parameter for query: group. objectId is required`
|
||||
);
|
||||
}
|
||||
body[key]._id = body[key].objectId;
|
||||
delete body[key].objectId;
|
||||
}
|
||||
pipeline.push({ [`$${key}`]: body[key] });
|
||||
}
|
||||
if (body.distinct) {
|
||||
options.distinct = String(body.distinct);
|
||||
}
|
||||
options.pipeline = pipeline;
|
||||
if (typeof body.where === 'string') {
|
||||
body.where = JSON.parse(body.where);
|
||||
}
|
||||
return rest.find(req.config, req.auth, this.className(req), body.where, options, req.info.clientSDK)
|
||||
.then((response) => { return { response }; });
|
||||
}
|
||||
|
||||
mountRoutes() {
|
||||
this.route('GET','/aggregate/:className', middleware.promiseEnforceMasterKeyAccess, req => { return this.handleFind(req); });
|
||||
}
|
||||
}
|
||||
|
||||
export default AggregateRouter;
|
||||
Reference in New Issue
Block a user