Before Connect + Before Subscribe help required (#6793)
* Before Connect + Before Subscribe #1 * Cleanup and Documentation * Add E2E tests * Bump parse to 2.15.0 Co-authored-by: Diamond Lewis <findlewis@gmail.com>
This commit is contained in:
@@ -62,15 +62,17 @@ class Client {
|
||||
parseWebSocket: any,
|
||||
code: number,
|
||||
error: string,
|
||||
reconnect: boolean = true
|
||||
reconnect: boolean = true,
|
||||
requestId: number | void = null
|
||||
): void {
|
||||
Client.pushResponse(
|
||||
parseWebSocket,
|
||||
JSON.stringify({
|
||||
op: 'error',
|
||||
error: error,
|
||||
code: code,
|
||||
reconnect: reconnect,
|
||||
error,
|
||||
code,
|
||||
reconnect,
|
||||
requestId,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -10,7 +10,11 @@ import { ParsePubSub } from './ParsePubSub';
|
||||
import SchemaController from '../Controllers/SchemaController';
|
||||
import _ from 'lodash';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { runLiveQueryEventHandlers } from '../triggers';
|
||||
import {
|
||||
runLiveQueryEventHandlers,
|
||||
maybeRunConnectTrigger,
|
||||
maybeRunSubscribeTrigger,
|
||||
} from '../triggers';
|
||||
import { getAuthForSessionToken, Auth } from '../Auth';
|
||||
import { getCacheController } from '../Controllers';
|
||||
import LRU from 'lru-cache';
|
||||
@@ -574,7 +578,7 @@ class ParseLiveQueryServer {
|
||||
return false;
|
||||
}
|
||||
|
||||
_handleConnect(parseWebsocket: any, request: any): any {
|
||||
async _handleConnect(parseWebsocket: any, request: any): any {
|
||||
if (!this._validateKeys(request, this.keyPairs)) {
|
||||
Client.pushError(parseWebsocket, 4, 'Key in request is not valid');
|
||||
logger.error('Key in request is not valid');
|
||||
@@ -589,19 +593,34 @@ class ParseLiveQueryServer {
|
||||
request.sessionToken,
|
||||
request.installationId
|
||||
);
|
||||
parseWebsocket.clientId = clientId;
|
||||
this.clients.set(parseWebsocket.clientId, client);
|
||||
logger.info(`Create new client: ${parseWebsocket.clientId}`);
|
||||
client.pushConnect();
|
||||
runLiveQueryEventHandlers({
|
||||
client,
|
||||
event: 'connect',
|
||||
clients: this.clients.size,
|
||||
subscriptions: this.subscriptions.size,
|
||||
sessionToken: request.sessionToken,
|
||||
useMasterKey: client.hasMasterKey,
|
||||
installationId: request.installationId,
|
||||
});
|
||||
try {
|
||||
const req = {
|
||||
client,
|
||||
event: 'connect',
|
||||
clients: this.clients.size,
|
||||
subscriptions: this.subscriptions.size,
|
||||
sessionToken: request.sessionToken,
|
||||
useMasterKey: client.hasMasterKey,
|
||||
installationId: request.installationId,
|
||||
};
|
||||
await maybeRunConnectTrigger('beforeConnect', req);
|
||||
parseWebsocket.clientId = clientId;
|
||||
this.clients.set(parseWebsocket.clientId, client);
|
||||
logger.info(`Create new client: ${parseWebsocket.clientId}`);
|
||||
client.pushConnect();
|
||||
runLiveQueryEventHandlers(req);
|
||||
} catch (error) {
|
||||
Client.pushError(
|
||||
parseWebsocket,
|
||||
error.code || 101,
|
||||
error.message || error,
|
||||
false
|
||||
);
|
||||
logger.error(
|
||||
`Failed running beforeConnect for session ${request.sessionToken} with:\n Error: ` +
|
||||
JSON.stringify(error)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
_hasMasterKey(request: any, validKeyPairs: any): boolean {
|
||||
@@ -636,7 +655,7 @@ class ParseLiveQueryServer {
|
||||
return isValid;
|
||||
}
|
||||
|
||||
_handleSubscribe(parseWebsocket: any, request: any): any {
|
||||
async _handleSubscribe(parseWebsocket: any, request: any): any {
|
||||
// If we can not find this client, return error to client
|
||||
if (!Object.prototype.hasOwnProperty.call(parseWebsocket, 'clientId')) {
|
||||
Client.pushError(
|
||||
@@ -650,61 +669,77 @@ class ParseLiveQueryServer {
|
||||
return;
|
||||
}
|
||||
const client = this.clients.get(parseWebsocket.clientId);
|
||||
|
||||
// Get subscription from subscriptions, create one if necessary
|
||||
const subscriptionHash = queryHash(request.query);
|
||||
// Add className to subscriptions if necessary
|
||||
const className = request.query.className;
|
||||
if (!this.subscriptions.has(className)) {
|
||||
this.subscriptions.set(className, new Map());
|
||||
}
|
||||
const classSubscriptions = this.subscriptions.get(className);
|
||||
let subscription;
|
||||
if (classSubscriptions.has(subscriptionHash)) {
|
||||
subscription = classSubscriptions.get(subscriptionHash);
|
||||
} else {
|
||||
subscription = new Subscription(
|
||||
className,
|
||||
request.query.where,
|
||||
subscriptionHash
|
||||
try {
|
||||
await maybeRunSubscribeTrigger('beforeSubscribe', className, request);
|
||||
|
||||
// Get subscription from subscriptions, create one if necessary
|
||||
const subscriptionHash = queryHash(request.query);
|
||||
// Add className to subscriptions if necessary
|
||||
|
||||
if (!this.subscriptions.has(className)) {
|
||||
this.subscriptions.set(className, new Map());
|
||||
}
|
||||
const classSubscriptions = this.subscriptions.get(className);
|
||||
let subscription;
|
||||
if (classSubscriptions.has(subscriptionHash)) {
|
||||
subscription = classSubscriptions.get(subscriptionHash);
|
||||
} else {
|
||||
subscription = new Subscription(
|
||||
className,
|
||||
request.query.where,
|
||||
subscriptionHash
|
||||
);
|
||||
classSubscriptions.set(subscriptionHash, subscription);
|
||||
}
|
||||
|
||||
// Add subscriptionInfo to client
|
||||
const subscriptionInfo = {
|
||||
subscription: subscription,
|
||||
};
|
||||
// Add selected fields, sessionToken and installationId for this subscription if necessary
|
||||
if (request.query.fields) {
|
||||
subscriptionInfo.fields = request.query.fields;
|
||||
}
|
||||
if (request.sessionToken) {
|
||||
subscriptionInfo.sessionToken = request.sessionToken;
|
||||
}
|
||||
client.addSubscriptionInfo(request.requestId, subscriptionInfo);
|
||||
|
||||
// Add clientId to subscription
|
||||
subscription.addClientSubscription(
|
||||
parseWebsocket.clientId,
|
||||
request.requestId
|
||||
);
|
||||
|
||||
client.pushSubscribe(request.requestId);
|
||||
|
||||
logger.verbose(
|
||||
`Create client ${parseWebsocket.clientId} new subscription: ${request.requestId}`
|
||||
);
|
||||
logger.verbose('Current client number: %d', this.clients.size);
|
||||
runLiveQueryEventHandlers({
|
||||
client,
|
||||
event: 'subscribe',
|
||||
clients: this.clients.size,
|
||||
subscriptions: this.subscriptions.size,
|
||||
sessionToken: request.sessionToken,
|
||||
useMasterKey: client.hasMasterKey,
|
||||
installationId: client.installationId,
|
||||
});
|
||||
} catch (e) {
|
||||
Client.pushError(
|
||||
parseWebsocket,
|
||||
e.code || 101,
|
||||
e.message || e,
|
||||
false,
|
||||
request.requestId
|
||||
);
|
||||
logger.error(
|
||||
`Failed running beforeSubscribe on ${className} for session ${request.sessionToken} with:\n Error: ` +
|
||||
JSON.stringify(e)
|
||||
);
|
||||
classSubscriptions.set(subscriptionHash, subscription);
|
||||
}
|
||||
|
||||
// Add subscriptionInfo to client
|
||||
const subscriptionInfo = {
|
||||
subscription: subscription,
|
||||
};
|
||||
// Add selected fields, sessionToken and installationId for this subscription if necessary
|
||||
if (request.query.fields) {
|
||||
subscriptionInfo.fields = request.query.fields;
|
||||
}
|
||||
if (request.sessionToken) {
|
||||
subscriptionInfo.sessionToken = request.sessionToken;
|
||||
}
|
||||
client.addSubscriptionInfo(request.requestId, subscriptionInfo);
|
||||
|
||||
// Add clientId to subscription
|
||||
subscription.addClientSubscription(
|
||||
parseWebsocket.clientId,
|
||||
request.requestId
|
||||
);
|
||||
|
||||
client.pushSubscribe(request.requestId);
|
||||
|
||||
logger.verbose(
|
||||
`Create client ${parseWebsocket.clientId} new subscription: ${request.requestId}`
|
||||
);
|
||||
logger.verbose('Current client number: %d', this.clients.size);
|
||||
runLiveQueryEventHandlers({
|
||||
client,
|
||||
event: 'subscribe',
|
||||
clients: this.clients.size,
|
||||
subscriptions: this.subscriptions.size,
|
||||
sessionToken: request.sessionToken,
|
||||
useMasterKey: client.hasMasterKey,
|
||||
installationId: client.installationId,
|
||||
});
|
||||
}
|
||||
|
||||
_handleUpdateSubscription(parseWebsocket: any, request: any): any {
|
||||
|
||||
Reference in New Issue
Block a user