Distributed Computing with dask

In this portion of the course, we’ll explore distributed computing with a Python library called dask.

dask is a library designed to help facilitate (a) manipulation of very large datasets, and (b) distribution of computation across lots of cores or physical computers. It is very similar to Apache Spark in the functionality it provides, but it is tightly integrated into numpy and pandas, making it much easier to learn than spark for users of those libraries.

What can dask do for me?

To get a sense of why dask is so nice, let’s begin with a demonstration. Suppose I have a pretty big dataset (say, an 80GB CSV of all drug shipments in the United States from 2006 to 2012). This data is too large for me to load into ram on my laptop directly, so if I were to work with it on my own, I’d do so by chunking the data by hand. But using dask, I can do the following:

[1]:
# Standard setups

import pandas as pd
import os
os.chdir('/users/nick/github/practicaldatascience/example_data/dask_data')
[2]:
# Here we start up a dask cluster.
# This creates a set of "workers".
# In this case, these workers are all on
# my computer, but if we wanted to connect
# to a cluster, we'd just past an IP address
# to `Client()` (the `n_workers` specifies
# the number of workers, so here I use 7)

from dask.distributed import Client
client = Client(n_workers=7)
client
[2]:

Client

Cluster

  • Workers: 7
  • Cores: 21
  • Memory: 34.36 GB
[3]:
# ARCOS Data on all drug shipments in US from 2006 to 2012
# I was unable to get this data
# into my repo, but you can download it here:
# https://d2ty8gaf6rmowa.cloudfront.net/dea-pain-pill-database/bulk/arcos_all_washpost.tsv.gz

# Note that while pandas can read compressed files like this .tsv.gz,
# file, `dask` cannot. So if you want to do this at home,
# you have to decompress the data.

import dask.dataframe as dd

df = dd.read_csv('arcos_all_washpost.tsv', sep='\t',
                dtype={'ACTION_INDICATOR': 'object',
                       'ORDER_FORM_NO': 'object',
                       'REPORTER_ADDRESS2': 'object',
                       'REPORTER_ADDL_CO_INFO': 'object',
                       'NDC_NO': 'object',
                       'UNIT': 'object'})
[4]:
# Extract year
df['date'] = dd.to_datetime(df.TRANSACTION_DATE, format="%m%d%Y")
df['year'] = df.date.dt.year

# Make an estimate of total morphine equivalent shipments
df['morphine_equivalent_g'] = (df['CALC_BASE_WT_IN_GM']) * df['MME_Conversion_Factor']

# Drop extra vars
df = df[['year', 'morphine_equivalent_g', 'BUYER_STATE', 'BUYER_COUNTY']]

# Collapse to total shipments to each county in each year.
collapsed = df.groupby(['year', 'BUYER_STATE', 'BUYER_COUNTY']).morphine_equivalent_g.sum()
collapsed
[4]:
Dask Series Structure:
npartitions=1
    float64
        ...
Name: morphine_equivalent_g, dtype: float64
Dask Name: series-groupby-sum-agg, 18931 tasks
[5]:
%%time
final = collapsed.compute()
final.sample(10)
CPU times: user 36.4 s, sys: 4.24 s, total: 40.6 s
Wall time: 5min 24s
[5]:
year  BUYER_STATE  BUYER_COUNTY
2011  CA           YOLO              49404.964955
2008  TX           CROCKETT            268.007700
2009  MO           SAINT FRANCOIS    51986.416907
2012  NE           CASS               2784.156097
2006  TX           CARSON              247.579725
2009  GA           CHEROKEE          65421.975288
      VA           NORTHUMBERLAND     1248.428104
2011  OH           COSHOCTON          9384.518407
2010  IN           JACKSON           27157.612188
2007  AR           SHARP              7973.241897
Name: morphine_equivalent_g, dtype: float64

Voila! I get back my thinned out dataset with the new variable I wanted, all while never using more than about 2gb of RAM at any point.

So let’s discuss what just happened.

  1. First, when I ran Client(), dask started up a set of 7 new python processes on my computer (called “workers”) it could call on for help. These are fully independent agents that could be running on different machines in a true cluster (though in this case, they’re all on my laptop).

  2. It then collected all the instructions I gave it for reading in the file, generating a new column, deleting extra columns, and grouping. But it didn’t actually execute any code. That’s why when I typed collapsed, what I got back was a dataframe with structure but no data – dask had figured out roughly what the result of my commands would look like, but hadn’t actually computed the result.

  3. Then, when I ran collapsed.compute(), dask came up with a set of assignments it could give to its workers to get me what I wanted and it started to actually compute the table I requested. In particular:

    • each worker loaded a chunk of the data, did the observation-level manipulations I wanted, and dropped extra columns.

    • then to execute the groupby, it then collapsed the data in each chunk, passed those collapsed chunks to a common worker who then re-collapsed them further, then gathered them again under (potentially) another worker and collapsed further until all the data had been collapsed.

    • Finally, after all the data had been collapsed, the data was passed back to me (the client session) as the variable final.

