Fix read preference for aggregate (#6585)
* added failing test cases * fixed test cases for aggregate query * added read preference option to aggregate router
This commit is contained in:
@@ -796,6 +796,86 @@ describe_only_db('mongo')('Read preference option', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('should change read preference for `aggregate` using `beforeFind`', async() => {
|
||||
// Save objects
|
||||
const obj0 = new Parse.Object('MyObject');
|
||||
obj0.set('boolKey', false);
|
||||
const obj1 = new Parse.Object('MyObject');
|
||||
obj1.set('boolKey', true);
|
||||
await Parse.Object.saveAll([obj0, obj1]);
|
||||
// Add trigger
|
||||
Parse.Cloud.beforeFind('MyObject', req => {
|
||||
req.readPreference = 'SECONDARY';
|
||||
});
|
||||
// Spy on DB adapter
|
||||
const databaseAdapter = Config.get(Parse.applicationId).database.adapter;
|
||||
spyOn(databaseAdapter.database.serverConfig, 'startSession').and.callThrough();
|
||||
// Query
|
||||
const query = new Parse.Query('MyObject');
|
||||
const results = await query.aggregate([{match:{boolKey: false}}]);
|
||||
// Validate
|
||||
expect(results.length).toBe(1);
|
||||
let readPreference = null;
|
||||
databaseAdapter.database.serverConfig.startSession.calls.all().forEach(call => {
|
||||
if (call.args[0].owner.ns.indexOf('MyObject') > -1) {
|
||||
readPreference = call.args[0].owner.operation.readPreference.mode;
|
||||
}
|
||||
});
|
||||
expect(readPreference).toEqual(ReadPreference.SECONDARY);
|
||||
});
|
||||
|
||||
it('should change read preference for `find` using query option', async() => {
|
||||
// Save objects
|
||||
const obj0 = new Parse.Object('MyObject');
|
||||
obj0.set('boolKey', false);
|
||||
const obj1 = new Parse.Object('MyObject');
|
||||
obj1.set('boolKey', true);
|
||||
await Parse.Object.saveAll([obj0, obj1]);
|
||||
// Spy on DB adapter
|
||||
const databaseAdapter = Config.get(Parse.applicationId).database.adapter;
|
||||
spyOn(databaseAdapter.database.serverConfig, 'cursor').and.callThrough();
|
||||
// Query
|
||||
const query = new Parse.Query('MyObject');
|
||||
query.equalTo('boolKey', false);
|
||||
query.readPreference('SECONDARY');
|
||||
const results = await query.find();
|
||||
// Validate
|
||||
expect(results.length).toBe(1);
|
||||
let myObjectReadPreference = null;
|
||||
databaseAdapter.database.serverConfig.cursor.calls.all().forEach(call => {
|
||||
if (call.args[0].ns.collection.indexOf('MyObject') >= 0) {
|
||||
myObjectReadPreference =
|
||||
call.args[0].options.readPreference.mode;
|
||||
}
|
||||
});
|
||||
expect(myObjectReadPreference).toEqual(ReadPreference.SECONDARY);
|
||||
});
|
||||
|
||||
it('should change read preference for `aggregate` using query option', async() => {
|
||||
// Save objects
|
||||
const obj0 = new Parse.Object('MyObject');
|
||||
obj0.set('boolKey', false);
|
||||
const obj1 = new Parse.Object('MyObject');
|
||||
obj1.set('boolKey', true);
|
||||
await Parse.Object.saveAll([obj0, obj1]);
|
||||
// Spy on DB adapter
|
||||
const databaseAdapter = Config.get(Parse.applicationId).database.adapter;
|
||||
spyOn(databaseAdapter.database.serverConfig, 'startSession').and.callThrough();
|
||||
// Query
|
||||
const query = new Parse.Query('MyObject');
|
||||
query.readPreference('SECONDARY');
|
||||
const results = await query.aggregate([{match:{boolKey: false}}]);
|
||||
// Validate
|
||||
expect(results.length).toBe(1);
|
||||
let readPreference = null;
|
||||
databaseAdapter.database.serverConfig.startSession.calls.all().forEach(call => {
|
||||
if (call.args[0].owner.ns.indexOf('MyObject') > -1) {
|
||||
readPreference = call.args[0].owner.operation.readPreference.mode;
|
||||
}
|
||||
});
|
||||
expect(readPreference).toEqual(ReadPreference.SECONDARY);
|
||||
});
|
||||
|
||||
it('should find includes in same replica of readPreference by default', done => {
|
||||
const databaseAdapter = Config.get(Parse.applicationId).database.adapter;
|
||||
|
||||
|
||||
@@ -54,6 +54,10 @@ export class AggregateRouter extends ClassesRouter {
|
||||
options.explain = body.explain;
|
||||
delete body.explain;
|
||||
}
|
||||
if (body.readPreference) {
|
||||
options.readPreference = body.readPreference;
|
||||
delete body.readPreference;
|
||||
}
|
||||
options.pipeline = AggregateRouter.getPipeline(body);
|
||||
if (typeof body.where === 'string') {
|
||||
body.where = JSON.parse(body.where);
|
||||
|
||||
Reference in New Issue
Block a user