# ASYNCHRONOUS PROGRAMMING: TASKS

 A Julia Task is a symmetric coroutine

In [1]:
t = @task begin; sleep(5); println("done"); end

Task (runnable) @0x00007fe8dc6eb6b0

The task t can be run wheneveer we are ready


In [2]:
sleep(10)

In [3]:
istaskstarted(t)

false

 Add a Task to the scheduler's queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such as wait. Note that schedule returns immediately

In [4]:
schedule(t);

In [5]:
istaskdone(t)

done


true

Once the task has run, it is no longer runnable

In [6]:
schedule(t);

LoadError: schedule: Task not runnable

The wait function blocks the calling task until some other task finishes.

In [12]:
t = @task begin; sleep(2); println("done"); end

done


Task (runnable) @0x00007fe8f4478d00

In [13]:
s = @task begin; sleep(1); println("really done"); end

Task (runnable) @0x00007fe8f4478e70

In [14]:
schedule(t); schedule(s); wait(s);

really done


It is common to want to create a task and schedule it right away, so the macro @async is provided for that purpose

In [15]:
t = @async begin;    for i = 1:5  println("done")  end; end;

done
done
done
done
done


# ASYNCHRONOUS PROGRAMMING: CHANNELS

A Channel is a waitable first-in first-out queue which can have multiple tasks reading from and writing to it.

In [16]:
function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

done


The Channel constructor accepts a 1-arg function as an argument,
   and wraps this function into a task, bound to the constructed channel.

In [17]:
chnl = Channel(producer);

One can then take! values repeatedly from the channel object:

In [18]:
take!(chnl)

"start"

In [19]:
take!(chnl)

2

In [20]:
take!(chnl)

4

In [21]:
take!(chnl)

6

In [22]:
take!(chnl)

8

In [23]:
take!(chnl)

"stop"

In [24]:
take!(chnl)

LoadError: InvalidStateException("Channel is closed.", :closed)

The channel object is closed automatically when the task terminates.

In [25]:
for x in Channel(producer)
                  println(x)
              end

start
2
4
6
8
stop


A channel can be visualized as a pipe, i.e., it has a write end and a read end.

In [26]:
function producer(c::Channel)
       for i=0:4
           put!(c, i+1)
       end	   
end

producer (generic function with 1 method)

In [27]:
c1 = Channel(producer)

Channel{Any}(0) (1 item available)

In [28]:
function consumer(c::Channel)
       while isready(c1)
           data = take!(c1)
	   result = data^2
	   put!(c,result)
	end
end

consumer (generic function with 1 method)

In [29]:
c2 = Channel(consumer)

Channel{Any}(0) (1 item available)

In [30]:
isready(c2)

true

In [31]:
take!(c2)

1

In [32]:
isready(c2)

true

In [33]:
take!(c2)

4

In [34]:
isready(c2)

true

In [35]:
take!(c2)

9

In [36]:
isready(c2)

true

In [37]:
take!(c2)

16

In [38]:
isready(c2)

true

In [39]:
take!(c2)

25

In [40]:
isready(c2)

false

# DISTRIBUTED COMPUTING: remote calls and remote references

 The first argument to remotecall is the function to call.
The second argument to remotecall is the id of the process that will do the work, and the remaining arguments will be passed to the function being called

In [41]:
using Distributed
addprocs(4)

4-element Vector{Int64}:
 2
 3
 4
 5

In [42]:
r = remotecall(rand, 2, 2, 2)

