feat: Upgrade Redis 3 to 4 for LiveQuery (#8333)

This commit is contained in:
Daniel
2022-11-27 03:45:30 +11:00
committed by GitHub
parent 40dd82ff19
commit b2761fb378
10 changed files with 136 additions and 40 deletions

View File

@@ -432,7 +432,7 @@ describe('ParseGraphQLServer', () => {
const expressApp = express(); const expressApp = express();
httpServer = http.createServer(expressApp); httpServer = http.createServer(expressApp);
expressApp.use('/parse', parseServer.app); expressApp.use('/parse', parseServer.app);
parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, { parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {
port: 1338, port: 1338,
}); });
parseGraphQLServer.applyGraphQL(expressApp); parseGraphQLServer.applyGraphQL(expressApp);

View File

@@ -0,0 +1,56 @@
if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') {
describe('ParseLiveQuery redis', () => {
afterEach(async () => {
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
client.close();
});
it('can connect', async () => {
await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
const subscription = await new Parse.Query('TestObject').subscribe();
const [object] = await Promise.all([
new Parse.Object('TestObject').save(),
new Promise(resolve =>
subscription.on('create', () => {
resolve();
})
),
]);
await Promise.all([
new Promise(resolve =>
subscription.on('delete', () => {
resolve();
})
),
object.destroy(),
]);
});
it('can call connect twice', async () => {
const server = await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
await server.config.liveQueryController.connect();
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
await server.liveQueryServer.connect();
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
});
});
}

View File

