Batch transaction (#5849)

* Batch transaction boilerplate

* Refactoring transaction boilerplate

* Independent sessions test

* Transactions - partial

* Missing only one test

* All tests passing for mongo db

* Tests on Travis

* Transactions on postgres

* Fix travis to restart mongodb

* Remove mongodb service and keep only mongodb runner

* MongoDB service back

* Initialize replicaset

* Remove mongodb runner again

* Again only with mongodb-runner and removing cache

* Trying with pretest and posttest

* WiredTiger

* Pretest and posttest again

* Removing inexistent scripts

* wiredTiger

* One more attempt

* Trying another way to run mongodb-runner

* Fixing tests

* Include batch transaction on direct access

* Add tests to direct access
This commit is contained in:
Antonio Davi Macedo Coelho de Castro
2019-07-31 02:41:07 -07:00
committed by GitHub
parent fe18fe0f61
commit 8b97c1380b
15 changed files with 931 additions and 106 deletions

View File

@@ -111,27 +111,30 @@ export default class MongoCollection {
.toArray();
}
insertOne(object) {
return this._mongoCollection.insertOne(object);
insertOne(object, session) {
return this._mongoCollection.insertOne(object, { session });
}
// Atomically updates data in the database for a single (first) object that matched the query
// If there is nothing that matches the query - does insert
// Postgres Note: `INSERT ... ON CONFLICT UPDATE` that is available since 9.5.
upsertOne(query, update) {
return this._mongoCollection.updateOne(query, update, { upsert: true });
upsertOne(query, update, session) {
return this._mongoCollection.updateOne(query, update, {
upsert: true,
session,
});
}
updateOne(query, update) {
return this._mongoCollection.updateOne(query, update);
}
updateMany(query, update) {
return this._mongoCollection.updateMany(query, update);
updateMany(query, update, session) {
return this._mongoCollection.updateMany(query, update, { session });
}
deleteMany(query) {
return this._mongoCollection.deleteMany(query);
deleteMany(query, session) {
return this._mongoCollection.deleteMany(query, { session });
}
_ensureSparseUniqueIndexInBackground(indexRequest) {

View File

@@ -472,7 +472,12 @@ export class MongoStorageAdapter implements StorageAdapter {
// TODO: As yet not particularly well specified. Creates an object. Maybe shouldn't even need the schema,
// and should infer from the type. Or maybe does need the schema for validations. Or maybe needs
// the schema only for the legacy mongo format. We'll figure that out later.
createObject(className: string, schema: SchemaType, object: any) {
createObject(
className: string,
schema: SchemaType,
object: any,
transactionalSession: ?any
) {
schema = convertParseSchemaToMongoSchema(schema);
const mongoObject = parseObjectToMongoObjectForCreate(
className,
@@ -480,7 +485,9 @@ export class MongoStorageAdapter implements StorageAdapter {
schema
);
return this._adaptiveCollection(className)
.then(collection => collection.insertOne(mongoObject))
.then(collection =>
collection.insertOne(mongoObject, transactionalSession)
)
.catch(error => {
if (error.code === 11000) {
// Duplicate value
@@ -510,13 +517,14 @@ export class MongoStorageAdapter implements StorageAdapter {
deleteObjectsByQuery(
className: string,
schema: SchemaType,
query: QueryType
query: QueryType,
transactionalSession: ?any
) {
schema = convertParseSchemaToMongoSchema(schema);
return this._adaptiveCollection(className)
.then(collection => {
const mongoWhere = transformWhere(className, query, schema);
return collection.deleteMany(mongoWhere);
return collection.deleteMany(mongoWhere, transactionalSession);
})
.catch(err => this.handleError(err))
.then(
@@ -543,13 +551,16 @@ export class MongoStorageAdapter implements StorageAdapter {
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
) {
schema = convertParseSchemaToMongoSchema(schema);
const mongoUpdate = transformUpdate(className, update, schema);
const mongoWhere = transformWhere(className, query, schema);
return this._adaptiveCollection(className)
.then(collection => collection.updateMany(mongoWhere, mongoUpdate))
.then(collection =>
collection.updateMany(mongoWhere, mongoUpdate, transactionalSession)
)
.catch(err => this.handleError(err));
}
@@ -559,7 +570,8 @@ export class MongoStorageAdapter implements StorageAdapter {
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
) {
schema = convertParseSchemaToMongoSchema(schema);
const mongoUpdate = transformUpdate(className, update, schema);
@@ -568,6 +580,7 @@ export class MongoStorageAdapter implements StorageAdapter {
.then(collection =>
collection._mongoCollection.findOneAndUpdate(mongoWhere, mongoUpdate, {
returnOriginal: false,
session: transactionalSession || undefined,
})
)
.then(result => mongoObjectToParseObject(className, result.value, schema))
@@ -588,13 +601,16 @@ export class MongoStorageAdapter implements StorageAdapter {
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
) {
schema = convertParseSchemaToMongoSchema(schema);
const mongoUpdate = transformUpdate(className, update, schema);
const mongoWhere = transformWhere(className, query, schema);
return this._adaptiveCollection(className)
.then(collection => collection.upsertOne(mongoWhere, mongoUpdate))
.then(collection =>
collection.upsertOne(mongoWhere, mongoUpdate, transactionalSession)
)
.catch(err => this.handleError(err));
}
@@ -1059,6 +1075,24 @@ export class MongoStorageAdapter implements StorageAdapter {
})
.catch(err => this.handleError(err));
}
createTransactionalSession(): Promise<any> {
const transactionalSection = this.client.startSession();
transactionalSection.startTransaction();
return Promise.resolve(transactionalSection);
}
commitTransactionalSession(transactionalSection: any): Promise<void> {
return transactionalSection.commitTransaction().then(() => {
transactionalSection.endSession();
});
}
abortTransactionalSession(transactionalSection: any): Promise<void> {
return transactionalSection.abortTransaction().then(() => {
transactionalSection.endSession();
});
}
}
export default MongoStorageAdapter;

View File

@@ -1223,7 +1223,12 @@ export class PostgresStorageAdapter implements StorageAdapter {
}
// TODO: remove the mongo format dependency in the return value
createObject(className: string, schema: SchemaType, object: any) {
createObject(
className: string,
schema: SchemaType,
object: any,
transactionalSession: ?any
) {
debug('createObject', className, object);
let columnsArray = [];
const valuesArray = [];
@@ -1351,7 +1356,10 @@ export class PostgresStorageAdapter implements StorageAdapter {
const qs = `INSERT INTO $1:name (${columnsPattern}) VALUES (${valuesPattern})`;
const values = [className, ...columnsArray, ...valuesArray];
debug(qs, values);
return this._client
const promise = (transactionalSession
? transactionalSession.t
: this._client
)
.none(qs, values)
.then(() => ({ ops: [object] }))
.catch(error => {
@@ -1371,6 +1379,10 @@ export class PostgresStorageAdapter implements StorageAdapter {
}
throw error;
});
if (transactionalSession) {
transactionalSession.batch.push(promise);
}
return promise;
}
// Remove all objects that match the given Parse Query.
@@ -1379,7 +1391,8 @@ export class PostgresStorageAdapter implements StorageAdapter {
deleteObjectsByQuery(
className: string,
schema: SchemaType,
query: QueryType
query: QueryType,
transactionalSession: ?any
) {
debug('deleteObjectsByQuery', className, query);
const values = [className];
@@ -1391,7 +1404,10 @@ export class PostgresStorageAdapter implements StorageAdapter {
}
const qs = `WITH deleted AS (DELETE FROM $1:name WHERE ${where.pattern} RETURNING *) SELECT count(*) FROM deleted`;
debug(qs, values);
return this._client
const promise = (transactionalSession
? transactionalSession.t
: this._client
)
.one(qs, values, a => +a.count)
.then(count => {
if (count === 0) {
@@ -1409,18 +1425,27 @@ export class PostgresStorageAdapter implements StorageAdapter {
}
// ELSE: Don't delete anything if doesn't exist
});
if (transactionalSession) {
transactionalSession.batch.push(promise);
}
return promise;
}
// Return value not currently well specified.
findOneAndUpdate(
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
): Promise<any> {
debug('findOneAndUpdate', className, query, update);
return this.updateObjectsByQuery(className, schema, query, update).then(
val => val[0]
);
return this.updateObjectsByQuery(
className,
schema,
query,
update,
transactionalSession
).then(val => val[0]);
}
// Apply the update to all objects that match the given Parse Query.
@@ -1428,7 +1453,8 @@ export class PostgresStorageAdapter implements StorageAdapter {
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
): Promise<[any]> {
debug('updateObjectsByQuery', className, query, update);
const updatePatterns = [];
@@ -1685,7 +1711,14 @@ export class PostgresStorageAdapter implements StorageAdapter {
where.pattern.length > 0 ? `WHERE ${where.pattern}` : '';
const qs = `UPDATE $1:name SET ${updatePatterns.join()} ${whereClause} RETURNING *`;
debug('update: ', qs, values);
return this._client.any(qs, values);
const promise = (transactionalSession
? transactionalSession.t
: this._client
).any(qs, values);
if (transactionalSession) {
transactionalSession.batch.push(promise);
}
return promise;
}
// Hopefully, we can get rid of this. It's only used for config and hooks.
@@ -1693,16 +1726,28 @@ export class PostgresStorageAdapter implements StorageAdapter {
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
) {
debug('upsertOneObject', { className, query, update });
const createValue = Object.assign({}, query, update);
return this.createObject(className, schema, createValue).catch(error => {
return this.createObject(
className,
schema,
createValue,
transactionalSession
).catch(error => {
// ignore duplicate value errors as it's upsert
if (error.code !== Parse.Error.DUPLICATE_VALUE) {
throw error;
}
return this.findOneAndUpdate(className, schema, query, update);
return this.findOneAndUpdate(
className,
schema,
query,
update,
transactionalSession
);
});
}
@@ -2323,6 +2368,37 @@ export class PostgresStorageAdapter implements StorageAdapter {
updateEstimatedCount(className: string) {
return this._client.none('ANALYZE $1:name', [className]);
}
createTransactionalSession(): Promise<any> {
return new Promise(resolve => {
const transactionalSession = {};
transactionalSession.result = this._client.tx(t => {
transactionalSession.t = t;
transactionalSession.promise = new Promise(resolve => {
transactionalSession.resolve = resolve;
});
transactionalSession.batch = [];
resolve(transactionalSession);
return transactionalSession.promise;
});
});
}
commitTransactionalSession(transactionalSession: any): Promise<void> {
transactionalSession.resolve(
transactionalSession.t.batch(transactionalSession.batch)
);
return transactionalSession.result;
}
abortTransactionalSession(transactionalSession: any): Promise<void> {
const result = transactionalSession.result.catch();
transactionalSession.batch.push(Promise.reject());
transactionalSession.resolve(
transactionalSession.t.batch(transactionalSession.batch)
);
return result;
}
}
function convertPolygonToSQL(polygon) {

View File

@@ -46,30 +46,35 @@ export interface StorageAdapter {
createObject(
className: string,
schema: SchemaType,
object: any
object: any,
transactionalSession: ?any
): Promise<any>;
deleteObjectsByQuery(
className: string,
schema: SchemaType,
query: QueryType
query: QueryType,
transactionalSession: ?any
): Promise<void>;
updateObjectsByQuery(
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
): Promise<[any]>;
findOneAndUpdate(
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
): Promise<any>;
upsertOneObject(
className: string,
schema: SchemaType,
query: QueryType,
update: any
update: any,
transactionalSession: ?any
): Promise<any>;
find(
className: string,
@@ -114,4 +119,7 @@ export interface StorageAdapter {
fields: any,
conn: ?any
): Promise<void>;
createTransactionalSession(): Promise<any>;
commitTransactionalSession(transactionalSession: any): Promise<void>;
abortTransactionalSession(transactionalSession: any): Promise<void>;
}