Handle shutdown for RedisCacheAdapter (#6658)
* Handle shutdown for RedisCacheAdapter * connected value need to be tested in setTimeout Co-authored-by: Promise Xu <promise@klido.me>
This commit is contained in:
@@ -9,17 +9,17 @@ and make sure a redis server is available on the default port
|
||||
*/
|
||||
describe_only(() => {
|
||||
return process.env.PARSE_SERVER_TEST_CACHE === 'redis';
|
||||
})('RedisCacheAdapter', function() {
|
||||
})('RedisCacheAdapter', function () {
|
||||
const KEY = 'hello';
|
||||
const VALUE = 'world';
|
||||
|
||||
function wait(sleep) {
|
||||
return new Promise(function(resolve) {
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(resolve, sleep);
|
||||
});
|
||||
}
|
||||
|
||||
it('should get/set/clear', done => {
|
||||
it('should get/set/clear', (done) => {
|
||||
const cache = new RedisCacheAdapter({
|
||||
ttl: NaN,
|
||||
});
|
||||
@@ -27,85 +27,94 @@ describe_only(() => {
|
||||
cache
|
||||
.put(KEY, VALUE)
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(VALUE))
|
||||
.then((value) => expect(value).toEqual(VALUE))
|
||||
.then(() => cache.clear())
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(null))
|
||||
.then((value) => expect(value).toEqual(null))
|
||||
.then(done);
|
||||
});
|
||||
|
||||
it('should expire after ttl', done => {
|
||||
it('should expire after ttl', (done) => {
|
||||
const cache = new RedisCacheAdapter(null, 50);
|
||||
|
||||
cache
|
||||
.put(KEY, VALUE)
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(VALUE))
|
||||
.then((value) => expect(value).toEqual(VALUE))
|
||||
.then(wait.bind(null, 52))
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(null))
|
||||
.then((value) => expect(value).toEqual(null))
|
||||
.then(done);
|
||||
});
|
||||
|
||||
it('should not store value for ttl=0', done => {
|
||||
it('should not store value for ttl=0', (done) => {
|
||||
const cache = new RedisCacheAdapter(null, 5);
|
||||
|
||||
cache
|
||||
.put(KEY, VALUE, 0)
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(null))
|
||||
.then((value) => expect(value).toEqual(null))
|
||||
.then(done);
|
||||
});
|
||||
|
||||
it('should not expire when ttl=Infinity', done => {
|
||||
it('should not expire when ttl=Infinity', (done) => {
|
||||
const cache = new RedisCacheAdapter(null, 1);
|
||||
|
||||
cache
|
||||
.put(KEY, VALUE, Infinity)
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(VALUE))
|
||||
.then((value) => expect(value).toEqual(VALUE))
|
||||
.then(wait.bind(null, 5))
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(VALUE))
|
||||
.then((value) => expect(value).toEqual(VALUE))
|
||||
.then(done);
|
||||
});
|
||||
|
||||
it('should fallback to default ttl', done => {
|
||||
it('should fallback to default ttl', (done) => {
|
||||
const cache = new RedisCacheAdapter(null, 1);
|
||||
let promise = Promise.resolve();
|
||||
|
||||
[-100, null, undefined, 'not number', true].forEach(ttl => {
|
||||
[-100, null, undefined, 'not number', true].forEach((ttl) => {
|
||||
promise = promise.then(() =>
|
||||
cache
|
||||
.put(KEY, VALUE, ttl)
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(VALUE))
|
||||
.then((value) => expect(value).toEqual(VALUE))
|
||||
.then(wait.bind(null, 5))
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(null))
|
||||
.then((value) => expect(value).toEqual(null))
|
||||
);
|
||||
});
|
||||
|
||||
promise.then(done);
|
||||
});
|
||||
|
||||
it('should find un-expired records', done => {
|
||||
it('should find un-expired records', (done) => {
|
||||
const cache = new RedisCacheAdapter(null, 5);
|
||||
|
||||
cache
|
||||
.put(KEY, VALUE)
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).toEqual(VALUE))
|
||||
.then((value) => expect(value).toEqual(VALUE))
|
||||
.then(wait.bind(null, 1))
|
||||
.then(() => cache.get(KEY))
|
||||
.then(value => expect(value).not.toEqual(null))
|
||||
.then((value) => expect(value).not.toEqual(null))
|
||||
.then(done);
|
||||
});
|
||||
|
||||
it('handleShutdown, close connection', async () => {
|
||||
const cache = new RedisCacheAdapter(null, 5);
|
||||
|
||||
await cache.handleShutdown();
|
||||
setTimeout(() => {
|
||||
expect(cache.client.connected).toBe(false);
|
||||
}, 0);
|
||||
});
|
||||
});
|
||||
|
||||
describe_only(() => {
|
||||
return process.env.PARSE_SERVER_TEST_CACHE === 'redis';
|
||||
})('RedisCacheAdapter/KeyPromiseQueue', function() {
|
||||
})('RedisCacheAdapter/KeyPromiseQueue', function () {
|
||||
const KEY1 = 'key1';
|
||||
const KEY2 = 'key2';
|
||||
const VALUE = 'hello';
|
||||
@@ -120,7 +129,7 @@ describe_only(() => {
|
||||
return Object.keys(cache.queue.queue).length;
|
||||
}
|
||||
|
||||
it('it should clear completed operations from queue', done => {
|
||||
it('it should clear completed operations from queue', (done) => {
|
||||
const cache = new RedisCacheAdapter({ ttl: NaN });
|
||||
|
||||
// execute a bunch of operations in sequence
|
||||
@@ -142,7 +151,7 @@ describe_only(() => {
|
||||
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
|
||||
});
|
||||
|
||||
it('it should count per key chained operations correctly', done => {
|
||||
it('it should count per key chained operations correctly', (done) => {
|
||||
const cache = new RedisCacheAdapter({ ttl: NaN });
|
||||
|
||||
let key1Promise = Promise.resolve();
|
||||
@@ -168,7 +177,7 @@ describe_only(() => {
|
||||
|
||||
describe_only(() => {
|
||||
return process.env.PARSE_SERVER_TEST_CACHE === 'redis';
|
||||
})('Redis Performance', function() {
|
||||
})('Redis Performance', function () {
|
||||
let cacheAdapter;
|
||||
let getSpy;
|
||||
let putSpy;
|
||||
|
||||
@@ -9,7 +9,7 @@ function debug() {
|
||||
logger.debug.apply(logger, ['RedisCacheAdapter', ...arguments]);
|
||||
}
|
||||
|
||||
const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0;
|
||||
const isValidTTL = (ttl) => typeof ttl === 'number' && ttl > 0;
|
||||
|
||||
export class RedisCacheAdapter {
|
||||
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
|
||||
@@ -18,13 +18,27 @@ export class RedisCacheAdapter {
|
||||
this.queue = new KeyPromiseQueue();
|
||||
}
|
||||
|
||||
handleShutdown() {
|
||||
if (!this.client) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
return new Promise((resolve) => {
|
||||
this.client.quit((err) => {
|
||||
if (err) {
|
||||
logger.error('RedisCacheAdapter error on shutdown', { error: err });
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
get(key) {
|
||||
debug('get', key);
|
||||
return this.queue.enqueue(
|
||||
key,
|
||||
() =>
|
||||
new Promise(resolve => {
|
||||
this.client.get(key, function(err, res) {
|
||||
new Promise((resolve) => {
|
||||
this.client.get(key, function (err, res) {
|
||||
debug('-> get', key, res);
|
||||
if (!res) {
|
||||
return resolve(null);
|
||||
@@ -48,8 +62,8 @@ export class RedisCacheAdapter {
|
||||
return this.queue.enqueue(
|
||||
key,
|
||||
() =>
|
||||
new Promise(resolve => {
|
||||
this.client.set(key, value, function() {
|
||||
new Promise((resolve) => {
|
||||
this.client.set(key, value, function () {
|
||||
resolve();
|
||||
});
|
||||
})
|
||||
@@ -63,8 +77,8 @@ export class RedisCacheAdapter {
|
||||
return this.queue.enqueue(
|
||||
key,
|
||||
() =>
|
||||
new Promise(resolve => {
|
||||
this.client.psetex(key, ttl, value, function() {
|
||||
new Promise((resolve) => {
|
||||
this.client.psetex(key, ttl, value, function () {
|
||||
resolve();
|
||||
});
|
||||
})
|
||||
@@ -76,8 +90,8 @@ export class RedisCacheAdapter {
|
||||
return this.queue.enqueue(
|
||||
key,
|
||||
() =>
|
||||
new Promise(resolve => {
|
||||
this.client.del(key, function() {
|
||||
new Promise((resolve) => {
|
||||
this.client.del(key, function () {
|
||||
resolve();
|
||||
});
|
||||
})
|
||||
@@ -89,8 +103,8 @@ export class RedisCacheAdapter {
|
||||
return this.queue.enqueue(
|
||||
FLUSH_DB_KEY,
|
||||
() =>
|
||||
new Promise(resolve => {
|
||||
this.client.flushdb(function() {
|
||||
new Promise((resolve) => {
|
||||
this.client.flushdb(function () {
|
||||
resolve();
|
||||
});
|
||||
})
|
||||
|
||||
@@ -85,7 +85,7 @@ class ParseServer {
|
||||
serverStartComplete();
|
||||
}
|
||||
})
|
||||
.catch(error => {
|
||||
.catch((error) => {
|
||||
if (serverStartComplete) {
|
||||
serverStartComplete(error);
|
||||
} else {
|
||||
@@ -126,6 +126,10 @@ class ParseServer {
|
||||
if (fileAdapter && typeof fileAdapter.handleShutdown === 'function') {
|
||||
promises.push(fileAdapter.handleShutdown());
|
||||
}
|
||||
const { adapter: cacheAdapter } = this.config.cacheController;
|
||||
if (cacheAdapter && typeof cacheAdapter.handleShutdown === 'function') {
|
||||
promises.push(cacheAdapter.handleShutdown());
|
||||
}
|
||||
return (promises.length > 0
|
||||
? Promise.all(promises)
|
||||
: Promise.resolve()
|
||||
@@ -154,7 +158,7 @@ class ParseServer {
|
||||
})
|
||||
);
|
||||
|
||||
api.use('/health', function(req, res) {
|
||||
api.use('/health', function (req, res) {
|
||||
res.json({
|
||||
status: 'ok',
|
||||
});
|
||||
@@ -179,7 +183,7 @@ class ParseServer {
|
||||
if (!process.env.TESTING) {
|
||||
//This causes tests to spew some useless warnings, so disable in test
|
||||
/* istanbul ignore next */
|
||||
process.on('uncaughtException', err => {
|
||||
process.on('uncaughtException', (err) => {
|
||||
if (err.code === 'EADDRINUSE') {
|
||||
// user-friendly message for this common error
|
||||
process.stderr.write(
|
||||
@@ -192,7 +196,7 @@ class ParseServer {
|
||||
});
|
||||
// verify the server url after a 'mount' event is received
|
||||
/* istanbul ignore next */
|
||||
api.on('mount', function() {
|
||||
api.on('mount', function () {
|
||||
ParseServer.verifyServerUrl();
|
||||
});
|
||||
}
|
||||
@@ -334,8 +338,8 @@ class ParseServer {
|
||||
if (Parse.serverURL) {
|
||||
const request = require('./request');
|
||||
request({ url: Parse.serverURL.replace(/\/$/, '') + '/health' })
|
||||
.catch(response => response)
|
||||
.then(response => {
|
||||
.catch((response) => response)
|
||||
.then((response) => {
|
||||
const json = response.data || null;
|
||||
if (
|
||||
response.status !== 200 ||
|
||||
@@ -368,7 +372,7 @@ function addParseCloud() {
|
||||
}
|
||||
|
||||
function injectDefaults(options: ParseServerOptions) {
|
||||
Object.keys(defaults).forEach(key => {
|
||||
Object.keys(defaults).forEach((key) => {
|
||||
if (!Object.prototype.hasOwnProperty.call(options, key)) {
|
||||
options[key] = defaults[key];
|
||||
}
|
||||
@@ -424,12 +428,12 @@ function injectDefaults(options: ParseServerOptions) {
|
||||
}
|
||||
|
||||
// Merge protectedFields options with defaults.
|
||||
Object.keys(defaults.protectedFields).forEach(c => {
|
||||
Object.keys(defaults.protectedFields).forEach((c) => {
|
||||
const cur = options.protectedFields[c];
|
||||
if (!cur) {
|
||||
options.protectedFields[c] = defaults.protectedFields[c];
|
||||
} else {
|
||||
Object.keys(defaults.protectedFields[c]).forEach(r => {
|
||||
Object.keys(defaults.protectedFields[c]).forEach((r) => {
|
||||
const unq = new Set([
|
||||
...(options.protectedFields[c][r] || []),
|
||||
...defaults.protectedFields[c][r],
|
||||
@@ -453,7 +457,7 @@ function configureListeners(parseServer) {
|
||||
const sockets = {};
|
||||
/* Currently, express doesn't shut down immediately after receiving SIGINT/SIGTERM if it has client connections that haven't timed out. (This is a known issue with node - https://github.com/nodejs/node/issues/2642)
|
||||
This function, along with `destroyAliveConnections()`, intend to fix this behavior such that parse server will close all open connections and initiate the shutdown process as soon as it receives a SIGINT/SIGTERM signal. */
|
||||
server.on('connection', socket => {
|
||||
server.on('connection', (socket) => {
|
||||
const socketId = socket.remoteAddress + ':' + socket.remotePort;
|
||||
sockets[socketId] = socket;
|
||||
socket.on('close', () => {
|
||||
@@ -461,7 +465,7 @@ function configureListeners(parseServer) {
|
||||
});
|
||||
});
|
||||
|
||||
const destroyAliveConnections = function() {
|
||||
const destroyAliveConnections = function () {
|
||||
for (const socketId in sockets) {
|
||||
try {
|
||||
sockets[socketId].destroy();
|
||||
@@ -471,7 +475,7 @@ function configureListeners(parseServer) {
|
||||
}
|
||||
};
|
||||
|
||||
const handleShutdown = function() {
|
||||
const handleShutdown = function () {
|
||||
process.stdout.write('Termination signal received. Shutting down.');
|
||||
destroyAliveConnections();
|
||||
server.close();
|
||||
|
||||
Reference in New Issue
Block a user