# First experience with distributed computation on arrays

In [None]:
using Distributed

addprocs(4)

In [None]:
@everywhere function maxnum_serial(a,s,e)
                if s==e
                  a[s]
         else
                   mid = div((s+e),2)
                   low = maxnum_serial(a,s,mid)
                   high = maxnum_serial(a,mid+1,e)
                   low >high ? low : high
                end
       end

In [None]:
@everywhere function maxnum_parallel(a,s,e)
                if (e-s)<=10000000
                  maxnum_serial(a,s,e)
               else
                   mid = div((s+e),2)
                   low_remote = @spawn maxnum_parallel(a,s,mid)
                   high = maxnum_parallel(a,mid+1,e)
                   low = fetch(low_remote)
                   low > high ? low : high
                end
       end

In [None]:
n=1024;
a=rand(n);

In [None]:
maxnum_serial(a,1,n)

In [None]:
n = 20000000
a=rand(n);

In [None]:
@time maxnum_serial(a,1,n)

As we can see, the parallel version runs slower than its serial counterpart.

In [None]:
@time maxnum_parallel(a,1,n)

Indeed, the amount of work (number of comparisons) is in the same order of magnitude of data transfer (number of integers to move from one processor than another). But the latter costs much more clock-cycles.

In [None]:
@everywhere function minimum_maximum_serial(a,s,e)
                if s==e
                  [a[s], a[s]]
         else
                   mid = div((s+e),2)
                   X = minimum_maximum_serial(a,s,mid)
                   Y = minimum_maximum_serial(a,mid+1,e)
                   [min(X[1],Y[1]), max(X[2],Y[2])]
                end
       end

In [None]:
@everywhere function minimum_maximum_parallel(a,s,e)
                if (e-s)<=10000000
                  minimum_maximum_serial(a,s,e)
               else
                   mid = div((s+e),2)
                   R = @spawn minimum_maximum_parallel(a,s,mid)
                   Y = minimum_maximum_parallel(a,mid+1,e)
                   X = fetch(R)
                   [min(X[1],Y[1]), max(X[2],Y[2])]
                end
       end

In [None]:
n = 20000000
a=rand(n);

In [None]:
@time minimum_maximum_serial(a,1,n)

As we can see below, the parallel version is faster now than its serial counterpart. Should we be satidified with this result?

In [None]:
@time minimum_maximum_parallel(a,1,n)

No, since computing serially the minimum alone is about 10 times faster

# First experience with parallel mergesort

We start with an in-place versio of mergesort

In [None]:
function mergesort(data, istart, iend)
                      if(istart < iend)
                              mid = (istart + iend) >>>1
                              mergesort(data, istart, mid)
                              mergesort(data, mid+1, iend)
                                 merge(data, istart, mid, iend)
                      end
              end

In [None]:
function merge( data, istart, mid, iend)
                      n = iend - istart + 1
                      temp = zeros(n)
                      s = istart
                      m = mid+1
                      for tem = 1:n
                              if s <= mid && (m > iend || data[s] <= data[m])
                                      temp[tem] = data[s]
                                      s=s+1
                              else
                                      temp[tem] = data[m]
                                      m=m+1
                              end
                      end
                      data[istart:iend] = temp[1:n]
              end

In [None]:
n = 1000000
A = [rem(rand(Int32),10) for i =1:n];
@time mergesort(A, 1, n);

Towards parallel code and pretending that we do not know about SharedArrays, we write an uut-of-place serial merge sort

In [None]:
function mergesort(data, istart, iend)
                      if(istart < iend)
                              mid = div((istart + iend),2)
                              a = mergesort(data, istart, mid)
                              b = mergesort(data,mid+1, iend)
                              c = merge(a, b, istart, mid, iend)
                      else
                          [data[istart]]
                      end
              end              

