feat: Upgrade Redis 3 to 4 (#8293)

BREAKING CHANGE: This release upgrades to Redis 4; if you are using the Redis cache adapter with Parse Server then this is a breaking change as the Redis client options have changed; see the [Redis migration guide](https://github.com/redis/node-redis/blob/redis%404.0.0/docs/v3-to-v4.md) for more details (#8293)
This commit is contained in:
dblythy
2022-11-11 11:16:50 +11:00
committed by GitHub
parent 9af9115f9d
commit 7d622f06a4
7 changed files with 445 additions and 400 deletions

648
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -49,7 +49,7 @@
"pg-monitor": "1.5.0", "pg-monitor": "1.5.0",
"pg-promise": "10.12.1", "pg-promise": "10.12.1",
"pluralize": "8.0.0", "pluralize": "8.0.0",
"redis": "3.1.2", "redis": "4.0.6",
"semver": "7.3.8", "semver": "7.3.8",
"subscriptions-transport-ws": "0.11.0", "subscriptions-transport-ws": "0.11.0",
"tv4": "1.3.0", "tv4": "1.3.0",

View File

@@ -677,4 +677,33 @@ describe('DefinedSchemas', () => {
expect(testSchema.classLevelPermissions.create).toEqual({ requiresAuthentication: true }); expect(testSchema.classLevelPermissions.create).toEqual({ requiresAuthentication: true });
expect(logger.error).toHaveBeenCalledTimes(0); expect(logger.error).toHaveBeenCalledTimes(0);
}); });
it('should not affect cacheAdapter', async () => {
const server = await reconfigureServer();
const logger = require('../lib/logger').logger;
spyOn(logger, 'error').and.callThrough();
const migrationOptions = {
definitions: [
{
className: 'Test',
fields: { aField: { type: 'String' } },
indexes: { aField: { aField: 1 } },
classLevelPermissions: {
create: { requiresAuthentication: true },
},
},
],
};
const cacheAdapter = {
get: () => Promise.resolve(null),
put: () => {},
del: () => {},
clear: () => {},
connect: jasmine.createSpy('clear'),
};
server.config.cacheAdapter = cacheAdapter;
await new DefinedSchemas(migrationOptions, server.config).execute();
expect(cacheAdapter.connect).not.toHaveBeenCalled();
});
}); });

View File