Future(2, 1, 6, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

In [43]:
s = @spawnat 2 1 .+ fetch(r)

Future(2, 1, 7, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

In [44]:
fetch(s)

2×2 Matrix{Float64}:
 1.58779  1.24771
 1.35839  1.29615

In [45]:
remotecall_fetch(r-> fetch(r)[1, 1], 2, r)

0.5877850258023015

To make things easier, the symbol :any can be passed to @spawnat, which picks where to do the operation

In [46]:
r = @spawnat :any rand(2,2)

Future(2, 1, 10, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

In [47]:
s = @spawnat :any 1 .+ fetch(r)

Future(3, 1, 11, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

In [48]:
fetch(s)

2×2 Matrix{Float64}:
 1.6149  1.37805
 1.6079  1.08615

Code must be available on any process that runs

In [49]:
function rand2(dims...)
           return 2*rand(dims...)
       end

rand2 (generic function with 1 method)

In [50]:
rand2(2,2)

2×2 Matrix{Float64}:
 1.96929   1.54537
 0.474394  1.13347

In [51]:
fetch(@spawnat :any rand2(2,2))

LoadError: On worker 4:
UndefVarError: #rand2 not defined
Stacktrace:
  [1] [0m[1mdeserialize_datatype[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:1332[24m[39m
  [2] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:854[24m[39m
  [3] [0m[1mdeserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m
  [4] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:861[24m[39m
  [5] [0m[1mdeserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m[90m [inlined][39m
  [6] [0m[1mdeserialize_global_from_main[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mclusterserialize.jl:160[24m[39m
  [7] [0m[1m#3[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mclusterserialize.jl:72[24m[39m[90m [inlined][39m
  [8] [0m[1mforeach[22m
[90m    @ [39m[90m./[39m[90m[4mabstractarray.jl:2694[24m[39m
  [9] [0m[1mdeserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mclusterserialize.jl:72[24m[39m
 [10] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:947[24m[39m
 [11] [0m[1mdeserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m
 [12] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:858[24m[39m
 [13] [0m[1mdeserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m
 [14] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:861[24m[39m
 [15] [0m[1mdeserialize[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m[90m [inlined][39m
 [16] [0m[1mdeserialize_msg[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mmessages.jl:87[24m[39m
 [17] [0m[1m#invokelatest#2[22m
[90m    @ [39m[90m./[39m[90m[4messentials.jl:716[24m[39m[90m [inlined][39m
 [18] [0m[1minvokelatest[22m
[90m    @ [39m[90m./[39m[90m[4messentials.jl:714[24m[39m[90m [inlined][39m
 [19] [0m[1mmessage_handler_loop[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mprocess_messages.jl:169[24m[39m
 [20] [0m[1mprocess_tcp_streams[22m
[90m    @ [39m[90m/usr/local/share/julia/julia-1.7.1/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mprocess_messages.jl:126[24m[39m
 [21] [0m[1m#99[22m
[90m    @ [39m[90m./[39m[90m[4mtask.jl:423[24m[39m

In [52]:
@everywhere function rand2(dims...)
           return 2*rand(dims...)
       end


In [53]:
fetch(@spawnat :any rand2(2,2))

2×2 Matrix{Float64}:
 1.22335  0.889621
 1.99648  0.112878

Sending messages and moving data constitute most of the overhead in a distributed program.

In [54]:
A = rand(1000,1000);

In [55]:
Bref = @spawnat :any A^2;

In [56]:
fetch(Bref);

Alternatively:

In [57]:
Bref = @spawnat :any rand(1000,1000)^2;

In [58]:
fetch(Bref);

Fortunately, many useful parallel computations do not require data movement. 

In [59]:
@everywhere function count_heads(n)
           c::Int = 0
           for i = 1:n
               c += rand(Bool)
           end
           c
       end

In [60]:
a = @spawnat :any count_heads(100000000)

Future(4, 1, 37, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

In [61]:
b = @spawnat :any count_heads(100000000)

Future(5, 1, 38, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

In [62]:
fetch(a)+fetch(b)

99996791

The  construction below implements the pattern of assigning iterations to multiple processes, and combining them with a specified reduction (in this case (+)). The result of each iteration is taken as the value of the last expression inside the loop. The whole parallel loop expression itself evaluates to the final answer.

In [63]:
nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

100003765

Note that although parallel for loops look like serial for loops, their behavior is dramatically different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processes. Any variables used inside the parallel loop will be copied and broadcast to each process.

In [64]:
a = zeros(100000)

100000-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 ⋮
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

In [65]:
@distributed for i = 1:100000
           a[i] = i
       end

Task (runnable) @0x00007fe947c7a400

In [66]:
a[1]

0.0

In [67]:
a[2]

0.0

Fortunately, Julia has SharedArrays for solving the above issue

In [68]:
using SharedArrays

In [69]:
a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

Task (runnable) @0x00007fe8dda63de0

In [70]:
a[1]

1.0

In [71]:
a[2]

2.0

The Fib example in parallel with Julia

In [72]:
@everywhere function fib(n)
                               if (n < 2) 
                                   return n
                               else return fib(n-1) + fib(n-2)
                               end
                            end

In [73]:
z = @spawn fib(10)

Future(2, 1, 78, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

In [74]:
fetch(z)

55

In [75]:
@time [fib(i) for i=1:45];

 13.373942 seconds (50.42 k allocations: 2.670 MiB, 0.18% compilation time)


In [76]:
@everywhere function fib_parallel(n)
                        if (n < 40) 
                            return fib(n)
                        else 
                            x = @spawn fib_parallel(n-1) 
                            y = fib_parallel(n-2)
                            return fetch(x) + y
                        end
                     end

In [77]:
@time [fib_parallel(i) for i=1:42];

  3.984146 seconds (269.85 k allocations: 14.776 MiB, 2.20% compilation time)


In [78]:
@time [fib_parallel(i) for i=1:45];

  7.859972 seconds (337.10 k allocations: 17.784 MiB, 0.11% gc time, 2.32% compilation time)


In [79]:
@time [fib(45) for i=1:4]

 20.776470 seconds (46.68 k allocations: 2.465 MiB, 0.11% compilation time)


4-element Vector{Int64}:
 1134903170
 1134903170
 1134903170
 1134903170

In [80]:
@time [fib_parallel(45) for i=1:4]

  6.993115 seconds (54.86 k allocations: 2.991 MiB, 0.47% compilation time)


4-element Vector{Int64}:
 1134903170
 1134903170
 1134903170
 1134903170