@@ -94,29 +94,29 @@ describe('ParseLiveQueryServer', function () {
expect(parseLiveQueryServer.subscriptions.size).toBe(0); expect(parseLiveQueryServer.subscriptions.size).toBe(0);
}); });
it('can be initialized from ParseServer', function () { it('can be initialized from ParseServer', async () => {
const httpServer = {}; const httpServer = {};
const parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {}); const parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {});
expect(parseLiveQueryServer.clientId).toBeUndefined(); expect(parseLiveQueryServer.clientId).toBeUndefined();
expect(parseLiveQueryServer.clients.size).toBe(0); expect(parseLiveQueryServer.clients.size).toBe(0);
expect(parseLiveQueryServer.subscriptions.size).toBe(0); expect(parseLiveQueryServer.subscriptions.size).toBe(0);
}); });
it('can be initialized from ParseServer without httpServer', function (done) { it('can be initialized from ParseServer without httpServer', async () => {
const parseLiveQueryServer = ParseServer.createLiveQueryServer(undefined, { const parseLiveQueryServer = await ParseServer.createLiveQueryServer(undefined, {
port: 22345, port: 22345,
}); });
expect(parseLiveQueryServer.clientId).toBeUndefined(); expect(parseLiveQueryServer.clientId).toBeUndefined();
expect(parseLiveQueryServer.clients.size).toBe(0); expect(parseLiveQueryServer.clients.size).toBe(0);
expect(parseLiveQueryServer.subscriptions.size).toBe(0); expect(parseLiveQueryServer.subscriptions.size).toBe(0);
parseLiveQueryServer.server.close(done); await new Promise(resolve => parseLiveQueryServer.server.close(resolve));
}); });
describe_only_db('mongo')('initialization', () => { describe_only_db('mongo')('initialization', () => {
it('can be initialized through ParseServer without liveQueryServerOptions', function (done) { it('can be initialized through ParseServer without liveQueryServerOptions', async function (done) {
const parseServer = ParseServer.start({ const parseServer = await ParseServer.start({
appId: 'hello', appId: 'hello',
masterKey: 'world', masterKey: 'world',
port: 22345, port: 22345,
@@ -137,8 +137,8 @@ describe('ParseLiveQueryServer', function () {
}); });
}); });
it('can be initialized through ParseServer with liveQueryServerOptions', function (done) { it('can be initialized through ParseServer with liveQueryServerOptions', async function (done) {
const parseServer = ParseServer.start({ const parseServer = await ParseServer.start({
appId: 'hello', appId: 'hello',
masterKey: 'world', masterKey: 'world',
port: 22346, port: 22346,

View File

@@ -15,7 +15,8 @@ describe('RedisPubSub', function () {
}); });
const redis = require('redis'); const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', { expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true, socket_keepalive: true,
no_ready_check: true, no_ready_check: true,
}); });
@@ -28,7 +29,8 @@ describe('RedisPubSub', function () {
}); });
const redis = require('redis'); const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', { expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true, socket_keepalive: true,
no_ready_check: true, no_ready_check: true,
}); });

View File

@@ -173,17 +173,19 @@ const reconfigureServer = (changedConfiguration = {}) => {
port, port,
}); });
cache.clear(); cache.clear();
parseServer = ParseServer.start(newConfiguration); ParseServer.start(newConfiguration).then(_parseServer => {
parseServer.expressApp.use('/1', err => { parseServer = _parseServer;
console.error(err); parseServer.expressApp.use('/1', err => {
fail('should not call next'); console.error(err);
}); fail('should not call next');
server = parseServer.server; });
server.on('connection', connection => { server = parseServer.server;
const key = `${connection.remoteAddress}:${connection.remotePort}`; server.on('connection', connection => {
openConnections[key] = connection; const key = `${connection.remoteAddress}:${connection.remotePort}`;
connection.on('close', () => { openConnections[key] = connection;
delete openConnections[key]; connection.on('close', () => {
delete openConnections[key];
});
}); });
}); });
} catch (error) { } catch (error) {

View File

@@ -2,12 +2,12 @@ import { createClient } from 'redis';
function createPublisher({ redisURL, redisOptions = {} }): any { function createPublisher({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true; redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions); return createClient({ url: redisURL, ...redisOptions });
} }
function createSubscriber({ redisURL, redisOptions = {} }): any { function createSubscriber({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true; redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions); return createClient({ url: redisURL, ...redisOptions });
} }
const RedisPubSub = { const RedisPubSub = {

View File

@@ -21,6 +21,10 @@ export class LiveQueryController {
this.liveQueryPublisher = new ParseCloudCodePublisher(config); this.liveQueryPublisher = new ParseCloudCodePublisher(config);
} }
connect() {
return this.liveQueryPublisher.connect();
}
onAfterSave( onAfterSave(
className: string, className: string,
currentObject: any, currentObject: any,

View File

@@ -11,6 +11,15 @@ class ParseCloudCodePublisher {
this.parsePublisher = ParsePubSub.createPublisher(config); this.parsePublisher = ParsePubSub.createPublisher(config);
} }
async connect() {
if (typeof this.parsePublisher.connect === 'function') {
if (this.parsePublisher.isOpen) {
return;
}
return Promise.resolve(this.parsePublisher.connect());
}
}
onCloudCodeAfterSave(request: any): void { onCloudCodeAfterSave(request: any): void {
this._onCloudCodeMessage(Parse.applicationId + 'afterSave', request); this._onCloudCodeMessage(Parse.applicationId + 'afterSave', request);
} }

View File

@@ -73,15 +73,25 @@ class ParseLiveQueryServer {
parseWebsocket => this._onConnect(parseWebsocket), parseWebsocket => this._onConnect(parseWebsocket),
config config
); );
// Initialize subscriber
this.subscriber = ParsePubSub.createSubscriber(config); this.subscriber = ParsePubSub.createSubscriber(config);
this.subscriber.subscribe(Parse.applicationId + 'afterSave'); if (!this.subscriber.connect) {
this.subscriber.subscribe(Parse.applicationId + 'afterDelete'); this.connect();
this.subscriber.subscribe(Parse.applicationId + 'clearCache'); }
// Register message handler for subscriber. When publisher get messages, it will publish message }
// to the subscribers and the handler will be called.
this.subscriber.on('message', (channel, messageStr) => { async connect() {
if (this.subscriber.isOpen) {
return;
}
if (typeof this.subscriber.connect === 'function') {
await Promise.resolve(this.subscriber.connect());
} else {
this.subscriber.isOpen = true;
}
this._createSubscribers();
}
_createSubscribers() {
const messageRecieved = (channel, messageStr) => {
logger.verbose('Subscribe message %j', messageStr); logger.verbose('Subscribe message %j', messageStr);
let message; let message;
try { try {
@@ -102,7 +112,12 @@ class ParseLiveQueryServer {
} else { } else {
logger.error('Get message %s from unknown channel %j', message, channel); logger.error('Get message %s from unknown channel %j', message, channel);
} }
}); };
this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr));
for (const field of ['afterSave', 'afterDelete', 'clearCache']) {
const channel = `${Parse.applicationId}${field}`;
this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr));
}
} }
// Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes. // Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.

