@@ -1,5 +1,5 @@
|
||||
import { ParseMessageQueue } from '../ParseMessageQueue';
|
||||
import rest from '../rest';
|
||||
import { ParseMessageQueue } from '../ParseMessageQueue';
|
||||
import rest from '../rest';
|
||||
import { applyDeviceTokenExists } from './utils';
|
||||
import Parse from 'parse/node';
|
||||
|
||||
@@ -30,33 +30,39 @@ export class PushQueue {
|
||||
|
||||
// Order by objectId so no impact on the DB
|
||||
const order = 'objectId';
|
||||
return Promise.resolve().then(() => {
|
||||
return rest.find(config,
|
||||
auth,
|
||||
'_Installation',
|
||||
where,
|
||||
{limit: 0, count: true});
|
||||
}).then(({results, count}) => {
|
||||
if (!results || count == 0) {
|
||||
return pushStatus.complete();
|
||||
}
|
||||
pushStatus.setRunning(Math.ceil(count / limit));
|
||||
let skip = 0;
|
||||
while (skip < count) {
|
||||
const query = { where,
|
||||
limit,
|
||||
skip,
|
||||
order };
|
||||
|
||||
const pushWorkItem = {
|
||||
body,
|
||||
query,
|
||||
pushStatus: { objectId: pushStatus.objectId },
|
||||
applicationId: config.applicationId
|
||||
return Promise.resolve()
|
||||
.then(() => {
|
||||
return rest.find(config, auth, '_Installation', where, {
|
||||
limit: 0,
|
||||
count: true,
|
||||
});
|
||||
})
|
||||
.then(({ results, count }) => {
|
||||
if (!results || count == 0) {
|
||||
return pushStatus.complete();
|
||||
}
|
||||
this.parsePublisher.publish(this.channel, JSON.stringify(pushWorkItem));
|
||||
skip += limit;
|
||||
}
|
||||
});
|
||||
pushStatus.setRunning(Math.ceil(count / limit));
|
||||
let skip = 0;
|
||||
while (skip < count) {
|
||||
const query = {
|
||||
where,
|
||||
limit,
|
||||
skip,
|
||||
order,
|
||||
};
|
||||
|
||||
const pushWorkItem = {
|
||||
body,
|
||||
query,
|
||||
pushStatus: { objectId: pushStatus.objectId },
|
||||
applicationId: config.applicationId,
|
||||
};
|
||||
this.parsePublisher.publish(
|
||||
this.channel,
|
||||
JSON.stringify(pushWorkItem)
|
||||
);
|
||||
skip += limit;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
// @flow
|
||||
// @flow-disable-next
|
||||
import deepcopy from 'deepcopy';
|
||||
import AdaptableController from '../Controllers/AdaptableController';
|
||||
import { master } from '../Auth';
|
||||
import Config from '../Config';
|
||||
import { PushAdapter } from '../Adapters/Push/PushAdapter';
|
||||
import rest from '../rest';
|
||||
import { pushStatusHandler } from '../StatusHandler';
|
||||
import * as utils from './utils';
|
||||
import { ParseMessageQueue } from '../ParseMessageQueue';
|
||||
import { PushQueue } from './PushQueue';
|
||||
import logger from '../logger';
|
||||
import deepcopy from 'deepcopy';
|
||||
import AdaptableController from '../Controllers/AdaptableController';
|
||||
import { master } from '../Auth';
|
||||
import Config from '../Config';
|
||||
import { PushAdapter } from '../Adapters/Push/PushAdapter';
|
||||
import rest from '../rest';
|
||||
import { pushStatusHandler } from '../StatusHandler';
|
||||
import * as utils from './utils';
|
||||
import { ParseMessageQueue } from '../ParseMessageQueue';
|
||||
import { PushQueue } from './PushQueue';
|
||||
import logger from '../logger';
|
||||
|
||||
function groupByBadge(installations) {
|
||||
return installations.reduce((map, installation) => {
|
||||
@@ -48,15 +48,23 @@ export class PushWorker {
|
||||
const where = utils.applyDeviceTokenExists(query.where);
|
||||
delete query.where;
|
||||
pushStatus = pushStatusHandler(config, pushStatus.objectId);
|
||||
return rest.find(config, auth, '_Installation', where, query).then(({results}) => {
|
||||
if (results.length == 0) {
|
||||
return;
|
||||
}
|
||||
return this.sendToAdapter(body, results, pushStatus, config, UTCOffset);
|
||||
});
|
||||
return rest
|
||||
.find(config, auth, '_Installation', where, query)
|
||||
.then(({ results }) => {
|
||||
if (results.length == 0) {
|
||||
return;
|
||||
}
|
||||
return this.sendToAdapter(body, results, pushStatus, config, UTCOffset);
|
||||
});
|
||||
}
|
||||
|
||||
sendToAdapter(body: any, installations: any[], pushStatus: any, config: Config, UTCOffset: ?any): Promise<*> {
|
||||
sendToAdapter(
|
||||
body: any,
|
||||
installations: any[],
|
||||
pushStatus: any,
|
||||
config: Config,
|
||||
UTCOffset: ?any
|
||||
): Promise<*> {
|
||||
// Check if we have locales in the push body
|
||||
const locales = utils.getLocalesFromPush(body);
|
||||
if (locales.length > 0) {
|
||||
@@ -64,31 +72,48 @@ export class PushWorker {
|
||||
const bodiesPerLocales = utils.bodiesPerLocales(body, locales);
|
||||
|
||||
// Group installations on the specified locales (en, fr, default etc...)
|
||||
const grouppedInstallations = utils.groupByLocaleIdentifier(installations, locales);
|
||||
const promises = Object.keys(grouppedInstallations).map((locale) => {
|
||||
const grouppedInstallations = utils.groupByLocaleIdentifier(
|
||||
installations,
|
||||
locales
|
||||
);
|
||||
const promises = Object.keys(grouppedInstallations).map(locale => {
|
||||
const installations = grouppedInstallations[locale];
|
||||
const body = bodiesPerLocales[locale];
|
||||
return this.sendToAdapter(body, installations, pushStatus, config, UTCOffset);
|
||||
return this.sendToAdapter(
|
||||
body,
|
||||
installations,
|
||||
pushStatus,
|
||||
config,
|
||||
UTCOffset
|
||||
);
|
||||
});
|
||||
return Promise.all(promises);
|
||||
}
|
||||
|
||||
if (!utils.isPushIncrementing(body)) {
|
||||
logger.verbose(`Sending push to ${installations.length}`);
|
||||
return this.adapter.send(body, installations, pushStatus.objectId).then((results) => {
|
||||
return pushStatus.trackSent(results, UTCOffset).then(() => results);
|
||||
});
|
||||
return this.adapter
|
||||
.send(body, installations, pushStatus.objectId)
|
||||
.then(results => {
|
||||
return pushStatus.trackSent(results, UTCOffset).then(() => results);
|
||||
});
|
||||
}
|
||||
|
||||
// Collect the badges to reduce the # of calls
|
||||
const badgeInstallationsMap = groupByBadge(installations);
|
||||
|
||||
// Map the on the badges count and return the send result
|
||||
const promises = Object.keys(badgeInstallationsMap).map((badge) => {
|
||||
const promises = Object.keys(badgeInstallationsMap).map(badge => {
|
||||
const payload = deepcopy(body);
|
||||
payload.data.badge = parseInt(badge);
|
||||
const installations = badgeInstallationsMap[badge];
|
||||
return this.sendToAdapter(payload, installations, pushStatus, config, UTCOffset);
|
||||
return this.sendToAdapter(
|
||||
payload,
|
||||
installations,
|
||||
pushStatus,
|
||||
config,
|
||||
UTCOffset
|
||||
);
|
||||
});
|
||||
return Promise.all(promises);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import Parse from 'parse/node';
|
||||
import Parse from 'parse/node';
|
||||
import deepcopy from 'deepcopy';
|
||||
|
||||
export function isPushIncrementing(body) {
|
||||
@@ -7,12 +7,16 @@ export function isPushIncrementing(body) {
|
||||
}
|
||||
|
||||
const badge = body.data.badge;
|
||||
if (typeof badge == 'string' && badge.toLowerCase() == "increment") {
|
||||
if (typeof badge == 'string' && badge.toLowerCase() == 'increment') {
|
||||
return true;
|
||||
}
|
||||
|
||||
return typeof badge == 'object' && typeof badge.__op == 'string' &&
|
||||
badge.__op.toLowerCase() == "increment" && Number(badge.amount);
|
||||
return (
|
||||
typeof badge == 'object' &&
|
||||
typeof badge.__op == 'string' &&
|
||||
badge.__op.toLowerCase() == 'increment' &&
|
||||
Number(badge.amount)
|
||||
);
|
||||
}
|
||||
|
||||
const localizableKeys = ['alert', 'title'];
|
||||
@@ -22,14 +26,18 @@ export function getLocalesFromPush(body) {
|
||||
if (!data) {
|
||||
return [];
|
||||
}
|
||||
return [...new Set(Object.keys(data).reduce((memo, key) => {
|
||||
localizableKeys.forEach((localizableKey) => {
|
||||
if (key.indexOf(`${localizableKey}-`) == 0) {
|
||||
memo.push(key.slice(localizableKey.length + 1));
|
||||
}
|
||||
});
|
||||
return memo;
|
||||
}, []))];
|
||||
return [
|
||||
...new Set(
|
||||
Object.keys(data).reduce((memo, key) => {
|
||||
localizableKeys.forEach(localizableKey => {
|
||||
if (key.indexOf(`${localizableKey}-`) == 0) {
|
||||
memo.push(key.slice(localizableKey.length + 1));
|
||||
}
|
||||
});
|
||||
return memo;
|
||||
}, [])
|
||||
),
|
||||
];
|
||||
}
|
||||
|
||||
export function transformPushBodyForLocale(body, locale) {
|
||||
@@ -38,7 +46,7 @@ export function transformPushBodyForLocale(body, locale) {
|
||||
return body;
|
||||
}
|
||||
body = deepcopy(body);
|
||||
localizableKeys.forEach((key) => {
|
||||
localizableKeys.forEach(key => {
|
||||
const localeValue = body.data[`${key}-${locale}`];
|
||||
if (localeValue) {
|
||||
body.data[key] = localeValue;
|
||||
@@ -48,9 +56,11 @@ export function transformPushBodyForLocale(body, locale) {
|
||||
}
|
||||
|
||||
export function stripLocalesFromBody(body) {
|
||||
if (!body.data) { return body; }
|
||||
Object.keys(body.data).forEach((key) => {
|
||||
localizableKeys.forEach((localizableKey) => {
|
||||
if (!body.data) {
|
||||
return body;
|
||||
}
|
||||
Object.keys(body.data).forEach(key => {
|
||||
localizableKeys.forEach(localizableKey => {
|
||||
if (key.indexOf(`${localizableKey}-`) == 0) {
|
||||
delete body.data[key];
|
||||
}
|
||||
@@ -71,23 +81,29 @@ export function bodiesPerLocales(body, locales = []) {
|
||||
}
|
||||
|
||||
export function groupByLocaleIdentifier(installations, locales = []) {
|
||||
return installations.reduce((map, installation) => {
|
||||
let added = false;
|
||||
locales.forEach((locale) => {
|
||||
if (added) {
|
||||
return;
|
||||
return installations.reduce(
|
||||
(map, installation) => {
|
||||
let added = false;
|
||||
locales.forEach(locale => {
|
||||
if (added) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
installation.localeIdentifier &&
|
||||
installation.localeIdentifier.indexOf(locale) === 0
|
||||
) {
|
||||
added = true;
|
||||
map[locale] = map[locale] || [];
|
||||
map[locale].push(installation);
|
||||
}
|
||||
});
|
||||
if (!added) {
|
||||
map.default.push(installation);
|
||||
}
|
||||
if (installation.localeIdentifier && installation.localeIdentifier.indexOf(locale) === 0) {
|
||||
added = true;
|
||||
map[locale] = map[locale] || [];
|
||||
map[locale].push(installation);
|
||||
}
|
||||
});
|
||||
if (!added) {
|
||||
map.default.push(installation);
|
||||
}
|
||||
return map;
|
||||
}, {default: []});
|
||||
return map;
|
||||
},
|
||||
{ default: [] }
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -106,8 +122,10 @@ export function validatePushType(where = {}, validPushTypes = []) {
|
||||
for (var i = 0; i < deviceTypes.length; i++) {
|
||||
var deviceType = deviceTypes[i];
|
||||
if (validPushTypes.indexOf(deviceType) < 0) {
|
||||
throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED,
|
||||
deviceType + ' is not supported push type.');
|
||||
throw new Parse.Error(
|
||||
Parse.Error.PUSH_MISCONFIGURED,
|
||||
deviceType + ' is not supported push type.'
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -115,7 +133,7 @@ export function validatePushType(where = {}, validPushTypes = []) {
|
||||
export function applyDeviceTokenExists(where) {
|
||||
where = deepcopy(where);
|
||||
if (!where.hasOwnProperty('deviceToken')) {
|
||||
where['deviceToken'] = {'$exists': true};
|
||||
where['deviceToken'] = { $exists: true };
|
||||
}
|
||||
return where;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user