feat: add Idempotency to Postgres (#7750)
This commit is contained in:
@@ -2440,9 +2440,55 @@ export class PostgresStorageAdapter implements StorageAdapter {
|
||||
? fieldNames.map((fieldName, index) => `lower($${index + 3}:name) varchar_pattern_ops`)
|
||||
: fieldNames.map((fieldName, index) => `$${index + 3}:name`);
|
||||
const qs = `CREATE INDEX IF NOT EXISTS $1:name ON $2:name (${constraintPatterns.join()})`;
|
||||
await conn.none(qs, [indexNameOptions.name, className, ...fieldNames]).catch(error => {
|
||||
throw error;
|
||||
});
|
||||
const setIdempotencyFunction = options.setIdempotencyFunction !== undefined ? options.setIdempotencyFunction : false;
|
||||
if (setIdempotencyFunction) {
|
||||
await this.ensureIdempotencyFunctionExists(options);
|
||||
}
|
||||
await conn.none(qs, [indexNameOptions.name, className, ...fieldNames])
|
||||
.catch(error => {
|
||||
if (
|
||||
error.code === PostgresDuplicateRelationError &&
|
||||
error.message.includes(indexNameOptions.name)
|
||||
) {
|
||||
// Index already exists. Ignore error.
|
||||
} else if (
|
||||
error.code === PostgresUniqueIndexViolationError &&
|
||||
error.message.includes(indexNameOptions.name)
|
||||
) {
|
||||
// Cast the error into the proper parse error
|
||||
throw new Parse.Error(
|
||||
Parse.Error.DUPLICATE_VALUE,
|
||||
'A duplicate value for a field with unique values was provided'
|
||||
);
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async deleteIdempotencyFunction(
|
||||
options?: Object = {}
|
||||
): Promise<any> {
|
||||
const conn = options.conn !== undefined ? options.conn : this._client;
|
||||
const qs = 'DROP FUNCTION IF EXISTS idempotency_delete_expired_records()';
|
||||
return conn
|
||||
.none(qs)
|
||||
.catch(error => {
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
|
||||
async ensureIdempotencyFunctionExists(
|
||||
options?: Object = {}
|
||||
): Promise<any> {
|
||||
const conn = options.conn !== undefined ? options.conn : this._client;
|
||||
const ttlOptions = options.ttl !== undefined ? `${options.ttl} seconds` : '60 seconds';
|
||||
const qs = 'CREATE OR REPLACE FUNCTION idempotency_delete_expired_records() RETURNS void LANGUAGE plpgsql AS $$ BEGIN DELETE FROM "_Idempotency" WHERE expire < NOW() - INTERVAL $1; END; $$;';
|
||||
return conn
|
||||
.none(qs, [ttlOptions])
|
||||
.catch(error => {
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ import logger from '../logger';
|
||||
import * as SchemaController from './SchemaController';
|
||||
import { StorageAdapter } from '../Adapters/Storage/StorageAdapter';
|
||||
import MongoStorageAdapter from '../Adapters/Storage/Mongo/MongoStorageAdapter';
|
||||
import PostgresStorageAdapter from '../Adapters/Storage/Postgres/PostgresStorageAdapter';
|
||||
import SchemaCache from '../Adapters/Cache/SchemaCache';
|
||||
import type { LoadSchemaOptions } from './types';
|
||||
import type { QueryOptions, FullQueryOptions } from '../Adapters/Storage/StorageAdapter';
|
||||
@@ -394,12 +395,14 @@ const relationSchema = {
|
||||
|
||||
class DatabaseController {
|
||||
adapter: StorageAdapter;
|
||||
idempotencyOptions: any;
|
||||
schemaCache: any;
|
||||
schemaPromise: ?Promise<SchemaController.SchemaController>;
|
||||
_transactionalSession: ?any;
|
||||
|
||||
constructor(adapter: StorageAdapter) {
|
||||
constructor(adapter: StorageAdapter, idempotencyOptions?: Object = {}) {
|
||||
this.adapter = adapter;
|
||||
this.idempotencyOptions = idempotencyOptions;
|
||||
// We don't want a mutable this.schema, because then you could have
|
||||
// one request that uses different schemas for different parts of
|
||||
// it. Instead, use loadSchema to get a schema.
|
||||
@@ -1713,9 +1716,7 @@ class DatabaseController {
|
||||
};
|
||||
await this.loadSchema().then(schema => schema.enforceClassExists('_User'));
|
||||
await this.loadSchema().then(schema => schema.enforceClassExists('_Role'));
|
||||
if (this.adapter instanceof MongoStorageAdapter) {
|
||||
await this.loadSchema().then(schema => schema.enforceClassExists('_Idempotency'));
|
||||
}
|
||||
await this.loadSchema().then(schema => schema.enforceClassExists('_Idempotency'));
|
||||
|
||||
await this.adapter.ensureUniqueness('_User', requiredUserFields, ['username']).catch(error => {
|
||||
logger.warn('Unable to ensure uniqueness for usernames: ', error);
|
||||
@@ -1751,18 +1752,28 @@ class DatabaseController {
|
||||
logger.warn('Unable to ensure uniqueness for role name: ', error);
|
||||
throw error;
|
||||
});
|
||||
if (this.adapter instanceof MongoStorageAdapter) {
|
||||
await this.adapter
|
||||
.ensureUniqueness('_Idempotency', requiredIdempotencyFields, ['reqId'])
|
||||
.catch(error => {
|
||||
logger.warn('Unable to ensure uniqueness for idempotency request ID: ', error);
|
||||
throw error;
|
||||
});
|
||||
|
||||
await this.adapter
|
||||
.ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, {
|
||||
await this.adapter
|
||||
.ensureUniqueness('_Idempotency', requiredIdempotencyFields, ['reqId'])
|
||||
.catch(error => {
|
||||
logger.warn('Unable to ensure uniqueness for idempotency request ID: ', error);
|
||||
throw error;
|
||||
});
|
||||
|
||||
const isMongoAdapter = this.adapter instanceof MongoStorageAdapter;
|
||||
const isPostgresAdapter = this.adapter instanceof PostgresStorageAdapter;
|
||||
if (isMongoAdapter || isPostgresAdapter) {
|
||||
let options = {};
|
||||
if (isMongoAdapter) {
|
||||
options = {
|
||||
ttl: 0,
|
||||
})
|
||||
};
|
||||
} else if (isPostgresAdapter) {
|
||||
options = this.idempotencyOptions;
|
||||
options.setIdempotencyFunction = true;
|
||||
}
|
||||
await this.adapter
|
||||
.ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, options)
|
||||
.catch(error => {
|
||||
logger.warn('Unable to create TTL index for idempotency expire date: ', error);
|
||||
throw error;
|
||||
|
||||
@@ -143,7 +143,7 @@ export function getLiveQueryController(options: ParseServerOptions): LiveQueryCo
|
||||
}
|
||||
|
||||
export function getDatabaseController(options: ParseServerOptions): DatabaseController {
|
||||
const { databaseURI, collectionPrefix, databaseOptions } = options;
|
||||
const { databaseURI, collectionPrefix, databaseOptions, idempotencyOptions } = options;
|
||||
let { databaseAdapter } = options;
|
||||
if (
|
||||
(databaseOptions ||
|
||||
@@ -157,7 +157,7 @@ export function getDatabaseController(options: ParseServerOptions): DatabaseCont
|
||||
} else {
|
||||
databaseAdapter = loadAdapter(databaseAdapter);
|
||||
}
|
||||
return new DatabaseController(databaseAdapter);
|
||||
return new DatabaseController(databaseAdapter, idempotencyOptions);
|
||||
}
|
||||
|
||||
export function getHooksController(
|
||||
|
||||
@@ -6,6 +6,7 @@ import ClientSDK from './ClientSDK';
|
||||
import defaultLogger from './logger';
|
||||
import rest from './rest';
|
||||
import MongoStorageAdapter from './Adapters/Storage/Mongo/MongoStorageAdapter';
|
||||
import PostgresStorageAdapter from './Adapters/Storage/Postgres/PostgresStorageAdapter';
|
||||
|
||||
export const DEFAULT_ALLOWED_HEADERS =
|
||||
'X-Parse-Master-Key, X-Parse-REST-API-Key, X-Parse-Javascript-Key, X-Parse-Application-Id, X-Parse-Client-Version, X-Parse-Session-Token, X-Requested-With, X-Parse-Revocable-Session, X-Parse-Request-Id, Content-Type, Pragma, Cache-Control';
|
||||
@@ -431,7 +432,7 @@ export function promiseEnforceMasterKeyAccess(request) {
|
||||
*/
|
||||
export function promiseEnsureIdempotency(req) {
|
||||
// Enable feature only for MongoDB
|
||||
if (!(req.config.database.adapter instanceof MongoStorageAdapter)) {
|
||||
if (!((req.config.database.adapter instanceof MongoStorageAdapter) || (req.config.database.adapter instanceof PostgresStorageAdapter))) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
// Get parameters
|
||||
|
||||
Reference in New Issue
Block a user