DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world
Concurrent Map
A simplified Map/Reduce in mono-process/multithread
Seem useful if generator and Map traitements use much IO input.
'reduce' is not parallelized.
require 'thread'
require 'timeout'
######################## Parallel Map engine ######################################
class ParalleMap
def initialize(options)
@nbThread = options[:nbThread] || 4
@generator = options[:generator] || raise("missing generator proc")
@mapper = options[:mapper] || raise("missing mapper proc")
@reducer = options[:reducer] || nil
@debug = options[:debug] || false
@query=Queue.new
@result=Queue.new
@lthread=(1..@nbThread).to_a.map { |no| Thread.new(no) { mapping(@query,@result) } }
@th=Thread.new() { generating() ; do_synthese() }
end
def generating()
# invoque generor and pip result for numering each request
mbx=Queue.new
t=Thread.new { @generator.call(mbx) ; mbx << :eend }
no=0
loop do
mess=mbx.pop
break if mess==:eend
@query << [no,mess]
no+=1
end
t.join # wait end of query pipe
puts "nb file #{no}" if @debug
@nbThread.times { @query << :eend } # stop message for each mapper
end
def do_synthese
@lthread.each { |th| th.join } # wait each mapper has ended
# pop parelle result in one array, sort it by creation order
res=[] ; res << @result.pop while @result.size > 0
res.sort! { |a,b| a[0]==b[0] ? a[1]<=>b[1] : a[0] <=> b[0] }
res.map! {|a| a[2] }
res= @reducer.call( res ) if @reducer
@resultats=res
end
def get_result()
@th.join # wait end on generator+synthese
@resultats
end
def mapping(queue,result)
nom=0
loop {
mess=queue.shift
return if mess==:eend
no,mess = *mess
begin
r=[]
@mapper.call(r,mess)
r.each { |elem| result << [no,nom,elem] ; nom+=1 }
rescue
puts "ERROR in mapper on %s : %s" % [mess.inspect,$!.to_s]
end
}
end
end
And here a use for recursive file grep :
############################# invoke bloc foreach filename matching file
def rfind(root,filter,&blk)
$nbfile=0
Dir.glob("#{root}/*").each do |en|
bn=File.basename(en)
next if bn =~ /^\.\.?$/
if File.directory?(en)
rfind(en,filter,&blk)
else
blk.call(en) if File.fnmatch( filter, bn.downcase())
end
end
end
####################### Map : grep on one file #####################
def selectLine(out,matcher,file)
result=[]
File.open(file,"r") do |f|
f.readlines.each_with_index { |line,nol|
out << "%s:%09d:%s" % [file,nol,line] if matcher =~ line
}
end
end
####################### Reduce : sort result by filena me/noline
def reduce(l)
l.map { |s|
a=s.split(":",3)
a[1].gsub!(/^0+/,'')
"%s:%s :: %s" % [a[0],a[1],a[2]]
}
end
####################################################################################
# M A I N #
####################################################################################
debug=false
debug=ARGV.shift if ARGV[0]=="-v"
raise("Usage : > pgrep regexp path 'file-filter'") if ARGV.length != 3
query= /#{ARGV[0]}/
path = ARGV[1]
ext = ARGV[2]
ext= (ext =~ /\*/) ? ext : '*.'+ext
starting=Time.now.to_f
pm=ParalleMap.new(
:nbThread => 5,
:generator => proc { |res| rfind(path,ext.downcase) { |file| res << file ; } },
:mapper => proc { |out,in_file_name| selectLine(out,query,in_file_name) },
:reducer =>proc { |rr| reduce(rr) },
:debug => (debug!=nil)
)
result=pm.get_result()
ending=Time.now.to_f
result.each { |s| puts s }
puts "\n Duration: #{ending-starting} secs" if debug
Benchmark ========= with a core i7 / 8 cores /8GB ram (!) / Windows 7 Ruby 1.9.2 :
>ruby pgrep.rb -v TestHtml . '*.rb' nb file 2805 ./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:4 :: class TestHtml5Inputstream < Test::Unit::TestCase Duration: 3.6732099056243896 secs
With msys find/grep :
>chrono gfind . -name '*.rb' -exec grep TestHtml {} /dev/null ;
gfind . -name '*.rb' -exec grep TestHtml {} /dev/null ;
./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:class TestHtml5Inputstream < Test::Unit::TestCase
Duree 41619.0 ms
With grep -r :
>chrono grep -r --include=*.rb TestHtml . grep -r '--include=*.rb' TestHtml . ./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:class TestHtml5Inputstream < Test::Unit::TestCase Duree 1639.0 ms
Jruby (1.5, java 6) :
>jruby pgrep.rb -v TestHtml . '*.rb' nb file 2805 ./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:4 :: class TestHtml5Inputstream < Test::Unit::TestCase Duration: 0.977999925613403 secs
IronRuby (1.1.3 .NET 4.0);
>ir pgrep.rb -v TestHtml . rb nb file 2805 ./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:4 :: class TestHtml5Inputstream < Test::Unit::TestCase Duration: 0.908051013946533 secs
(with chrono.rb as :
ARGV.map! { |a| a=~/\*/ ? "'"+a+"'" : a}
puts ARGV.join(" ")
date_start=Time.now
system(*ARGV)
puts "Duree " + ((Time.now-date_start)*1000).to_s + " ms"
)





