User Tools

Site Tools


channels

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

channels [2018/09/16 17:14] (current)
Line 1: Line 1:
 +# Channels, RemoteChannels,​ and Tasks,
 +
 +WARNING This is a scratch note, not a chapter.
 +
 +
 +## RemoteChannel
 +
 +see [https://​docs.julialang.org/​en/​stable/​manual/​parallel-computing/#​Channels-and-RemoteChannels-1]]
 +
 +* A Channel is local to a process. Worker 2 cannot directly refer to a Channel on worker 3 and vice-versa. A RemoteChannel,​ however, can put and take values across workers.
 +
 +* A RemoteChannel can be thought of as a handle to a Channel.
 +
 +* The process id, pid, associated with a RemoteChannel identifies the process where the backing store, i.e., the backing Channel exists.
 +
 +* Any process with a reference to a RemoteChannel can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a RemoteChannel is associated with.
 +
 +```
 +using Distributed
 +addprocs(2)
 +const channel = RemoteChannel(()->​Channel{Int}(1))
 +
 +@async pmap(1:20) do i
 +    isready(channel) && println(i)
 +    sleep(3)
 +end
 +```
 +
 +
 +
 +
 +## RemoteChannel,​ Working From Docs
 +
 +```julia
 +NJ = 12;
 +
 +addprocs(4);​ # add worker processes
 +
 +
 +const jobs = RemoteChannel(()->​Channel{Int}(32));​
 +make_jobs(n::​Int)= for i in 1:n; put!(jobs, i); (i == 1) && print("​\njobqueue filling: "); print("​ $i/​$n"​);​ end#for
 +@async make_jobs(NJ);​
 +
 +
 +const results= RemoteChannel(()->​Channel{Tuple}(32));​
 +@everywhere function do_work(jobs,​ results) # define work function everywhere
 +    while true
 +        job_id = take!(jobs)
 +        exec_time= rand()
 +        sleep(exec_time) # simulates elapsed time doing actual work
 +        println("​[slept exectd job=$job_id in $(round(exec_time,​2))s on cpu=$(myid())]"​)
 +        put!(results,​ (job_id, exec_time, myid()))
 +    end
 +end
 +
 +
 +using Random
 +Random.seed!(0)
 +for p in workers() # start tasks on the workers to process requests in parallel
 +    @async remote_do(do_work,​ p, jobs, results)
 +end
 +
 +println("​\n\nAll processes have been scheduled and have already been (slowly) running."​)
 +
 +sleep(NJ*0.1) ​ ## give the do_work some time to complete; not necessary.
 +
 +println("​\nSlept enough...some are done, and some are not.  Main Process now:​\n"​)
 +
 +while (NJ > 0)
 +    job_id, exec_time, where = take!(results) ​  ## because NJ = 12, we will continue until we have all 12!
 +    println("​Main:​ Job $job_id finished in $(round(exec_time,​2)) secs on worker $where"​)
 +    NJ = NJ - 1
 +end#while
 +```
 +
 +#### Output
 +
 +This works when I know exactly how many results I want to take.  A `@sync` on the `@async` croaks.
 +
 +```text
 +$ julia snippet.jl
 +
 +jobqueue filling: ​ 1/12 2/12 3/12 4/12 5/12 6/12 7/12 8/12 9/12 10/12 11/12 12/12
 +
 +All processes have been scheduled and have already been (slowly) running.
 + From worker 3: [slept exectd job=1 in 0.54s on cpu=3]
 + From worker 2: [slept exectd job=4 in 0.68s on cpu=2]
 + From worker 5: [slept exectd job=2 in 0.78s on cpu=5]
 + From worker 3: [slept exectd job=5 in 0.2s on cpu=3]
 + From worker 4: [slept exectd job=3 in 0.95s on cpu=4]
 +
 +Slept enough...some are done, and some are not.  Main Process now:
 +
 +Main: Job 1 finished in 0.54 secs on worker 3
 +Main: Job 4 finished in 0.68 secs on worker 2
 +Main: Job 2 finished in 0.78 secs on worker 5
 +Main: Job 5 finished in 0.2 secs on worker 3
 +Main: Job 3 finished in 0.95 secs on worker 4
 + From worker 3: [slept exectd job=8 in 0.22s on cpu=3]
 +Main: Job 8 finished in 0.22 secs on worker 3
 + From worker 3: [slept exectd job=10 in 0.26s on cpu=3]
 +Main: Job 10 finished in 0.26 secs on worker 3
 + From worker 2: [slept exectd job=6 in 0.65s on cpu=2]
 +Main: Job 6 finished in 0.65 secs on worker 2
 + From worker 3: [slept exectd job=11 in 0.31s on cpu=3]
 +Main: Job 11 finished in 0.31 secs on worker 3
 + From worker 5: [slept exectd job=7 in 0.99s on cpu=5]
 +Main: Job 7 finished in 0.99 secs on worker 5
 + From worker 4: [slept exectd job=9 in 0.89s on cpu=4]
 +Main: Job 9 finished in 0.89 secs on worker 4
 + From worker 2: [slept exectd job=12 in 0.99s on cpu=2]
 +Main: Job 12 finished in 0.99 secs on worker 2
 +```
 +
 +
 +## RemoteChannel --- Not Working as Coordinator
 +
 +```julia
 +addprocs(4) ​  ## we want to operate on 8 CPU cores
 +
 +## does not work.  each function then has its own producer fun
 +
 +@everywhere function producerfun(c::​Channel)
 +    s=10
 +    while (true)
 +        s= s+100
 +        sleep(rand())
 +        put!(c, String("​\tr=s *** cpu=$(myid())"​)) ​      ## visually distinctive
 +        for n=1:5
 +            sleep(rand()/​100)
 +            put!(c, String("​\tr=$s +C$n cpu=$(myid())"​))
 +        end#for
 +    end#while
 +end;#​function##​
 +
 +@everywhere const prodchnl= RemoteChannel( ()->​Channel(producerfun) );
 +
 +@everywhere ts(p)= for i=1:20000; println("​P$p>​ ", take!(prodchnl));​ sleep(rand()/​100);​ end#for
 +
 +@sync begin
 +    ## Tasks only work on one thread! ​ Thus, they execute sequentially.
 +    ## The @async enables parallel scheduling.
 +    t1= @async remote_do( ts, 2, 20 )  ## note: uses remote_do(),​ not Task()
 +    t2= @async remote_do( ts, 3, 30 )  ## 2=core. ​ 20 is function argument
 +    t3= @async remote_do( ts, 4, 40 )
 +end
 +
 +sleep(10)
 +println("​done"​)
 +
 +## bind(prodchnl,​ti) cannot be used, because the first closed task would close
 +## the channel, and the remaining tasks could still be going.
 +
 +close(prodchnl)
 +```
 +
 +
 +#### Output
 +
 +Unfortunately,​ the output shows that each CPU has its own producerfun,​ which means that it cannot coordinate:
 +
 +```text
 + From worker 3: P30> r=s *** cpu=3
 + From worker 3: P30> r=110 +C1 cpu=3
 + From worker 3: P30> r=110 +C2 cpu=3
 + From worker 3: P30> r=110 +C3 cpu=3
 + From worker 3: P30> r=110 +C4 cpu=3
 + From worker 3: P30> r=110 +C5 cpu=3
 + From worker 2: P20> r=s *** cpu=2
 + From worker 2: P20> r=110 +C1 cpu=2
 + From worker 4: P40> r=s *** cpu=4
 + From worker 2: P20> r=110 +C2 cpu=2
 + From worker 4: P40> r=110 +C1 cpu=4
 + From worker 2: P20> r=110 +C3 cpu=2
 + From worker 4: P40> r=110 +C2 cpu=4
 + From worker 4: P40> r=110 +C3 cpu=4
 + From worker 2: P20> r=110 +C4 cpu=2
 + From worker 4: P40> r=110 +C4 cpu=4
 + From worker 4: P40> r=110 +C5 cpu=4
 + From worker 2: P20> r=110 +C5 cpu=2
 + From worker 3: P30> r=s *** cpu=3
 + From worker 3: P30> r=210 +C1 cpu=3
 + From worker 3: P30> r=210 +C2 cpu=3
 + From worker 3: P30> r=210 +C3 cpu=3
 + From worker 3: P30> r=210 +C4 cpu=3
 +```
 +
 +
 +
 +
 +## Tasks
 +
 +As far as I understand, tasks can never be async'​ed. ​ WTH?
 +
 +```julia
 +function producer(c::​Channel)
 +    while (true)
 +        s= rand(10000:​99999)
 +        sleep(rand()/​10)
 +        put!(c, -s)       ## visually distinctive
 +        for n=1:5
 +            sleep(rand()/​1000)
 +            put!(c, s+n)
 +        end#for
 +    end#while
 +end;#​function##​
 +
 +prodchnl= Channel(producer)
 +
 +
 +ts(p)= for i=1:20000; println("​P$p on core$(myid())>​ ", take!(prodchnl));​ sleep(rand()/​1000);​ end#for
 +
 +addprocs(7) ​  ## we want to operate on 8 CPU cores, but it never does?!
 +
 +@sync begin
 +    ## Tasks only work on one thread! ​ Thus, they execute sequentially.
 +    ## The @async should but does not enable true parallel scheduling.
 +    t1= @async Task( ts(1) )
 +    t2= @async Task( ts(2) )
 +    t3= @async Task( ts(3) )
 +end
 +
 +println("​done"​)
 +
 +## bind(prodchnl,​ti) cannot be used, because the first closed task would close
 +## the channel, and the remaining tasks could still be going.
 +
 +close(prodchnl)
 +```
 +
 +#### Output
 +
 +Unfortunately,​ the output is always using core1, despite the `@async`.
 +
 +```text
 +P1 on core1> -58109
 +P2 on core1> 58110
 +P3 on core1> 58111
 +P1 on core1> 58112
 +P2 on core1> 58113
 +P3 on core1> 58114
 +P1 on core1> -51110
 +P2 on core1> 51111
 +P3 on core1> 51112
 +P1 on core1> 51113
 +P2 on core1> 51114
 +P3 on core1> 51115
 +P1 on core1> -98195
 +P2 on core1> 98196
 +P3 on core1> 98197
 +P1 on core1> 98198
 +P2 on core1> 98199
 +```
 +
 +
 +
 +### NEED TO RESOLVE
 +
 +FIXME What is the relation between `Channel()` and `Task()`, so that they are both covered in the same manual chapter?
 +
 +FIXME explain `wait()` in the Task/​Process context, which waits on a `Condition()`. ​ also `notify`. ​ States can be runnable, waiting, queued, done, or failed. ​ also `wait` is deprecated for `fetch`.
 +
 +FIXME Channel/​Task/​etc. questions. ​ Show an example of using `bind()` and `schedule()` to coordinate multiple cores.
 +
 +
 +
  
channels.txt ยท Last modified: 2018/09/16 17:14 (external edit)