feat: Upgrade Redis 3 to 4 (#8293)
BREAKING CHANGE: This release upgrades to Redis 4; if you are using the Redis cache adapter with Parse Server then this is a breaking change as the Redis client options have changed; see the [Redis migration guide](https://github.com/redis/node-redis/blob/redis%404.0.0/docs/v3-to-v4.md) for more details (#8293)
This commit is contained in:
648
package-lock.json
generated
648
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -49,7 +49,7 @@
|
|||||||
"pg-monitor": "1.5.0",
|
"pg-monitor": "1.5.0",
|
||||||
"pg-promise": "10.12.1",
|
"pg-promise": "10.12.1",
|
||||||
"pluralize": "8.0.0",
|
"pluralize": "8.0.0",
|
||||||
"redis": "3.1.2",
|
"redis": "4.0.6",
|
||||||
"semver": "7.3.8",
|
"semver": "7.3.8",
|
||||||
"subscriptions-transport-ws": "0.11.0",
|
"subscriptions-transport-ws": "0.11.0",
|
||||||
"tv4": "1.3.0",
|
"tv4": "1.3.0",
|
||||||
|
|||||||
@@ -677,4 +677,33 @@ describe('DefinedSchemas', () => {
|
|||||||
expect(testSchema.classLevelPermissions.create).toEqual({ requiresAuthentication: true });
|
expect(testSchema.classLevelPermissions.create).toEqual({ requiresAuthentication: true });
|
||||||
expect(logger.error).toHaveBeenCalledTimes(0);
|
expect(logger.error).toHaveBeenCalledTimes(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should not affect cacheAdapter', async () => {
|
||||||
|
const server = await reconfigureServer();
|
||||||
|
const logger = require('../lib/logger').logger;
|
||||||
|
spyOn(logger, 'error').and.callThrough();
|
||||||
|
const migrationOptions = {
|
||||||
|
definitions: [
|
||||||
|
{
|
||||||
|
className: 'Test',
|
||||||
|
fields: { aField: { type: 'String' } },
|
||||||
|
indexes: { aField: { aField: 1 } },
|
||||||
|
classLevelPermissions: {
|
||||||
|
create: { requiresAuthentication: true },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
|
||||||
|
const cacheAdapter = {
|
||||||
|
get: () => Promise.resolve(null),
|
||||||
|
put: () => {},
|
||||||
|
del: () => {},
|
||||||
|
clear: () => {},
|
||||||
|
connect: jasmine.createSpy('clear'),
|
||||||
|
};
|
||||||
|
server.config.cacheAdapter = cacheAdapter;
|
||||||
|
await new DefinedSchemas(migrationOptions, server.config).execute();
|
||||||
|
expect(cacheAdapter.connect).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -19,23 +19,22 @@ describe_only(() => {
|
|||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
cache = new RedisCacheAdapter(null, 100);
|
cache = new RedisCacheAdapter(null, 100);
|
||||||
|
await cache.connect();
|
||||||
await cache.clear();
|
await cache.clear();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should get/set/clear', done => {
|
it('should get/set/clear', async () => {
|
||||||
const cacheNaN = new RedisCacheAdapter({
|
const cacheNaN = new RedisCacheAdapter({
|
||||||
ttl: NaN,
|
ttl: NaN,
|
||||||
});
|
});
|
||||||
|
await cacheNaN.connect();
|
||||||
cacheNaN
|
await cacheNaN.put(KEY, VALUE);
|
||||||
.put(KEY, VALUE)
|
let value = await cacheNaN.get(KEY);
|
||||||
.then(() => cacheNaN.get(KEY))
|
expect(value).toEqual(VALUE);
|
||||||
.then(value => expect(value).toEqual(VALUE))
|
await cacheNaN.clear();
|
||||||
.then(() => cacheNaN.clear())
|
value = await cacheNaN.get(KEY);
|
||||||
.then(() => cacheNaN.get(KEY))
|
expect(value).toEqual(null);
|
||||||
.then(value => expect(value).toEqual(null))
|
await cacheNaN.clear();
|
||||||
.then(() => cacheNaN.clear())
|
|
||||||
.then(done);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should expire after ttl', done => {
|
it('should expire after ttl', done => {
|
||||||
@@ -100,7 +99,7 @@ describe_only(() => {
|
|||||||
it('handleShutdown, close connection', async () => {
|
it('handleShutdown, close connection', async () => {
|
||||||
await cache.handleShutdown();
|
await cache.handleShutdown();
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
expect(cache.client.connected).toBe(false);
|
expect(cache.client.isOpen).toBe(false);
|
||||||
}, 0);
|
}, 0);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -122,8 +121,9 @@ describe_only(() => {
|
|||||||
return Object.keys(cache.queue.queue).length;
|
return Object.keys(cache.queue.queue).length;
|
||||||
}
|
}
|
||||||
|
|
||||||
it('it should clear completed operations from queue', done => {
|
it('it should clear completed operations from queue', async done => {
|
||||||
const cache = new RedisCacheAdapter({ ttl: NaN });
|
const cache = new RedisCacheAdapter({ ttl: NaN });
|
||||||
|
await cache.connect();
|
||||||
|
|
||||||
// execute a bunch of operations in sequence
|
// execute a bunch of operations in sequence
|
||||||
let promise = Promise.resolve();
|
let promise = Promise.resolve();
|
||||||
@@ -144,8 +144,9 @@ describe_only(() => {
|
|||||||
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
|
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('it should count per key chained operations correctly', done => {
|
it('it should count per key chained operations correctly', async done => {
|
||||||
const cache = new RedisCacheAdapter({ ttl: NaN });
|
const cache = new RedisCacheAdapter({ ttl: NaN });
|
||||||
|
await cache.connect();
|
||||||
|
|
||||||
let key1Promise = Promise.resolve();
|
let key1Promise = Promise.resolve();
|
||||||
let key2Promise = Promise.resolve();
|
let key2Promise = Promise.resolve();
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import redis from 'redis';
|
import { createClient } from 'redis';
|
||||||
import logger from '../../logger';
|
import logger from '../../logger';
|
||||||
import { KeyPromiseQueue } from '../../KeyPromiseQueue';
|
import { KeyPromiseQueue } from '../../KeyPromiseQueue';
|
||||||
|
|
||||||
@@ -15,114 +15,76 @@ const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0;
|
|||||||
export class RedisCacheAdapter {
|
export class RedisCacheAdapter {
|
||||||
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
|
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
|
||||||
this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL;
|
this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL;
|
||||||
this.client = redis.createClient(redisCtx);
|
this.client = createClient(redisCtx);
|
||||||
this.queue = new KeyPromiseQueue();
|
this.queue = new KeyPromiseQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
handleShutdown() {
|
async connect() {
|
||||||
if (!this.client) {
|
if (this.client.isOpen) {
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
return new Promise(resolve => {
|
return this.client.connect();
|
||||||
this.client.quit(err => {
|
|
||||||
if (err) {
|
|
||||||
logger.error('RedisCacheAdapter error on shutdown', { error: err });
|
|
||||||
}
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get(key) {
|
async handleShutdown() {
|
||||||
|
if (!this.client) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await this.client.quit();
|
||||||
|
} catch (err) {
|
||||||
|
logger.error('RedisCacheAdapter error on shutdown', { error: err });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async get(key) {
|
||||||
debug('get', { key });
|
debug('get', { key });
|
||||||
return this.queue.enqueue(
|
try {
|
||||||
key,
|
await this.queue.enqueue(key);
|
||||||
() =>
|
const res = await this.client.get(key);
|
||||||
new Promise(resolve => {
|
if (!res) {
|
||||||
this.client.get(key, function (err, res) {
|
return null;
|
||||||
debug('-> get', { key, res });
|
}
|
||||||
if (!res) {
|
return JSON.parse(res);
|
||||||
return resolve(null);
|
} catch (err) {
|
||||||
}
|
logger.error('RedisCacheAdapter error on get', { error: err });
|
||||||
resolve(JSON.parse(res));
|
}
|
||||||
});
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
put(key, value, ttl = this.ttl) {
|
async put(key, value, ttl = this.ttl) {
|
||||||
value = JSON.stringify(value);
|
value = JSON.stringify(value);
|
||||||
debug('put', { key, value, ttl });
|
debug('put', { key, value, ttl });
|
||||||
|
await this.queue.enqueue(key);
|
||||||
if (ttl === 0) {
|
if (ttl === 0) {
|
||||||
// ttl of zero is a logical no-op, but redis cannot set expire time of zero
|
// ttl of zero is a logical no-op, but redis cannot set expire time of zero
|
||||||
return this.queue.enqueue(key, () => Promise.resolve());
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ttl === Infinity) {
|
if (ttl === Infinity) {
|
||||||
return this.queue.enqueue(
|
return this.client.set(key, value);
|
||||||
key,
|
|
||||||
() =>
|
|
||||||
new Promise(resolve => {
|
|
||||||
this.client.set(key, value, function () {
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isValidTTL(ttl)) {
|
if (!isValidTTL(ttl)) {
|
||||||
ttl = this.ttl;
|
ttl = this.ttl;
|
||||||
}
|
}
|
||||||
|
return this.client.set(key, value, { PX: ttl });
|
||||||
return this.queue.enqueue(
|
|
||||||
key,
|
|
||||||
() =>
|
|
||||||
new Promise(resolve => {
|
|
||||||
this.client.psetex(key, ttl, value, function () {
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
del(key) {
|
async del(key) {
|
||||||
debug('del', { key });
|
debug('del', { key });
|
||||||
return this.queue.enqueue(
|
await this.queue.enqueue(key);
|
||||||
key,
|
return this.client.del(key);
|
||||||
() =>
|
|
||||||
new Promise(resolve => {
|
|
||||||
this.client.del(key, function () {
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
clear() {
|
async clear() {
|
||||||
debug('clear');
|
debug('clear');
|
||||||
return this.queue.enqueue(
|
await this.queue.enqueue(FLUSH_DB_KEY);
|
||||||
FLUSH_DB_KEY,
|
return this.client.sendCommand(['FLUSHDB']);
|
||||||
() =>
|
|
||||||
new Promise(resolve => {
|
|
||||||
this.client.flushdb(function () {
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used for testing
|
// Used for testing
|
||||||
async getAllKeys() {
|
getAllKeys() {
|
||||||
return new Promise((resolve, reject) => {
|
return this.client.keys('*');
|
||||||
this.client.keys('*', (err, keys) => {
|
|
||||||
if (err) {
|
|
||||||
reject(err);
|
|
||||||
} else {
|
|
||||||
resolve(keys);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,13 @@
|
|||||||
import redis from 'redis';
|
import { createClient } from 'redis';
|
||||||
|
|
||||||
function createPublisher({ redisURL, redisOptions = {} }): any {
|
function createPublisher({ redisURL, redisOptions = {} }): any {
|
||||||
redisOptions.no_ready_check = true;
|
redisOptions.no_ready_check = true;
|
||||||
return redis.createClient(redisURL, redisOptions);
|
return createClient(redisURL, redisOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
function createSubscriber({ redisURL, redisOptions = {} }): any {
|
function createSubscriber({ redisURL, redisOptions = {} }): any {
|
||||||
redisOptions.no_ready_check = true;
|
redisOptions.no_ready_check = true;
|
||||||
return redis.createClient(redisURL, redisOptions);
|
return createClient(redisURL, redisOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
const RedisPubSub = {
|
const RedisPubSub = {
|
||||||
|
|||||||
@@ -87,9 +87,18 @@ class ParseServer {
|
|||||||
.performInitialization()
|
.performInitialization()
|
||||||
.then(() => hooksController.load())
|
.then(() => hooksController.load())
|
||||||
.then(async () => {
|
.then(async () => {
|
||||||
|
const startupPromises = [];
|
||||||
if (schema) {
|
if (schema) {
|
||||||
await new DefinedSchemas(schema, this.config).execute();
|
startupPromises.push(new DefinedSchemas(schema, this.config).execute());
|
||||||
}
|
}
|
||||||
|
if (
|
||||||
|
options.cacheAdapter &&
|
||||||
|
options.cacheAdapter.connect &&
|
||||||
|
typeof options.cacheAdapter.connect === 'function'
|
||||||
|
) {
|
||||||
|
startupPromises.push(options.cacheAdapter.connect());
|
||||||
|
}
|
||||||
|
await Promise.all(startupPromises);
|
||||||
if (serverStartComplete) {
|
if (serverStartComplete) {
|
||||||
serverStartComplete();
|
serverStartComplete();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user