View File

@@ -77,7 +77,12 @@ class ParseServer {
const allControllers = controllers.getControllers(options); const allControllers = controllers.getControllers(options);
const { loggerController, databaseController, hooksController } = allControllers; const {
loggerController,
databaseController,
hooksController,
liveQueryController,
} = allControllers;
this.config = Config.put(Object.assign({}, options, allControllers)); this.config = Config.put(Object.assign({}, options, allControllers));
logging.setLogger(loggerController); logging.setLogger(loggerController);
@@ -98,6 +103,7 @@ class ParseServer {
) { ) {
startupPromises.push(options.cacheAdapter.connect()); startupPromises.push(options.cacheAdapter.connect());
} }
startupPromises.push(liveQueryController.connect());
await Promise.all(startupPromises); await Promise.all(startupPromises);
if (serverStartComplete) { if (serverStartComplete) {
serverStartComplete(); serverStartComplete();
@@ -263,7 +269,7 @@ class ParseServer {
* @param {Function} callback called when the server has started * @param {Function} callback called when the server has started
* @returns {ParseServer} the parse server instance * @returns {ParseServer} the parse server instance
*/ */
start(options: ParseServerOptions, callback: ?() => void) { async start(options: ParseServerOptions, callback: ?() => void) {
const app = express(); const app = express();
if (options.middleware) { if (options.middleware) {
let middleware; let middleware;
@@ -307,7 +313,7 @@ class ParseServer {
this.server = server; this.server = server;
if (options.startLiveQueryServer || options.liveQueryServerOptions) { if (options.startLiveQueryServer || options.liveQueryServerOptions) {
this.liveQueryServer = ParseServer.createLiveQueryServer( this.liveQueryServer = await ParseServer.createLiveQueryServer(
server, server,
options.liveQueryServerOptions, options.liveQueryServerOptions,
options options
@@ -338,9 +344,9 @@ class ParseServer {
* @param {Server} httpServer an optional http server to pass * @param {Server} httpServer an optional http server to pass
* @param {LiveQueryServerOptions} config options for the liveQueryServer * @param {LiveQueryServerOptions} config options for the liveQueryServer
* @param {ParseServerOptions} options options for the ParseServer * @param {ParseServerOptions} options options for the ParseServer
* @returns {ParseLiveQueryServer} the live query server instance * @returns {Promise<ParseLiveQueryServer>} the live query server instance
*/ */
static createLiveQueryServer( static async createLiveQueryServer(
httpServer, httpServer,
config: LiveQueryServerOptions, config: LiveQueryServerOptions,
options: ParseServerOptions options: ParseServerOptions
@@ -350,7 +356,9 @@ class ParseServer {
httpServer = require('http').createServer(app); httpServer = require('http').createServer(app);
httpServer.listen(config.port); httpServer.listen(config.port);
} }
return new ParseLiveQueryServer(httpServer, config, options); const server = new ParseLiveQueryServer(httpServer, config, options);
await server.connect();
return server;
} }
static verifyServerUrl(callback) { static verifyServerUrl(callback) {