Fix for _PushStatus Stuck 'running' when Count is Off (#4319)
* Fix for _PushStatus stuck 'running' if count is off * use 'count' for batches * push worker test fix
This commit is contained in:
committed by
Florent Vilmart
parent
842343a1a9
commit
c1a7347143
@@ -203,4 +203,261 @@ describe('Parse.Push', () => {
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
const successfulAny = function(body, installations) {
|
||||
const promises = installations.map((device) => {
|
||||
return Promise.resolve({
|
||||
transmitted: true,
|
||||
device: device,
|
||||
})
|
||||
});
|
||||
|
||||
return Promise.all(promises);
|
||||
};
|
||||
|
||||
const provideInstallations = function(num) {
|
||||
if(!num) {
|
||||
num = 2;
|
||||
}
|
||||
|
||||
const installations = [];
|
||||
while(installations.length !== num) {
|
||||
// add Android installations
|
||||
const installation = new Parse.Object("_Installation");
|
||||
installation.set("installationId", "installation_" + installations.length);
|
||||
installation.set("deviceToken","device_token_" + installations.length);
|
||||
installation.set("deviceType", "android");
|
||||
installations.push(installation);
|
||||
}
|
||||
|
||||
return installations;
|
||||
};
|
||||
|
||||
const losingAdapter = {
|
||||
send: function(body, installations) {
|
||||
// simulate having lost an installation before this was called
|
||||
// thus invalidating our 'count' in _PushStatus
|
||||
installations.pop();
|
||||
|
||||
return successfulAny(body, installations);
|
||||
},
|
||||
getValidPushTypes: function() {
|
||||
return ["android"];
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Verifies that _PushStatus cannot get stuck in a 'running' state
|
||||
* Simulates a simple push where 1 installation is removed between _PushStatus
|
||||
* count being set and the pushes being sent
|
||||
*/
|
||||
it('does not get stuck with _PushStatus \'running\' on 1 installation lost', (done) => {
|
||||
reconfigureServer({
|
||||
push: {adapter: losingAdapter}
|
||||
}).then(() => {
|
||||
return Parse.Object.saveAll(provideInstallations());
|
||||
}).then(() => {
|
||||
return Parse.Push.send(
|
||||
{
|
||||
data: {alert: "We fixed our status!"},
|
||||
where: {deviceType: 'android'}
|
||||
},
|
||||
{ useMasterKey: true }
|
||||
);
|
||||
}).then(() => {
|
||||
// it is enqueued so it can take time
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
resolve();
|
||||
}, 1000);
|
||||
});
|
||||
}).then(() => {
|
||||
// query for push status
|
||||
const query = new Parse.Query('_PushStatus');
|
||||
return query.find({useMasterKey: true});
|
||||
}).then((results) => {
|
||||
// verify status is NOT broken
|
||||
expect(results.length).toBe(1);
|
||||
const result = results[0];
|
||||
expect(result.get('status')).toEqual('succeeded');
|
||||
expect(result.get('numSent')).toEqual(1);
|
||||
expect(result.get('count')).toEqual(undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Verifies that _PushStatus cannot get stuck in a 'running' state
|
||||
* Simulates a simple push where 1 installation is added between _PushStatus
|
||||
* count being set and the pushes being sent
|
||||
*/
|
||||
it('does not get stuck with _PushStatus \'running\' on 1 installation added', (done) => {
|
||||
const installations = provideInstallations();
|
||||
|
||||
// add 1 iOS installation which we will omit & add later on
|
||||
const iOSInstallation = new Parse.Object("_Installation");
|
||||
iOSInstallation.set("installationId", "installation_" + installations.length);
|
||||
iOSInstallation.set("deviceToken","device_token_" + installations.length);
|
||||
iOSInstallation.set("deviceType", "ios");
|
||||
installations.push(iOSInstallation);
|
||||
|
||||
reconfigureServer({
|
||||
push: {
|
||||
adapter: {
|
||||
send: function(body, installations) {
|
||||
// simulate having added an installation before this was called
|
||||
// thus invalidating our 'count' in _PushStatus
|
||||
installations.push(iOSInstallation);
|
||||
|
||||
return successfulAny(body, installations);
|
||||
},
|
||||
getValidPushTypes: function() {
|
||||
return ["android"];
|
||||
}
|
||||
}
|
||||
}
|
||||
}).then(() => {
|
||||
return Parse.Object.saveAll(installations);
|
||||
}).then(() => {
|
||||
return Parse.Push.send(
|
||||
{
|
||||
data: {alert: "We fixed our status!"},
|
||||
where: {deviceType: {'$ne' : 'random'}}
|
||||
},
|
||||
{ useMasterKey: true }
|
||||
);
|
||||
}).then(() => {
|
||||
// it is enqueued so it can take time
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
resolve();
|
||||
}, 1000);
|
||||
});
|
||||
}).then(() => {
|
||||
// query for push status
|
||||
const query = new Parse.Query('_PushStatus');
|
||||
return query.find({useMasterKey: true});
|
||||
}).then((results) => {
|
||||
// verify status is NOT broken
|
||||
expect(results.length).toBe(1);
|
||||
const result = results[0];
|
||||
expect(result.get('status')).toEqual('succeeded');
|
||||
expect(result.get('numSent')).toEqual(3);
|
||||
expect(result.get('count')).toEqual(undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Verifies that _PushStatus cannot get stuck in a 'running' state
|
||||
* Simulates an extended push, where some installations may be removed,
|
||||
* resulting in a non-zero count
|
||||
*/
|
||||
it('does not get stuck with _PushStatus \'running\' on many installations removed', (done) => {
|
||||
const devices = 1000;
|
||||
const installations = provideInstallations(devices);
|
||||
|
||||
reconfigureServer({
|
||||
push: {adapter: losingAdapter}
|
||||
}).then(() => {
|
||||
return Parse.Object.saveAll(installations);
|
||||
}).then(() => {
|
||||
return Parse.Push.send(
|
||||
{
|
||||
data: {alert: "We fixed our status!"},
|
||||
where: {deviceType: 'android'}
|
||||
},
|
||||
{ useMasterKey: true }
|
||||
);
|
||||
}).then(() => {
|
||||
// it is enqueued so it can take time
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
resolve();
|
||||
}, 1000);
|
||||
});
|
||||
}).then(() => {
|
||||
// query for push status
|
||||
const query = new Parse.Query('_PushStatus');
|
||||
return query.find({useMasterKey: true});
|
||||
}).then((results) => {
|
||||
// verify status is NOT broken
|
||||
expect(results.length).toBe(1);
|
||||
const result = results[0];
|
||||
expect(result.get('status')).toEqual('succeeded');
|
||||
// expect # less than # of batches used, assuming each batch is 100 pushes
|
||||
expect(result.get('numSent')).toEqual(devices - (devices / 100));
|
||||
expect(result.get('count')).toEqual(undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Verifies that _PushStatus cannot get stuck in a 'running' state
|
||||
* Simulates an extended push, where some installations may be added,
|
||||
* resulting in a non-zero count
|
||||
*/
|
||||
it('does not get stuck with _PushStatus \'running\' on many installations added', (done) => {
|
||||
const devices = 1000;
|
||||
const installations = provideInstallations(devices);
|
||||
|
||||
// add 1 iOS installation which we will omit & add later on
|
||||
const iOSInstallations = [];
|
||||
|
||||
while(iOSInstallations.length !== (devices / 100)) {
|
||||
const iOSInstallation = new Parse.Object("_Installation");
|
||||
iOSInstallation.set("installationId", "installation_" + installations.length);
|
||||
iOSInstallation.set("deviceToken", "device_token_" + installations.length);
|
||||
iOSInstallation.set("deviceType", "ios");
|
||||
installations.push(iOSInstallation);
|
||||
iOSInstallations.push(iOSInstallation);
|
||||
}
|
||||
|
||||
reconfigureServer({
|
||||
push: {
|
||||
adapter: {
|
||||
send: function(body, installations) {
|
||||
// simulate having added an installation before this was called
|
||||
// thus invalidating our 'count' in _PushStatus
|
||||
installations.push(iOSInstallations.pop());
|
||||
|
||||
return successfulAny(body, installations);
|
||||
},
|
||||
getValidPushTypes: function() {
|
||||
return ["android"];
|
||||
}
|
||||
}
|
||||
}
|
||||
}).then(() => {
|
||||
return Parse.Object.saveAll(installations);
|
||||
}).then(() => {
|
||||
return Parse.Push.send(
|
||||
{
|
||||
data: {alert: "We fixed our status!"},
|
||||
where: {deviceType: {'$ne' : 'random'}}
|
||||
},
|
||||
{ useMasterKey: true }
|
||||
);
|
||||
}).then(() => {
|
||||
// it is enqueued so it can take time
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
resolve();
|
||||
}, 1000);
|
||||
});
|
||||
}).then(() => {
|
||||
// query for push status
|
||||
const query = new Parse.Query('_PushStatus');
|
||||
return query.find({useMasterKey: true});
|
||||
}).then((results) => {
|
||||
// verify status is NOT broken
|
||||
expect(results.length).toBe(1);
|
||||
const result = results[0];
|
||||
expect(result.get('status')).toEqual('succeeded');
|
||||
// expect # less than # of batches used, assuming each batch is 100 pushes
|
||||
expect(result.get('numSent')).toEqual(devices + (devices / 100));
|
||||
expect(result.get('count')).toEqual(undefined);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -276,7 +276,7 @@ describe('PushWorker', () => {
|
||||
'failedPerType.ios': { __op: 'Increment', amount: 1 },
|
||||
[`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
|
||||
[`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
|
||||
count: { __op: 'Increment', amount: -2 },
|
||||
count: { __op: 'Increment', amount: -1 }
|
||||
});
|
||||
const query = new Parse.Query('_PushStatus');
|
||||
return query.get(handler.objectId, { useMasterKey: true });
|
||||
@@ -354,7 +354,7 @@ describe('PushWorker', () => {
|
||||
'failedPerType.ios': { __op: 'Increment', amount: 1 },
|
||||
[`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
|
||||
[`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
|
||||
count: { __op: 'Increment', amount: -2 },
|
||||
count: { __op: 'Increment', amount: -1 }
|
||||
});
|
||||
done();
|
||||
});
|
||||
|
||||
@@ -88,7 +88,7 @@ const defaultColumns = Object.freeze({
|
||||
"failedPerType": {type:'Object'},
|
||||
"sentPerUTCOffset": {type:'Object'},
|
||||
"failedPerUTCOffset": {type:'Object'},
|
||||
"count": {type:'Number'}
|
||||
"count": {type:'Number'} // tracks # of batches queued and pending
|
||||
},
|
||||
_JobStatus: {
|
||||
"jobName": {type: 'String'},
|
||||
|
||||
@@ -40,7 +40,7 @@ export class PushQueue {
|
||||
if (!results || count == 0) {
|
||||
return Promise.reject({error: 'PushController: no results in query'})
|
||||
}
|
||||
pushStatus.setRunning(count);
|
||||
pushStatus.setRunning(Math.ceil(count / limit));
|
||||
let skip = 0;
|
||||
while (skip < count) {
|
||||
const query = { where,
|
||||
|
||||
@@ -190,10 +190,18 @@ export function pushStatusHandler(config, existingObjectId) {
|
||||
});
|
||||
}
|
||||
|
||||
const setRunning = function(count) {
|
||||
logger.verbose(`_PushStatus ${objectId}: sending push to %d installations`, count);
|
||||
return handler.update({status:"pending", objectId: objectId},
|
||||
{status: "running", count });
|
||||
const setRunning = function(batches) {
|
||||
logger.verbose(`_PushStatus ${objectId}: sending push to installations with %d batches`, batches);
|
||||
return handler.update(
|
||||
{
|
||||
status:"pending",
|
||||
objectId: objectId
|
||||
},
|
||||
{
|
||||
status: "running",
|
||||
count: batches
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
const trackSent = function(results, UTCOffset, cleanupInstallations = process.env.PARSE_SERVER_CLEANUP_INVALID_INSTALLATIONS) {
|
||||
@@ -235,7 +243,6 @@ export function pushStatusHandler(config, existingObjectId) {
|
||||
}
|
||||
return memo;
|
||||
}, update);
|
||||
incrementOp(update, 'count', -results.length);
|
||||
}
|
||||
|
||||
logger.verbose(`_PushStatus ${objectId}: sent push! %d success, %d failures`, update.numSent, update.numFailed);
|
||||
@@ -259,6 +266,9 @@ export function pushStatusHandler(config, existingObjectId) {
|
||||
});
|
||||
}
|
||||
|
||||
// indicate this batch is complete
|
||||
incrementOp(update, 'count', -1);
|
||||
|
||||
return handler.update({ objectId }, update).then((res) => {
|
||||
if (res && res.count === 0) {
|
||||
return this.complete();
|
||||
|
||||
Reference in New Issue
Block a user