Dask abstractions: delays
Overview
Teaching: 60 min
Exercises: 30 minQuestions
What abstractions does Dask offer?
What programming patterns exist in the parallel universe?
Objectives
Understand the abstraction of delayed evaluation
Use the
visualize
method to create dependency graphs
Dask is one of the many tools available for parallelizing Python code in a comfortable way.
We’ve seen a basic example of dask.array
in a previous episode.
Now, we will focus on the delayed
and bag
sub-modules.
Dask has a lot of other useful components, such as dataframe
and futures
, but we are not going to cover them in this lesson.
See an overview below:
Dask module | Abstraction | Keywords | Covered |
---|---|---|---|
dask.array |
numpy |
Numerical analysis | ✔️ |
dask.bag |
itertools |
Map-reduce, workflows | ✔️ |
dask.delayed |
functions | Anything that doesn’t fit the above | ✔️ |
dask.dataframe |
pandas |
Generic data analysis | ❌ |
dask.futures |
concurrent.futures |
Control execution, low-level | ❌ |
Dask Delayed
A lot of the functionality in Dask is based on top of a framework of delayed evaluation. The concept of delayed evaluation is very important in understanding how Dask functions, which is why we will go a bit deeper into dask.delayed
.
from dask import delayed
The delayed
decorator builds a dependency graph from function calls.
@delayed
def add(a, b):
result = a + b
print(f"{a} + {b} = {result}")
return a + b
A delayed
function stores the requested function call inside a promise. The function is not actually executed yet, instead we
are promised a value that can be computed later.
x_p = add(1, 2)
We can check that x_p
is now a Delayed
value.
type(x_p)
[out]: dask.delayed.Delayed
Note
It is often a good idea to suffix variables that you know are promises with
_p
. That way you keep track of promises versus immediate values.
Only when we evaluate the computation, do we get an output.
x_p.compute()
1 + 2 = 3
[out]: 3
From Delayed
values we can create larger workflows and visualize them.
x_p = add(1, 2)
y_p = add(x_p, 3)
z_p = add(x_p, y_p)
z_p.visualize(rankdir="LR")
Challenge: run the workflow
Given this workflow:
x_p = add(1, 2) y_p = add(x_p, 3) z_p = add(x_p, -3)
Visualize and compute
y_p
andz_p
, how often isx_p
evaluated? Now change the workflow:x_p = add(1, 2) y_p = add(x_p, 3) z_p = add(x_p, y_p) z_p.visualize(rankdir="LR")
We pass the yet uncomputed promise
x_p
to bothy_p
andz_p
. How often do you expectx_p
to be evaluated? Run the workflow to check your answer.Solution
z_p.compute()
1 + 2 = 3 3 + 3 = 6 3 + 6 = 9 [out]: 9
The computation of
x_p
(1 + 2) appears only once.
We can also make a promise by directly calling delayed
N = 10**7
x_p = delayed(calc_pi)(N)
It is now possible to call visualize
or compute
methods on x_p
.
Variadic arguments
In Python you can define functions that take arbitrary number of arguments:
def add(*args): return sum(args) add(1, 2, 3, 4) # => 10
You can use tuple-unpacking to pass a sequence of arguments:
numbers = [1, 2, 3, 4] add(*numbers) # => 10
We can build new primitives from the ground up.
@delayed
def gather(*args):
return list(args)
Challenge: understand
gather
Can you describe what the
gather
function does in terms of lists and promises?Solution
It turns a list of promises into a promise of a list.
We can visualize what gather
does by this small example.
x_p = gather(*(add(n, n) for n in range(10))) # Shorthand for gather(add(1, 1), add(2, 2), ...)
x_p.visualize()
Computing the result,
x_p.compute()
[out]: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Challenge: design a
mean
function and calculate piWrite a
delayed
function that computes the mean of its arguments. Use it to esimates pi several times and returns the mean of the results.>>> mean(1, 2, 3, 4).compute() 2.5
Make sure that the entire computation is contained in a single promise.
Solution
from dask import delayed import random @delayed def mean(*args): return sum(args) / len(args) def calc_pi(N): """Computes the value of pi using N random samples.""" M = 0 for i in range(N): # take a sample x = random.uniform(-1, 1) y = random.uniform(-1, 1) if x*x + y*y < 1.: M+=1 return 4 * M / N N = 10**6 pi_p = mean(*(delayed(calc_pi)(N) for i in range(10))) pi_p.compute()
You may not seed a significant speedup. This is because dask delayed
uses threads by default and our native Python implementation
of calc_pi
does not circumvent the GIL. With for example the numba version of calc_pi
you should see a more significant speedup.
In practice you may not need to use @delayed
functions too often, but it does offer ultimate
flexibility. You can build complex computational workflows in this manner, sometimes replacing shell
scripting, make files and the likes.
Key Points
Use abstractions to keep programs manageable