@@ -19,23 +19,22 @@ describe_only(() => {
beforeEach(async () => { beforeEach(async () => {
cache = new RedisCacheAdapter(null, 100); cache = new RedisCacheAdapter(null, 100);
await cache.connect();
await cache.clear(); await cache.clear();
}); });
it('should get/set/clear', done => { it('should get/set/clear', async () => {
const cacheNaN = new RedisCacheAdapter({ const cacheNaN = new RedisCacheAdapter({
ttl: NaN, ttl: NaN,
}); });
await cacheNaN.connect();
cacheNaN await cacheNaN.put(KEY, VALUE);
.put(KEY, VALUE) let value = await cacheNaN.get(KEY);
.then(() => cacheNaN.get(KEY)) expect(value).toEqual(VALUE);
.then(value => expect(value).toEqual(VALUE)) await cacheNaN.clear();
.then(() => cacheNaN.clear()) value = await cacheNaN.get(KEY);
.then(() => cacheNaN.get(KEY)) expect(value).toEqual(null);
.then(value => expect(value).toEqual(null)) await cacheNaN.clear();
.then(() => cacheNaN.clear())
.then(done);
}); });
it('should expire after ttl', done => { it('should expire after ttl', done => {
@@ -100,7 +99,7 @@ describe_only(() => {
it('handleShutdown, close connection', async () => { it('handleShutdown, close connection', async () => {
await cache.handleShutdown(); await cache.handleShutdown();
setTimeout(() => { setTimeout(() => {
expect(cache.client.connected).toBe(false); expect(cache.client.isOpen).toBe(false);
}, 0); }, 0);
}); });
}); });
@@ -122,8 +121,9 @@ describe_only(() => {
return Object.keys(cache.queue.queue).length; return Object.keys(cache.queue.queue).length;
} }
it('it should clear completed operations from queue', done => { it('it should clear completed operations from queue', async done => {
const cache = new RedisCacheAdapter({ ttl: NaN }); const cache = new RedisCacheAdapter({ ttl: NaN });
await cache.connect();
// execute a bunch of operations in sequence // execute a bunch of operations in sequence
let promise = Promise.resolve(); let promise = Promise.resolve();
@@ -144,8 +144,9 @@ describe_only(() => {
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done); promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
}); });
it('it should count per key chained operations correctly', done => { it('it should count per key chained operations correctly', async done => {
const cache = new RedisCacheAdapter({ ttl: NaN }); const cache = new RedisCacheAdapter({ ttl: NaN });
await cache.connect();
let key1Promise = Promise.resolve(); let key1Promise = Promise.resolve();
let key2Promise = Promise.resolve(); let key2Promise = Promise.resolve();

View File

@@ -1,4 +1,4 @@
import redis from 'redis'; import { createClient } from 'redis';
import logger from '../../logger'; import logger from '../../logger';
import { KeyPromiseQueue } from '../../KeyPromiseQueue'; import { KeyPromiseQueue } from '../../KeyPromiseQueue';
@@ -15,114 +15,76 @@ const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0;
export class RedisCacheAdapter { export class RedisCacheAdapter {
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) { constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL; this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL;
this.client = redis.createClient(redisCtx); this.client = createClient(redisCtx);
this.queue = new KeyPromiseQueue(); this.queue = new KeyPromiseQueue();
} }
handleShutdown() { async connect() {
if (!this.client) { if (this.client.isOpen) {
return Promise.resolve(); return;
} }
return new Promise(resolve => { return this.client.connect();
this.client.quit(err => {
if (err) {
logger.error('RedisCacheAdapter error on shutdown', { error: err });
}
resolve();
});
});
} }
get(key) { async handleShutdown() {
if (!this.client) {
return;
}
try {
await this.client.quit();
} catch (err) {
logger.error('RedisCacheAdapter error on shutdown', { error: err });
}
}
async get(key) {
debug('get', { key }); debug('get', { key });
return this.queue.enqueue( try {
key, await this.queue.enqueue(key);
() => const res = await this.client.get(key);
new Promise(resolve => { if (!res) {
this.client.get(key, function (err, res) { return null;
debug('-> get', { key, res }); }
if (!res) { return JSON.parse(res);
return resolve(null); } catch (err) {
} logger.error('RedisCacheAdapter error on get', { error: err });
resolve(JSON.parse(res)); }
});
})
);
} }
put(key, value, ttl = this.ttl) { async put(key, value, ttl = this.ttl) {
value = JSON.stringify(value); value = JSON.stringify(value);
debug('put', { key, value, ttl }); debug('put', { key, value, ttl });
await this.queue.enqueue(key);
if (ttl === 0) { if (ttl === 0) {
// ttl of zero is a logical no-op, but redis cannot set expire time of zero // ttl of zero is a logical no-op, but redis cannot set expire time of zero
return this.queue.enqueue(key, () => Promise.resolve()); return;
} }
if (ttl === Infinity) { if (ttl === Infinity) {
return this.queue.enqueue( return this.client.set(key, value);
key,
() =>
new Promise(resolve => {
this.client.set(key, value, function () {
resolve();
});
})
);
} }
if (!isValidTTL(ttl)) { if (!isValidTTL(ttl)) {
ttl = this.ttl; ttl = this.ttl;
} }
return this.client.set(key, value, { PX: ttl });
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.psetex(key, ttl, value, function () {
resolve();
});
})
);
} }
del(key) { async del(key) {
debug('del', { key }); debug('del', { key });
return this.queue.enqueue( await this.queue.enqueue(key);
key, return this.client.del(key);
() =>
new Promise(resolve => {
this.client.del(key, function () {
resolve();
});
})
);
} }
clear() { async clear() {
debug('clear'); debug('clear');
return this.queue.enqueue( await this.queue.enqueue(FLUSH_DB_KEY);
FLUSH_DB_KEY, return this.client.sendCommand(['FLUSHDB']);
() =>
new Promise(resolve => {
this.client.flushdb(function () {
resolve();
});
})
);
} }
// Used for testing // Used for testing
async getAllKeys() { getAllKeys() {
return new Promise((resolve, reject) => { return this.client.keys('*');
this.client.keys('*', (err, keys) => {
if (err) {
reject(err);
} else {
resolve(keys);
}
});
});
} }
} }

View File

@@ -1,13 +1,13 @@
import redis from 'redis'; import { createClient } from 'redis';
function createPublisher({ redisURL, redisOptions = {} }): any { function createPublisher({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true; redisOptions.no_ready_check = true;
return redis.createClient(redisURL, redisOptions); return createClient(redisURL, redisOptions);
} }
function createSubscriber({ redisURL, redisOptions = {} }): any { function createSubscriber({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true; redisOptions.no_ready_check = true;
return redis.createClient(redisURL, redisOptions); return createClient(redisURL, redisOptions);
} }
const RedisPubSub = { const RedisPubSub = {

View File

@@ -87,9 +87,18 @@ class ParseServer {
.performInitialization() .performInitialization()
.then(() => hooksController.load()) .then(() => hooksController.load())
.then(async () => { .then(async () => {
const startupPromises = [];
if (schema) { if (schema) {
await new DefinedSchemas(schema, this.config).execute(); startupPromises.push(new DefinedSchemas(schema, this.config).execute());
} }
if (
options.cacheAdapter &&
options.cacheAdapter.connect &&
typeof options.cacheAdapter.connect === 'function'
) {
startupPromises.push(options.cacheAdapter.connect());
}
await Promise.all(startupPromises);
if (serverStartComplete) { if (serverStartComplete) {
serverStartComplete(); serverStartComplete();
} }