I am reading Node.js Design Patterns and am trying to understand the following example of a producer-consumer pattern in implementing limited parallel execution (my questions in comments):
export class TaskQueuePC extends EventEmitter {
constructor(concurrency) {
super();
this.taskQueue = [];
this.consumerQueue = [];
for (let i = 0; i < concurrency; i++) {
this.consumer();
}
}
async consumer() {
while (true) {
try {
const task = await this.getNextTask();
await task();
} catch (err) {
console.error(err);
}
}
}
async getNextTask() {
return new Promise((resolve) => {
if (this.taskQueue.length !== 0) {
return resolve(this.taskQueue.shift());
}
this.consumerQueue.push(resolve);
});
}
runTask(task) {
// why are we returning a promise here?
return new Promise((resolve, reject) => {
// why are we wrapping our task here?
const taskWrapper = () => {
const taskPromise = task();
taskPromise.then(resolve, reject);
return taskPromise;
};
if (this.consumerQueue.length !== 0) {
const consumer = this.consumerQueue.shift();
consumer(taskWrapper);
} else {
this.taskQueue.push(taskWrapper);
}
});
}
}
- In the constructor, we create queues for both tasks and consumers, and then execute the
consumer
method up to the concurrency limit. - This pauses each
consumer
onconst task = await getNextTask()
which returns a pending promise. - Because there are no tasks yet in our task queue, the resolver for the promise is pushed to the consumer queue.
- When a task is added with
runTask
, the consumer (the pending promise's resolver) is plucked off the queue and called with the task. This returns execution to theconsumer
method, which will run thetask()
, eventually looping again to await another task or sit in the queue.
What I cannot grok is the purpose of the Promise and taskWrapper
in the runTask
method. It seems we would have the same behavior if both the Promise and taskWrapper
were omitted:
runTask(task) {
if (this.consumerQueue.length !== 0) {
const consumer = this.consumerQueue.shift();
consumer(task);
} else {
this.taskQueue.push(task);
}
}
In fact, when I execute this version I get the same results. Am I missing something?
Aucun commentaire:
Enregistrer un commentaire