samedi 14 janvier 2023

What is the purpose of this Promise wrapper in producer-consumer queue?

I am reading Node.js Design Patterns and am trying to understand the following example of a producer-consumer pattern in implementing limited parallel execution (my questions in comments):

export class TaskQueuePC extends EventEmitter {
  constructor(concurrency) {
    super();
    this.taskQueue = [];
    this.consumerQueue = [];

    for (let i = 0; i < concurrency; i++) {
      this.consumer();
    }
  }

  async consumer() {
    while (true) {
      try {
        const task = await this.getNextTask();
        await task();
      } catch (err) {
        console.error(err);
      }
    }
  }

  async getNextTask() {
    return new Promise((resolve) => {
      if (this.taskQueue.length !== 0) {
        return resolve(this.taskQueue.shift());
      }

      this.consumerQueue.push(resolve);
    });
  }

  runTask(task) {
    // why are we returning a promise here?
    return new Promise((resolve, reject) => {
      // why are we wrapping our task here?
      const taskWrapper = () => {
        const taskPromise = task();
        taskPromise.then(resolve, reject);
        return taskPromise;
      };

      if (this.consumerQueue.length !== 0) {
        const consumer = this.consumerQueue.shift();
        consumer(taskWrapper);
      } else {
        this.taskQueue.push(taskWrapper);
      }
    });
  }
}
  1. In the constructor, we create queues for both tasks and consumers, and then execute the consumer method up to the concurrency limit.
  2. This pauses each consumer on const task = await getNextTask() which returns a pending promise.
  3. Because there are no tasks yet in our task queue, the resolver for the promise is pushed to the consumer queue.
  4. When a task is added with runTask, the consumer (the pending promise's resolver) is plucked off the queue and called with the task. This returns execution to the consumer method, which will run the task(), eventually looping again to await another task or sit in the queue.

What I cannot grok is the purpose of the Promise and taskWrapper in the runTask method. It seems we would have the same behavior if both the Promise and taskWrapper were omitted:

  runTask(task) {
    if (this.consumerQueue.length !== 0) {
      const consumer = this.consumerQueue.shift();
      consumer(task);
    } else {
      this.taskQueue.push(task);
    }
  }

In fact, when I execute this version I get the same results. Am I missing something?

Aucun commentaire:

Enregistrer un commentaire