User Tools

Site Tools


channels

Channels, RemoteChannels, and Tasks,

WARNING This is a scratch note, not a chapter.

RemoteChannel

see [https://docs.julialang.org/en/stable/manual/parallel-computing/#Channels-and-RemoteChannels-1]]

  • A Channel is local to a process. Worker 2 cannot directly refer to a Channel on worker 3 and vice-versa. A RemoteChannel, however, can put and take values across workers.
  • A RemoteChannel can be thought of as a handle to a Channel.
  • The process id, pid, associated with a RemoteChannel identifies the process where the backing store, i.e., the backing Channel exists.
  • Any process with a reference to a RemoteChannel can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a RemoteChannel is associated with.
using Distributed
addprocs(2)
const channel = RemoteChannel(()->Channel{Int}(1))

@async pmap(1:20) do i
    isready(channel) && println(i)
    sleep(3)
end

RemoteChannel, Working From Docs

snippet.julia
NJ = 12;

addprocs(4); # add worker processes


const jobs = RemoteChannel(()->Channel{Int}(32));
make_jobs(n::Int)= for i in 1:n; put!(jobs, i); (i == 1) && print("\njobqueue filling: "); print(" $i/$n"); end#for
@async make_jobs(NJ);


const results= RemoteChannel(()->Channel{Tuple}(32));
@everywhere function do_work(jobs, results) # define work function everywhere
    while true
        job_id = take!(jobs)
        exec_time= rand()
        sleep(exec_time) # simulates elapsed time doing actual work
        println("[slept exectd job=$job_id in $(round(exec_time,2))s on cpu=$(myid())]")
        put!(results, (job_id, exec_time, myid()))
    end
end


using Random
Random.seed!(0)
for p in workers() # start tasks on the workers to process requests in parallel
    @async remote_do(do_work, p, jobs, results)
end

println("\n\nAll processes have been scheduled and have already been (slowly) running.")

sleep(NJ*0.1)  ## give the do_work some time to complete; not necessary.

println("\nSlept enough...some are done, and some are not.  Main Process now:\n")

while (NJ > 0)
    job_id, exec_time, where = take!(results)   ## because NJ = 12, we will continue until we have all 12!
    println("Main: Job $job_id finished in $(round(exec_time,2)) secs on worker $where")
    NJ = NJ - 1
end#while

Output

This works when I know exactly how many results I want to take. A @sync on the @async croaks.

snippet.text
[download only julia statements]
$ julia snippet.jl
 
jobqueue filling:  1/12 2/12 3/12 4/12 5/12 6/12 7/12 8/12 9/12 10/12 11/12 12/12
 
All processes have been scheduled and have already been (slowly) running.
	From worker 3:	[slept exectd job=1 in 0.54s on cpu=3]
	From worker 2:	[slept exectd job=4 in 0.68s on cpu=2]
	From worker 5:	[slept exectd job=2 in 0.78s on cpu=5]
	From worker 3:	[slept exectd job=5 in 0.2s on cpu=3]
	From worker 4:	[slept exectd job=3 in 0.95s on cpu=4]
 
Slept enough...some are done, and some are not.  Main Process now:
 
Main: Job 1 finished in 0.54 secs on worker 3
Main: Job 4 finished in 0.68 secs on worker 2
Main: Job 2 finished in 0.78 secs on worker 5
Main: Job 5 finished in 0.2 secs on worker 3
Main: Job 3 finished in 0.95 secs on worker 4
	From worker 3:	[slept exectd job=8 in 0.22s on cpu=3]
Main: Job 8 finished in 0.22 secs on worker 3
	From worker 3:	[slept exectd job=10 in 0.26s on cpu=3]
Main: Job 10 finished in 0.26 secs on worker 3
	From worker 2:	[slept exectd job=6 in 0.65s on cpu=2]
Main: Job 6 finished in 0.65 secs on worker 2
	From worker 3:	[slept exectd job=11 in 0.31s on cpu=3]
Main: Job 11 finished in 0.31 secs on worker 3
	From worker 5:	[slept exectd job=7 in 0.99s on cpu=5]
Main: Job 7 finished in 0.99 secs on worker 5
	From worker 4:	[slept exectd job=9 in 0.89s on cpu=4]
Main: Job 9 finished in 0.89 secs on worker 4
	From worker 2:	[slept exectd job=12 in 0.99s on cpu=2]
Main: Job 12 finished in 0.99 secs on worker 2

RemoteChannel --- Not Working as Coordinator

snippet.julia
addprocs(4)   ## we want to operate on 8 CPU cores

# does not work.  each function then has its own producer fun

@everywhere function producerfun(c::Channel)
    s=10
    while (true)
        s= s+100
        sleep(rand())
        put!(c, String("\tr=s *** cpu=$(myid())"))       ## visually distinctive
        for n=1:5
            sleep(rand()/100)
            put!(c, String("\tr=$s +C$n cpu=$(myid())"))
        end#for
    end#while
end;#function##

@everywhere const prodchnl= RemoteChannel( ()->Channel(producerfun) );

@everywhere ts(p)= for i=1:20000; println("P$p> ", take!(prodchnl)); sleep(rand()/100); end#for

@sync begin
    ## Tasks only work on one thread!  Thus, they execute sequentially.
    ## The @async enables parallel scheduling.
    t1= @async remote_do( ts, 2, 20 )  ## note: uses remote_do(), not Task()
    t2= @async remote_do( ts, 3, 30 )  ## 2=core.  20 is function argument
    t3= @async remote_do( ts, 4, 40 )
end

sleep(10)
println("done")

# bind(prodchnl,ti) cannot be used, because the first closed task would close
# the channel, and the remaining tasks could still be going.

close(prodchnl)

Output

Unfortunately, the output shows that each CPU has its own producerfun, which means that it cannot coordinate:

snippet.text
[download only julia statements]
	From worker 3:	P30> 	r=s *** cpu=3
	From worker 3:	P30> 	r=110 +C1 cpu=3
	From worker 3:	P30> 	r=110 +C2 cpu=3
	From worker 3:	P30> 	r=110 +C3 cpu=3
	From worker 3:	P30> 	r=110 +C4 cpu=3
	From worker 3:	P30> 	r=110 +C5 cpu=3
	From worker 2:	P20> 	r=s *** cpu=2
	From worker 2:	P20> 	r=110 +C1 cpu=2
	From worker 4:	P40> 	r=s *** cpu=4
	From worker 2:	P20> 	r=110 +C2 cpu=2
	From worker 4:	P40> 	r=110 +C1 cpu=4
	From worker 2:	P20> 	r=110 +C3 cpu=2
	From worker 4:	P40> 	r=110 +C2 cpu=4
	From worker 4:	P40> 	r=110 +C3 cpu=4
	From worker 2:	P20> 	r=110 +C4 cpu=2
	From worker 4:	P40> 	r=110 +C4 cpu=4
	From worker 4:	P40> 	r=110 +C5 cpu=4
	From worker 2:	P20> 	r=110 +C5 cpu=2
	From worker 3:	P30> 	r=s *** cpu=3
	From worker 3:	P30> 	r=210 +C1 cpu=3
	From worker 3:	P30> 	r=210 +C2 cpu=3
	From worker 3:	P30> 	r=210 +C3 cpu=3
	From worker 3:	P30> 	r=210 +C4 cpu=3

Tasks

As far as I understand, tasks can never be async'ed. WTH?

snippet.julia
function producer(c::Channel)
    while (true)
        s= rand(10000:99999)
        sleep(rand()/10)
        put!(c, -s)       ## visually distinctive
        for n=1:5
            sleep(rand()/1000)
            put!(c, s+n)
        end#for
    end#while
end;#function##

prodchnl= Channel(producer)


ts(p)= for i=1:20000; println("P$p on core$(myid())> ", take!(prodchnl)); sleep(rand()/1000); end#for

addprocs(7)   ## we want to operate on 8 CPU cores, but it never does?!

@sync begin
    ## Tasks only work on one thread!  Thus, they execute sequentially.
    ## The @async should but does not enable true parallel scheduling.
    t1= @async Task( ts(1) )
    t2= @async Task( ts(2) )
    t3= @async Task( ts(3) )
end

println("done")

# bind(prodchnl,ti) cannot be used, because the first closed task would close
# the channel, and the remaining tasks could still be going.

close(prodchnl)

Output

Unfortunately, the output is always using core1, despite the @async.

snippet.text
[download only julia statements]
P1 on core1> -58109
P2 on core1> 58110
P3 on core1> 58111
P1 on core1> 58112
P2 on core1> 58113
P3 on core1> 58114
P1 on core1> -51110
P2 on core1> 51111
P3 on core1> 51112
P1 on core1> 51113
P2 on core1> 51114
P3 on core1> 51115
P1 on core1> -98195
P2 on core1> 98196
P3 on core1> 98197
P1 on core1> 98198
P2 on core1> 98199

NEED TO RESOLVE

FIXME What is the relation between Channel() and Task(), so that they are both covered in the same manual chapter?

FIXME explain wait() in the Task/Process context, which waits on a Condition(). also notify. States can be runnable, waiting, queued, done, or failed. also wait is deprecated for fetch.

FIXME Channel/Task/etc. questions. Show an example of using bind() and schedule() to coordinate multiple cores.

channels.txt · Last modified: 2018/09/16 17:14 (external edit)