Files
kami-parse-server/src/StatusHandler.js
Florent Vilmart 10ace495d8 Adds jobs endpoint protected by masterKey (#2560)
* Adds jobs endpoint protected by masterKey

* Adds connection timeout for 15 minutes in jobs

* Refactors pushStatusHandler into StatusHandler

* Adds reporting of _JobStatus

* Only accept strings as messages

* Adds test for masterKey basic auth

* Adds CloudCodeRouter for cloud_code endpoint of job status, enable Jobs feature on dashboard

* xit racing test
2016-08-30 07:19:21 -04:00

197 lines
5.0 KiB
JavaScript
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { md5Hash, newObjectId } from './cryptoUtils';
import { logger } from './logger';
const PUSH_STATUS_COLLECTION = '_PushStatus';
const JOB_STATUS_COLLECTION = '_JobStatus';
export function flatten(array) {
return array.reduce((memo, element) => {
if (Array.isArray(element)) {
memo = memo.concat(flatten(element));
} else {
memo = memo.concat(element);
}
return memo;
}, []);
}
function statusHandler(className, database) {
let lastPromise = Promise.resolve();
function create(object) {
lastPromise = lastPromise.then(() => {
return database.create(className, object).then(() => {
return Promise.resolve(object);
});
});
return lastPromise;
}
function update(where, object) {
lastPromise = lastPromise.then(() => {
return database.update(className, where, object);
});
return lastPromise;
}
return Object.freeze({
create,
update
})
}
export function jobStatusHandler(config) {
let jobStatus;
let objectId = newObjectId();
let database = config.database;
let lastPromise = Promise.resolve();
let handler = statusHandler(JOB_STATUS_COLLECTION, database);
let setRunning = function(jobName, params) {
let now = new Date();
jobStatus = {
objectId,
jobName,
params,
status: 'running',
source: 'api',
createdAt: now,
// lockdown!
ACL: {}
}
return handler.create(jobStatus);
}
let setMessage = function(message) {
if (!message || typeof message !== 'string') {
return Promise.resolve();
}
return handler.update({ objectId }, { message });
}
let setSucceeded = function(message) {
return setFinalStatus('succeeded', message);
}
let setFailed = function(message) {
return setFinalStatus('failed', message);
}
let setFinalStatus = function(status, message = undefined) {
let finishedAt = new Date();
let update = { status, finishedAt };
if (message && typeof message === 'string') {
update.message = message;
}
return handler.update({ objectId }, update);
}
return Object.freeze({
setRunning,
setSucceeded,
setMessage,
setFailed
});
}
export function pushStatusHandler(config) {
let pushStatus;
let objectId = newObjectId();
let database = config.database;
let handler = statusHandler(PUSH_STATUS_COLLECTION, database);
let setInitial = function(body = {}, where, options = {source: 'rest'}) {
let now = new Date();
let data = body.data || {};
let payloadString = JSON.stringify(data);
let pushHash;
if (typeof data.alert === 'string') {
pushHash = md5Hash(data.alert);
} else if (typeof data.alert === 'object') {
pushHash = md5Hash(JSON.stringify(data.alert));
} else {
pushHash = 'd41d8cd98f00b204e9800998ecf8427e';
}
let object = {
objectId,
createdAt: now,
pushTime: now.toISOString(),
query: JSON.stringify(where),
payload: payloadString,
source: options.source,
title: options.title,
expiry: body.expiration_time,
status: "pending",
numSent: 0,
pushHash,
// lockdown!
ACL: {}
}
return handler.create(object).then(() => {
pushStatus = {
objectId
};
return Promise.resolve(pushStatus);
});
}
let setRunning = function(installations) {
logger.verbose('sending push to %d installations', installations.length);
return handler.update({status:"pending", objectId: objectId},
{status: "running", updatedAt: new Date() });
}
let complete = function(results) {
let update = {
status: 'succeeded',
updatedAt: new Date(),
numSent: 0,
numFailed: 0,
};
if (Array.isArray(results)) {
results = flatten(results);
results.reduce((memo, result) => {
// Cannot handle that
if (!result || !result.device || !result.device.deviceType) {
return memo;
}
let deviceType = result.device.deviceType;
if (result.transmitted)
{
memo.numSent++;
memo.sentPerType = memo.sentPerType || {};
memo.sentPerType[deviceType] = memo.sentPerType[deviceType] || 0;
memo.sentPerType[deviceType]++;
} else {
memo.numFailed++;
memo.failedPerType = memo.failedPerType || {};
memo.failedPerType[deviceType] = memo.failedPerType[deviceType] || 0;
memo.failedPerType[deviceType]++;
}
return memo;
}, update);
}
logger.verbose('sent push! %d success, %d failures', update.numSent, update.numFailed);
return handler.update({status:"running", objectId }, update);
}
let fail = function(err) {
let update = {
errorMessage: JSON.stringify(err),
status: 'failed',
updatedAt: new Date()
}
logger.info('warning: error while sending push', err);
return handler.update({ objectId }, update);
}
return Object.freeze({
objectId,
setInitial,
setRunning,
complete,
fail
})
}