In [None]:
@everywhere function merge(a, b, istart, mid, iend)
                      n = iend - istart + 1
                      nb = iend - mid
                      na = mid - istart + 1
                      c = zeros(n)
                      s = 1
                      m = 1
                      for tem = 1:n
                              if s <= na && (m > nb || a[s] <= b[m])
                                      c[tem] = a[s]
                                      s= s+1
                              else
                                      c[tem] = b[m]
                                      m=m+1
                              end
                      end
                      c
              end


In [None]:
n = 1000000;
A = [rem(rand(Int32),10) for i =1:n];
@time mergesort(A, 1, n);

In [None]:
@everywhere function mergesort_serial(data, istart, iend)
                      if(istart < iend)
                              mid = div((istart + iend),2)
                              a = mergesort_serial(data, istart, mid)
                              b = mergesort_serial(data,mid+1, iend)
                              c = merge(a, b, istart, mid, iend)
                      else
                          [data[istart]]
                      end
              end

In [None]:
@everywhere function mergesort_parallel(data, istart, iend)
                      if(iend - istart <= 2500000)
                              mergesort_serial(data, istart, iend)
                       else
                              mid = div((istart + iend),2)
                              a = @spawn mergesort_parallel(data, istart, mid)
                              b = mergesort_parallel(data,mid+1, iend)
                              c = merge(fetch(a), b, istart, mid, iend)
                      end
              end

In [None]:
n = 10000000;
A = [rem(rand(Int32),10) for i =1:n];
@time mergesort_serial(A, 1, n);

In [None]:
@time mergesort_parallel(A, 1, n);

So we got some speedup! But not that great!
EXERCISE: Improve the performance of he above code
HINT: Use SharedArrays

# Matrix Matrix Multiplication

![Blocked matrix multiplication](4-29.png)


In [None]:
function dacmm(i0, i1, j0, j1, k0, k1, A, B, C, n, basecase)
           ## A, B, C are matrices
           ## We compute C = A * B

           if n > basecase
              n = div(n, 2)
              dacmm(i0, i1, j0, j1, k0, k1, A, B, C, n, basecase)
              dacmm(i0, i1, j0, j1+n, k0, k1+n, A, B, C, n, basecase)
              dacmm(i0+n, i1, j0, j1, k0+n, k1, A, B, C, n, basecase)
              dacmm(i0+n, i1, j0, j1+n, k0+n, k1+n, A, B, C, n, basecase)
              dacmm(i0, i1+n, j0+n, j1, k0, k1, A, B, C, n, basecase)
              dacmm(i0, i1+n, j0+n, j1+n, k0, k1+n, A, B, C, n, basecase)
              dacmm(i0+n, i1+n, j0+n, j1, k0+n, k1, A, B, C, n, basecase)
              dacmm(i0+n, i1+n, j0+n, j1+n, k0+n, k1+n, A, B, C, n, basecase)
           else
             for i= 1:n, j=1:n, k=1:n
                 C[i+k0,k1+j] = C[i+k0,k1+j] + A[i+i0,i1+k] * B[k+j0,j1+j]
             end
           end
       end

In [None]:
n=4
basecase = 2
A = [rem(rand(Int32),5) for i =1:n, j = 1:n]
B = [rem(rand(Int32),5) for i =1:n, j = 1:n]
C = zeros(Int32,n,n);
dacmm(0, 0, 0, 0, 0, 0, A, B, C, n, basecase)
A * B - C

In [None]:
function test_dacmm(n, basecase)
	 A = [rem(rand(Int32),5) for i =1:n, j = 1:n]
	 B = [rem(rand(Int32),5) for i =1:n, j = 1:n]
	 C = zeros(Int32,n,n);
	 @time dacmm(0, 0, 0, 0, 0, 0, A, B, C, n, basecase)
	 if n < 1024
	 return A * B - C
	 else
	 @assert A* B == C
	 end
end

In [None]:
test_dacmm(4, 2)

In [None]:
test_dacmm(16, 4)

In [None]:
test_dacmm(256, 16)

