Importing Opioid Shipment Data on Azure

Yup, it’s time to work with everyone’s favorite data: The DEA ARCOS opioid drug shipment database! But today we’re gonna work with this data not on our personal computers, but instead on a dask cluster running on Microsoft Azure.

For these exercises, you’ll need an Azure account with available credits. The easiest way to do this is to set up a new account (either by signing up as a student, or just using a new gmail account to sign up as a new regular user). You can find instructions for doing so here.

As with our last exercise using dask, to strike a balance between keeping the data big enough to be interesting and small enough you don’t have to wait around too long, we’ll be working with a thinned version that has only January and February from 2011 and 2012. This way the workflow you develop will be same the same workflow you’d use for the full dataset, of course, but we can move quickly, even on the limited resources you’re allowed to use on a free account.

Before you start, make sure to review the package setup instructions fordask here!

Loading Data to Azure

(1) Download the thinned ARCOS data from this link. It should be about 250MB zipped, 2.31 GB unzipped.

(2) Let’s upload this data to Azure by creating a new Storage Account and Blob Container (you may have already done this while reading about Azure in today’s readings, but let’s do it again for practice!). If you can’t remember how to do it, you can review instructions here.

If you have an existing Resource Group, you can use that, but if not remember you’ll have to create one of those too!

(3) Once you have a Blob Container, unzip your data and then start the upload process through the web browser interface (will talk about other tools for managing data tomorrow).

Note you must unzip your files first! Annoying, I know. There are ways to upload zipped files and unzip them later, but they’re surprisingly complicated, and dask won’t read a zipped file.

Setting Up Dask on AzureML

(4) While that’s happening, let’s get ready to start a dask cluster. First, let’s create a new Workspace! Again, you probably did this already, but let’s do it again anyway. :) Practice makes perfect. Again, directions are here if you’ve forgotten.

(5) Now, following the example from the readings, create a new notebook and write your code to start a dask cluster and get it running!

Since you’re probably using a free student account, use vm_size="Standard_DS11_v2", and initial_node_count=3. This will keep us to 6 total CPUs (this is not a lot, but it’s what we can do for free!). If you’re using a normal free account, your quota is 4 CPUs, so set your initial_node_count=2.

Note we’re using DS11 because that model has 2 CPUs per node, so we can stay under our 6 CPU quota for free accounts, and fast storage (the “S” in “DS11” is for Storage-optimized, and for data science, we almost always want fast storage access).

A Note on Quotas

Remember that for a free account, Azure imposes a quota of only 6 CPUs for students and 4 CPUs for normal free accounts to use across all your activities on Azure.

Since we’re requesting a cluster with 4 or 6 CPUs (depending on what kind of account you have), if have a VM running anywhere on Azure (even a Stopped VM counts against quota – you have to delete the VM), your AzureMLCluster command will just sit there processing forever until eventually you get a timeout error (with no reference to Quotas).

To check the status of your cluster startup, login to Azure in your browser and navigate to your Machine Learning Workspace. Remember to navigate to the new version by clicking “Launch Studio” if you end up in the old interface (the new interface will have a URL that starts The click on Compute on the left menu towards the bottom and click on the Compute Clusters tab. There you should see the cluster your Python commands are trying to create. There you can see your quota status by clicking on the View Quota button, though this only appears if you have at least one cluster trying to start up (which is a little annoying):


Also, if you’re hitting your quote limit, you may also see a red circle next to the cluster that’s starting up. If you click on it, you’ll see:


If you’re having continued issues, make sure you don’t have any VMs up in your current workspace, and that you haven’t left any VMs open in other workspaces you may have created!

Actually Analyzing Data

OK, is your dask cluster up and running? And has your data upload finished? Then it’s time to start analyzing some data!

Starting from the code you wrote for our last exercise (where you used dask to load your arcos data on your own computer), let’s write some code to analyze the ARCOS data you’ve written on the cloud.

(6) Our goal today is going to be to find the pharmaceutical company that has shipped the most opioids (CALC_BASE_WT_IN_GM * MME_Conversion_Factor) in the US. So write some code to identify this company. A few reminders:

  • Remember to work on your analysis code BELOW the cell where you created your Client instance (e.g. where you ran c = Client(amlcluster)). You don’t want to keep re-running that.

  • Remember that to read the file, you’ll need to pass your Azure Storage Account secrets to the read_csv function.

  • The same tricks we’ve been practicing before still apply here: start by only reading in the first couple thousand lines for debugging before you start using dask on the full data!

  • Just because we’re on a cluster doesn’t mean we have unlimited resources! Each of the computers we’re using is relativley small, so we still want to do things like only load the columns we need.

Since we can only use dask given how we’re connected to the cluster, we need to ask dask to just give us the first few rows as a dataframe. dask doesn’t support the nrows keyword, but you can get the same effect using .head(n=rows_to_get). So rather than running:

import dask.dataframe as dd
rows_to_get = 1000
df = dd.read_csv(file, nrows=rows_to_get)

You can just run:

import dask.dataframe as dd
df = dd.read_csv(file).head(n=rows_to_get)

and df will be a pandas DataFrame. Note that if you try and use the nrows keyword, dask will kindly remind you to use .head() instead, so if you forget, don’t worry about it.

(7) Now let’s run our full dataset on dask, calculating the total shipments by reporting company. Remember:

  • Start by spinning up a cluster

  • Dask won’t read compressed files, so you have to unzip your ARCOS data.

  • Start your cluster in a cell all by itself since you don’t want to keep re-running the “start a cluster” code.

  • Don’t load columns you don’t need!

As you run your code, make sure to click on the Dashboard link below where you created your cluster:


Among other things, the bar across the bottom should give you a sense of how long your task will take:


(8) Now let’s calculate, for each state, what company shipped the most pills?

Note you will quickly find that you can’t sort in dask – sorting in parallel is really tricky! So you’ll have to work around that. Do what you need to do on the big dataset first, then compute it all so you get it as a regular pandas dataframe, then finish.

Does this seem like a situation where a single company is responsible for the opioid epidemic?

(9) Now go ahead and try and re-do the chunking you did by hand for your project using dask – calculate, for each year, the total opioids sent to each county in the US (note that I’ve included year as its own variable so you don’t have to parse TRANSACTION_DATE).

(10) In several of the preceding analyses, we loaded nearly the same data before doing slightly different data wrangling manipulations. In these situations, it is often best to not run these manipulations separately, but instead to run the common component of the analyses (here, loading the data), then caching that intermediate data using the .persist() method (e.g. df = client.persist(df)).

Take the code you’ve already written to try and generate answers to questions 7, 8, and 9 efficiently using persist. If you have time, you can then run each of your answers above again and time them, then compare that total run time to how long your new code (that uses persist) takes.