From 0c3feaaa1751964c0db89f25674935c3354b1538 Mon Sep 17 00:00:00 2001 From: Corey Date: Sun, 2 Jan 2022 13:25:53 -0500 Subject: [PATCH] feat: add Idempotency to Postgres (#7750) --- README.md | 21 +++++++- spec/Idempotency.spec.js | 35 +++++++++++-- spec/PostgresStorageAdapter.spec.js | 11 ++++ .../Postgres/PostgresStorageAdapter.js | 52 +++++++++++++++++-- src/Controllers/DatabaseController.js | 39 +++++++++----- src/Controllers/index.js | 4 +- src/middlewares.js | 3 +- 7 files changed, 139 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 86d1247b..7b507e9c 100644 --- a/README.md +++ b/README.md @@ -525,9 +525,26 @@ let api = new ParseServer({ | `idempotencyOptions.paths` | yes | `Array` | `[]` | `.*` (all paths, includes the examples below),
`functions/.*` (all functions),
`jobs/.*` (all jobs),
`classes/.*` (all classes),
`functions/.*` (all functions),
`users` (user creation / update),
`installations` (installation creation / update) | PARSE_SERVER_EXPERIMENTAL_IDEMPOTENCY_PATHS | An array of path patterns that have to match the request path for request deduplication to be enabled. The mount path must not be included, for example to match the request path `/parse/functions/myFunction` specify the path pattern `functions/myFunction`. A trailing slash of the request path is ignored, for example the path pattern `functions/myFunction` matches both `/parse/functions/myFunction` and `/parse/functions/myFunction/`. | | `idempotencyOptions.ttl` | yes | `Integer` | `300` | `60` (60 seconds) | PARSE_SERVER_EXPERIMENTAL_IDEMPOTENCY_TTL | The duration in seconds after which a request record is discarded from the database. Duplicate requests due to network issues can be expected to arrive within milliseconds up to several seconds. This value must be greater than `0`. | -### Notes +### Postgres -- This feature is currently only available for MongoDB and not for Postgres. +To use this feature in Postgres, you will need to create a cron job using [pgAdmin](https://www.pgadmin.org/docs/pgadmin4/development/pgagent_jobs.html) or similar to call the Postgres function `idempotency_delete_expired_records()` that deletes expired idempotency records. You can find an example script below. Make sure the script has the same privileges to log into Postgres as Parse Server. + +```bash +#!/bin/bash + +set -e +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL + SELECT idempotency_delete_expired_records(); +EOSQL + +exec "$@" +``` + +Assuming the script above is named, `parse_idempotency_delete_expired_records.sh`, a cron job that runs the script every 2 minutes may look like: + +```bash +2 * * * * /root/parse_idempotency_delete_expired_records.sh >/dev/null 2>&1 +``` ## Localization diff --git a/spec/Idempotency.spec.js b/spec/Idempotency.spec.js index c2ef8665..85fa8a65 100644 --- a/spec/Idempotency.spec.js +++ b/spec/Idempotency.spec.js @@ -6,11 +6,14 @@ const rest = require('../lib/rest'); const auth = require('../lib/Auth'); const uuid = require('uuid'); -describe_only_db('mongo')('Idempotency', () => { +describe('Idempotency', () => { // Parameters /** Enable TTL expiration simulated by removing entry instead of waiting for MongoDB TTL monitor which runs only every 60s, so it can take up to 119s until entry removal - ain't nobody got time for that */ const SIMULATE_TTL = true; + const ttl = 2; + const maxTimeOut = 4000; + // Helpers async function deleteRequestEntry(reqId) { const config = Config.get(Parse.applicationId); @@ -38,9 +41,10 @@ describe_only_db('mongo')('Idempotency', () => { } await setup({ paths: ['functions/.*', 'jobs/.*', 'classes/.*', 'users', 'installations'], - ttl: 30, + ttl: ttl, }); }); + // Tests it('should enforce idempotency for cloud code function', async () => { let counter = 0; @@ -56,7 +60,7 @@ describe_only_db('mongo')('Idempotency', () => { 'X-Parse-Request-Id': 'abc-123', }, }; - expect(Config.get(Parse.applicationId).idempotencyOptions.ttl).toBe(30); + expect(Config.get(Parse.applicationId).idempotencyOptions.ttl).toBe(ttl); await request(params); await request(params).then(fail, e => { expect(e.status).toEqual(400); @@ -83,12 +87,35 @@ describe_only_db('mongo')('Idempotency', () => { if (SIMULATE_TTL) { await deleteRequestEntry('abc-123'); } else { - await new Promise(resolve => setTimeout(resolve, 130000)); + await new Promise(resolve => setTimeout(resolve, maxTimeOut)); } await expectAsync(request(params)).toBeResolved(); expect(counter).toBe(2); }); + it_only_db('postgres')('should delete request entry when postgress ttl function is called', async () => { + const client = Config.get(Parse.applicationId).database.adapter._client; + let counter = 0; + Parse.Cloud.define('myFunction', () => { + counter++; + }); + const params = { + method: 'POST', + url: 'http://localhost:8378/1/functions/myFunction', + headers: { + 'X-Parse-Application-Id': Parse.applicationId, + 'X-Parse-Master-Key': Parse.masterKey, + 'X-Parse-Request-Id': 'abc-123', + }, + }; + await expectAsync(request(params)).toBeResolved(); + await expectAsync(request(params)).toBeRejected(); + await new Promise(resolve => setTimeout(resolve, maxTimeOut)); + await client.one('SELECT idempotency_delete_expired_records()'); + await expectAsync(request(params)).toBeResolved(); + expect(counter).toBe(2); + }); + it('should enforce idempotency for cloud code jobs', async () => { let counter = 0; Parse.Cloud.job('myJob', () => { diff --git a/spec/PostgresStorageAdapter.spec.js b/spec/PostgresStorageAdapter.spec.js index 769aad74..5673d6bc 100644 --- a/spec/PostgresStorageAdapter.spec.js +++ b/spec/PostgresStorageAdapter.spec.js @@ -558,6 +558,17 @@ describe_only_db('postgres')('PostgresStorageAdapter', () => { await new Promise(resolve => setTimeout(resolve, 2000)); expect(adapter._onchange).toHaveBeenCalled(); }); + + it('Idempotency class should have function', async () => { + await reconfigureServer(); + const adapter = Config.get('test').database.adapter; + const client = adapter._client; + const qs = "SELECT format('%I.%I(%s)', ns.nspname, p.proname, oidvectortypes(p.proargtypes)) FROM pg_proc p INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid) WHERE p.proname = 'idempotency_delete_expired_records'"; + const foundFunction = await client.one(qs); + expect(foundFunction.format).toBe("public.idempotency_delete_expired_records()"); + await adapter.deleteIdempotencyFunction(); + await client.none(qs); + }); }); describe_only_db('postgres')('PostgresStorageAdapter shutdown', () => { diff --git a/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js b/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js index f789952e..7477270a 100644 --- a/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js +++ b/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js @@ -2440,9 +2440,55 @@ export class PostgresStorageAdapter implements StorageAdapter { ? fieldNames.map((fieldName, index) => `lower($${index + 3}:name) varchar_pattern_ops`) : fieldNames.map((fieldName, index) => `$${index + 3}:name`); const qs = `CREATE INDEX IF NOT EXISTS $1:name ON $2:name (${constraintPatterns.join()})`; - await conn.none(qs, [indexNameOptions.name, className, ...fieldNames]).catch(error => { - throw error; - }); + const setIdempotencyFunction = options.setIdempotencyFunction !== undefined ? options.setIdempotencyFunction : false; + if (setIdempotencyFunction) { + await this.ensureIdempotencyFunctionExists(options); + } + await conn.none(qs, [indexNameOptions.name, className, ...fieldNames]) + .catch(error => { + if ( + error.code === PostgresDuplicateRelationError && + error.message.includes(indexNameOptions.name) + ) { + // Index already exists. Ignore error. + } else if ( + error.code === PostgresUniqueIndexViolationError && + error.message.includes(indexNameOptions.name) + ) { + // Cast the error into the proper parse error + throw new Parse.Error( + Parse.Error.DUPLICATE_VALUE, + 'A duplicate value for a field with unique values was provided' + ); + } else { + throw error; + } + }); + } + + async deleteIdempotencyFunction( + options?: Object = {} + ): Promise { + const conn = options.conn !== undefined ? options.conn : this._client; + const qs = 'DROP FUNCTION IF EXISTS idempotency_delete_expired_records()'; + return conn + .none(qs) + .catch(error => { + throw error; + }); + } + + async ensureIdempotencyFunctionExists( + options?: Object = {} + ): Promise { + const conn = options.conn !== undefined ? options.conn : this._client; + const ttlOptions = options.ttl !== undefined ? `${options.ttl} seconds` : '60 seconds'; + const qs = 'CREATE OR REPLACE FUNCTION idempotency_delete_expired_records() RETURNS void LANGUAGE plpgsql AS $$ BEGIN DELETE FROM "_Idempotency" WHERE expire < NOW() - INTERVAL $1; END; $$;'; + return conn + .none(qs, [ttlOptions]) + .catch(error => { + throw error; + }); } } diff --git a/src/Controllers/DatabaseController.js b/src/Controllers/DatabaseController.js index 42273b69..2c313f83 100644 --- a/src/Controllers/DatabaseController.js +++ b/src/Controllers/DatabaseController.js @@ -14,6 +14,7 @@ import logger from '../logger'; import * as SchemaController from './SchemaController'; import { StorageAdapter } from '../Adapters/Storage/StorageAdapter'; import MongoStorageAdapter from '../Adapters/Storage/Mongo/MongoStorageAdapter'; +import PostgresStorageAdapter from '../Adapters/Storage/Postgres/PostgresStorageAdapter'; import SchemaCache from '../Adapters/Cache/SchemaCache'; import type { LoadSchemaOptions } from './types'; import type { QueryOptions, FullQueryOptions } from '../Adapters/Storage/StorageAdapter'; @@ -394,12 +395,14 @@ const relationSchema = { class DatabaseController { adapter: StorageAdapter; + idempotencyOptions: any; schemaCache: any; schemaPromise: ?Promise; _transactionalSession: ?any; - constructor(adapter: StorageAdapter) { + constructor(adapter: StorageAdapter, idempotencyOptions?: Object = {}) { this.adapter = adapter; + this.idempotencyOptions = idempotencyOptions; // We don't want a mutable this.schema, because then you could have // one request that uses different schemas for different parts of // it. Instead, use loadSchema to get a schema. @@ -1713,9 +1716,7 @@ class DatabaseController { }; await this.loadSchema().then(schema => schema.enforceClassExists('_User')); await this.loadSchema().then(schema => schema.enforceClassExists('_Role')); - if (this.adapter instanceof MongoStorageAdapter) { - await this.loadSchema().then(schema => schema.enforceClassExists('_Idempotency')); - } + await this.loadSchema().then(schema => schema.enforceClassExists('_Idempotency')); await this.adapter.ensureUniqueness('_User', requiredUserFields, ['username']).catch(error => { logger.warn('Unable to ensure uniqueness for usernames: ', error); @@ -1751,18 +1752,28 @@ class DatabaseController { logger.warn('Unable to ensure uniqueness for role name: ', error); throw error; }); - if (this.adapter instanceof MongoStorageAdapter) { - await this.adapter - .ensureUniqueness('_Idempotency', requiredIdempotencyFields, ['reqId']) - .catch(error => { - logger.warn('Unable to ensure uniqueness for idempotency request ID: ', error); - throw error; - }); - await this.adapter - .ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, { + await this.adapter + .ensureUniqueness('_Idempotency', requiredIdempotencyFields, ['reqId']) + .catch(error => { + logger.warn('Unable to ensure uniqueness for idempotency request ID: ', error); + throw error; + }); + + const isMongoAdapter = this.adapter instanceof MongoStorageAdapter; + const isPostgresAdapter = this.adapter instanceof PostgresStorageAdapter; + if (isMongoAdapter || isPostgresAdapter) { + let options = {}; + if (isMongoAdapter) { + options = { ttl: 0, - }) + }; + } else if (isPostgresAdapter) { + options = this.idempotencyOptions; + options.setIdempotencyFunction = true; + } + await this.adapter + .ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, options) .catch(error => { logger.warn('Unable to create TTL index for idempotency expire date: ', error); throw error; diff --git a/src/Controllers/index.js b/src/Controllers/index.js index 67f90a8e..71ab5ef4 100644 --- a/src/Controllers/index.js +++ b/src/Controllers/index.js @@ -143,7 +143,7 @@ export function getLiveQueryController(options: ParseServerOptions): LiveQueryCo } export function getDatabaseController(options: ParseServerOptions): DatabaseController { - const { databaseURI, collectionPrefix, databaseOptions } = options; + const { databaseURI, collectionPrefix, databaseOptions, idempotencyOptions } = options; let { databaseAdapter } = options; if ( (databaseOptions || @@ -157,7 +157,7 @@ export function getDatabaseController(options: ParseServerOptions): DatabaseCont } else { databaseAdapter = loadAdapter(databaseAdapter); } - return new DatabaseController(databaseAdapter); + return new DatabaseController(databaseAdapter, idempotencyOptions); } export function getHooksController( diff --git a/src/middlewares.js b/src/middlewares.js index 88de1072..e749d050 100644 --- a/src/middlewares.js +++ b/src/middlewares.js @@ -6,6 +6,7 @@ import ClientSDK from './ClientSDK'; import defaultLogger from './logger'; import rest from './rest'; import MongoStorageAdapter from './Adapters/Storage/Mongo/MongoStorageAdapter'; +import PostgresStorageAdapter from './Adapters/Storage/Postgres/PostgresStorageAdapter'; export const DEFAULT_ALLOWED_HEADERS = 'X-Parse-Master-Key, X-Parse-REST-API-Key, X-Parse-Javascript-Key, X-Parse-Application-Id, X-Parse-Client-Version, X-Parse-Session-Token, X-Requested-With, X-Parse-Revocable-Session, X-Parse-Request-Id, Content-Type, Pragma, Cache-Control'; @@ -431,7 +432,7 @@ export function promiseEnforceMasterKeyAccess(request) { */ export function promiseEnsureIdempotency(req) { // Enable feature only for MongoDB - if (!(req.config.database.adapter instanceof MongoStorageAdapter)) { + if (!((req.config.database.adapter instanceof MongoStorageAdapter) || (req.config.database.adapter instanceof PostgresStorageAdapter))) { return Promise.resolve(); } // Get parameters