In [None]:
test_dacmm(1024, 2)

In [None]:
[test_dacmm(1024,2^i) for i=1:10]

In [None]:
@everywhere function dacmm_parallel(i0, i1, j0, j1, k0, k1, A, B, C, n, basecase)
   l = []
   if n > basecase && nprocs() > 3
    n = div(n,2)
    lrf = [
       @spawnat procs()[1] dacmm_parallel(i0, i1, j0, j1, k0, k1, A, B, C, n,basecase),
       @spawnat procs()[2] dacmm_parallel(i0, i1, j0, j1+n, k0, k1+n, A, B, C, n,basecase),
       @spawnat procs()[3] dacmm_parallel(i0+n, i1, j0, j1, k0+n, k1, A, B, C, n,basecase),
       @spawnat procs()[4] dacmm_parallel(i0+n, i1, j0, j1+n, k0+n, k1+n, A, B, C, n, basecase)
       ]
    ## pmap(fetch, lrf)
    l = [l..., map(fetch, lrf)]
    lrf = [
       @spawnat procs()[1] dacmm_parallel(i0, i1+n, j0+n, j1, k0, k1, A, B, C, n,basecase),
       @spawnat procs()[2] dacmm_parallel(i0, i1+n, j0+n, j1+n, k0, k1+n, A, B, C, n, basecase),
       @spawnat procs()[3] dacmm_parallel(i0+n, i1+n, j0+n, j1, k0+n, k1, A, B, C, n, basecase),
       @spawnat procs()[4] dacmm_parallel(i0+n, i1+n, j0+n, j1+n, k0+n, k1+n, A, B, C, n, basecase)
       ]
    ## pmap(fetch, lrf)
    l = [l..., map(fetch, lrf)]
   else
    for i= 1:n, j=1:n, k=1:n
        C[i+k0,k1+j] += A[i+i0,i1+k] * B[k+j0,j1+j]
    end
   end
   return l
end

In [None]:
using SharedArrays

@everywhere n = 4
a = SharedArray{Int32}(n,n)
for i = 1:n, j = 1:n a[i,j]=rem(rand(Int64),5) end
b = SharedArray{Int32}(n,n)
for i = 1:n, j = 1:n b[i,j]=rem(rand(Int64),5) end
c = SharedArray{Int32}(n,n)
dacmm_parallel(0,0,0,0,0,0,a,b,c,n,n)
c - a * b

In [None]:
c = SharedArray{Int32}(n,n)
dacmm_parallel(0,0,0,0,0,0,a,b,c,n,div(n,2))
c - a * b


In [None]:
function test_dacmm_parallel(n, basecase)
	 a = SharedArray{Int32}(n,n)
	 for i = 1:n, j = 1:n a[i,j]=rem(rand(Int32),5) end
	 b = SharedArray{Int32}(n,n)
	 for i = 1:n, j = 1:n b[i,j]=rem(rand(Int32),5) end
	 c = SharedArray{Int32}(n,n)
	 @time dacmm_parallel(0,0,0,0,0,0,a,b,c,n,basecase)
	 if n < 128
	    sleep(2)
	    return a * b - c
	 end
end

In [None]:
@everywhere n = 16
test_dacmm_parallel(n,n)
test_dacmm_parallel(n,div(n,2))
test_dacmm_parallel(n,div(n,4))


In [None]:
@everywhere n = 64
test_dacmm_parallel(n,n)
test_dacmm_parallel(n,div(n,2))
test_dacmm_parallel(n,div(n,4))
test_dacmm_parallel(n,div(n,8))

In [None]:
@everywhere n = 128
[test_dacmm_parallel(n,2^i) for i=2:6]

In [None]:
@everywhere n = 256
[test_dacmm_parallel(n,2^i) for i=3:7]

In [None]:
@everywhere n = 1024
[test_dacmm_parallel(n,2^i) for i=4:10]

In [None]:
[test_dacmm(n,2^i) for i=3:10]