Push scalability (#3080)

* Update status through increment
* adds support for incrementing nested keys
* fix issue when having spaces in keys for ordering
* Refactors PushController to use worker
* Adds tests for custom push queue config
* Makes PushController adapter independant
* Better logging of _PushStatus in VERBOSE
This commit is contained in:
Florent Vilmart
2017-01-13 19:34:04 -05:00
committed by GitHub
parent 5f849ca662
commit deedf7b370
20 changed files with 588 additions and 211 deletions

View File

@@ -0,0 +1,65 @@
import events from 'events';
const emitter = new events.EventEmitter();
const subscriptions = new Map();
function unsubscribe(channel: string) {
if (!subscriptions.has(channel)) {
//console.log('No channel to unsub from');
return;
}
//console.log('unsub ', channel);
emitter.removeListener(channel, subscriptions.get(channel));
subscriptions.delete(channel);
}
class Publisher {
emitter: any;
constructor(emitter: any) {
this.emitter = emitter;
}
publish(channel: string, message: string): void {
this.emitter.emit(channel, message);
}
}
class Consumer extends events.EventEmitter {
emitter: any;
constructor(emitter: any) {
super();
this.emitter = emitter;
}
subscribe(channel: string): void {
unsubscribe(channel);
const handler = (message) => {
this.emit('message', channel, message);
}
subscriptions.set(channel, handler);
this.emitter.on(channel, handler);
}
unsubscribe(channel: string): void {
unsubscribe(channel);
}
}
function createPublisher(): any {
return new Publisher(emitter);
}
function createSubscriber(): any {
return new Consumer(emitter);
}
const EventEmitterMQ = {
createPublisher,
createSubscriber
}
export {
EventEmitterMQ
}

View File

@@ -1,3 +1,4 @@
// @flow
/*eslint no-unused-vars: "off"*/
// Push Adapter
//
@@ -11,13 +12,15 @@
// android push and APNS for ios push.
export class PushAdapter {
send(devices, installations, pushStatus) { }
send(body: any, installations: any[], pushStatus: any): ?Promise<*> {}
/**
* Get an array of valid push types.
* @returns {Array} An array of valid push types
*/
getValidPushTypes() {}
getValidPushTypes(): string[] {
return []
}
}
export default PushAdapter;

View File

@@ -926,16 +926,34 @@ export class PostgresStorageAdapter {
} else if (typeof fieldValue === 'object'
&& schema.fields[fieldName]
&& schema.fields[fieldName].type === 'Object') {
// Gather keys to increment
const keysToIncrement = Object.keys(originalUpdate).filter(k => {
// choose top level fields that have a delete operation set
return originalUpdate[k].__op === 'Increment' && k.split('.').length === 2 && k.split(".")[0] === fieldName;
}).map(k => k.split('.')[1]);
let incrementPatterns = '';
if (keysToIncrement.length > 0) {
incrementPatterns = ' || ' + keysToIncrement.map((c) => {
const amount = fieldValue[c].amount;
return `CONCAT('{"${c}":', COALESCE($${index}:name->>'${c}','0')::int + ${amount}, '}')::jsonb`;
}).join(' || ');
// Strip the keys
keysToIncrement.forEach((key) => {
delete fieldValue[key];
});
}
const keysToDelete = Object.keys(originalUpdate).filter(k => {
// choose top level fields that have a delete operation set
return originalUpdate[k].__op === 'Delete' && k.split('.').length === 2
return originalUpdate[k].__op === 'Delete' && k.split('.').length === 2 && k.split(".")[0] === fieldName;
}).map(k => k.split('.')[1]);
const deletePatterns = keysToDelete.reduce((p, c, i) => {
return p + ` - '$${index + 1 + i}:value'`;
}, '');
updatePatterns.push(`$${index}:name = ( COALESCE($${index}:name, '{}'::jsonb) ${deletePatterns} || $${index + 1 + keysToDelete.length}::jsonb )`);
updatePatterns.push(`$${index}:name = ( COALESCE($${index}:name, '{}'::jsonb) ${deletePatterns} ${incrementPatterns} || $${index + 1 + keysToDelete.length}::jsonb )`);
values.push(fieldName, ...keysToDelete, JSON.stringify(fieldValue));
index += 2 + keysToDelete.length;

View File

@@ -58,6 +58,9 @@ export class Config {
this.hooksController = cacheInfo.hooksController;
this.filesController = cacheInfo.filesController;
this.pushController = cacheInfo.pushController;
this.pushControllerQueue = cacheInfo.pushControllerQueue;
this.pushWorker = cacheInfo.pushWorker;
this.hasPushSupport = cacheInfo.hasPushSupport;
this.loggerController = cacheInfo.loggerController;
this.userController = cacheInfo.userController;
this.authDataManager = cacheInfo.authDataManager;

View File

@@ -38,11 +38,15 @@ export class AdaptableController {
}
validateAdapter(adapter) {
AdaptableController.validateAdapter(adapter, this);
}
static validateAdapter(adapter, self, ExpectedType) {
if (!adapter) {
throw new Error(this.constructor.name + " requires an adapter");
}
const Type = this.expectedAdapterType();
const Type = ExpectedType || self.expectedAdapterType();
// Allow skipping for testing
if (!Type) {
return;

View File

@@ -9,6 +9,8 @@ const DefaultHooksCollectionName = "_Hooks";
export class HooksController {
_applicationId:string;
_webhookKey:string;
database: any;
constructor(applicationId:string, databaseController, webhookKey) {
this._applicationId = applicationId;

View File

@@ -1,54 +1,17 @@
import { Parse } from 'parse/node';
import rest from '../rest';
import AdaptableController from './AdaptableController';
import { PushAdapter } from '../Adapters/Push/PushAdapter';
import deepcopy from 'deepcopy';
import RestQuery from '../RestQuery';
import RestWrite from '../RestWrite';
import { master } from '../Auth';
import { pushStatusHandler } from '../StatusHandler';
const UNSUPPORTED_BADGE_KEY = "unsupported";
export class PushController extends AdaptableController {
/**
* Check whether the deviceType parameter in qury condition is valid or not.
* @param {Object} where A query condition
* @param {Array} validPushTypes An array of valid push types(string)
*/
static validatePushType(where = {}, validPushTypes = []) {
var deviceTypeField = where.deviceType || {};
var deviceTypes = [];
if (typeof deviceTypeField === 'string') {
deviceTypes.push(deviceTypeField);
} else if (Array.isArray(deviceTypeField['$in'])) {
deviceTypes.concat(deviceTypeField['$in']);
}
for (var i = 0; i < deviceTypes.length; i++) {
var deviceType = deviceTypes[i];
if (validPushTypes.indexOf(deviceType) < 0) {
throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED,
deviceType + ' is not supported push type.');
}
}
}
get pushIsAvailable() {
return !!this.adapter;
}
export class PushController {
sendPush(body = {}, where = {}, config, auth, onPushStatusSaved = () => {}) {
var pushAdapter = this.adapter;
if (!this.pushIsAvailable) {
throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED,
'Push adapter is not available');
}
if (!this.options) {
if (!config.hasPushSupport) {
throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED,
'Missing push configuration');
}
PushController.validatePushType(where, pushAdapter.getValidPushTypes());
// Replace the expiration_time with a valid Unix epoch milliseconds time
body['expiration_time'] = PushController.getExpirationTime(body);
// TODO: If the req can pass the checking, we return immediately instead of waiting
@@ -86,15 +49,7 @@ export class PushController extends AdaptableController {
onPushStatusSaved(pushStatus.objectId);
return badgeUpdate();
}).then(() => {
return rest.find(config, auth, '_Installation', where);
}).then((response) => {
if (!response.results) {
return Promise.reject({error: 'PushController: no results in query'})
}
pushStatus.setRunning(response.results);
return this.sendToAdapter(body, response.results, pushStatus, config);
}).then((results) => {
return pushStatus.complete(results);
return config.pushControllerQueue.enqueue(body, where, config, auth, pushStatus);
}).catch((err) => {
return pushStatus.fail(err).then(() => {
throw err;
@@ -102,34 +57,6 @@ export class PushController extends AdaptableController {
});
}
sendToAdapter(body, installations, pushStatus) {
if (body.data && body.data.badge && typeof body.data.badge == 'string' && body.data.badge.toLowerCase() == "increment") {
// Collect the badges to reduce the # of calls
const badgeInstallationsMap = installations.reduce((map, installation) => {
let badge = installation.badge;
if (installation.deviceType != "ios") {
badge = UNSUPPORTED_BADGE_KEY;
}
map[badge + ''] = map[badge + ''] || [];
map[badge + ''].push(installation);
return map;
}, {});
// Map the on the badges count and return the send result
const promises = Object.keys(badgeInstallationsMap).map((badge) => {
const payload = deepcopy(body);
if (badge == UNSUPPORTED_BADGE_KEY) {
delete payload.data.badge;
} else {
payload.data.badge = parseInt(badge);
}
return this.adapter.send(payload, badgeInstallationsMap[badge], pushStatus.objectId);
});
return Promise.all(promises);
}
return this.adapter.send(body, installations, pushStatus.objectId);
}
/**
* Get expiration time from the request body.
* @param {Object} request A request object
@@ -157,10 +84,6 @@ export class PushController extends AdaptableController {
}
return expirationTime.valueOf();
}
expectedAdapterType() {
return PushAdapter;
}
}
export default PushController;

View File

@@ -86,6 +86,7 @@ const defaultColumns = Object.freeze({
"errorMessage": {type:'Object'},
"sentPerType": {type:'Object'},
"failedPerType":{type:'Object'},
"count": {type:'Number'}
},
_JobStatus: {
"jobName": {type: 'String'},

26
src/ParseMessageQueue.js Normal file
View File

@@ -0,0 +1,26 @@
import { loadAdapter } from './Adapters/AdapterLoader';
import {
EventEmitterMQ
} from './Adapters/MessageQueue/EventEmitterMQ';
const ParseMessageQueue = {};
ParseMessageQueue.createPublisher = function(config: any): any {
const adapter = loadAdapter(config.messageQueueAdapter, EventEmitterMQ, config);
if (typeof adapter.createPublisher !== 'function') {
throw 'pubSubAdapter should have createPublisher()';
}
return adapter.createPublisher(config);
}
ParseMessageQueue.createSubscriber = function(config: any): void {
const adapter = loadAdapter(config.messageQueueAdapter, EventEmitterMQ, config)
if (typeof adapter.createSubscriber !== 'function') {
throw 'messageQueueAdapter should have createSubscriber()';
}
return adapter.createSubscriber(config);
}
export {
ParseMessageQueue
}

View File

@@ -39,6 +39,8 @@ import { LogsRouter } from './Routers/LogsRouter';
import { ParseLiveQueryServer } from './LiveQuery/ParseLiveQueryServer';
import { PublicAPIRouter } from './Routers/PublicAPIRouter';
import { PushController } from './Controllers/PushController';
import { PushQueue } from './Push/PushQueue';
import { PushWorker } from './Push/PushWorker';
import { PushRouter } from './Routers/PushRouter';
import { CloudCodeRouter } from './Routers/CloudCodeRouter';
import { RolesRouter } from './Routers/RolesRouter';
@@ -168,11 +170,28 @@ class ParseServer {
});
const filesController = new FilesController(filesControllerAdapter, appId);
const pushOptions = Object.assign({}, push);
const pushQueueOptions = pushOptions.queueOptions || {};
if (pushOptions.queueOptions) {
delete pushOptions.queueOptions;
}
// Pass the push options too as it works with the default
const pushControllerAdapter = loadAdapter(push && push.adapter, ParsePushAdapter, push || {});
// We pass the options and the base class for the adapter,
const pushAdapter = loadAdapter(pushOptions && pushOptions.adapter, ParsePushAdapter, pushOptions);
// We pass the options and the base class for the adatper,
// Note that passing an instance would work too
const pushController = new PushController(pushControllerAdapter, appId, push);
const pushController = new PushController();
const hasPushSupport = pushAdapter && push;
const {
disablePushWorker
} = pushQueueOptions;
const pushControllerQueue = new PushQueue(pushQueueOptions);
let pushWorker;
if (!disablePushWorker) {
pushWorker = new PushWorker(pushAdapter, pushQueueOptions);
}
const emailControllerAdapter = loadAdapter(emailAdapter);
const userController = new UserController(emailControllerAdapter, appId, { verifyUserEmails });
@@ -237,7 +256,10 @@ class ParseServer {
databaseController,
schemaCacheTTL,
enableSingleSchemaCache,
userSensitiveFields
userSensitiveFields,
pushWorker,
pushControllerQueue,
hasPushSupport
});
Config.validate(AppCache.get(appId));

60
src/Push/PushQueue.js Normal file
View File

@@ -0,0 +1,60 @@
import { ParseMessageQueue } from '../ParseMessageQueue';
import rest from '../rest';
import { isPushIncrementing } from './utils';
const PUSH_CHANNEL = 'parse-server-push';
const DEFAULT_BATCH_SIZE = 100;
export class PushQueue {
parsePublisher: Object;
channel: String;
batchSize: Number;
// config object of the publisher, right now it only contains the redisURL,
// but we may extend it later.
constructor(config: any = {}) {
this.channel = config.channel || PUSH_CHANNEL;
this.batchSize = config.batchSize || DEFAULT_BATCH_SIZE;
this.parsePublisher = ParseMessageQueue.createPublisher(config);
}
static defaultPushChannel() {
return PUSH_CHANNEL;
}
enqueue(body, where, config, auth, pushStatus) {
const limit = this.batchSize;
// Order by badge (because the payload is badge dependant)
// and createdAt to fix the order
const order = isPushIncrementing(body) ? 'badge,createdAt' : 'createdAt';
return Promise.resolve().then(() => {
return rest.find(config,
auth,
'_Installation',
where,
{limit: 0, count: true});
}).then(({results, count}) => {
if (!results) {
return Promise.reject({error: 'PushController: no results in query'})
}
pushStatus.setRunning(count);
let skip = 0;
while (skip < count) {
const query = { where,
limit,
skip,
order };
const pushWorkItem = {
body,
query,
pushStatus: { objectId: pushStatus.objectId },
applicationId: config.applicationId
}
this.parsePublisher.publish(this.channel, JSON.stringify(pushWorkItem));
skip += limit;
}
});
}
}

95
src/Push/PushWorker.js Normal file
View File

@@ -0,0 +1,95 @@
// @flow
import deepcopy from 'deepcopy';
import AdaptableController from '../Controllers/AdaptableController';
import { master } from '../Auth';
import Config from '../Config';
import { PushAdapter } from '../Adapters/Push/PushAdapter';
import rest from '../rest';
import { pushStatusHandler } from '../StatusHandler';
import { isPushIncrementing } from './utils';
import { ParseMessageQueue } from '../ParseMessageQueue';
import { PushQueue } from './PushQueue';
const UNSUPPORTED_BADGE_KEY = "unsupported";
function groupByBadge(installations) {
return installations.reduce((map, installation) => {
let badge = installation.badge + '';
if (installation.deviceType != "ios") {
badge = UNSUPPORTED_BADGE_KEY;
}
map[badge] = map[badge] || [];
map[badge].push(installation);
return map;
}, {});
}
export class PushWorker {
subscriber: ?any;
adapter: any;
channel: string;
constructor(pushAdapter: PushAdapter, subscriberConfig: any = {}) {
AdaptableController.validateAdapter(pushAdapter, this, PushAdapter);
this.adapter = pushAdapter;
this.channel = subscriberConfig.channel || PushQueue.defaultPushChannel();
this.subscriber = ParseMessageQueue.createSubscriber(subscriberConfig);
if (this.subscriber) {
const subscriber = this.subscriber;
subscriber.subscribe(this.channel);
subscriber.on('message', (channel, messageStr) => {
const workItem = JSON.parse(messageStr);
this.run(workItem);
});
}
}
unsubscribe(): void {
if (this.subscriber) {
this.subscriber.unsubscribe(this.channel);
}
}
run({ body, query, pushStatus, applicationId }: any): Promise<*> {
const config = new Config(applicationId);
const auth = master(config);
const where = query.where;
delete query.where;
return rest.find(config, auth, '_Installation', where, query).then(({results}) => {
if (results.length == 0) {
return;
}
return this.sendToAdapter(body, results, pushStatus, config);
}, err => {
throw err;
});
}
sendToAdapter(body: any, installations: any[], pushStatus: any, config: Config): Promise<*> {
pushStatus = pushStatusHandler(config, pushStatus.objectId);
if (!isPushIncrementing(body)) {
return this.adapter.send(body, installations, pushStatus.objectId).then((results) => {
return pushStatus.trackSent(results);
});
}
// Collect the badges to reduce the # of calls
const badgeInstallationsMap = groupByBadge(installations);
// Map the on the badges count and return the send result
const promises = Object.keys(badgeInstallationsMap).map((badge) => {
const payload = deepcopy(body);
if (badge == UNSUPPORTED_BADGE_KEY) {
delete payload.data.badge;
} else {
payload.data.badge = parseInt(badge);
}
const installations = badgeInstallationsMap[badge];
return this.sendToAdapter(payload, installations, pushStatus, config);
});
return Promise.all(promises);
}
}
export default PushWorker;

30
src/Push/utils.js Normal file
View File

@@ -0,0 +1,30 @@
import Parse from 'parse/node';
export function isPushIncrementing(body) {
return body.data &&
body.data.badge &&
typeof body.data.badge == 'string' &&
body.data.badge.toLowerCase() == "increment"
}
/**
* Check whether the deviceType parameter in qury condition is valid or not.
* @param {Object} where A query condition
* @param {Array} validPushTypes An array of valid push types(string)
*/
export function validatePushType(where = {}, validPushTypes = []) {
var deviceTypeField = where.deviceType || {};
var deviceTypes = [];
if (typeof deviceTypeField === 'string') {
deviceTypes.push(deviceTypeField);
} else if (Array.isArray(deviceTypeField['$in'])) {
deviceTypes.concat(deviceTypeField['$in']);
}
for (var i = 0; i < deviceTypes.length; i++) {
var deviceType = deviceTypes[i];
if (validPushTypes.indexOf(deviceType) < 0) {
throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED,
deviceType + ' is not supported push type.');
}
}
}

View File

@@ -92,15 +92,15 @@ function RestQuery(config, auth, className, restWhere = {}, restOptions = {}, cl
break;
case 'order':
var fields = restOptions.order.split(',');
var sortMap = {};
for (var field of fields) {
this.findOptions.sort = fields.reduce((sortMap, field) => {
field = field.trim();
if (field[0] == '-') {
sortMap[field.slice(1)] = -1;
} else {
sortMap[field] = 1;
}
}
this.findOptions.sort = sortMap;
return sortMap;
}, {});
break;
case 'include': {
const paths = restOptions.include.split(',');

View File

@@ -29,9 +29,9 @@ export class FeaturesRouter extends PromiseRouter {
from: true,
},
push: {
immediatePush: req.config.pushController.pushIsAvailable,
immediatePush: req.config.hasPushSupport,
scheduledPush: false,
storedPushData: req.config.pushController.pushIsAvailable,
storedPushData: req.config.hasPushSupport,
pushAudiences: false,
},
schemas: {

View File

@@ -4,6 +4,15 @@ import { logger } from './logger';
const PUSH_STATUS_COLLECTION = '_PushStatus';
const JOB_STATUS_COLLECTION = '_JobStatus';
const incrementOp = function(object = {}, key, amount = 1) {
if (!object[key]) {
object[key] = {__op: 'Increment', amount: amount}
} else {
object[key].amount += amount;
}
return object[key];
}
export function flatten(array) {
var flattened = [];
for(var i = 0; i < array.length; i++) {
@@ -94,10 +103,9 @@ export function jobStatusHandler(config) {
});
}
export function pushStatusHandler(config) {
export function pushStatusHandler(config, objectId = newObjectId()) {
let pushStatus;
const objectId = newObjectId();
const database = config.database;
const handler = statusHandler(PUSH_STATUS_COLLECTION, database);
const setInitial = function(body = {}, where, options = {source: 'rest'}) {
@@ -136,18 +144,17 @@ export function pushStatusHandler(config) {
});
}
const setRunning = function(installations) {
logger.verbose('sending push to %d installations', installations.length);
const setRunning = function(count) {
logger.verbose(`_PushStatus ${objectId}: sending push to %d installations`, count);
return handler.update({status:"pending", objectId: objectId},
{status: "running", updatedAt: new Date() });
{status: "running", updatedAt: new Date(), count });
}
const complete = function(results) {
const trackSent = function(results) {
const update = {
status: 'succeeded',
updatedAt: new Date(),
numSent: 0,
numFailed: 0,
numFailed: 0
};
if (Array.isArray(results)) {
results = flatten(results);
@@ -157,23 +164,44 @@ export function pushStatusHandler(config) {
return memo;
}
const deviceType = result.device.deviceType;
if (result.transmitted)
{
const key = result.transmitted ? `sentPerType.${deviceType}` : `failedPerType.${deviceType}`;
memo[key] = incrementOp(memo, key);
if (result.transmitted) {
memo.numSent++;
memo.sentPerType = memo.sentPerType || {};
memo.sentPerType[deviceType] = memo.sentPerType[deviceType] || 0;
memo.sentPerType[deviceType]++;
} else {
memo.numFailed++;
memo.failedPerType = memo.failedPerType || {};
memo.failedPerType[deviceType] = memo.failedPerType[deviceType] || 0;
memo.failedPerType[deviceType]++;
}
return memo;
}, update);
incrementOp(update, 'count', -results.length);
}
logger.verbose('sent push! %d success, %d failures', update.numSent, update.numFailed);
return handler.update({status:"running", objectId }, update);
logger.verbose(`_PushStatus ${objectId}: sent push! %d success, %d failures`, update.numSent, update.numFailed);
['numSent', 'numFailed'].forEach((key) => {
if (update[key] > 0) {
update[key] = {
__op: 'Increment',
amount: update[key]
};
} else {
delete update[key];
}
});
return handler.update({ objectId }, update).then((res) => {
if (res && res.count === 0) {
return this.complete();
}
})
}
const complete = function() {
return handler.update({ objectId }, {
status: 'succeeded',
count: {__op: 'Delete'},
updatedAt: new Date()
});
}
const fail = function(err) {
@@ -182,7 +210,7 @@ export function pushStatusHandler(config) {
status: 'failed',
updatedAt: new Date()
}
logger.info('warning: error while sending push', err);
logger.warn(`_PushStatus ${objectId}: error while sending push`, err);
return handler.update({ objectId }, update);
}
@@ -190,6 +218,7 @@ export function pushStatusHandler(config) {
objectId,
setInitial,
setRunning,
trackSent,
complete,
fail
})

View File

@@ -7,6 +7,7 @@ import RedisCacheAdapter from './Adapters/Cache/RedisCacheAdapter'
import * as TestUtils from './TestUtils';
import { useExternal } from './deprecated';
import { getLogger } from './logger';
import { PushWorker } from './Push/PushWorker';
// Factory function
const _ParseServer = function(options) {
@@ -23,4 +24,4 @@ Object.defineProperty(module.exports, 'logger', {
});
export default ParseServer;
export { S3Adapter, GCSAdapter, FileSystemAdapter, InMemoryCacheAdapter, NullCacheAdapter, RedisCacheAdapter, TestUtils, _ParseServer as ParseServer };
export { S3Adapter, GCSAdapter, FileSystemAdapter, InMemoryCacheAdapter, NullCacheAdapter, RedisCacheAdapter, TestUtils, PushWorker, _ParseServer as ParseServer };