Dask Cluster on Azure Example

NOTE: DASK-CLOUDPROVIDER IS IN A STATE OF FLUX, AND SO THE CODE BELOW DOESN”T ACTUALLY WORK AT THE MOMENT… CHECK BACK IN A FEW WEEKS? - Nick, Dec 2020.

In this lesson, we’ll be using a dask cluster to replicate the exercise we did in the Big Data section where we loaded global temperature data to measure global warming at a number of locations. You can get the data we’re using for this exercise here).

I’m also assuming you already have a Azure account and have already read through Azure Storage and Setting Up a Cluster on Azure

Want to use Amazon AWS? You can do that too! The package we use to launch our cluster below also supports AWS. We won’t go through that here, but you can find info on it here.

If you want to follow along, just decompress the ghcnd_daily.tar.gz file and upload the resulting .csv into a Blob container using the web interface. Note this will take a little while. Will talk about more efficient methods of upload in our next tutorial.

Starting a Dask Cluster

[1]:
%load_ext lab_black
from dask_cloudprovider.azure import AzureVMCluster

In the next step we’ll spin up a new cluster. Before we do so, though, it’s worth covering a few things:

  • This will spin up a new compute cluster that’s completely independent of any compute you currently have running.

  • This cluster also has a time-out setting, so if nothing happens in the cluster for the specified time (here, 7200 seconds), the cluster will shut down so you aren’t paying for anything.

  • vm_size dictates the size of each node in your new cluster. There are a couple ways to see the range of VM specifications available. You can find a general summary here, but you may also find it help to navigate to your workspace in your broswer, click Compute, click the Compute Clusters tab, hit + New, then look at the “Virtual machine size” dropdown to see more options and the exact names assigned to each configuration.

  • To protect people from running up insane bills they can’t actually pay, Azure has some default limits on the number of CPUs you can have running at any time. You can see this quota by going to the Compute Clusters tab using the steps above and clicking the View Quota. (Note if you don’t have any clusters up you can’t see this number, ironically. So quickly create a cluster with the + New button, then you’ll get the button you want). This will tell you the total number of cores you can be running at any one time across all of Azure. That means if you have a single VM up and running, that counts against the quota you might want to allocate to a cluster!

    • For free student accounts, this quota is quite low: 6 CPUs. So the cluster we’ll create below is quite small. If your cluster allocation is above the quota, the code we run below will just hang, or you may get a time-out message. Sadly, you don’t get an informative error message. We’ll discuss below how to deal with this problem.

    • For a standard free account you got with credit card verification, the quota is 4 CPUs. So you have to reduce the number of nodes in the code below to 2 (2 nodes x 2 CPUs per node = 4).

    • If you have a paid account, you also have a quota, though likely a much higher one (my Duke account has a 300 CPU quota). If you’re running into quota limits on a paid account, you can ask that they be increased.

  • These clusters are scalable, so the n_workers is your initial node count. So it often makes sense to start a little slow then scale up using the widget that appears below when you’re sure you’re done debugging.

The next cell should take between 5 and 15 minutes to run!

[2]:
# Start cluster
cluster = AzureVMCluster(
    resource_group="nce8rg",
    vnet="nce8vn",
    security_group="nce8nsg",
    n_workers=3,
    location="eastus",
    vm_size="Standard_DS11_v2",
)
Creating scheduler instance
Assigned public IP
Network interface ready
Creating VM
Created VM dask-9283e987-scheduler
Waiting for scheduler to run
Scheduler is running
Creating worker instance
/Users/Nick/miniconda3/lib/python3.7/contextlib.py:119: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources. Hang tight!
  next(self.gen)
Network interface ready
Creating VM
Created VM dask-9283e987-worker-f3bb5298
Creating worker instance
Network interface ready
Creating VM
Created VM dask-9283e987-worker-10038e07
Creating worker instance
Network interface ready
Creating VM
Created VM dask-9283e987-worker-ec20074c

Note that we’re using the “Standard_DS11_v2” VM because it only has 2 cores, but it does have high speed data connections (the “S” in “DS11” is for Storage-optimized). And for data science, we almost always want fast storage access.

Using Your Cluster

There are two ways to use your cluster: You can click on the link above to open a connection to JupyterLab running on one of the computers in your cluster, or connect from here with this command:

[3]:
from dask.distributed import Client

c = Client(cluster)
/Users/Nick/miniconda3/lib/python3.7/site-packages/distributed/client.py:1129: VersionMismatchWarning: Mismatched versions found

+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| lz4     | 3.1.0         | 3.1.1         | 3.1.1         |
| numpy   | 1.19.4        | 1.18.1        | 1.18.1        |
| python  | 3.7.8.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+---------------+---------------+---------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

