I am reading Node.js Design Patterns and am trying to understand the following example of a producer-consumer pattern in implementing limited parallel execution (my questions in comments):
export class TaskQueuePC extends EventEmitter {
constructor(concurrency) {
super();
this.taskQueue = [];
this.consumerQueue = [];
for (let i = 0; i < concurrency; i++) {
this.consumer();
}
}
async consumer() {
while (true) {
try {
const task = await this.getNextTask();
await task();
} catch (err) {
console.error(err);
}
}
}
async getNextTask() {
return new Promise((resolve) => {
if (this.taskQueue.length !== 0) {
return resolve(this.taskQueue.shift());
}
this.consumerQueue.push(resolve);
});
}
runTask(task) {
// why are we returning a promise here?
return new Promise((resolve, reject) => {
// why are we wrapping our task here?
const taskWrapper = () => {
const taskPromise = task();
taskPromise.then(resolve, reject);
return taskPromise;
};
if (this.consumerQueue.length !== 0) {
const consumer = this.consumerQueue.shift();
consumer(taskWrapper);
} else {
this.taskQueue.push(taskWrapper);
}
});
}
}
- In the constructor, we create queues for both tasks and consumers, and then execute the
consumermethod up to the concurrency limit. - This pauses each
consumeronconst task = await getNextTask()which returns a pending promise. - Because there are no tasks yet in our task queue, the resolver for the promise is pushed to the consumer queue.
- When a task is added with
runTask, the consumer (the pending promise's resolver) is plucked off the queue and called with the task. This returns execution to theconsumermethod, which will run thetask(), eventually looping again to await another task or sit in the queue.
What I cannot grok is the purpose of the Promise and taskWrapper in the runTask method. It seems we would have the same behavior if both the Promise and taskWrapper were omitted:
runTask(task) {
if (this.consumerQueue.length !== 0) {
const consumer = this.consumerQueue.shift();
consumer(task);
} else {
this.taskQueue.push(task);
}
}
In fact, when I execute this version I get the same results. Am I missing something?
Aucun commentaire:
Enregistrer un commentaire