And it did it all in 6 minutes!

“Um… why didn’t you tell us about this before?” I hear you say…

“This sure seems like this is easier than chunking by hand!!!!”

The answer is that these distributed computing tools are often very fragile and opaque, and it’s easy to get in trouble using them, and hard to figure out why if you don’t understand the underlying principles that govern their operation.

This example was carefully chosen. I used only manipulations that dask is really good at, and I have the experience to implement them in a way that works. But do what I did above in slightly different ways, and chaos would ensure. Ok, not chaos. But you’re computer would probably crash.

The biggest challenge in using a tool like dask or spark (they operate on the same principles – more on that below) is that to prevent crashes, you have to understand how and why dask is chunking your data. And the best way to do that is to get experience chunking by hand.

For example, one of the key features of distributing computing is that they don’t run code as soon as you type it. Instead, they just keep track of what you want and wait until you run .compute() to actually execute your code. Why? Because if dask had run each command as I entered it and passed the result back to me, it would have crashed my computer.

Recall that the whole reason we’re using dask is precisely because we knew that loading the whole dataset at once would take up more memory than I have available. It’s only because we only loaded a few chunks of the data at a time, thinned out those chunks by dropping variables, and then collapsed by county-state-year that we ended up with a dataset that was small enough that it would fit in memory.

In other words, this only worked because I knew that the steps after the intial load would reduce the size of the dataset enough that when I ran .compute(), the result I got back would be small enough to fit in memory. Had I just run:

df = dd.read_csv('arcos_all_washpost.tsv', sep='\t',
                dtype={'ACTION_INDICATOR': 'object',
                       'ORDER_FORM_NO': 'object',
                       'REPORTER_ADDRESS2': 'object',
                       'REPORTER_ADDL_CO_INFO': 'object',
                       'NDC_NO': 'object',
                       'UNIT': 'object'})
df.compute()

dask would have tried to hand the full dataset back to me, and my computer would have crashed, just as it would had I tried to read the full dataset with pandas.

So understanding the intuition of chunking is basically a pre-requisite to using these tools effectively.

Vocabulary

Now that we’ve seen this example, let’s formally introduce some distributed computing vocabulary:

  • Delayed Execution (also sometimes called “lazy evaluation”): The practice of not executing code as soon as you type it, but rather accumulating a set of requests, then executing them when instructed. This allows distributing computing systems to optimize how they move around data and get things done, and ensures that you don’t end up with the system trying to shove a massive dataset into ram that’s too small.

  • Client: The central process (here, Python process) where code is being entered.

  • Workers: Other processes that are assigned work, and which eventually pass results back to the client.

  • Scheduler: The part of the system that manages the assignment of tasks to different workers.

  • map-reduce: The name for the process of distributing sub-problems to workers (map), then processing them in a way that allows the results to be recombined (reduce). maps and reduces are kind of the building blocks of distributed systems, though when working with dask you won’t usually have to manage the assignment or recombination of tasks yourself (they’re happening behind the scenes).

Let’s Learn More dask!

OK, now that we have the basic idea of dask in hand, please watch the following (very well organized) talk on dask. Then come back here and we can talk some more!

Matthew Rocklin on Dask

There is no magic here

As noted in that video, one of the very nice things about dask (and one of the reasons it’s been able to offer so much functionality so quickly) is that it really is just an extension of numpy and pandas. There is no magic here.

For example, suppose we wanted to find the largest number of pills in a single shipment in our ARCOS dataset, but we don’t have the memory to load the whole dataset into memory at once. How would we get that number? Probably by doing something like:

[6]:
import pandas as pd
import numpy as np
import os

os.chdir('/users/nick/github/practicaldatascience/example_data/dask_data')

# Create an interator of the data so
# it doesn't all load at once
df = pd.read_csv('arcos_all_washpost.tsv',
                 delimiter='\t',
                 iterator=True,
                 chunksize=100000,
                 usecols=['DOSAGE_UNIT', 'Measure']
                )


# Find the max from each chunk
max_candidates = list()
for chunk in df:
    # Subset for pill shipments
    chunk = chunk[chunk['Measure'] == "TAB"]

    # Find largest in this chunk
    max_candidates.append(chunk['DOSAGE_UNIT'].max())