And you’re off to the races! One note though – if you decide to work from your own computer, you may get a warning about version differences between dask on the cloud on and on your own computer. I initially got:

/Users/Nick/miniconda3/lib/python3.7/site-packages/distributed/client.py:1130: VersionMismatchWarning: Mismatched versions found

+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| lz4     | None          | 3.1.0         | 3.1.0         |
| numpy   | 1.19.1        | 1.19.2        | 1.19.2        |
| python  | 3.7.8.final.0 | 3.6.9.final.0 | 3.6.9.final.0 |
+---------+---------------+---------------+---------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

The numpy and python issues don’t seem like to cause big problems (though if you have a mismatch and get problems down the road, consider changing your Python version!), but the fact that the schedulers and workers have one package I don’t (lz4) is a problem, so I installed it before moving forward. (lz4 is a compression algorithm used to send data back and forth, so not having it is a big problem).

Accessing Storage

First, we’ll contect to the container where I put the climate data. As you saw in the last exercise, you can upload data using the Azure web interface, and I would do that if you want to do these exercises. However there are more efficient tools we’ll cover in the next lesson.

As discussed previously, dask can access Azure storage without the help of any other libraries – you just need to be able to pass it your Storage Account name and Access Key, which you can find by going to your Azure Portal, then your Storage Account, and then clicking on “Access Keys” on the left menu. The syntax is:

import dask.dataframe as dd

storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}

ddf = dd.read_csv('az://{CONTAINER}/{FOLDER}/*.csv', storage_options=storage_options)
ddf = dd.read_parquet('az://{CONTAINER}/folder.parquet', storage_options=storage_options)

But since I don’t want you to see all my secret codes, I’m gonna load my information from a file. You can do this, but you can also put them in your code if your code isn’t public!

[4]:
# Dask connects with a protocl
import json

with open("/users/nick/azure_secrets/azure_sa_name_and_key.json") as f:
    storage_options = json.load(f)
[5]:
import dask.dataframe as dd

# dask things the flag columns are floats, when really objects.
flag_dict = {}
for i in range(1, 32):
    for flag in ["q", "m"]:
        flag_dict.update({f"{flag}flag{i}": "object"})

temps = dd.read_csv(
    "az://globaltemps/ghcnd_daily.csv",
    storage_options=storage_options,
    dtype=flag_dict,
)
[6]:
temps.head(10)
---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-6-e8b1118eea72> in <module>
----> 1 temps.head(10)

