This lesson is being piloted (Beta version)

Dask abstractions: delays

Overview

Teaching: 60 min
Exercises: 30 min
Questions
  • What abstractions does Dask offer?

  • What programming patterns exist in the parallel universe?

Objectives
  • Understand the abstraction of delayed evaluation

  • Use the visualize method to create dependency graphs

Dask is one of the many tools available for parallelizing Python code in a comfortable way. We’ve seen a basic example of dask.array in a previous episode. Now, we will focus on the delayed and bag sub-modules. Dask has a lot of other useful components, such as dataframe and futures, but we are not going to cover them in this lesson.

See an overview below:

Dask module Abstraction Keywords Covered
dask.array numpy Numerical analysis ✔️
dask.bag itertools Map-reduce, workflows ✔️
dask.delayed functions Anything that doesn’t fit the above ✔️
dask.dataframe pandas Generic data analysis
dask.futures concurrent.futures Control execution, low-level

Dask Delayed

A lot of the functionality in Dask is based on top of a framework of delayed evaluation. The concept of delayed evaluation is very important in understanding how Dask functions, which is why we will go a bit deeper into dask.delayed.

from dask import delayed

The delayed decorator builds a dependency graph from function calls.

@delayed
def add(a, b):
    result = a + b
    print(f"{a} + {b} = {result}")
    return a + b

A delayed function stores the requested function call inside a promise. The function is not actually executed yet, instead we are promised a value that can be computed later.

x_p = add(1, 2)

We can check that x_p is now a Delayed value.

type(x_p)
[out]: dask.delayed.Delayed

Note

It is often a good idea to suffix variables that you know are promises with _p. That way you keep track of promises versus immediate values.

Only when we evaluate the computation, do we get an output.

x_p.compute()
1 + 2 = 3
[out]: 3

From Delayed values we can create larger workflows and visualize them.

x_p = add(1, 2)
y_p = add(x_p, 3)
z_p = add(x_p, y_p)
z_p.visualize(rankdir="LR")

Dask workflow graph

Challenge: run the workflow

Given this workflow:

x_p = add(1, 2)
y_p = add(x_p, 3)
z_p = add(x_p, -3)

Visualize and compute y_p and z_p, how often is x_p evaluated? Now change the workflow:

x_p = add(1, 2)
y_p = add(x_p, 3)
z_p = add(x_p, y_p)
z_p.visualize(rankdir="LR")

We pass the yet uncomputed promise x_p to both y_p and z_p. How often do you expect x_p to be evaluated? Run the workflow to check your answer.

Solution

z_p.compute()
1 + 2 = 3
3 + 3 = 6
3 + 6 = 9
[out]: 9

The computation of x_p (1 + 2) appears only once.

We can also make a promise by directly calling delayed

N = 10**7
x_p = delayed(calc_pi)(N)

It is now possible to call visualize or compute methods on x_p.

Variadic arguments

In Python you can define functions that take arbitrary number of arguments:

def add(*args):
 return sum(args)

add(1, 2, 3, 4)   # => 10

You can use tuple-unpacking to pass a sequence of arguments:

numbers = [1, 2, 3, 4]
add(*numbers)   # => 10

We can build new primitives from the ground up.

@delayed
def gather(*args):
    return list(args)

Challenge: understand gather

Can you describe what the gather function does in terms of lists and promises?

Solution

It turns a list of promises into a promise of a list.

We can visualize what gather does by this small example.

x_p = gather(*(add(n, n) for n in range(10))) # Shorthand for gather(add(1, 1), add(2, 2), ...)
x_p.visualize()

a gather pattern

Computing the result,

x_p.compute()
[out]: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Challenge: design a mean function and calculate pi

Write a delayed function that computes the mean of its arguments. Use it to esimates pi several times and returns the mean of the results.

>>> mean(1, 2, 3, 4).compute()
2.5

Make sure that the entire computation is contained in a single promise.

Solution

from dask import delayed
import random

@delayed
def mean(*args):
    return sum(args) / len(args)

def calc_pi(N):
    """Computes the value of pi using N random samples."""
    M = 0
    for i in range(N):
        # take a sample
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x*x + y*y < 1.: M+=1
    return 4 * M / N


N = 10**6
pi_p = mean(*(delayed(calc_pi)(N) for i in range(10)))
pi_p.compute()

You may not seed a significant speedup. This is because dask delayed uses threads by default and our native Python implementation of calc_pi does not circumvent the GIL. With for example the numba version of calc_pi you should see a more significant speedup.

In practice you may not need to use @delayed functions too often, but it does offer ultimate flexibility. You can build complex computational workflows in this manner, sometimes replacing shell scripting, make files and the likes.

Key Points

  • Use abstractions to keep programs manageable