mardi 8 août 2017

How to use producer and consumer pattern in ruby

I am new to multi-thread programming. The work flow is make each of raw data to call 'process_data' and 'save_processed_data' methods. The 'save_processed_data' method is use for save processed data to database, and should make sure that only one of threads can calling it at the same time.

def process_data
  sleep(rand(5))
end

def save_processed_data
  sleep(rand(2))
end

I tried to use mutex in thread. However, it caused serious performance problem.

def test_async(list)
  Parallel.each(list, in_threads: 4) do |group|
    Parallel.each(group, in_threads: 10) do |e|
      process_data
      save_processed_data
      # puts e
    end
  end
end

def test_mutex_sync(list)
  mutex = Mutex.new
  Parallel.each(list, in_threads: 4) do |group|
    Parallel.each(group, in_threads: 10) do |e|
      process_data
      mutex.synchronize do
        save_processed_data
        # puts e
      end
    end
  end
end

raw_data_list = (1..10).map { |e| [*1..20] }
report1 = Benchmark.measure { test_async(raw_data_list) }
report2 = Benchmark.measure { test_mutex_sync(raw_data_list) }
puts "test_async: #{report1}"
puts "test_mutex_sync: #{report2}"

output:

test_async:        0.010000   0.020000   0.030000 ( 21.026267)
test_mutex_sync:   0.010000   0.010000   0.020000 (107.274150)

I know that Producer-Consumer pattern can solve it. Then, I copy the below sample from internet.

require 'thread'

class MakerThread < Thread
  @@id = 0
  @@mutex = Mutex.new

  attr_reader :name

  def initialize(name, table)
    @name = name
    @table = table
    super {
      while true do
        #puts @name + " wants to put "
        cake = "[Cake No. #{MakerThread.nextId} by #{self.name}]"
        @table.put(cake)
        puts @name + " puts "+ cake
        sleep rand(3)
      end
    }
  end

  def MakerThread.nextId
    @@mutex.synchronize{
      @@id = @@id + 1
    }
    @@id
  end
end

class EaterThread < Thread
  def initialize(name, table)
    @name = name
    @table = table
    super {
      while true do
        #puts @name + " wants to take "
        cake = @table.take
        puts @name + " takes "+ cake
        sleep rand(3)
      end
    }
  end
end

class Table
  def initialize(size)
    raise "size should be > 0 " if size < 1

    @limit_size = size
    @buffer = Array.new
    @mutex = Mutex.new
    @cv_full  = ConditionVariable.new
    @cv_empty = ConditionVariable.new
  end

  def put(cake)
    @mutex.synchronize{
      while( @buffer.size >= @limit_size) do
        puts "waiting : table is full"
        @cv_full.wait(@mutex)
      end

      @buffer.push(cake)
      @cv_empty.broadcast
    }
  end

  def take
    @mutex.synchronize {
      while(@buffer.empty?) do
        puts "waiting : table is empty"
        @cv_empty.wait(@mutex)
      end

      cake = @buffer.shift
      @cv_full.broadcast
      cake
    }
  end
end

table = Table.new(20)
threads = [
  (1..3).map{ |i| MakerThread.new( "MakerThread #{i.to_s}", table ) },
  (1..3).map{ |i| EaterThread.new( "EaterThread #{i.to_s}", table ) },
  ].flatten
threads.each{ |t| t.join }

The sample code has some problems.

  1. I have no idea that use 'raw_data_list' variable in the sample code.

  2. How to kill all worker threads when all jobs finished.

Aucun commentaire:

Enregistrer un commentaire