Improve single schema cache (#7214)

* Initial Commit

* fix flaky test

* temporary set ci timeout

* turn off ci check

* fix postgres tests

* fix tests

* node flaky test

* remove improvements

* Update SchemaPerformance.spec.js

* fix tests

* revert ci

* Create Singleton Object

* properly clear cache testing

* Cleanup

* remove fit

* try PushController.spec

* try push test rewrite

* try push enqueue time

* Increase test timeout

* remove pg server creation test

* xit push tests

* more xit

* remove skipped tests

* Fix conflicts

* reduce ci timeout

* fix push tests

* Revert "fix push tests"

This reverts commit 05aba62f1cbbca7d5d3e80b9444529f59407cb56.

* improve initialization

* fix flaky tests

* xit flaky test

* Update CHANGELOG.md

* enable debug logs

* Update LogsRouter.spec.js

* create initial indexes in series

* lint

* horizontal scaling documentation

* Update Changelog

* change horizontalScaling db option

* Add enableSchemaHooks option

* move enableSchemaHooks to databaseOptions
This commit is contained in:
Diamond Lewis
2021-03-16 16:05:36 -05:00
committed by GitHub
parent 32fc45d2d2
commit a02014f557
38 changed files with 673 additions and 937 deletions

View File

@@ -113,12 +113,15 @@ export class MongoStorageAdapter implements StorageAdapter {
_uri: string;
_collectionPrefix: string;
_mongoOptions: Object;
_onchange: any;
_stream: any;
// Public
connectionPromise: ?Promise<any>;
database: any;
client: MongoClient;
_maxTimeMS: ?number;
canSortOnJoinTables: boolean;
enableSchemaHooks: boolean;
constructor({ uri = defaults.DefaultMongoURI, collectionPrefix = '', mongoOptions = {} }: any) {
this._uri = uri;
@@ -126,13 +129,20 @@ export class MongoStorageAdapter implements StorageAdapter {
this._mongoOptions = mongoOptions;
this._mongoOptions.useNewUrlParser = true;
this._mongoOptions.useUnifiedTopology = true;
this._onchange = () => {};
// MaxTimeMS is not a global MongoDB client option, it is applied per operation.
this._maxTimeMS = mongoOptions.maxTimeMS;
this.canSortOnJoinTables = true;
this.enableSchemaHooks = !!mongoOptions.enableSchemaHooks;
delete mongoOptions.enableSchemaHooks;
delete mongoOptions.maxTimeMS;
}
watch(callback: () => void): void {
this._onchange = callback;
}
connect() {
if (this.connectionPromise) {
return this.connectionPromise;
@@ -198,7 +208,13 @@ export class MongoStorageAdapter implements StorageAdapter {
_schemaCollection(): Promise<MongoSchemaCollection> {
return this.connect()
.then(() => this._adaptiveCollection(MongoSchemaCollectionName))
.then(collection => new MongoSchemaCollection(collection));
.then(collection => {
if (!this._stream && this.enableSchemaHooks) {
this._stream = collection._mongoCollection.watch();
this._stream.on('change', () => this._onchange());
}
return new MongoSchemaCollection(collection);
});
}
classExists(name: string) {

View File

@@ -4,6 +4,8 @@ import { createClient } from './PostgresClient';
import Parse from 'parse/node';
// @flow-disable-next
import _ from 'lodash';
// @flow-disable-next
import { v4 as uuidv4 } from 'uuid';
import sql from './sql';
const PostgresRelationDoesNotExistError = '42P01';
@@ -794,20 +796,33 @@ const buildWhereClause = ({ schema, query, index, caseInsensitive }): WhereClaus
export class PostgresStorageAdapter implements StorageAdapter {
canSortOnJoinTables: boolean;
enableSchemaHooks: boolean;
// Private
_collectionPrefix: string;
_client: any;
_onchange: any;
_pgp: any;
_stream: any;
_uuid: any;
constructor({ uri, collectionPrefix = '', databaseOptions }: any) {
constructor({ uri, collectionPrefix = '', databaseOptions = {} }: any) {
this._collectionPrefix = collectionPrefix;
this.enableSchemaHooks = !!databaseOptions.enableSchemaHooks;
delete databaseOptions.enableSchemaHooks;
const { client, pgp } = createClient(uri, databaseOptions);
this._client = client;
this._onchange = () => {};
this._pgp = pgp;
this._uuid = uuidv4();
this.canSortOnJoinTables = false;
}
watch(callback: () => void): void {
this._onchange = callback;
}
//Note that analyze=true will run the query, executing INSERTS, DELETES, etc.
createExplainableQuery(query: string, analyze: boolean = false) {
if (analyze) {
@@ -818,12 +833,39 @@ export class PostgresStorageAdapter implements StorageAdapter {
}
handleShutdown() {
if (this._stream) {
this._stream.done();
delete this._stream;
}
if (!this._client) {
return;
}
this._client.$pool.end();
}
async _listenToSchema() {
if (!this._stream && this.enableSchemaHooks) {
this._stream = await this._client.connect({ direct: true });
this._stream.client.on('notification', data => {
const payload = JSON.parse(data.payload);
if (payload.senderId !== this._uuid) {
this._onchange();
}
});
await this._stream.none('LISTEN $1~', 'schema.change');
}
}
_notifySchemaChange() {
if (this._stream) {
this._stream
.none('NOTIFY $1~, $2', ['schema.change', { senderId: this._uuid }])
.catch(error => {
console.log('Failed to Notify:', error); // unlikely to ever happen
});
}
}
async _ensureSchemaCollectionExists(conn: any) {
conn = conn || this._client;
await conn
@@ -859,6 +901,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
values
);
});
this._notifySchemaChange();
}
async setIndexesWithSchemaFormat(
@@ -920,11 +963,12 @@ export class PostgresStorageAdapter implements StorageAdapter {
[className, 'schema', 'indexes', JSON.stringify(existingIndexes)]
);
});
this._notifySchemaChange();
}
async createClass(className: string, schema: SchemaType, conn: ?any) {
conn = conn || this._client;
return conn
const parseSchema = await conn
.tx('create-class', async t => {
await this.createTable(className, schema, t);
await t.none(
@@ -940,6 +984,8 @@ export class PostgresStorageAdapter implements StorageAdapter {
}
throw err;
});
this._notifySchemaChange();
return parseSchema;
}
// Just create a table, do not insert in schema
@@ -1073,6 +1119,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
);
}
});
this._notifySchemaChange();
}
// Drops a collection. Resolves with true if it was a Parse Schema (eg. _User, Custom, etc.)
@@ -1085,9 +1132,12 @@ export class PostgresStorageAdapter implements StorageAdapter {
values: [className],
},
];
return this._client
const response = await this._client
.tx(t => t.none(this._pgp.helpers.concat(operations)))
.then(() => className.indexOf('_Join:') != 0); // resolves with false when _Join table
this._notifySchemaChange();
return response;
}
// Delete all data known to this adapter. Used for testing.
@@ -1173,6 +1223,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
await t.none(`ALTER TABLE $1:name DROP COLUMN IF EXISTS ${columns}`, values);
}
});
this._notifySchemaChange();
}
// Return a promise for all schemas known to this adapter, in Parse format. In case the
@@ -2237,6 +2288,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
})
.then(() => this.schemaUpgrade(schema.className, schema));
});
promises.push(this._listenToSchema());
return Promise.all(promises)
.then(() => {
return this._client.tx('perform-initialization', async t => {

View File

@@ -111,6 +111,7 @@ export interface StorageAdapter {
explain?: boolean
): Promise<any>;
performInitialization(options: ?any): Promise<void>;
watch(callback: () => void): void;
// Indexing
createIndexes(className: string, indexes: any, conn: ?any): Promise<void>;