Julia Tasks 101

Tasks are how Julia handles parallelism & concurrency. Tasks are defined at the program level and Julia’s scheduler maps them to hardware/​OS threads.

Tasks have many names in other languages: “symmetric coroutines, lightweight threads, cooperative multitasking, or one-shot continuations”. They’re particularly similar to the coroutines used by Go and Cilk.

There’s already a lot written on the details of tasks, so instead I’m going to focus on how to use them.

Uses

Use Threads.@spawn % of the Time

The most common, recommended way to create Tasks is with Threads.@spawn. You can use it with arbitrary Julia expressions to create and immediately schedule a Task that can be run on any thread – this gives us parallelism and concurrency.

using Base.Threads

@time begin           
    task1 = @spawn (println(threadid()); sleep(1))    
    task2 = @spawn (println(threadid()); sleep(1))
    wait.([task1, task2])
end

> 2
  3
  1.014021 seconds (15.98 k allocations: 1.108 MiB, 1.79% compilation time)

Tasks are small, so we can create a lot of them:

@time @sync for i in 1:10_000
    Threads.@spawn sleep(1)
end
>  1.046200 seconds (125.11 k allocations: 30.180 MiB, 13.47% compilation time)

(@sync is a convenience macro that will wait for all the created tasks to finish.)

@spawn gives the Julia scheduler the most freedom. It’s allowed to run the Task on any thread, use multiple threads, pause a Task, move a Task from one thread to another, and so on. The scheduler has a lot of information at runtime, so it can typically make pretty good decisions.

Consider this function hash_lots. It spends ms sleeping and then about ms working. If we run it once, it takes about ms:

function hash_lots(x)
    sleep(.075)
    for i in 1:9_860_000
        x = hash(x)
    end
    return x
end

@btime hash_lots(5)
>    150.162 ms (6 allocations: 144 bytes)

(@btime is a benchmarking macro that runs a function many times to get a more accurate estimate of runtime.) We get a similar time if we run copies in parallel, where is the number of threads.

@btime begin 
    @sync for i in 1:nthreads()
        @spawn hash_lots(i)
    end
end
>    151.340 ms (189 allocations: 12.56 KiB)

Now let’s say we run copies of this. How long would you expect it to take?

With no switching, each thread would process four copies of the function sequentially, and it would take about ~ms. With perfect switching, Julia would start the sleep in each Task almost immediately, taking ms across all Tasks. Then the hashing would take ms of CPU time, and there are CPUs, for another ms. So our best possible time would be ms.

@btime begin 
    @sync for i in 1:(4*n)
        @spawn hash_lots(i)
    end
end
>    376.614 ms (720 allocations: 49.30 KiB)

ms is pretty good. We get almost all of the maximum possible benefit while using default settings. Most importantly, we didn’t annotate the line sleep(.075) in any way! We wrote hash_lots as normal, synchronous code – we only needed to wrap it in a Task at the top level, and the scheduler took care of the rest. You can do this with any code, as long as its thread-safe.

Micromanaging with @task and Channels

Ok, but what if your code isn’t thread-safe? Or what if it’s 99% thread-safe, but there’s one part where your simultaneous Tasks write to a dict? We can control execution with the @task macro.

t = @spawn println("Hola!") is equivalent to the following:

t = @task println("Hola!") 
t.sticky = false #defaults to true for historical reasons
schedule(t)

Going back to the dict example – if your worker Tasks write to the dict directly, you’ll get a segfault. Instead, have one Task that writes directly to the dict, one element at a time, and use a Channel (a threadsafe queue) to communicate between your Tasks:

using Base.Threads

channel = Channel{Task}(Inf)
dict = Dict()

function make_worker(channel::Channel{Task}, dict::Dict)
	worker_task = @spawn begin
    	while true
        	k, v = do_stuff()
        	update_task = @task dict[k] = v
        	put!(channel, update_task)
        end
    end
    return worker_task
end

function make_consumer(channel::Channel{Task}) 
	consumer_task = @spawn begin
    	while true
        	t = take!(channel)
        	schedule(t)
        	wait(t)
    	end
    return consumer_task
end

workers = [make_worker(channel, dict) for i in 1:8]
consumer = make_consumer(channel)

Each worker will run the bulk of its work concurrently. Then instead of updating the dict directly, they create Tasks that get sent to the Channel. There’s only one consumer reading from the Channel and updating the dict, and that consumer calls wait after each task, so it will only run one update at a time, sequentially.

Warnings

Recursively Spawned Tasks

Julia has a relatively simple mark-sweep Garbage Collector. It’s fast but can get confused in some cases, like recursively spawned tasks – it often isn’t able to free memory until the entire stack is cleared.

So if you have a case that uses a lot of RAM, avoid having Tasks create other Tasks.

Distributed.@spawn

The Distributed package also has a @spawn macro, which is deprecated and shouldn’t be used. So if you’re using both packages, make sure to explicitly call Threads.@spawn

Deprecated macro @async

“for new code there is no reason to use @async”—vchuravy

@async is an earlier macro you might see in some old code. It’s similar to @spawn, but the spawned Task is “sticky”, meaning it will only run on the same hardware thread as the code that calls it. In other words, @async gives concurrency without parallelism.

Stickiness makes tasks a lot less composable, because a sticky Task will also limit its parent. A very low-level @async can lead to surprising bad performance across an application. Furthermore, there’s no performance gain from disabling parallelism – the overhead of @async and @spawn is the same.

Threads.@threads

@threads has a similar issue – it only creates as many Tasks as there are threads, so it doesn’t work well with Task switching. This might be what you want in some cases (e.g. to conserve memory), but most of the time you should use @spawn

@btime begin
    Threads.@threads for i in 1:(4*n)
        sleep(.1)
    end
end
>    415.664 ms (212 allocations: 9.17 KiB)

Glossary

These terms are a bit scattered across the Julia documentation, so here’s a list:

  • Task(my_function) create a Task from a callable function with no arguments.

  • @task create a Task from an arbitrary Julia expression

  • schedule(task::Task) schedule task to be run

  • task.sticky if true, task can only be run on the same hardware thread where schedule was called. If false, it can be assigned to any thread on the scheduler. The current recommendation is to use non-sticky tasks almost all the time, but tasks are sticky by default for historical reasons.

  • Threads.@spawn create and immediately schedule a non-sticky Task. This gives the Julia scheduler freedom to run the Task in the way that it thinks is optimal

  • @async (deprecated) create and immediately schedule a sticky task

  • Threads.@threads (deprecated) run a for loop in parallel

  • wait(task::Task) waits for a task to complete

  • task.result once the task is done, contains the output. Contains nothing otherwise

  • fetch(task::Task) : wait for the task, then return its result value

  • @sync use this before an expression that creates multiple tasks, and it will wait until all those tasks are done.

  • Channel “a waitable first-in first-out queue which can have multiple tasks reading from and writing to it”. Channels are a robust way of communicating between tasks. If you’re familiar with Go, you use Tasks and Channels in Julia the way you use Goroutines and Channels in Go.

  • put!(channel::Channel, value) append value to channel, blocking if it’s full

  • take!(channel::Channel) return the next available value from channel, blocking if it’s empty

No comments.