Add KeyPromiseQueue to Push and Job StatusHandlers (#7267)

* Add KeyPromiseQueue to Push and Job StatusHandlers

* Update CHANGELOG.md

* Update CHANGELOG.md
This commit is contained in:
Diamond Lewis
2021-03-15 18:51:46 -05:00
committed by GitHub
parent f7d2e09de6
commit 32fc45d2d2
4 changed files with 17 additions and 28 deletions

View File

@@ -1,6 +1,6 @@
import redis from 'redis';
import logger from '../../../logger';
import { KeyPromiseQueue } from './KeyPromiseQueue';
import logger from '../../logger';
import { KeyPromiseQueue } from '../../KeyPromiseQueue';
const DEFAULT_REDIS_TTL = 30 * 1000; // 30 seconds in milliseconds
const FLUSH_DB_KEY = '__flush_db__';

View File

@@ -1,43 +0,0 @@
// KeyPromiseQueue is a simple promise queue
// used to queue operations per key basis.
// Once the tail promise in the key-queue fulfills,
// the chain on that key will be cleared.
export class KeyPromiseQueue {
constructor() {
this.queue = {};
}
enqueue(key, operation) {
const tuple = this.beforeOp(key);
const toAwait = tuple[1];
const nextOperation = toAwait.then(operation);
const wrappedOperation = nextOperation.then(result => {
this.afterOp(key);
return result;
});
tuple[1] = wrappedOperation;
return wrappedOperation;
}
beforeOp(key) {
let tuple = this.queue[key];
if (!tuple) {
tuple = [0, Promise.resolve()];
this.queue[key] = tuple;
}
tuple[0]++;
return tuple;
}
afterOp(key) {
const tuple = this.queue[key];
if (!tuple) {
return;
}
tuple[0]--;
if (tuple[0] <= 0) {
delete this.queue[key];
return;
}
}
}