# Now gather those candidates together and
# find the maximum of all the chunk maximums.
np.max(max_candidates)
[6]:
3115000.0

Wow… that is a LOT of pills!

Now suppose we asked dask to do the same thing:

[7]:
df = dd.read_csv('arcos_all_washpost.tsv', sep='\t',
                dtype={'ACTION_INDICATOR': 'object',
                       'ORDER_FORM_NO': 'object',
                       'REPORTER_ADDRESS2': 'object',
                       'REPORTER_ADDL_CO_INFO': 'object',
                       'NDC_NO': 'object',
                       'UNIT': 'object'})

df = df[df['Measure'] == "TAB"]
max_shipment = df['DOSAGE_UNIT'].max()
max_shipment.compute()
[7]:
3115000.0

What dask is actually doing is exactly what you just did! It reads in chunks of the dataset, calculates morphine_equivalent_g for each chunk, then calculates the maximum value for that chunk. Then it gathers all those maximium values, and finds the maximum of all those chunk maximums. The only difference is that it loads and evaluations chunks in parallel, and it’s parallel workers then have to pass their maximum value candidates to a central node for the final evaluation.

Moreover, when I said that’s exactly what dask did, I don’t just mean that you and dask are doing the same thing in principledask is built on pandas, so it really is calling pd.read_csv, and the pandas .max() method, just like you.

Dask’s developers were smart, and didn’t want to reinvent the wheel – they just created a package full or recipes for using numpy/pandas to (a) divide tasks into smaller pieces it can distribute to different workers (map), and to (b) recombine the results of sub-problems to give you a single answer (reduce).

But those recipes are just made of the pandas and numpy code you know and love. Or at least tolerate begrudingly.

dask versus Spark

In terms of funcationality, dask and spark basically do the same things. Both are tools for distributed computing across many computers, and conceptually they work in basically the same ways:

  • Both offer both low-level tools (things like map/filter in pyspark, and delayed in dask) as well as higher-level abstractions (spark has spark dataframes, dask has dask arrays; spark has RDDs, dask has dask arrays).

  • Both are fault tolerate (if one machine in your cluster dies, the system knows what the machine that died was doing and can assign it to another cluster).

  • Both make use of delayed execution / lazy evaluation to allow the system to develop efficient plans for completing a computation efficiently.

  • Both can be run both on your own computer, or on a cluster of lots of computers.

The differences instead come down to implementation details. dask, for example, is for Python users. It’s written in Python, and it’s a relatively simple library that just creates recipes for parallelizing numpy and pandas functions (and uses numpy and pandas to execute those recipes). And because it was designed for Python users, the syntax almost always matches that of numpy and pandas.

spark is a stand-alone system. It’s built to run on Java virtual machines, and spark itself is written in a language called Scala. It is not a tool for pandas users per se, and so it has its own syntax for manipulating datasets that is distinct from that of numpy and pandas.

As for which is more popular, as is so often the case with software, it depends on where you’re working. In this class, I’ve decided to teach dask because you don’t have to learn a new syntax, so you can instead just focus on learning how distributed computing works. But if you get a job someday that requires you to work with spark, don’t panic – you’ll find that all the concepts you’ve learned for using dask also apply to spark – you’ll just have to learn some new syntax.

What else can dask do?

At this point, we’ve mostly emphasized what dask can do in terms of the data wrangling of dataframes. However, dask has a number of additional functionalities to be aware of:

  • Working with dask arrays: dask has a parallelized version of numpy arrays

  • Parallelizing arbitrary functions: you can write your own parallelized functions with delayed

  • Distributed machine learning: dask has parallelized SOME machine learning methods in dask-ml

  • dask can be used with different tools for parallelization other than distributed (for example, it can do some parallelism through multi-threading). However… it seems like everyone seems to agree at this point you should just use distributed all the time.

What can’t dask do?

Because dask is basically a library full of little recipes for parallelizing common python, numpy, and pandas functions, you will occassionally find that there are some things that the authors of dask just haven’t implemented (usually because some operations are really hard to parallelize). Here are guides to what you can expect to work and what is unlikely to work with dask:

If you really want to get into dask…

Then after doing the exercise below (which I think is actually the most accessible to those taking this class), I would suggest following along with this three hour tutorial that goes through dask from its most basic tools up through dask arrays and dask dataframes (with exercises!).

Then also be sure to check out dask best practices here.

Exercises

Exercises can be found here

If you want more, you can find tutorials written by the dask team here.

(Note: all these tutorials are great, but as the focus of this class is on real world tabular data, we’re gonna focus on those exercises).

When you’re done, you can find a dask cheatsheet here for future reference!