~/miniconda3/lib/python3.7/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
   1004             Whether to compute the result, default is True.
   1005         """
-> 1006         return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
   1007
   1008     def _head(self, n, npartitions, compute, safe):

~/miniconda3/lib/python3.7/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
   1037
   1038         if compute:
-> 1039             result = result.compute()
   1040         return result
   1041

~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169

~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454

~/miniconda3/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2723                     should_rejoin = False
   2724             try:
-> 2725                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2726             finally:
   2727                 for f in futures.values():

~/miniconda3/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1990                 direct=direct,
   1991                 local_worker=local_worker,
-> 1992                 asynchronous=asynchronous,
   1993             )
   1994

~/miniconda3/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    831         else:
    832             return sync(
--> 833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )
    835

~/miniconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/miniconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/miniconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/miniconda3/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1849                             exc = CancelledError(key)
   1850                         else:
-> 1851                             raise exception.with_traceback(traceback)
   1852                         raise exc
   1853                     if errors == "skip":

KilledWorker: ("('read-csv-head-1-10-read-csv-3a29deea2d090d453bbd7546f1100890', 0)", <Worker 'tcp://10.0.0.11:44995', name: dask-9283e987-worker-10038e07, memory: 0, processing: 1>)

You will see that I added some code to explicitly name the types of some columns (the flags). As you may recall from our other lesson, dask isn’t great at type inference, and will otherwise assume those are float columns instead of objects. Without that code, I get this error:

/azureml-envs/azureml_c6bd17107f471892400d23146f291775/lib/python3.6/site-packages/dask/dataframe/io/csv.py in coerce_dtypes()

ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+---------+--------+----------+
| Column  | Found  | Expected |
+---------+--------+----------+
| qflag1  | object | float64  |
| qflag10 | object | float64  |
| qflag11 | object | float64  |
| qflag12 | object | float64  |
| qflag13 | object | float64  |
| qflag14 | object | float64  |
| qflag15 | object | float64  |
| qflag16 | object | float64  |
| qflag17 | object | float64  |
| qflag18 | object | float64  |
| qflag19 | object | float64  |
| qflag2  | object | float64  |
| qflag20 | object | float64  |

Let’s start by asking dask to go through this entire dataset and just pull out data for the station near my home in Colorado, and calculating the average daily max-temp for each month:

[ ]:
temps = temps[temps["id"] == "USC00050848"]

Now as you may recall, dask hasn’t actually run the code above – it’s just making a plan and waiting till I run .compute() to actually execute, so if I check on temps, I’ll see:

[ ]:
temps

Now since we’ll most want to work with this data from one station, we can cache this subset dataframe with c.persist(temps) so dask won’t have to re-load the original data next time we run .compute(). More on caching here, as well as other dask best practices. Make sure to not just run c.persists(temps) but rather temps = c.persist(temps).

Finally, one other note: this dataset is actually pretty small for Cloud computing, and since most of what we’re doing is reading in data and filtering it, it involves a lot of moving data around. As a result it’d be much faster on a single really large VM (which doesn’t need to use network connections to pass around data). But I wanted an example I could run relatively quickly and easily. :)

[ ]:
temps = c.persist(temps)

OK, now we have our subset, and we can calculate what we want. Note that in reality, the data for this one station is actually easily small enough to put on my own computer, so I could have just run data_on_own_comp = temps.compute() above and moved the final dataset to my personal computer. Indeed, as you may recall from our previous lesson on parallelism, if you don’t have to use distributed computing, you probably don’t want to! Even the authors of dask are quick to remind users: “For data that fits into RAM, Pandas can often be faster and easier to use than Dask DataFrame. While ‘Big Data’ tools can be exciting, they are almost always worse than normal data tools while those remain appropriate.”

But let’s carry on with this on the cluster for practice.

[ ]:
temps["value31"].value_counts().compute()

Now the really cool thing: if we use the link for our dashboard (cluster.dashboard_link) we got when we created our dask cluster, we can see our cluster working (note the exact number of processes will vary depending on how you specified your cluster above. This pick comes from a 16 core cluster):

azure_dask_scalingup

So let’s replace those with missing, calculate averages across all the days in each month, and also convert from 1/10th of a degree Centigrade to Centigrade units:

[ ]:
import numpy as np
import re

for i in range(1, 32):
    temps[f"value{i}"] = temps[f"value{i}"].replace(-9999, np.nan)

value_columns = [i for i in temps.columns if re.match("value.*", "value.*")]
temps["avg"] = temps[value_columns].mean(axis="columns")
temps["avg_C"] = temps["avg"] / 10

Now finally we can collect our results on our personal computer, create a “time” variable that uses years and months for plotting, and plot our temperatures!

[ ]:
temps["time"] = temps["year"] + (temps["month"] - 1) / 12
temps = temps[["time", "avg_C"]]
df = temps.compute()
[ ]:
from plotnine import *

(
    ggplot(df, aes(x="time", y="avg_C"))
    + geom_line()
    + geom_smooth(method="lowess", color="red")
    + ylab("Monthly Average of Daily Highs (C)")
    + xlab("Year")
    + ggtitle("Temperatures in Boulder, CO")
)

And that, my friends, is global warming.

Done with your cluster?

You have two options: you can shut it down manually with the method .close() on your cluster object.

[ ]:
amlcluster.close()

(For some reason when I get this I get the error above, but I can confirm on the Azure cite my cluster has been deleted.)

Extensions and Fun Resources

  • dask-ml: As a reminder, if you now want to do some machine learning, you can use dask-ml on this system, which does the same thing for scikit-learn that regular dask does for pandas.

  • Parallelize your own code with delayed: Just a reminder that while we’ve been using dask to emulate pandas in a distributed setting, it’s also a framework you can use for distributing your own code! Check out the ``delayed` <https://docs.dask.org/en/latest/delayed.html>`__ method to see how dask can manage your distributed workload.

  • Curious how dask compares to other tools for distributed computing? Here’s a conceptual comparison to Spark, and here’s a case study comparison of performance. Comparisons will usually depend a lot on the specifics of the work being done, but at least in this case, dask was a little faster than Spark. In the interview noted below, they also cite an example of dask beating Spark by 40x in a project they worked on. And here’s one report of a 2000x speed up doing random forests moving from Spark to dask. As they say, mileage may vary, but I think it’s safe to say you aren’t giving anything up performance wise by using dask and its familiar syntax instead of Spark.

  • Interested in using dask in your company and want help? There’s a great new company created by the founders of dask to provide enterprise support for dask called coiled (No, I have no affiliation with them, I just think these companies that try to offer paid support services to businesses to help them move from closed source software to open source are a great way to help make open source software better). You can also hear a fun interview with the founders about both dask and coiled here.

  • The folks from coiled have also compiled a great collection of videos and tutorials about dask and Python at scale here

  • Working with GPUs? There’s a project to offer the kind of CPU parallelization we get from dask for GPUs called dask-cudf (part of the RAPIDS project. The project is young but growing quickly. My guess, though, is that those libraries will become the infrastructure for updates to tools like dask-ml rather than something most applied people need to play with. But putting it here as an FYI!