User Tools

Site Tools


parallel-working

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

parallel-working [2018/10/27 15:54] (current)
Line 1: Line 1:
 +
 +~~CLOSETOC~~
 +
 +~~TOC 1-4 wide~~
 +
 +# Parallel Processing
 +
 +## Basic Background
 +
 +Julia distinguishes between
 +
 +1. parallel processes run on a single core;
 +
 +2. parallel processes run on different threads (sharing the same address space, more lightweight);​ and
 +
 +3. parallel processes running on different processes (possibly other computers, heavyweight).
 +
 +Embarrassingly parallel problems (like Monte-Carlo simulations) are fairly easy to implement. ​ Other aspects await an overhaul of Julia'​s parallel aspects.
 +
 +In the chapter on [[parallel-timing|Timing]],​ there are timing comparisons among different methods all for the same embarrassingly ​ parallel task.  The rough conclusion is
 +
 +* Parallelism works better with fewer long-lasting function calls than with more short function calls.
 +
 +* With many short function calls, all process-based parallelism is slow.  The overhead of building and tearing down processes kills it.  Only thread-based parallelism should be considered, and only if each thread uses very little memory.
 +
 +* Threads are great when the program and data fits into the CPU cache. ​ The number of function calls and length of function invokation matters little.
 +
 +* `pmap` can be good in similar circumstances (when appropriate tuned), but will still only be half as fast as threads. ​ With many short function calls, pmap can be *thousands* of times slower than a plain non-parallel map.
 +
 +
 +
 +
 +## Pseudo-Parallelism on a Single Core
 +
 +This is often accomplished via a "​Parallel For-Do Loop."
 +
 +```juliarepl
 +julia> using Distributed,​ SharedArrays
 +
 +julia> function paralleloop( f::​Function,​ NC::Int=4 )
 +           z = SharedArray{Float64}( NC ) ## needed for sharing among asyncs
 +           @sync for i=1:NC ## when the sync block ends, wait for all internal async'​s
 +               ​@async z[i]= f( i ) ## queue a process
 +           ​end#​@sync for
 +           ​convert(Vector{Float64},​ z)  ## release sharedarray
 +       ​end;##​function##​
 +
 +julia> using Random; Random.seed!(0); ​
 +
 +julia> addprocs(3);​ ##​ the addprocs is useless, because everything will run on #1
 +
 +julia> function sleepingworker(i::​Int)::​Int
 +     sleep(i/8)
 +            println("​Worker $i on core #​$(myid())"​)
 +            i^2
 +       ​end;#​function##​
 +
 +julia> println( paralleloop( sleepingworker ) )
 +Worker 1 on core #1
 +Worker 2 on core #1
 +Worker 3 on core #1
 +Worker 4 on core #1
 +[1.0, 4.0, 9.0, 16.0]
 +
 +```
 +
 +* QUESTION FIXME is it ever possible to run `@async` to different threads?
 +
 +* Here is a great explanation of [@sync and @async](https://​stackoverflow.com/​questions/​37287020/​how-and-when-to-use-async-and-sync-in-julia). ​ FIXME to myself: adopt some of the explanation here.
 +
 +
 +
 +
 +
 +## Parallelism on Threads
 +
 +* Julia allows parallel threads to run simultaneously on many CPU cores, but only on the same computer.
 +
 +* From the OS perspective,​ threads allow a single (Julia) OS process to employ multiple CPU cores without having to copy *everything* over to full heavy-weight operating system processes. ​ (Thus, there is also no need to use `@everywhere` [see below].)
 +
 +* In a sense, whereas heavy-weight processes (ab-)use the multitasking in the operating system, threads allow the Julia user to coordinate the multi-tasking itself, with lower overhead and contortions (building and tearing down operating system processes). ​ However, please realize that operating systems are pretty well optimized for process management. ​ Administering process communication oneself is not necessarily better.
 +
 +* Threads tend to be efficient when tasks fit into the first-tier cache of processors. ​ In fact, they may suffer almost no performance hit, with an 8-thread program being nearly 8 times faster than a 1-thread program. ​ Thread performance can deteriorate badly when program and data vastly exceed the cache.
 +
 +Postscripts:​
 +
 +* PS: Threads can suffer from a [closure bug](https://​github.com/​JuliaLang/​julia/​issues/​15276),​ which is most easy to circumvent by    strongly typing all function inputs and outputs. ​ Because I recommend strong typing on all functions that are designed for particular types, this is just another reason to double up on this advice.
 +
 +* PS: Threads do not exist in R.  ​
 +
 +* PS: A thread is not a thread in the sense of Intel hardware, where it means a second lightweight process on a shared core, competing for resources that the other thread on the same core may not use.
 +
 +
 +QUESTION FIXME Presumably threads work only on a local computer system, where processes can float off to other cores; but not across different computers. ​ Correct?
 +
 +
 +* WARNING `using` makes code available to all threads, while `include` only makes it available to the current thread.
 +
 +
 +### Setting the Number of Threads
 +
 +The number of threads cannot be changed by a running Julia program. ​ The number of threads must be set on the operating system level before julia starts:
 +
 +```bash
 +$ export JULIA_NUM_THREADS=4 ##​ on your shell under unix
 +
 +$ julia
 +```
 +
 +* If `JULIA_NUM_THREADS` is greater than the maximum number of hardware cores (e.g., 8 for a 4-core Intel i7-7700K)), julia winsorizes the number of threads to the maximum number.
 +
 +QUESTION FIXME how are JULIA_NUM_THREADS set under Windows?
 +
 +
 +### Querying the Number of Available Threads
 +
 +```julianoeval
 +julia> using Distributed
 +
 +julia> Threads.nthreads() ##​ the number of threads is now 3
 +4
 +```
 +
 +
 +### Running a User Function on Multiple Threads
 +
 +```julianoeval
 +julia> using Distributed
 +
 +julia> @assert( Threads.nthreads() > 1, "why are you doing this?" )
 +
 +julia> timingfun( me::Int, tot::Int )::Float64= me/tot ## should be longer, but just to illustrate
 +
 +julia> function threads_sum( NUM::Int )
 +     a= Vector{Float64}( undef, NUM )
 +     Threads.@threads for i=1:NUM; a[i]= timingfun(i,​ NUM); end#for
 +            sum(a)
 + end#​function##​
 +
 +julia> threads_sum( 2000 )
 +1000.5
 +
 +```
 +
 +### Finding the Id of the Current Threads
 +
 +```julianoeval
 +julia> Base.Threads.threadid()
 +```
 +
 +
 +### Threads Communication
 +
 +Threads share the same address space. ​ Thus, they can all modify shared global variances.
 +
 +Need examples.
 +
 +QUESTION Please confirm: threads always share the same address space on the one computer. ​
 +
 +
 +
 +
 +## Parallelism on Full Processes
 +
 +### Setting and Querying Number of Processors
 +
 +* Julia always uses a master-worker architecture,​ where each worker receives its own operating system process. ​ Unlike R (which just copies everything, useful or not), julia is more careful about letting the user decide what it copies from the master to the slave, thereby conserving memory. ​ The key facility is `@everywhere`.
 +
 +* There are two ways to obtain more processes. ​ The first is `addprocs()`. ​ The second is the julia'​s `-p` command line argument can be used.  For example, `$ julia -p 2` gives three processes: one master, two workers.
 +
 +* When there is more than one process, then process #1 is the master process. ​ Processes above 1 are considered `workers`. ​ The master process usually only coordinates the activity of the worker processes. ​ Thus, there is usually no speed benefit to using only two processes.
 +
 +* Although it is possible to change the number of processes dynamically,​ this often becomes as painful as it is useless. ​ It requires delicate dancing with `@everywhere`. ​ Such dynamic allocation is recommended only if you absolutely need it and you know what you are doing. ​ Ergo, either invoke Julia with `-p` or `addprocs()` as an early or first statement in your Julia program.
 +
 +
 +```text
 +bash> julia -p 3
 +
 +bash> julia
 +...
 +
 +julia> nprocs()
 +4
 +
 +```
 +
 +You can also use `addprocs(countof)` to achieve the same result of adding processes (and `rmprocs(id)` to remove processor id [not count of]), but be warned: ​ previous `@everywhere` will then have become obsolete. ​ this can make for baffling errors. ​ You are usually better off starting julia with `-p`.
 +
 +
 +### Querying the Number of Processes
 +
 +```julianoeval
 +julia> nprocs()
 +4
 +
 +```
 +
 +* While running, you should also be able to watch the (number of) processes in your operating system'​s process manager (e.g., `htop` or `top`).
 +
 +
 +### Sending Objects From Master to Servant Processes
 +
 +* Think of `@everywhere` as a copier of objects from the master to all *currently existing* workers. ​ (If you add processes later on, the `@everywhere` will not have had any effect, and the processes will complain about not knowing anything about the object.)
 +
 +* You can stick `@everywhere` even in front of things like `include()` and `using()`.
 +
 +
 +QUESTION FIXME how can the programmer pass an already existing compiled function from the master to the worker processes?
 +
 +
 +
 +### Running a User Function on Multiple CPU Cores
 +
 +#### pmap
 +
 +```julianoeval
 +julia> using Random; Random.seed!( 0 )
 +
 +julia> addprocs(3);​                   ## give me power. ​ must come before @everywhere
 +
 +julia> @everywhere function sleepingworker(i::​Int)::​Int;​ ##​ @everywhere is required to send to existing servants
 +     sleep(rand())
 +            println("​Worker $i on core #​$(myid())"​)
 +            i^2
 +            end;#​function##​
 +
 +julia> ​ println( ​ pmap( sleepingworker,​ 1:4 )  ) ## pmap is just one variety
 + From worker 3: Worker 3 on core #3
 + From worker 3: Worker 4 on core #3
 + From worker 4: Worker 3 on core #4
 + From worker 2: Worker 1 on core #2
 +[1.0, 4.0, 9.0, 16.0]
 +
 +```
 +
 +* The function must be declared with `@everywhere` so that it is copied from the master to the three worker processes. ​ Otherwise, there will be many incomprehensible errors.
 +
 +* `pmap()` is only one of a number of ways to run threads in parallel with more CPU cores. ​ These ways will be illustrated in the next section. ​ `pmap()` is very convenient, but can also be fairly slow.  It works best when each function called takes a lot of CPU.
 +
 +* The `From worker .:` is automatically generated by Julia when a particular core sends output.
 +
 +
 +#### pmap Executor
 +
 +* PS: With the default `batch_size`,​ the performance can be atrocious:
 +
 +```text
 +function pmap_sum_nb(numprl::​Int)::​Int
 +    r= pmap( i->​timingfun( i, nprocs() ), 1:​nprocs() ​ )
 +    sum(r)
 +end#​function##​
 +```
 +
 +With a better-tuned `batch_size` parameter, `pmap()` is typically better.
 +
 +```text
 +function pmap_sum_b(numprl::​Int)::​Int
 +    r= pmap( i->​timingfun( i, nprocs() ), 1:nprocs() , batch_size=ceil(Int,​ nprocs()/​nworkers()))
 +    sum(r)
 +end#​function##​
 +```
 +
 +WARNING The similarly sounding `asyncmap()` function is scheduled in a single OS thread, and therefore not really parallel.
 +
 +
 +
 +
 +
 +#### Sharedarray Parallel Executor
 +
 +```julia
 +using SharedArrays
 +
 +function sharedarray_parallel_sum( numprl::Int )
 +    sa= SharedArray{Float64}( undef, nprocs() )
 +    s= @sync @parallel for i=1:numprl; sa[i]= timingfun( i, nprocs() ); end#for
 +    sum(sa)
 +end#​function##​
 +```
 +
 +`@parallel` could also sum up results, which is sometimes called "​reducing." ​ This is highly advantageous in terms of memory resources.
 +
 +```julia
 +function sharedarray_mapreduce(numprl::​Int)
 +    sa= SharedArray{Float64}( nprocs() )
 +    @parallel (+) for i=1:​nprocs();​ sa[i]= timingfun( i, nprocs() ); end#for
 +    sum(sa)
 +end#​function##​
 +```
 +
 +* `SharedArray` consumes only about 200 bytes more storage than a plain Array.
 +
 +
 +
 +
 +
 +### Finding the Id of the Current Process
 +
 +```julianoeval
 +julia> myid()
 +
 +```
 +
 +
 +
 +
 +### Dynamic Process Communication (IPC)
 +
 +QUESTION FIXME Is master to worker dynamic IPC fairly useless, because the master'​s only role is to dole out work to the workers?
 +
 +#### Worker-Worker Dynamic IPC
 +
 +QUESTION FIXME show dynamic worker to worker interprocess communication (IPC). ​ Example should have one worker waiting for a specific string from the other (ignore wrong ones), answer, and return "​ok"​. ​ Or they alternate on a task.
 +
 +
 +
 +
 +
 +
 +
 +### Running Julia Worker Processes On Remote Computers
 +
 +There are official [docs](https://​docs.julialang.org/​en/​stable/​manual/​getting-started). ​  ​Assume that you are on localhost and you want to start worker processes on remote computers. ​ Of course, everything that we warned about above (overhead of copying) is now likely to play a role, only ten times worse. ​ It better be the case that each function that you are sending to the remote workers take at least a few seconds to complete, or you are wasting your time.
 +
 +* Julia needs to be installed in the same location on the remote computer.
 +
 +* Set up password-less ssh to each remote machine where you want to requisition workers.
 +
 +* Check that this all works. ​ This is quoting-escape hell:
 +
 +```bash
 +$ ssh myhostip "/​Applications/​Julia-0.6.app/​Contents/​Resources/​julia/​bin/​julia -e \"​versioninfo()\""​
 +Commit d386e40c17 (2017-12-13 18:08 UTC)
 +Platform Info:
 +  OS: macOS (x86_64-apple-darwin14.5.0)
 +  CPU: Intel(R) Core(TM) i7-7700K CPU @ 4.20GHz
 +  WORD_SIZE: 64
 +  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Prescott)
 +  LAPACK: libopenblas64_
 +  LIBM: libopenlibm
 +  LLVM: libLLVM-3.9.1 (ORCJIT, broadwell)
 +
 +back to home
 +```
 +
 +* Now try this on your local julia (master):
 +
 +```julia
 +$ julia -O3 -q ## do *not* start with -p.  It is not compatible with what's below
 +julia> addprocs( [ ("​myhostip",​ 5) ] ) ## add 5 remote workers on myhostip. ​ (could give more tuples for other computers))
 +5-element Array{Int64,​1}
 + 2
 + 3
 + 4
 + 5
 + 6
 +
 +julia> addprocs( 3 ) ## add a few local workers, too
 +3-element Array{Int64,​1}
 + 7
 + 8
 + 9
 +
 +julia> @everywhere function givehostname(i::​Int)
 + (i, myid(), chomp(readstring(`hostname`)))
 +        end;#​function##​
 +
 +julia> pmap( i->​givehostname(i),​ 1:10 )
 +10-element Array{Tuple{Int64,​Int64,​SubString{String}},​1}:​
 + (1, 2, "​imac17.local"​)
 + (2, 8, "​ipro.lan"​)
 + (3, 7, "​ipro.lan"​)
 + (4, 9, "​ipro.lan"​)
 + (5, 3, "​imac17.local"​)
 + (6, 5, "​imac17.local"​)
 + (7, 4, "​imac17.local"​)
 + (8, 6, "​imac17.local"​)
 + (9, 7, "​ipro.lan"​)
 + (10, 9, "​ipro.lan"​)
 +
 +```
 +
 +
 +
 +### Mimicking R's mclapply (Multi-Core List Apply) With Processes
 +
 +Although it is possible to create an R-like `mclapply()` function, use this only if you are sure that your function is excessively parallelizable (few invokations of long function):
 +
 +
 +```juliarepl
 +julia> using Statistics: quantile,​mean
 +
 +julia> using CategoricalArrays,​ Random; ​ Random.seed!( 0 );
 +
 +julia> function lapply(listobj::​AbstractVector,​ indcateg::​AbstractVector,​ func::​Function,​ onumprlargs=nothing)
 +     map( elem->​(elem,​ func(  listobj[findall(elem .== indcateg)] , onumprlargs ​ ) ), levels(indcateg) )
 + end;##​function##​
 +
 +julia> function lapply(listobj::​AbstractVector,​ indcateg::​AbstractVector,​ func::​Function)
 +     map( elem->​(elem,​ func(  listobj[findall(elem .== indcateg)] ​ ) ), levels(indcateg) )
 + end;##​function##​
 +
 +julia> function mclapply(listobj::​AbstractVector,​ indcateg::​AbstractVector,​ func::​Function,​ onumprlargs=nothing)
 +     pmap( elem->​(elem,​ func(listobj[findall(elem .== indcateg)], onumprlargs)),​ levels(indcateg) )
 + end;##​function##​
 +
 +julia> function mclapply(listobj::​AbstractVector,​ indcateg::​AbstractVector,​ func::​Function)
 +     pmap( elem->​(elem,​ func(listobj[findall(elem .== indcateg)])),​ levels(indcateg) )
 + end;##​function##​
 +
 +julia> c= rand('​a':'​d',​ 100);
 +
 +julia> lapply( 1:100, c, mean )
 +4-element Array{Tuple{Char,​Float64},​1}:​
 + ​('​a',​ 51.416666666666664)
 + ​('​b',​ 53.75)
 + ​('​c',​ 49.95652173913044)
 + ​('​d',​ 47.48275862068966)
 +
 +julia> lapply( 1:100, c, quantile, [ 0.25, 0.5, 0.75 ] )
 +4-element Array{Tuple{Char,​Array{Float64,​1}},​1}:​
 + ​('​a',​ [29.75, 47.0, 79.5])
 + ​('​b',​ [26.75, 56.5, 77.0])
 + ​('​c',​ [25.0, 44.0, 79.5])
 + ​('​d',​ [25.0, 47.0, 67.0])
 +
 +julia> using Distributed
 +
 +julia> mclapply( 1:100, c, quantile, [ 0.25, 0.5, 0.75 ] )
 +4-element Array{Tuple{Char,​Array{Float64,​1}},​1}:​
 + ​('​a',​ [29.75, 47.0, 79.5])
 + ​('​b',​ [26.75, 56.5, 77.0])
 + ​('​c',​ [25.0, 44.0, 79.5])
 + ​('​d',​ [25.0, 47.0, 67.0])
 +
 +```
 +
 +
 +
 +
 +## Setting a Timeout
 +
 +With parallel processes, it is often necessary to set a timer to timeout if one worker takes too long.
 +
 +* QUESTION FIXME how do I set a timeout, either refer to [[controlflow]] how to set a timeout for the async process, or move the explanation there to here.
 +
 +
 +
 +
 +
 +
 +# Backmatter
 +
 +## Useful Packages on Julia Repository
 +
 +* [ParallelAccelerator](https://​juliaobserver.com/​packages/​ParallelAccelerator) ​ Improved compiler for parallel operations.
 +
 +* [[DistributedArrays]](https://​github.com/​JuliaParallel/​DistributedArrays.jl) dole out parts of arrays to different (remote) workers to operate on [[https://​github.com/​stevengj/​18S096/​blob/​master/​lectures/​other/​The%2BBig%2BPicture%2Bof%2BParallel%2BComputing.ipynb for DistributedArray]] ​
 +
 +
 +
 +## Notes
 +
 +* Special Thanks to Elrod.
 +
 +QUESTION FIXME Is there a difference between read-only and read-write memory treatment in parallel processing? ​ In some sense, all `const` variables may as well be visible to all processes. ​ Use the same `@everywhere`?​
 +
 +## References
  
parallel-working.txt ยท Last modified: 2018/10/27 15:54 (external edit)