I am new to multi-thread programming. The work flow is make each of raw data to call 'process_data' and 'save_processed_data' methods. The 'save_processed_data' method is use for save processed data to database, and should make sure that only one of threads can calling it at the same time.
def process_data
sleep(rand(5))
end
def save_processed_data
sleep(rand(2))
end
I tried to use mutex in thread. However, it caused serious performance problem.
def test_async(list)
Parallel.each(list, in_threads: 4) do |group|
Parallel.each(group, in_threads: 10) do |e|
process_data
save_processed_data
# puts e
end
end
end
def test_mutex_sync(list)
mutex = Mutex.new
Parallel.each(list, in_threads: 4) do |group|
Parallel.each(group, in_threads: 10) do |e|
process_data
mutex.synchronize do
save_processed_data
# puts e
end
end
end
end
raw_data_list = (1..10).map { |e| [*1..20] }
report1 = Benchmark.measure { test_async(raw_data_list) }
report2 = Benchmark.measure { test_mutex_sync(raw_data_list) }
puts "test_async: #{report1}"
puts "test_mutex_sync: #{report2}"
output:
test_async: 0.010000 0.020000 0.030000 ( 21.026267)
test_mutex_sync: 0.010000 0.010000 0.020000 (107.274150)
I know that Producer-Consumer pattern can solve it. Then, I copy the below sample from internet.
require 'thread'
class MakerThread < Thread
@@id = 0
@@mutex = Mutex.new
attr_reader :name
def initialize(name, table)
@name = name
@table = table
super {
while true do
#puts @name + " wants to put "
cake = "[Cake No. #{MakerThread.nextId} by #{self.name}]"
@table.put(cake)
puts @name + " puts "+ cake
sleep rand(3)
end
}
end
def MakerThread.nextId
@@mutex.synchronize{
@@id = @@id + 1
}
@@id
end
end
class EaterThread < Thread
def initialize(name, table)
@name = name
@table = table
super {
while true do
#puts @name + " wants to take "
cake = @table.take
puts @name + " takes "+ cake
sleep rand(3)
end
}
end
end
class Table
def initialize(size)
raise "size should be > 0 " if size < 1
@limit_size = size
@buffer = Array.new
@mutex = Mutex.new
@cv_full = ConditionVariable.new
@cv_empty = ConditionVariable.new
end
def put(cake)
@mutex.synchronize{
while( @buffer.size >= @limit_size) do
puts "waiting : table is full"
@cv_full.wait(@mutex)
end
@buffer.push(cake)
@cv_empty.broadcast
}
end
def take
@mutex.synchronize {
while(@buffer.empty?) do
puts "waiting : table is empty"
@cv_empty.wait(@mutex)
end
cake = @buffer.shift
@cv_full.broadcast
cake
}
end
end
table = Table.new(20)
threads = [
(1..3).map{ |i| MakerThread.new( "MakerThread #{i.to_s}", table ) },
(1..3).map{ |i| EaterThread.new( "EaterThread #{i.to_s}", table ) },
].flatten
threads.each{ |t| t.join }
The sample code has some problems.
-
I have no idea that use 'raw_data_list' variable in the sample code.
-
How to kill all worker threads when all jobs finished.
Aucun commentaire:
Enregistrer un commentaire