The Basics of Single Node Parallel Computing

Chris Rackauckas
September 21st, 2020

Youtube Video Link

Moore's law was the idea that computers double in efficiency at fixed time points, leading to exponentially more computing power over time. This was true for a very long time.

However, sometime in the last decade, computer cores have stopped getting faster.

The technology that promises to keep Moore’s Law going after 2013 is known as extreme ultraviolet (EUV) lithography. It uses light to write a pattern into a chemical layer on top of a silicon wafer, which is then chemically etched into the silicon to make chip components. EUV lithography uses very high energy ultraviolet light rays that are closer to X-rays than visible light. That’s attractive because EUV light has a short wavelength—around 13 nanometers—which allows for making smaller details than the 193-nanometer ultraviolet light used in lithography today. But EUV has proved surprisingly difficult to perfect.

-MIT Technology Review

The answer to the “end of Moore's Law” is Parallel Computing. However, programs need to be specifically designed in order to adequately use parallelism. This lecture will describe at a very high level the forms of parallelism and when they are appropriate. We will then proceed to use shared-memory multithreading to parallelize the simulation of the discrete dynamical system.

Managing Threads

Concurrency vs Parallelism and Green Threads

There is a difference between concurrency and parallelism. In a nutshell:

  • Concurrency: Interruptability

  • Parallelism: Independentability

To start thinking about concurrency, we need to distinguish between a process and a thread. A process is discrete running instance of a computer program. It has allocated memory for the program's code, its data, a heap, etc. Each process can have many compute threads. These threads are the unit of execution that needs to be done. On each task is its own stack and a virtual CPU (virtual CPU since it's not the true CPU, since that would require that the task is ON the CPU, which it might not be because the task can be temporarily haulted). The kernel of the operating systems then schedules tasks, which runs them. In order to keep the computer running smooth, context switching, i.e. changing the task that is actually running, happens all the time. This is independent of whether tasks are actually scheduled in parallel or not.

Each thread has its own stack associated with it.

This is an important distinction because many tasks may need to be ran concurrently but without parallelism. Examples of this are input/output (I/O). For example, in a game you may want to be updating the graphics, but the moment a user clicks you want to handle that event. You do not necessarily need to have these running in parallel, but you need the event handling task to be running concurrently to the processing of the game.

Data handling is the key area of scientific computing where green threads (concurrent non-parallel threads) show up. For data handling, one may need to send a signal that causes a message to start being passed. Alternative hardware take over at that point. This alternative hardware is a processor specific for an I/O bus, like the controller for the SSD, modem, GPU, or Infiniband. It will be polled, then it will execute the command, and give the result. There are two variants:

  • Non-Blocking vs Blocking: Whether the thread will periodically poll for whether that task is complete, or whether it should wait for the task to complete before doing anything else

  • Synchronous vs Asynchronus: Whether to execute the operation as initiated by the program or as a response to an event from the kernel.

I/O operations cause a privileged context switch, allowing the task which is handling the I/O to directly be switched to in order to continue actions.

The Main Event Loop

Julia, along with other languages with a runtime (Javascript, Go, etc.) at its core is a single process running an event loop. This event loop has is the main thread, and "Julia program" or "script" that one is running is actually ran in a green thread that is controlled by the main event loop. The event loop takes over to look for other work whenever the program hits a yield point. More yield points allows for more aggressive task switching, while it also means more switches to the event loop which suspends the numerical task, i.e. making it slower. Thus yielding shouldn't interrupt the main loop!

