diff --git a/spec/RedisCacheAdapter.spec.js b/spec/RedisCacheAdapter.spec.js index aa759cce..1b2dbcc9 100644 --- a/spec/RedisCacheAdapter.spec.js +++ b/spec/RedisCacheAdapter.spec.js @@ -45,6 +45,48 @@ describe_only(() => { .then(done); }); + it('should not store value for ttl=0', done => { + const cache = new RedisCacheAdapter(null, 5); + + cache + .put(KEY, VALUE, 0) + .then(() => cache.get(KEY)) + .then(value => expect(value).toEqual(null)) + .then(done); + }); + + it('should not expire when ttl=Infinity', done => { + const cache = new RedisCacheAdapter(null, 1); + + cache + .put(KEY, VALUE, Infinity) + .then(() => cache.get(KEY)) + .then(value => expect(value).toEqual(VALUE)) + .then(wait.bind(null, 1)) + .then(() => cache.get(KEY)) + .then(value => expect(value).toEqual(VALUE)) + .then(done); + }); + + it('should fallback to default ttl', done => { + const cache = new RedisCacheAdapter(null, 1); + let promise = Promise.resolve(); + + [-100, null, undefined, 'not number', true].forEach(ttl => { + promise = promise.then(() => + cache + .put(KEY, VALUE, ttl) + .then(() => cache.get(KEY)) + .then(value => expect(value).toEqual(VALUE)) + .then(wait.bind(null, 2)) + .then(() => cache.get(KEY)) + .then(value => expect(value).toEqual(null)) + ); + }); + + promise.then(done); + }); + it('should find un-expired records', done => { const cache = new RedisCacheAdapter(null, 5); @@ -58,3 +100,66 @@ describe_only(() => { .then(done); }); }); + +describe_only(() => { + return process.env.PARSE_SERVER_TEST_CACHE === 'redis'; +})('RedisCacheAdapter/KeyPromiseQueue', function() { + const KEY1 = 'key1'; + const KEY2 = 'key2'; + const VALUE = 'hello'; + + // number of chained ops on a single key + function getQueueCountForKey(cache, key) { + return cache.queue.queue[key][0]; + } + + // total number of queued keys + function getQueueCount(cache) { + return Object.keys(cache.queue.queue).length; + } + + it('it should clear completed operations from queue', done => { + const cache = new RedisCacheAdapter({ ttl: NaN }); + + // execute a bunch of operations in sequence + let promise = Promise.resolve(); + for (let index = 1; index < 100; index++) { + promise = promise.then(() => { + const key = `${index}`; + return cache + .put(key, VALUE) + .then(() => expect(getQueueCount(cache)).toEqual(0)) + .then(() => cache.get(key)) + .then(() => expect(getQueueCount(cache)).toEqual(0)) + .then(() => cache.clear()) + .then(() => expect(getQueueCount(cache)).toEqual(0)); + }); + } + + // at the end the queue should be empty + promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done); + }); + + it('it should count per key chained operations correctly', done => { + const cache = new RedisCacheAdapter({ ttl: NaN }); + + let key1Promise = Promise.resolve(); + let key2Promise = Promise.resolve(); + for (let index = 1; index < 100; index++) { + key1Promise = cache.put(KEY1, VALUE); + key2Promise = cache.put(KEY2, VALUE); + // per key chain should be equal to index, which is the + // total number of operations on that key + expect(getQueueCountForKey(cache, KEY1)).toEqual(index); + expect(getQueueCountForKey(cache, KEY2)).toEqual(index); + // the total keys counts should be equal to the different keys + // we have currently being processed. + expect(getQueueCount(cache)).toEqual(2); + } + + // at the end the queue should be empty + Promise.all([key1Promise, key2Promise]) + .then(() => expect(getQueueCount(cache)).toEqual(0)) + .then(done); + }); +}); diff --git a/src/Adapters/Cache/RedisCacheAdapter.js b/src/Adapters/Cache/RedisCacheAdapter.js deleted file mode 100644 index bc8bf894..00000000 --- a/src/Adapters/Cache/RedisCacheAdapter.js +++ /dev/null @@ -1,83 +0,0 @@ -import redis from 'redis'; -import logger from '../../logger'; - -const DEFAULT_REDIS_TTL = 30 * 1000; // 30 seconds in milliseconds - -function debug() { - logger.debug.apply(logger, ['RedisCacheAdapter', ...arguments]); -} - -export class RedisCacheAdapter { - constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) { - this.client = redis.createClient(redisCtx); - this.p = Promise.resolve(); - this.ttl = ttl; - } - - get(key) { - debug('get', key); - this.p = this.p.then(() => { - return new Promise(resolve => { - this.client.get(key, function(err, res) { - debug('-> get', key, res); - if (!res) { - return resolve(null); - } - resolve(JSON.parse(res)); - }); - }); - }); - return this.p; - } - - put(key, value, ttl = this.ttl) { - value = JSON.stringify(value); - debug('put', key, value, ttl); - if (ttl === 0) { - return this.p; // ttl of zero is a logical no-op, but redis cannot set expire time of zero - } - if (ttl < 0 || isNaN(ttl)) { - ttl = DEFAULT_REDIS_TTL; - } - this.p = this.p.then(() => { - return new Promise(resolve => { - if (ttl === Infinity) { - this.client.set(key, value, function() { - resolve(); - }); - } else { - this.client.psetex(key, ttl, value, function() { - resolve(); - }); - } - }); - }); - return this.p; - } - - del(key) { - debug('del', key); - this.p = this.p.then(() => { - return new Promise(resolve => { - this.client.del(key, function() { - resolve(); - }); - }); - }); - return this.p; - } - - clear() { - debug('clear'); - this.p = this.p.then(() => { - return new Promise(resolve => { - this.client.flushdb(function() { - resolve(); - }); - }); - }); - return this.p; - } -} - -export default RedisCacheAdapter; diff --git a/src/Adapters/Cache/RedisCacheAdapter/KeyPromiseQueue.js b/src/Adapters/Cache/RedisCacheAdapter/KeyPromiseQueue.js new file mode 100644 index 00000000..64458f34 --- /dev/null +++ b/src/Adapters/Cache/RedisCacheAdapter/KeyPromiseQueue.js @@ -0,0 +1,43 @@ +// KeyPromiseQueue is a simple promise queue +// used to queue operations per key basis. +// Once the tail promise in the key-queue fulfills, +// the chain on that key will be cleared. +export class KeyPromiseQueue { + constructor() { + this.queue = {}; + } + + enqueue(key, operation) { + const tuple = this.beforeOp(key); + const toAwait = tuple[1]; + const nextOperation = toAwait.then(operation); + const wrappedOperation = nextOperation.then(result => { + this.afterOp(key); + return result; + }); + tuple[1] = wrappedOperation; + return wrappedOperation; + } + + beforeOp(key) { + let tuple = this.queue[key]; + if (!tuple) { + tuple = [0, Promise.resolve()]; + this.queue[key] = tuple; + } + tuple[0]++; + return tuple; + } + + afterOp(key) { + const tuple = this.queue[key]; + if (!tuple) { + return; + } + tuple[0]--; + if (tuple[0] <= 0) { + delete this.queue[key]; + return; + } + } +} diff --git a/src/Adapters/Cache/RedisCacheAdapter/index.js b/src/Adapters/Cache/RedisCacheAdapter/index.js new file mode 100644 index 00000000..7c771a23 --- /dev/null +++ b/src/Adapters/Cache/RedisCacheAdapter/index.js @@ -0,0 +1,101 @@ +import redis from 'redis'; +import logger from '../../../logger'; +import { KeyPromiseQueue } from './KeyPromiseQueue'; + +const DEFAULT_REDIS_TTL = 30 * 1000; // 30 seconds in milliseconds +const FLUSH_DB_KEY = '__flush_db__'; + +function debug() { + logger.debug.apply(logger, ['RedisCacheAdapter', ...arguments]); +} + +const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0; + +export class RedisCacheAdapter { + constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) { + this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL; + this.client = redis.createClient(redisCtx); + this.queue = new KeyPromiseQueue(); + } + + get(key) { + debug('get', key); + return this.queue.enqueue( + key, + () => + new Promise(resolve => { + this.client.get(key, function(err, res) { + debug('-> get', key, res); + if (!res) { + return resolve(null); + } + resolve(JSON.parse(res)); + }); + }) + ); + } + + put(key, value, ttl = this.ttl) { + value = JSON.stringify(value); + debug('put', key, value, ttl); + + if (ttl === 0) { + // ttl of zero is a logical no-op, but redis cannot set expire time of zero + return this.queue.enqueue(key, () => Promise.resolve()); + } + + if (ttl === Infinity) { + return this.queue.enqueue( + key, + () => + new Promise(resolve => { + this.client.set(key, value, function() { + resolve(); + }); + }) + ); + } + + if (!isValidTTL(ttl)) { + ttl = this.ttl; + } + + return this.queue.enqueue( + key, + () => + new Promise(resolve => { + this.client.psetex(key, ttl, value, function() { + resolve(); + }); + }) + ); + } + + del(key) { + debug('del', key); + return this.queue.enqueue( + key, + () => + new Promise(resolve => { + this.client.del(key, function() { + resolve(); + }); + }) + ); + } + + clear() { + debug('clear'); + return this.queue.enqueue( + FLUSH_DB_KEY, + () => + new Promise(resolve => { + this.client.flushdb(function() { + resolve(); + }); + }) + ); + } +} + +export default RedisCacheAdapter;