This is one area where languages can wildly differ in implementation. Languages structured for lots of I/O and input handling, like Javascript, have yield points at every line (it's an interpreted language and therefore the interpreter can always take control). In Julia, the yield points are minimized. The common yield points are allocations and I/O (println). This means that a tight non-allocating inner loop will not have any yield points and will be a thread that is not interruptible. While this is great for numerical performance, it is something to be aware of.

Side effect: if you run a long tight loop and wish to exit it, you may try Ctrl + C and see that it doesn't work. This is because interrupts are handled by the event loop. The event loop is never re-entered until after your tight numerical loop, and therefore you have the waiting occur. If you hit Ctrl + C multiple times, you will escalate the interruption until the OS takes over and then this is handled by the signal handling of the OS's event loop, which sends a higher level interrupt which Julia handles the moment the safety locks says it's okay (these locks occur during memory allocations to ensure that memory is not corrupted).

Asynchronus Calling Example

This example will become more clear when we get to distributed computing, but for think of remotecall_fetch as a way to run a command on a different computer. What we want to do is start all of the commands at once, and then wait for all the results before finishing the loop. We will use @async to make the call to remotecall_fetch be non-blocking, i.e. it'll start the job and only poll infrequently to find out when the other machine has completed the job and returned the result. We then add @sync to the loop, which will only continue the loop after all of the green threads have fetched the result. Otherwise, it's possible that a[idx] may not be filled yet, since the thread may not have fetched the result!

@time begin
    a = Vector{Any}(undef,nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall_fetch(sleep, pid, 2)
    end
end

The same can be done for writing to the disk. @async is a quick shorthand for spawning a green thread which will handle that I/O operation, and the main event loop will keep switching between them until they are all handled. @sync encodes that the program will not continue until all green threads are handled. This could be done more manually with Task and Channels, which will be something we touch on in the future.

Examples of the Differences

Synchronous = Thread will complete an action

Blocking = Thread will wait until action is completed

  • Asynchronous + Non-Blocking: I/O

  • Asynchronous + Blocking: Threaded atomics (demonstrated next lecture)

  • Synchronous + Blocking: Standard computing, @sync

  • Synchronous + Non-Blocking: Webservers where an I/O operation can be performed, but one never checks if the operation is completed.

Multithreading

If your threads are independent, then it may make sense to run them in parallel. This is the form of parallelism known as multithreading. To understand the data that is available in a multithreaded setup, let's look at the picture of threads again:

Each thread has its own call stack, but it's the process that holds the heap. This means that dynamically-sized heap allocated objects are shared between threads with no cost, a setup known as shared-memory computing.

Loop-Based Multithreading with @threads

Let's look back at our Lorenz dynamical system from before:

using StaticArrays, BenchmarkTools
function lorenz(u,p)
  α,σ,ρ,β = p
  @inbounds begin
    du1 = u[1] + α*(σ*(u[2]-u[1]))
    du2 = u[2] + α*(u[1]*(ρ-u[3]) - u[2])
    du3 = u[3] + α*(u[1]*u[2] - β*u[3])
  end
  @SVector [du1,du2,du3]
end
function solve_system_save!(u,f,u0,p,n)
  @inbounds u[1] = u0
  @inbounds for i in 1:length(u)-1
    u[i+1] = f(u[i],p)
  end
  u
end
p = (0.02,10.0,28.0,8/3)
u = Vector{typeof(@SVector([1.0,0.0,0.0]))}(undef,1000)
@btime solve_system_save!(u,lorenz,@SVector([1.0,0.0,0.0]),p,1000)
5.950 μs (0 allocations: 0 bytes)
1000-element Vector{SVector{3, Float64}}:
 [1.0, 0.0, 0.0]
 [0.8, 0.56, 0.0]
 [0.752, 0.9968000000000001, 0.008960000000000001]
 [0.80096, 1.3978492416000001, 0.023474005333333336]
 [0.92033784832, 1.8180538219817644, 0.04461448495326095]
 [1.099881043052353, 2.296260732619613, 0.07569952060880669]
 [1.339156980965805, 2.864603692722823, 0.12217448583728006]
 [1.6442463233172087, 3.5539673118971193, 0.19238159391549564]
 [2.026190521033191, 4.397339452147425, 0.2989931959555302]
 [2.5004203072560376, 5.431943011293093, 0.4612438424853632]
 ⋮
 [6.8089180814322185, 0.8987564841782779, 31.6759436385101]
 [5.6268857619814305, 0.3801973723631693, 30.108951163308078]
 [4.577548084057778, 0.13525687944525802, 28.545926978224173]
 [3.6890898431352737, 0.08257160199224252, 27.035860436772758]
 [2.9677861949066675, 0.15205611935372762, 25.600040161309696]
 [2.4046401797960795, 0.2914663505185634, 24.24373008707723]
 [1.9820054139405763, 0.46628657468365653, 22.964748583050085]
 [1.6788616460891923, 0.6565587545689172, 21.758445642263496]
 [1.4744010677851374, 0.8530017039412324, 20.62004063423844]

In order to use multithreading on this code, we need to take a look at the dependency graph and see what items can be calculated independently of each other. Notice that

σ*(u[2]-u[1])
ρ-u[3]
u[1]*u[2]
β*u[3]

are all independent operations, so in theory we could split those off to different threads, move up, etc.

Or we can have three threads:

u[1] + α*(σ*(u[2]-u[1]))
u[2] + α*(u[1]*(ρ-u[3]) - u[2])
u[3] + α*(u[1]*u[2] - β*u[3])

all don't depend on the output of each other, so these tasks can be run in parallel. We can do this by using Julia's Threads.@threads macro which puts each of the computations of a loop in a different thread. The threaded loops do not allow you to return a value, so how do you build up the values for the @SVector?

...?

...?

...?

It's not possible! To understand why, let's look at the picture again:

There is a shared heap, but the stacks are thread local. This means that a value cannot be stack allocated in one thread and magically appear when re-entering the main thread: it needs to go on the heap somewhere. But if it needs to go onto the heap, then it makes sense for us to have preallocated its location. But if we want to preallocate du[1], du[2], and du[3], then it makes sense to use the fully non-allocating update form:

function lorenz!(du,u,p)
  α,σ,ρ,β = p
  @inbounds begin
    du[1] = u[1] + α*(σ*(u[2]-u[1]))
    du[2] = u[2] + α*(u[1]*(ρ-u[3]) - u[2])
    du[3] = u[3] + α*(u[1]*u[2] - β*u[3])
  end
end
function solve_system_save_iip!(u,f,u0,p,n)
  @inbounds u[1] = u0
  @inbounds for i in 1:length(u)-1
    f(u[i+1],u[i],p)
  end
  u
end
p = (0.02,10.0,28.0,8/3)
u = [Vector{Float64}(undef,3) for i in 1:1000]
@btime solve_system_save_iip!(u,lorenz!,[1.0,0.0,0.0],p,1000)
8.233 μs (1 allocation: 80 bytes)
1000-element Vector{Vector{Float64}}:
 [1.0, 0.0, 0.0]
 [0.8, 0.56, 0.0]
 [0.752, 0.9968000000000001, 0.008960000000000001]
 [0.80096, 1.3978492416000001, 0.023474005333333336]
 [0.92033784832, 1.8180538219817644, 0.04461448495326095]
 [1.099881043052353, 2.296260732619613, 0.07569952060880669]
 [1.339156980965805, 2.864603692722823, 0.12217448583728006]
 [1.6442463233172087, 3.5539673118971193, 0.19238159391549564]
 [2.026190521033191, 4.397339452147425, 0.2989931959555302]
 [2.5004203072560376, 5.431943011293093, 0.4612438424853632]
 ⋮
 [6.8089180814322185, 0.8987564841782779, 31.6759436385101]
 [5.6268857619814305, 0.3801973723631693, 30.108951163308078]
 [4.577548084057778, 0.13525687944525802, 28.545926978224173]
 [3.6890898431352737, 0.08257160199224252, 27.035860436772758]
 [2.9677861949066675, 0.15205611935372762, 25.600040161309696]
 [2.4046401797960795, 0.2914663505185634, 24.24373008707723]
 [1.9820054139405763, 0.46628657468365653, 22.964748583050085]
 [1.6788616460891923, 0.6565587545689172, 21.758445642263496]
 [1.4744010677851374, 0.8530017039412324, 20.62004063423844]

and now we multithread:

using Base.Threads
function lorenz_mt!(du,u,p)
  α,σ,ρ,β = p
  let du=du, u=u, p=p
    Threads.@threads for i in 1:3
      @inbounds begin
        if i == 1
          du[1] = u[1] + α*(σ*(u[2]-u[1]))
        elseif i == 2
          du[2] = u[2] + α*(u[1]*(ρ-u[3]) - u[2])
        else
          du[3] = u[3] + α*(u[1]*u[2] - β*u[3])
        end
        nothing
      end
    end
  end
  nothing
end
function solve_system_save_iip!(u,f,u0,p,n)
  @inbounds u[1] = u0
  @inbounds for i in 1:length(u)-1
    f(u[i+1],u[i],p)
  end
  u
end
p = (0.02,10.0,28.0,8/3)
u = [Vector{Float64}(undef,3) for i in 1:1000]
@btime solve_system_save_iip!(u,lorenz_mt!,[1.0,0.0,0.0],p,1000);
3.573 ms (5995 allocations: 577.62 KiB)

Parallelism doesn't always make things faster. There are two costs associated with this code. For one, we had to go to the slower heap+mutation version, so its implementation starting point is slower. But secondly, and more importantly, the cost of spinning a new thread is non-negligible. In fact, here we can see that it even needs to make a small allocation for the new context. The total cost is on the order of It's on the order of 50ns: not huge, but something to take note of. So what we've done is taken almost free calculations and made them ~50ns by making each in a different thread, instead of just having it be one thread with one call stack.

The moral of the story is that you need to make sure that there's enough work per thread in order to effectively accelerate a program with parallelism.

Data-Parallel Problems

So not every setup is amenable to parallelism. Dynamical systems are notorious for being quite difficult to parallelize because the dependency of the future time step on the previous time step is clear, meaning that one cannot easily "parallelize through time" (though it is possible, which we will study later).

However, one common way that these systems are generally parallelized is in their inputs. The following questions allow for independent simulations:

  • What steady state does an input u0 go to for some list/region of initial conditions?

  • How does the solution very when I use a different p?

The problem has a few descriptions. For one, it's called an embarrassingly parallel problem since the problem can remain largely intact to solve the parallelism problem. To solve this, we can use the exact same solve_system_save_iip!, and just change how we are calling it. Secondly, this is called a data parallel problem, since it parallelized by splitting up the input data (here, the possible u0 or ps) and acting on them independently.

Multithreaded Parameter Searches

Now let's multithread our parameter search. Let's say we wanted to compute the mean of the values in the trajectory. For a single input pair, we can compute that like:

using Statistics
function compute_trajectory_mean(u0,p)
  u = Vector{typeof(@SVector([1.0,0.0,0.0]))}(undef,1000)
  solve_system_save!(u,lorenz,u0,p,1000);
  mean(u)
end
@btime compute_trajectory_mean(@SVector([1.0,0.0,0.0]),p)
7.550 μs (3 allocations: 23.52 KiB)
3-element SVector{3, Float64} with indices SOneTo(3):
 -0.31149962346484683
 -0.3097490174897651
 26.024603558583014

We can make this faster by preallocating the cache vector u. For example, we can globalize it:

u = Vector{typeof(@SVector([1.0,0.0,0.0]))}(undef,1000)
function compute_trajectory_mean2(u0,p)
  # u is automatically captured
  solve_system_save!(u,lorenz,u0,p,1000);
  mean(u)
end
@btime compute_trajectory_mean2(@SVector([1.0,0.0,0.0]),p)
7.000 μs (3 allocations: 112 bytes)
3-element SVector{3, Float64} with indices SOneTo(3):
 -0.31149962346484683
 -0.3097490174897651
 26.024603558583014

But this is still allocating? The issue with this code is that u is a global, and captured globals cannot be inferred because their type can change at any time. Thus what we can do instead is capture a constant:

const _u_cache = Vector{typeof(@SVector([1.0,0.0,0.0]))}(undef,1000)
function compute_trajectory_mean3(u0,p)
  # u is automatically captured
  solve_system_save!(_u_cache,lorenz,u0,p,1000);
  mean(_u_cache)
end
@btime compute_trajectory_mean3(@SVector([1.0,0.0,0.0]),p)
6.960 μs (1 allocation: 32 bytes)
3-element SVector{3, Float64} with indices SOneTo(3):
 -0.31149962346484683
 -0.3097490174897651
 26.024603558583014

Now it's just allocating the output. The other way to do this is to use a closure which encapsulates the cache data:

function _compute_trajectory_mean4(u,u0,p)
  solve_system_save!(u,lorenz,u0,p,1000);
  mean(u)
end
compute_trajectory_mean4(u0,p) = _compute_trajectory_mean4(_u_cache,u0,p)
@btime compute_trajectory_mean4(@SVector([1.0,0.0,0.0]),p)
6.960 μs (1 allocation: 32 bytes)
3-element SVector{3, Float64} with indices SOneTo(3):
 -0.31149962346484683
 -0.3097490174897651
 26.024603558583014

This is the same, but a bit more explicit. Now let's create our parameter search function. Let's take a sample of parameters:

ps = [(0.02,10.0,28.0,8/3) .* (1.0,rand(3)...) for i in 1:1000]
1000-element Vector{NTuple{4, Float64}}:
 (0.02, 3.812945173293439, 27.55324373364244, 1.0257633488371298)
 (0.02, 6.790500179615313, 27.485783241111793, 0.5938489162088573)
 (0.02, 1.339477414335264, 6.29546690942283, 1.237911790637971)
 (0.02, 1.7975864688934862, 14.704041493424002, 1.1685002769450543)
 (0.02, 0.5272148312635649, 14.760466901338434, 2.48982576061833)
 (0.02, 4.734639821837439, 3.8541267835767883, 1.5106517080611956)
 (0.02, 4.650148161184944, 23.50449964565748, 0.937255759821389)
 (0.02, 1.8127696998336018, 22.18404057823202, 0.19432730201840914)
 (0.02, 7.0188261107458425, 19.20488995118546, 1.6085973195206995)
 (0.02, 9.57360848365602, 27.542244295002934, 0.11668285241675225)
 ⋮
 (0.02, 8.375059776717714, 27.195496855731374, 1.8527817484424878)
 (0.02, 7.884682439473162, 12.06684013414872, 0.16554268439580758)
 (0.02, 9.366109509148453, 12.966127532495282, 1.9214533064275305)
 (0.02, 9.246576645845364, 14.951778443811778, 0.5779728574009795)
 (0.02, 9.016173597430338, 7.458380467235672, 2.0287232218322013)
 (0.02, 0.9833872351340134, 1.198269122446359, 0.3999388708749878)
 (0.02, 0.4675993127695799, 24.66362357650897, 0.5731969736004816)
 (0.02, 1.2527162273688897, 21.053453513165337, 1.2009377935604744)
 (0.02, 8.915821908231509, 22.282069244426605, 2.327854317320825)

And let's get the mean of the trajectory for each of the parameters.

serial_out = map(p -> compute_trajectory_mean4(@SVector([1.0,0.0,0.0]),p),ps)
1000-element Vector{SVector{3, Float64}}:
 [-0.2733225197299042, -0.3576683631142484, 25.439341240037304]
 [0.1453909390679449, 0.15668819382181087, 27.209760105011576]
 [2.50943346493476, 2.5676661607656874, 5.064540833935935]
 [-3.222855036982875, -3.3687725243708, 12.758257283162084]
 [5.58216933104162, 6.042446951705745, 13.208162852299596]
 [2.0241027725084124, 2.0354686995816924, 2.7048338659833755]
 [0.6836477548131753, 0.7042552183449579, 21.59318942606505]
 [-0.6263184774568146, -0.784841294251967, 20.923731525616596]
 [-0.6415155861685428, -0.685771556639759, 16.615290915134867]
 [NaN, NaN, NaN]
 ⋮
 [-0.32597181094679356, -0.2960526973260711, 25.326405179952957]
 [0.16886344647510948, 0.17396350466608834, 10.75793488220407]
 [-0.6515988920503101, -0.6089497509409556, 10.181735444526261]
 [0.17596816524883094, 0.1867622786644403, 13.340440707996374]
 [3.4693965241606572, 3.4839344082292296, 6.150996493938196]
 [0.34458340806090465, 0.30774924883614385, 0.25770673902702546]
 [3.647594050943995, 3.934563706882653, 22.77182412833659]
 [4.63322511996309, 4.788008065860776, 18.571547942274233]
 [-1.008640144986007, -0.9768354618477326, 19.81359364809037]

Now let's do this with multithreading:

function tmap(f,ps)
  out = Vector{typeof(@SVector([1.0,0.0,0.0]))}(undef,1000)
  Threads.@threads for i in 1:1000
    # each loop part is using a different part of the data
    out[i] = f(ps[i])
  end
  out
end
threaded_out = tmap(p -> compute_trajectory_mean4(@SVector([1.0,0.0,0.0]),p),ps)
1000-element Vector{SVector{3, Float64}}:
 [-0.2733225197299042, -0.3576683631142484, 25.439341240037304]
 [0.1453909390679449, 0.15668819382181087, 27.209760105011576]
 [2.50943346493476, 2.5676661607656874, 5.064540833935935]
 [-3.222855036982875, -3.3687725243708, 12.758257283162084]
 [5.58216933104162, 6.042446951705745, 13.208162852299596]
 [2.0241027725084124, 2.0354686995816924, 2.7048338659833755]
 [0.6836477548131753, 0.7042552183449579, 21.59318942606505]
 [-0.6263184774568146, -0.784841294251967, 20.923731525616596]
 [-0.6415155861685428, -0.685771556639759, 16.615290915134867]
 [NaN, NaN, NaN]
 ⋮
 [-0.32597181094679356, -0.2960526973260711, 25.326405179952957]
 [0.16886344647510948, 0.17396350466608834, 10.75793488220407]
 [-0.6515988920503101, -0.6089497509409556, 10.181735444526261]
 [0.17596816524883094, 0.1867622786644403, 13.340440707996374]
 [3.4693965241606572, 3.4839344082292296, 6.150996493938196]
 [0.34458340806090465, 0.30774924883614385, 0.25770673902702546]
 [3.647594050943995, 3.934563706882653, 22.77182412833659]
 [4.63322511996309, 4.788008065860776, 18.571547942274233]
 [-1.008640144986007, -0.9768354618477326, 19.81359364809037]

Let's check the output:

serial_out - threaded_out
1000-element Vector{SVector{3, Float64}}:
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [NaN, NaN, NaN]
 ⋮
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]

Oh no, we don't get the same answer! What happened?

The answer is the caching. Every single thread is using _u_cache as the cache, and so while one is writing into it the other is reading out of it, and thus is getting the value written to it from the wrong cache!

To fix this, what we need is a different heap per thread:

const _u_cache_threads = [Vector{typeof(@SVector([1.0,0.0,0.0]))}(undef,1000) for i in 1:Threads.nthreads()]
function compute_trajectory_mean5(u0,p)
  # u is automatically captured
  solve_system_save!(_u_cache_threads[Threads.threadid()],lorenz,u0,p,1000);
  mean(_u_cache_threads[Threads.threadid()])
end
@btime compute_trajectory_mean5(@SVector([1.0,0.0,0.0]),p)
6.940 μs (1 allocation: 32 bytes)
3-element SVector{3, Float64} with indices SOneTo(3):
 -0.31149962346484683
 -0.3097490174897651
 26.024603558583014
serial_out = map(p -> compute_trajectory_mean5(@SVector([1.0,0.0,0.0]),p),ps)
threaded_out = tmap(p -> compute_trajectory_mean5(@SVector([1.0,0.0,0.0]),p),ps)
serial_out - threaded_out
1000-element Vector{SVector{3, Float64}}:
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [NaN, NaN, NaN]
 ⋮
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
 [0.0, 0.0, 0.0]
@btime serial_out = map(p -> compute_trajectory_mean5(@SVector([1.0,0.0,0.0]),p),ps)
7.868 ms (3 allocations: 23.50 KiB)
1000-element Vector{SVector{3, Float64}}:
 [-0.2733225197299042, -0.3576683631142484, 25.439341240037304]
 [0.1453909390679449, 0.15668819382181087, 27.209760105011576]
 [2.50943346493476, 2.5676661607656874, 5.064540833935935]
 [-3.222855036982875, -3.3687725243708, 12.758257283162084]
 [5.58216933104162, 6.042446951705745, 13.208162852299596]
 [2.0241027725084124, 2.0354686995816924, 2.7048338659833755]
 [0.6836477548131753, 0.7042552183449579, 21.59318942606505]
 [-0.6263184774568146, -0.784841294251967, 20.923731525616596]
 [-0.6415155861685428, -0.685771556639759, 16.615290915134867]
 [NaN, NaN, NaN]
 ⋮
 [-0.32597181094679356, -0.2960526973260711, 25.326405179952957]
 [0.16886344647510948, 0.17396350466608834, 10.75793488220407]
 [-0.6515988920503101, -0.6089497509409556, 10.181735444526261]
 [0.17596816524883094, 0.1867622786644403, 13.340440707996374]
 [3.4693965241606572, 3.4839344082292296, 6.150996493938196]
 [0.34458340806090465, 0.30774924883614385, 0.25770673902702546]
 [3.647594050943995, 3.934563706882653, 22.77182412833659]
 [4.63322511996309, 4.788008065860776, 18.571547942274233]
 [-1.008640144986007, -0.9768354618477326, 19.81359364809037]
@btime threaded_out = tmap(p -> compute_trajectory_mean5(@SVector([1.0,0.0,0.0]),p),ps)
7.748 ms (8 allocations: 24.03 KiB)
1000-element Vector{SVector{3, Float64}}:
 [-0.2733225197299042, -0.3576683631142484, 25.439341240037304]
 [0.1453909390679449, 0.15668819382181087, 27.209760105011576]
 [2.50943346493476, 2.5676661607656874, 5.064540833935935]
 [-3.222855036982875, -3.3687725243708, 12.758257283162084]
 [5.58216933104162, 6.042446951705745, 13.208162852299596]
 [2.0241027725084124, 2.0354686995816924, 2.7048338659833755]
 [0.6836477548131753, 0.7042552183449579, 21.59318942606505]
 [-0.6263184774568146, -0.784841294251967, 20.923731525616596]
 [-0.6415155861685428, -0.685771556639759, 16.615290915134867]
 [NaN, NaN, NaN]
 ⋮
 [-0.32597181094679356, -0.2960526973260711, 25.326405179952957]
 [0.16886344647510948, 0.17396350466608834, 10.75793488220407]
 [-0.6515988920503101, -0.6089497509409556, 10.181735444526261]
 [0.17596816524883094, 0.1867622786644403, 13.340440707996374]
 [3.4693965241606572, 3.4839344082292296, 6.150996493938196]
 [0.34458340806090465, 0.30774924883614385, 0.25770673902702546]
 [3.647594050943995, 3.934563706882653, 22.77182412833659]
 [4.63322511996309, 4.788008065860776, 18.571547942274233]
 [-1.008640144986007, -0.9768354618477326, 19.81359364809037]

Hierarchical Task-Based Multithreading and Dynamic Scheduling

The major change in Julia v1.3 is that Julia's Tasks, which are traditionally its green threads interface, are now the basis of its multithreading infrastructure. This means that all independent threads are parallelized, and a new interface for multithreading will exist that works by spawning threads.

This implementation follows Go's goroutines and the classic multithreading interface of Cilk. There is a Julia-level scheduler that handles the multithreading to put different tasks on different vCPU threads. A benefit from this is hierarchical multithreading. Since Julia's tasks can spawn tasks, what can happen is a task can create tasks which create tasks which etc. In Julia (/Go/Cilk), this is then seen as a single pool of tasks which it can schedule, and thus it will still make sure only N are running at a time (as opposed to the naive implementation where the total number of running threads is equal then multiplied). This is essential for numerical performance because running multiple compute threads on a single CPU thread requires constant context switching between the threads, which will slow down the computations.

To directly use the task-based interface, simply use Threads.@spawn to spawn new tasks. For example:

function tmap2(f,ps)
  tasks = [Threads.@spawn f(ps[i]) for i in 1:1000]
  out = [fetch(t) for t in tasks]
end
threaded_out = tmap2(p -> compute_trajectory_mean5(@SVector([1.0,0.0,0.0]),p),ps)
1000-element Vector{SVector{3, Float64}}:
 [-0.2733225197299042, -0.3576683631142484, 25.439341240037304]
 [0.1453909390679449, 0.15668819382181087, 27.209760105011576]
 [2.50943346493476, 2.5676661607656874, 5.064540833935935]
 [-3.222855036982875, -3.3687725243708, 12.758257283162084]
 [5.58216933104162, 6.042446951705745, 13.208162852299596]
 [2.0241027725084124, 2.0354686995816924, 2.7048338659833755]
 [0.6836477548131753, 0.7042552183449579, 21.59318942606505]
 [-0.6263184774568146, -0.784841294251967, 20.923731525616596]
 [-0.6415155861685428, -0.685771556639759, 16.615290915134867]
 [NaN, NaN, NaN]
 ⋮
 [-0.32597181094679356, -0.2960526973260711, 25.326405179952957]
 [0.16886344647510948, 0.17396350466608834, 10.75793488220407]
 [-0.6515988920503101, -0.6089497509409556, 10.181735444526261]
 [0.17596816524883094, 0.1867622786644403, 13.340440707996374]
 [3.4693965241606572, 3.4839344082292296, 6.150996493938196]
 [0.34458340806090465, 0.30774924883614385, 0.25770673902702546]
 [3.647594050943995, 3.934563706882653, 22.77182412833659]
 [4.63322511996309, 4.788008065860776, 18.571547942274233]
 [-1.008640144986007, -0.9768354618477326, 19.81359364809037]

However, if we check the timing we see:

@btime tmap2(p -> compute_trajectory_mean5(@SVector([1.0,0.0,0.0]),p),ps)
8.391 ms (6005 allocations: 531.45 KiB)
1000-element Vector{SVector{3, Float64}}:
 [-0.2733225197299042, -0.3576683631142484, 25.439341240037304]
 [0.1453909390679449, 0.15668819382181087, 27.209760105011576]
 [2.50943346493476, 2.5676661607656874, 5.064540833935935]
 [-3.222855036982875, -3.3687725243708, 12.758257283162084]
 [5.58216933104162, 6.042446951705745, 13.208162852299596]
 [2.0241027725084124, 2.0354686995816924, 2.7048338659833755]
 [0.6836477548131753, 0.7042552183449579, 21.59318942606505]
 [-0.6263184774568146, -0.784841294251967, 20.923731525616596]
 [-0.6415155861685428, -0.685771556639759, 16.615290915134867]
 [NaN, NaN, NaN]
 ⋮
 [-0.32597181094679356, -0.2960526973260711, 25.326405179952957]
 [0.16886344647510948, 0.17396350466608834, 10.75793488220407]
 [-0.6515988920503101, -0.6089497509409556, 10.181735444526261]
 [0.17596816524883094, 0.1867622786644403, 13.340440707996374]
 [3.4693965241606572, 3.4839344082292296, 6.150996493938196]
 [0.34458340806090465, 0.30774924883614385, 0.25770673902702546]
 [3.647594050943995, 3.934563706882653, 22.77182412833659]
 [4.63322511996309, 4.788008065860776, 18.571547942274233]
 [-1.008640144986007, -0.9768354618477326, 19.81359364809037]

Threads.@threads is built on the same multithreading infrastructure, so why is this so much slower? The reason is because Threads.@threads employs static scheduling while Threads.@spawn is using dynamic scheduling. Dynamic scheduling is the model of allowing the runtime to determine the ordering and scheduling of processes, i.e. what tasks will run run where and when. Julia's task-based multithreading system has a thread scheduler which will automatically do this for you in the background, but because this is done at runtime it will have overhead. Static scheduling is the model of pre-determining where and when tasks will run, instead of allowing this to be determined at runtime. Threads.@threads is "quasi-static" in the sense that it cuts the loop so that it spawns only as many tasks as there are threads, essentially assigning one thread for even chunks of the input data.

Does this lack of runtime overhead mean that static scheduling is "better"? No, it simply has trade-offs. Static scheduling assumes that the runtime of each block is the same. For this specific case where there are fixed number of loop iterations for the dynamical systems, we know that every compute_trajectory_mean5 costs exactly the same, and thus this will be more efficient. However, There are many cases where this might not be efficient. For example:

function sleepmap_static()
  out = Vector{Int}(undef,24)
  Threads.@threads for i in 1:24
    sleep(i/10)
    out[i] = i
  end
  out
end
isleep(i) = (sleep(i/10);i)
function sleepmap_spawn()
  tasks = [Threads.@spawn(isleep(i)) for i in 1:24]
  out = [fetch(t) for t in tasks]
end

@btime sleepmap_static()
@btime sleepmap_spawn()
30.058 s (128 allocations: 4.45 KiB)
  2.401 s (244 allocations: 14.77 KiB)
24-element Vector{Int64}:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
  ⋮
 16
 17
 18
 19
 20
 21
 22
 23
 24

The reason why this occurs is because of how the static scheduling had chunked my calculation. On my computer:

Threads.nthreads()
1

This means that there are 6 tasks that are created by Threads.@threads. The first takes:

sum(i/10 for i in 1:4)
1.0

1 second, while the next group takes longer, then the next, etc. while the last takes:

sum(i/10 for i in 21:24)
9.0

9 seconds (which is precisely the result!). Thus by unevenly distributing the runtime, we run as fast as the slowest thread. However, dynamic scheduling allows new tasks to immediately run when another is finished, meaning that the in that case the shorter tasks tend to be piled together, causing a faster execution. Thus whether dynamic or static scheduling is beneficial is dependent on the problem and the implementation of the static schedule.

Possible Project

Note that this can extend to external library calls as well. FFTW.jl recently gained support for this. A possible final project would be to do a similar change to OpenBLAS.

A Teaser for Alternative Parallelism Models

Simplest Parallel Code

A = rand(10000,10000)
B = rand(10000,10000)
A*B
10000×10000 Matrix{Float64}:
 2506.64  2505.69  2513.32  2480.37  …  2460.17  2486.95  2488.21  2479.0
 2487.34  2493.1   2493.25  2491.17     2463.11  2499.08  2479.07  2486.71
 2488.34  2520.18  2496.28  2513.02     2483.58  2498.39  2491.75  2486.11
 2508.35  2496.45  2501.15  2517.47     2483.62  2486.93  2494.7   2500.92
 2503.92  2501.93  2499.76  2499.07     2476.02  2492.62  2471.76  2485.64
 2507.19  2491.48  2499.27  2514.32  …  2465.84  2507.66  2476.52  2484.37
 2500.05  2515.31  2512.71  2522.24     2497.53  2516.0   2491.4   2514.89
 2492.51  2517.63  2511.07  2516.79     2489.11  2508.19  2496.27  2470.27
 2496.61  2512.56  2493.78  2498.99     2483.74  2502.98  2469.3   2503.57
 2515.22  2523.53  2515.79  2514.8      2481.72  2524.63  2503.11  2497.66
    ⋮                                ⋱                             
 2498.42  2512.14  2504.41  2515.64     2475.14  2503.41  2495.64  2484.49
 2504.98  2534.8   2517.77  2542.91     2495.88  2531.84  2520.7   2507.66
 2498.01  2508.28  2490.17  2513.5      2479.58  2511.16  2498.47  2502.65
 2475.0   2506.38  2499.31  2509.21     2477.2   2499.69  2479.3   2494.68
 2501.43  2511.39  2501.86  2512.22  …  2474.26  2491.48  2473.11  2495.57
 2490.54  2513.47  2489.4   2514.12     2479.64  2505.52  2480.21  2499.59
 2513.86  2503.88  2499.66  2520.05     2497.88  2509.8   2489.56  2492.43
 2503.67  2511.06  2496.57  2509.35     2468.36  2508.44  2476.73  2467.24
 2527.59  2542.52  2527.69  2529.07     2496.43  2524.05  2508.59  2505.97

If you are using a computer that has N cores, then this will use N cores. Try it and look at your resource usage!

Array-Based Parallelism

The simplest form of parallelism is array-based parallelism. The idea is that you use some construction of an array whose operations are already designed to be parallel under the hood. In Julia, some examples of this are:

  • DistributedArrays (Distributed Computing)

  • Elemental

  • MPIArrays

  • CuArrays (GPUs)

This is not a Julia specific idea either.

BLAS and Standard Libraries

The basic linear algebra calls are all handled by a set of libraries which follow the same interface known as BLAS (Basic Linear Algebra Subroutines). It's divided into 3 portions:

  • BLAS1: Element-wise operations (O(n))

  • BLAS2: Matrix-vector operations (O(n^2))

  • BLAS3: Matrix-matrix operations (O(n^3))

BLAS implementations are highly optimized, like OpenBLAS and Intel MKL, so every numerical language and library essentially uses similar underlying BLAS implementations. Extensions to these, known as LAPACK, include operations like factorizations, and are included in these standard libraries. These are all multithreaded. The reason why this is a location to target is because the operation count is high enough that parallelism can be made efficient even when only targeting this level: a matrix multiplication can take on the order of seconds, minutes, hours, or even days, and these are all highly parallel operations. This means you can get away with a bunch just by parallelizing at this level, which happens to be a bottleneck for a lot scientific computing codes.

This is also commonly the level at which GPU computing occurs in machine learning libraries for reasons which we will explain later.

MPI

Well, this is a big topic and we'll address this one later!

Conclusion

The easiest forms of parallelism are:

  • Embarrassingly parallel

  • Array-level parallelism (built into linear algebra)

Exploit these when possible.