{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed Computing with dask\n", "\n", "In this portion of the course, we'll explore distributed computing with a Python library called `dask`. \n", "\n", "Dask is a library designed to help facilitate (a) the manipulation of very large datasets, and (b) the distribution of computation across lots of cores or physical computers. It is very similar to Apache Spark in the functionality it provides, but it is tightly integrated into `numpy` and `pandas`, making it *much* easier to learn than spark for users of those libraries.\n", "\n", "## What can dask do for me?\n", "\n", "To get a sense of why dask is so nice, let's begin with a demonstration. Suppose I have a pretty big dataset (say, [an 100GB CSV of all drug shipments in the United States from 2006 to 2014](https://www.dropbox.com/s/oiv3k3sfwiviup5/all_prescriptions.csv?dl=0)). This data is too large for me to load into ram on my laptop directly, so if I were to work with it on my own, I'd do so by [chunking the data by hand](big_data_strategies.ipynb). But using dask, I can do the following:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# Standard setups\n", "import pandas as pd\n", "import os\n", "\n", "os.chdir(\"/users/nce8/dropbox/MIDS_Data_Prep/arcos\")\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "I have 10 logical cores.\n" ] }, { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-56aae5aa-772b-11ed-96cb-f623161a15eb

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

1317ab53

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 5\n", "
\n", " Total threads: 10\n", " \n", " Total memory: 64.00 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-96024e4c-d7f6-4863-b65c-f11741173297

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:61437\n", " \n", " Workers: 5\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 10\n", "
\n", " Started: Just now\n", " \n", " Total memory: 64.00 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:61455\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:61459/status\n", " \n", " Memory: 12.80 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:61442\n", "
\n", " Local directory: /var/folders/fs/h_8_rwsn5hvg9mhp0txgc_s9v6191b/T/dask-worker-space/worker-wo_zxjlh\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:61456\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:61461/status\n", " \n", " Memory: 12.80 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:61443\n", "
\n", " Local directory: /var/folders/fs/h_8_rwsn5hvg9mhp0txgc_s9v6191b/T/dask-worker-space/worker-zxewnrbp\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 2

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:61467\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:61468/status\n", " \n", " Memory: 12.80 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:61441\n", "
\n", " Local directory: /var/folders/fs/h_8_rwsn5hvg9mhp0txgc_s9v6191b/T/dask-worker-space/worker-3prrdzz6\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 3

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:61458\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:61463/status\n", " \n", " Memory: 12.80 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:61444\n", "
\n", " Local directory: /var/folders/fs/h_8_rwsn5hvg9mhp0txgc_s9v6191b/T/dask-worker-space/worker-c5ctbfi4\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 4

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:61457\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:61464/status\n", " \n", " Memory: 12.80 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:61440\n", "
\n", " Local directory: /var/folders/fs/h_8_rwsn5hvg9mhp0txgc_s9v6191b/T/dask-worker-space/worker-94o6ttrs\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Here we start up a dask cluster.\n", "# This creates a set of \"workers\".\n", "# In this case, these workers are all on\n", "# my computer, but if we wanted to connect\n", "# to a cluster, we'd just past an IP address\n", "# to `Client()`.\n", "\n", "# You can see how many cores your computer\n", "# has using `os.cpu_count()` in the `os` library.\n", "\n", "# Note you may get some warnings about\n", "# Python wanting to accept incoming\n", "# connections from your firewall.\n", "# You need to approve those\n", "# so workers can talk to one another.\n", "\n", "import os\n", "\n", "print(f\"I have {os.cpu_count()} logical cores.\")\n", "\n", "from dask.distributed import Client\n", "\n", "client = Client()\n", "client\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# ARCOS Data on all drug shipments in US from 2006 to 2014\n", "# I was unable to get this data\n", "# into my repo, but you can download it here:\n", "# https://www.dropbox.com/s/oiv3k3sfwiviup5/all_prescriptions.csv?dl=0\n", "\n", "# Note that while pandas can read compressed files like this .tsv.gz,\n", "# file, dask cannot. So if you want to do this at home,\n", "# you have to decompress the data.\n", "\n", "import dask.dataframe as dd\n", "\n", "df = dd.read_csv(\n", " \"all_prescriptions.csv\",\n", " dtype={\n", " \"ACTION_INDICATOR\": \"object\",\n", " \"ORDER_FORM_NO\": \"object\",\n", " \"REPORTER_ADDRESS2\": \"object\",\n", " \"REPORTER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDRESS2\": \"object\",\n", " \"NDC_NO\": \"object\",\n", " \"UNIT\": \"object\",\n", " \"STRENGTH\": \"float64\",\n", " \"BUYER_ZIP\": \"object\",\n", " \"DRUG_CODE\": \"object\",\n", " \"MME_Conversion_Factor\": \"object\",\n", " \"QUANTITY\": \"object\",\n", " \"TRANSACTION_DATE\": \"float64\",\n", " \"TRANSACTION_ID\": \"float64\",\n", " \"dos_str\": \"object\",\n", " },\n", ")\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "ename": "ValueError", "evalue": "Metadata inference failed in `mul`.\n\nOriginal error is below:\n------------------------\nTypeError(\"can't multiply sequence by non-int of type 'float'\")\n\nTraceback:\n---------\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/utils.py\", line 195, in raise_on_meta_error\n yield\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/core.py\", line 6239, in elemwise\n meta = partial_by_order(*parts, function=op, other=other)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/utils.py\", line 1325, in partial_by_order\n return function(*args2, **kwargs)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/common.py\", line 72, in new_method\n return method(self, other)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/arraylike.py\", line 118, in __mul__\n return self._arith_method(other, operator.mul)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/series.py\", line 6259, in _arith_method\n return base.IndexOpsMixin._arith_method(self, other, op)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/base.py\", line 1325, in _arith_method\n result = ops.arithmetic_op(lvalues, rvalues, op)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py\", line 226, in arithmetic_op\n res_values = _na_arithmetic_op(left, right, op) # type: ignore[arg-type]\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py\", line 172, in _na_arithmetic_op\n result = _masked_arith_op(left, right, op)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py\", line 110, in _masked_arith_op\n result[mask] = op(xrav[mask], yrav[mask])\n", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py:165\u001b[0m, in \u001b[0;36m_na_arithmetic_op\u001b[0;34m(left, right, op, is_cmp)\u001b[0m\n\u001b[1;32m 164\u001b[0m \u001b[39mtry\u001b[39;00m:\n\u001b[0;32m--> 165\u001b[0m result \u001b[39m=\u001b[39m func(left, right)\n\u001b[1;32m 166\u001b[0m \u001b[39mexcept\u001b[39;00m \u001b[39mTypeError\u001b[39;00m:\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/computation/expressions.py:241\u001b[0m, in \u001b[0;36mevaluate\u001b[0;34m(op, a, b, use_numexpr)\u001b[0m\n\u001b[1;32m 239\u001b[0m \u001b[39mif\u001b[39;00m use_numexpr:\n\u001b[1;32m 240\u001b[0m \u001b[39m# error: \"None\" not callable\u001b[39;00m\n\u001b[0;32m--> 241\u001b[0m \u001b[39mreturn\u001b[39;00m _evaluate(op, op_str, a, b) \u001b[39m# type: ignore[misc]\u001b[39;00m\n\u001b[1;32m 242\u001b[0m \u001b[39mreturn\u001b[39;00m _evaluate_standard(op, op_str, a, b)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/computation/expressions.py:70\u001b[0m, in \u001b[0;36m_evaluate_standard\u001b[0;34m(op, op_str, a, b)\u001b[0m\n\u001b[1;32m 69\u001b[0m _store_test_result(\u001b[39mFalse\u001b[39;00m)\n\u001b[0;32m---> 70\u001b[0m \u001b[39mreturn\u001b[39;00m op(a, b)\n", "\u001b[0;31mTypeError\u001b[0m: can't multiply sequence by non-int of type 'float'", "\nDuring handling of the above exception, another exception occurred:\n", "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/utils.py:195\u001b[0m, in \u001b[0;36mraise_on_meta_error\u001b[0;34m(funcname, udf)\u001b[0m\n\u001b[1;32m 194\u001b[0m \u001b[39mtry\u001b[39;00m:\n\u001b[0;32m--> 195\u001b[0m \u001b[39myield\u001b[39;00m\n\u001b[1;32m 196\u001b[0m \u001b[39mexcept\u001b[39;00m \u001b[39mException\u001b[39;00m \u001b[39mas\u001b[39;00m e:\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/core.py:6239\u001b[0m, in \u001b[0;36melemwise\u001b[0;34m(op, meta, out, transform_divisions, *args, **kwargs)\u001b[0m\n\u001b[1;32m 6238\u001b[0m \u001b[39mwith\u001b[39;00m raise_on_meta_error(funcname(op)):\n\u001b[0;32m-> 6239\u001b[0m meta \u001b[39m=\u001b[39m partial_by_order(\u001b[39m*\u001b[39;49mparts, function\u001b[39m=\u001b[39;49mop, other\u001b[39m=\u001b[39;49mother)\n\u001b[1;32m 6241\u001b[0m result \u001b[39m=\u001b[39m new_dd_object(graph, _name, meta, divisions)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/utils.py:1325\u001b[0m, in \u001b[0;36mpartial_by_order\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 1324\u001b[0m args2\u001b[39m.\u001b[39minsert(i, arg)\n\u001b[0;32m-> 1325\u001b[0m \u001b[39mreturn\u001b[39;00m function(\u001b[39m*\u001b[39;49margs2, \u001b[39m*\u001b[39;49m\u001b[39m*\u001b[39;49mkwargs)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/common.py:72\u001b[0m, in \u001b[0;36m_unpack_zerodim_and_defer..new_method\u001b[0;34m(self, other)\u001b[0m\n\u001b[1;32m 70\u001b[0m other \u001b[39m=\u001b[39m item_from_zerodim(other)\n\u001b[0;32m---> 72\u001b[0m \u001b[39mreturn\u001b[39;00m method(\u001b[39mself\u001b[39;49m, other)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/arraylike.py:118\u001b[0m, in \u001b[0;36mOpsMixin.__mul__\u001b[0;34m(self, other)\u001b[0m\n\u001b[1;32m 116\u001b[0m \u001b[39m@unpack_zerodim_and_defer\u001b[39m(\u001b[39m\"\u001b[39m\u001b[39m__mul__\u001b[39m\u001b[39m\"\u001b[39m)\n\u001b[1;32m 117\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39m__mul__\u001b[39m(\u001b[39mself\u001b[39m, other):\n\u001b[0;32m--> 118\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_arith_method(other, operator\u001b[39m.\u001b[39;49mmul)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/series.py:6259\u001b[0m, in \u001b[0;36mSeries._arith_method\u001b[0;34m(self, other, op)\u001b[0m\n\u001b[1;32m 6258\u001b[0m \u001b[39mself\u001b[39m, other \u001b[39m=\u001b[39m ops\u001b[39m.\u001b[39malign_method_SERIES(\u001b[39mself\u001b[39m, other)\n\u001b[0;32m-> 6259\u001b[0m \u001b[39mreturn\u001b[39;00m base\u001b[39m.\u001b[39;49mIndexOpsMixin\u001b[39m.\u001b[39;49m_arith_method(\u001b[39mself\u001b[39;49m, other, op)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/base.py:1325\u001b[0m, in \u001b[0;36mIndexOpsMixin._arith_method\u001b[0;34m(self, other, op)\u001b[0m\n\u001b[1;32m 1324\u001b[0m \u001b[39mwith\u001b[39;00m np\u001b[39m.\u001b[39merrstate(\u001b[39mall\u001b[39m\u001b[39m=\u001b[39m\u001b[39m\"\u001b[39m\u001b[39mignore\u001b[39m\u001b[39m\"\u001b[39m):\n\u001b[0;32m-> 1325\u001b[0m result \u001b[39m=\u001b[39m ops\u001b[39m.\u001b[39;49marithmetic_op(lvalues, rvalues, op)\n\u001b[1;32m 1327\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_construct_result(result, name\u001b[39m=\u001b[39mres_name)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py:226\u001b[0m, in \u001b[0;36marithmetic_op\u001b[0;34m(left, right, op)\u001b[0m\n\u001b[1;32m 224\u001b[0m \u001b[39m# error: Argument 1 to \"_na_arithmetic_op\" has incompatible type\u001b[39;00m\n\u001b[1;32m 225\u001b[0m \u001b[39m# \"Union[ExtensionArray, ndarray[Any, Any]]\"; expected \"ndarray[Any, Any]\"\u001b[39;00m\n\u001b[0;32m--> 226\u001b[0m res_values \u001b[39m=\u001b[39m _na_arithmetic_op(left, right, op) \u001b[39m# type: ignore[arg-type]\u001b[39;00m\n\u001b[1;32m 228\u001b[0m \u001b[39mreturn\u001b[39;00m res_values\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py:172\u001b[0m, in \u001b[0;36m_na_arithmetic_op\u001b[0;34m(left, right, op, is_cmp)\u001b[0m\n\u001b[1;32m 167\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mnot\u001b[39;00m is_cmp \u001b[39mand\u001b[39;00m (is_object_dtype(left\u001b[39m.\u001b[39mdtype) \u001b[39mor\u001b[39;00m is_object_dtype(right)):\n\u001b[1;32m 168\u001b[0m \u001b[39m# For object dtype, fallback to a masked operation (only operating\u001b[39;00m\n\u001b[1;32m 169\u001b[0m \u001b[39m# on the non-missing values)\u001b[39;00m\n\u001b[1;32m 170\u001b[0m \u001b[39m# Don't do this for comparisons, as that will handle complex numbers\u001b[39;00m\n\u001b[1;32m 171\u001b[0m \u001b[39m# incorrectly, see GH#32047\u001b[39;00m\n\u001b[0;32m--> 172\u001b[0m result \u001b[39m=\u001b[39m _masked_arith_op(left, right, op)\n\u001b[1;32m 173\u001b[0m \u001b[39melse\u001b[39;00m:\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py:110\u001b[0m, in \u001b[0;36m_masked_arith_op\u001b[0;34m(x, y, op)\u001b[0m\n\u001b[1;32m 109\u001b[0m \u001b[39mif\u001b[39;00m mask\u001b[39m.\u001b[39many():\n\u001b[0;32m--> 110\u001b[0m result[mask] \u001b[39m=\u001b[39m op(xrav[mask], yrav[mask])\n\u001b[1;32m 112\u001b[0m \u001b[39melse\u001b[39;00m:\n", "\u001b[0;31mTypeError\u001b[0m: can't multiply sequence by non-int of type 'float'", "\nThe above exception was the direct cause of the following exception:\n", "\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)", "Cell \u001b[0;32mIn [4], line 6\u001b[0m\n\u001b[1;32m 3\u001b[0m df[\u001b[39m\"\u001b[39m\u001b[39myear\u001b[39m\u001b[39m\"\u001b[39m] \u001b[39m=\u001b[39m df\u001b[39m.\u001b[39mdate\u001b[39m.\u001b[39mdt\u001b[39m.\u001b[39myear\n\u001b[1;32m 5\u001b[0m \u001b[39m# Make an estimate of total morphine equivalent shipments\u001b[39;00m\n\u001b[0;32m----> 6\u001b[0m df[\u001b[39m\"\u001b[39m\u001b[39mmorphine_equivalent_g\u001b[39m\u001b[39m\"\u001b[39m] \u001b[39m=\u001b[39m (df[\u001b[39m\"\u001b[39;49m\u001b[39mCALC_BASE_WT_IN_GM\u001b[39;49m\u001b[39m\"\u001b[39;49m]) \u001b[39m*\u001b[39;49m df[\u001b[39m\"\u001b[39;49m\u001b[39mMME_Conversion_Factor\u001b[39;49m\u001b[39m\"\u001b[39;49m]\n\u001b[1;32m 8\u001b[0m \u001b[39m# Drop extra vars\u001b[39;00m\n\u001b[1;32m 9\u001b[0m df \u001b[39m=\u001b[39m df[[\u001b[39m\"\u001b[39m\u001b[39myear\u001b[39m\u001b[39m\"\u001b[39m, \u001b[39m\"\u001b[39m\u001b[39mmorphine_equivalent_g\u001b[39m\u001b[39m\"\u001b[39m, \u001b[39m\"\u001b[39m\u001b[39mBUYER_STATE\u001b[39m\u001b[39m\"\u001b[39m, \u001b[39m\"\u001b[39m\u001b[39mBUYER_COUNTY\u001b[39m\u001b[39m\"\u001b[39m]]\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/core.py:1784\u001b[0m, in \u001b[0;36m_Frame._get_binary_operator..\u001b[0;34m(self, other)\u001b[0m\n\u001b[1;32m 1782\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mlambda\u001b[39;00m \u001b[39mself\u001b[39m, other: elemwise(op, other, \u001b[39mself\u001b[39m)\n\u001b[1;32m 1783\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[0;32m-> 1784\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mlambda\u001b[39;00m \u001b[39mself\u001b[39m, other: elemwise(op, \u001b[39mself\u001b[39;49m, other)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/core.py:6238\u001b[0m, in \u001b[0;36melemwise\u001b[0;34m(op, meta, out, transform_divisions, *args, **kwargs)\u001b[0m\n\u001b[1;32m 6229\u001b[0m \u001b[39m# For broadcastable series, use no rows.\u001b[39;00m\n\u001b[1;32m 6230\u001b[0m parts \u001b[39m=\u001b[39m [\n\u001b[1;32m 6231\u001b[0m d\u001b[39m.\u001b[39m_meta\n\u001b[1;32m 6232\u001b[0m \u001b[39mif\u001b[39;00m _is_broadcastable(d)\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 6236\u001b[0m \u001b[39mfor\u001b[39;00m d \u001b[39min\u001b[39;00m dasks\n\u001b[1;32m 6237\u001b[0m ]\n\u001b[0;32m-> 6238\u001b[0m \u001b[39mwith\u001b[39;00m raise_on_meta_error(funcname(op)):\n\u001b[1;32m 6239\u001b[0m meta \u001b[39m=\u001b[39m partial_by_order(\u001b[39m*\u001b[39mparts, function\u001b[39m=\u001b[39mop, other\u001b[39m=\u001b[39mother)\n\u001b[1;32m 6241\u001b[0m result \u001b[39m=\u001b[39m new_dd_object(graph, _name, meta, divisions)\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/contextlib.py:153\u001b[0m, in \u001b[0;36m_GeneratorContextManager.__exit__\u001b[0;34m(self, typ, value, traceback)\u001b[0m\n\u001b[1;32m 151\u001b[0m value \u001b[39m=\u001b[39m typ()\n\u001b[1;32m 152\u001b[0m \u001b[39mtry\u001b[39;00m:\n\u001b[0;32m--> 153\u001b[0m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mgen\u001b[39m.\u001b[39;49mthrow(typ, value, traceback)\n\u001b[1;32m 154\u001b[0m \u001b[39mexcept\u001b[39;00m \u001b[39mStopIteration\u001b[39;00m \u001b[39mas\u001b[39;00m exc:\n\u001b[1;32m 155\u001b[0m \u001b[39m# Suppress StopIteration *unless* it's the same exception that\u001b[39;00m\n\u001b[1;32m 156\u001b[0m \u001b[39m# was passed to throw(). This prevents a StopIteration\u001b[39;00m\n\u001b[1;32m 157\u001b[0m \u001b[39m# raised inside the \"with\" statement from being suppressed.\u001b[39;00m\n\u001b[1;32m 158\u001b[0m \u001b[39mreturn\u001b[39;00m exc \u001b[39mis\u001b[39;00m \u001b[39mnot\u001b[39;00m value\n", "File \u001b[0;32m~/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/utils.py:216\u001b[0m, in \u001b[0;36mraise_on_meta_error\u001b[0;34m(funcname, udf)\u001b[0m\n\u001b[1;32m 207\u001b[0m msg \u001b[39m+\u001b[39m\u001b[39m=\u001b[39m (\n\u001b[1;32m 208\u001b[0m \u001b[39m\"\u001b[39m\u001b[39mOriginal error is below:\u001b[39m\u001b[39m\\n\u001b[39;00m\u001b[39m\"\u001b[39m\n\u001b[1;32m 209\u001b[0m \u001b[39m\"\u001b[39m\u001b[39m------------------------\u001b[39m\u001b[39m\\n\u001b[39;00m\u001b[39m\"\u001b[39m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 213\u001b[0m \u001b[39m\"\u001b[39m\u001b[39m{2}\u001b[39;00m\u001b[39m\"\u001b[39m\n\u001b[1;32m 214\u001b[0m )\n\u001b[1;32m 215\u001b[0m msg \u001b[39m=\u001b[39m msg\u001b[39m.\u001b[39mformat(\u001b[39mf\u001b[39m\u001b[39m\"\u001b[39m\u001b[39m in `\u001b[39m\u001b[39m{\u001b[39;00mfuncname\u001b[39m}\u001b[39;00m\u001b[39m`\u001b[39m\u001b[39m\"\u001b[39m \u001b[39mif\u001b[39;00m funcname \u001b[39melse\u001b[39;00m \u001b[39m\"\u001b[39m\u001b[39m\"\u001b[39m, \u001b[39mrepr\u001b[39m(e), tb)\n\u001b[0;32m--> 216\u001b[0m \u001b[39mraise\u001b[39;00m \u001b[39mValueError\u001b[39;00m(msg) \u001b[39mfrom\u001b[39;00m \u001b[39me\u001b[39;00m\n", "\u001b[0;31mValueError\u001b[0m: Metadata inference failed in `mul`.\n\nOriginal error is below:\n------------------------\nTypeError(\"can't multiply sequence by non-int of type 'float'\")\n\nTraceback:\n---------\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/utils.py\", line 195, in raise_on_meta_error\n yield\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/dataframe/core.py\", line 6239, in elemwise\n meta = partial_by_order(*parts, function=op, other=other)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/dask/utils.py\", line 1325, in partial_by_order\n return function(*args2, **kwargs)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/common.py\", line 72, in new_method\n return method(self, other)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/arraylike.py\", line 118, in __mul__\n return self._arith_method(other, operator.mul)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/series.py\", line 6259, in _arith_method\n return base.IndexOpsMixin._arith_method(self, other, op)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/base.py\", line 1325, in _arith_method\n result = ops.arithmetic_op(lvalues, rvalues, op)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py\", line 226, in arithmetic_op\n res_values = _na_arithmetic_op(left, right, op) # type: ignore[arg-type]\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py\", line 172, in _na_arithmetic_op\n result = _masked_arith_op(left, right, op)\n File \"/Users/nce8/opt/miniconda3/envs/dask/lib/python3.10/site-packages/pandas/core/ops/array_ops.py\", line 110, in _masked_arith_op\n result[mask] = op(xrav[mask], yrav[mask])\n" ] } ], "source": [ "# Extract year\n", "df[\"date\"] = dd.to_datetime(df.TRANSACTION_DATE, format=\"%m%d%Y\")\n", "df[\"year\"] = df.date.dt.year\n", "\n", "# Make an estimate of total morphine equivalent shipments\n", "df[\"morphine_equivalent_g\"] = (df[\"CALC_BASE_WT_IN_GM\"]) * df[\"MME_Conversion_Factor\"]\n", "\n", "# Drop extra vars\n", "df = df[[\"year\", \"morphine_equivalent_g\", \"BUYER_STATE\", \"BUYER_COUNTY\"]]\n", "\n", "# Collapse to total shipments to each county in each year.\n", "collapsed = df.groupby(\n", " [\"year\", \"BUYER_STATE\", \"BUYER_COUNTY\"]\n", ").morphine_equivalent_g.sum()\n", "collapsed\n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "%%time\n", "final = collapsed.compute()\n", "final.sample(10)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Voila! I get back my thinned-out dataset with the new variable I wanted, all while never using more than about 2GB of RAM at any point.\n", "\n", "So let's discuss what just happened.\n", "\n", "1. First, when I ran `Client()`, dask started up a set of 10 new python processes on my computer (called \"workers\") it could call on for help. These are fully independent agents that *could* be running on different machines in a true cluster (though in this case, they're all on my laptop).\n", "2. It then collected all the instructions I gave it for reading in the file, generating a new column, deleting extra columns, and grouping. **But it didn't actually execute any code**. That's why when I typed `collapsed`, what I got back was a dataframe with structure but no data—dask had figured out roughly what the result of my commands would look like, but hadn't actually computed the result.\n", "3. Then, when I ran `collapsed.compute()`, dask came up with a set of assignments it could give to its workers to get me what I wanted and it started to actually compute the table I requested. In particular:\n", " - each worker loaded a chunk of the data, did the observation-level manipulations I wanted, and dropped extra columns.\n", " - then to execute the groupby, it then collapsed the data in each chunk, passed those collapsed chunks to a common worker who then re-collapsed them further, then gathered them again under (potentially) another worker and collapsed further until all the data had been collapsed.\n", " - Finally, after all the data had been collapsed, the data was passed back to me (the client session) as the variable `final`.\n", "\n", "And it did it all in under 5 minutes!\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## \"Um... why didn't you tell us about this before?\" I hear you say...\n", "\n", "\"This sure seems like this is easier than [chunking by hand](big_data_strategies)!!!!\"\n", "\n", "The answer is that these distributed computing tools are often very fragile and opaque, and it's easy to get in trouble using them, and hard to figure out why if you don't understand the underlying principles that govern their operation. \n", "\n", "This example was carefully chosen. I used only manipulations that dask is really good at, and I have the experience to implement them in a way that works. But do what I did above in slightly different ways, and chaos would ensure. Ok, not chaos. But your computer would probably crash. \n", "\n", "The biggest challenge in using a tool like dask or spark (they operate on the same principles -- more on that below) is that to prevent crashes, you have to understand how and why dask is chunking your data. And the best way to do that is to get experience chunking by hand. \n", "\n", "For example, one of the key features of distributing computing is that they don't run code as soon as you type it. Instead, they just keep track of what you want and wait until you run `.compute()` to actually execute your code. Why? Because if dask had run each command as I entered it and passed the result back to me, it would have crashed my computer. \n", "\n", "Recall that the whole reason we're using dask is precisely because we knew that loading the whole dataset at once would take up more memory than I have available. It's only because we only loaded a few chunks of the data at a time, thinned out those chunks by dropping variables, and then collapsed by county-state-year that we ended up with a dataset that was small enough that it would fit in memory.\n", "\n", "In other words, this only worked because I knew that the steps after the initial load would reduce the size of the dataset enough that when I ran `.compute()`, the result I got back would be small enough to fit in memory. Had I just run:\n", "\n", "```python\n", "df = dd.read_csv(\n", " \"arcos_all_washpost_2006_2012.tsv\",\n", " sep=\"\\t\",\n", " dtype={\n", " \"ACTION_INDICATOR\": \"object\",\n", " \"ORDER_FORM_NO\": \"object\",\n", " \"REPORTER_ADDRESS2\": \"object\",\n", " \"REPORTER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDRESS2\": \"object\",\n", " \"NDC_NO\": \"object\",\n", " \"UNIT\": \"object\",\n", " },\n", ")\n", "df.compute()\n", "```\n", "\n", "dask would have tried to hand the full dataset back to me, and my computer would have crashed, just as it would have had I tried to read the full dataset with `pandas`. \n", "\n", "So understanding the *intuition* of chunking is basically a prerequisite to using these tools effectively. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Vocabulary\n", "\n", "Now that we've seen this example, let's formally introduce some distributed computing vocabulary:\n", "\n", "- **Lazy Evaluation** (also sometimes called \"delayed execution\"): The practice of not executing code as soon as you type it, but rather accumulating a set of requests, then executing them when instructed. This allows distributing computing systems to optimize how they move around data and get things done, and ensures that you don't end up with the system trying to shove a massive dataset into ram that's too small. \n", "- **Client**: The central process (here, Python process) where code is being entered.\n", "- **Workers**: Other processes that are assigned work, and which eventually pass results back to the client. \n", "- **Scheduler**: The part of the system that manages the assignment of tasks to different workers.\n", "- **map-reduce**: The name for the process of distributing sub-problems to workers (`map`), then processing them in a way that allows the results to be recombined (`reduce`). `map`s and `reduce`s are kind of the building blocks of distributed systems, though when working with dask you won't usually have to manage the assignment or recombination of tasks yourself (they're happening behind the scenes). " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Let's Learn More dask!\n", "\n", "OK, now that we have the basic idea of dask in hand, please watch the following set of seven short videos on dask, then come back here and we can talk some more!\n", "\n", "[Dask Tutorial from Coiled—7 videos, about 1 hour in total](https://www.youtube.com/watch?v=z18qjLu-Mw4&list=PLeDTMczuyDQ8S73cdc0PrnTO80kfzpgz2)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## There is no magic here\n", "\n", "\n", "As noted in that video, one of the very nice things about dask (and one of the reasons it's been able to offer so much functionality so quickly) is that it really is just an extension of `numpy` and `pandas`. There is no magic here. \n", "\n", "For example, suppose we wanted to find the largest number of pills in a single shipment in our ARCOS dataset, but we don't have the memory to load the whole dataset into memory at once. How would we get that number? Probably by doing something like:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "3115000.0" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "import numpy as np\n", "import os\n", "\n", "os.chdir(\"/users/nce8/dropbox/MIDS_Data_Prep/arcos\")\n", "\n", "# Create an interator of the data so\n", "# it doesn't all load at once\n", "df = pd.read_csv(\n", " \"arcos_all_washpost_2006_2012.tsv\",\n", " delimiter=\"\\t\",\n", " iterator=True,\n", " chunksize=100_000,\n", " usecols=[\"DOSAGE_UNIT\", \"Measure\"],\n", ")\n", "\n", "\n", "# Find the max from each chunk\n", "max_candidates = list()\n", "for chunk in df:\n", " # Subset for pill shipments\n", " chunk = chunk[chunk[\"Measure\"] == \"TAB\"]\n", "\n", " # Find largest in this chunk\n", " max_candidates.append(chunk[\"DOSAGE_UNIT\"].max())\n", "\n", "# Now gather those candidates together and\n", "# find the maximum of all the chunk maximums.\n", "np.max(max_candidates)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wow... that is a LOT of pills!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now suppose we asked dask to do the same thing: " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "3115000.0" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = dd.read_csv(\n", " \"arcos_all_washpost_2006_2012.tsv\",\n", " sep=\"\\t\",\n", " dtype={\n", " \"ACTION_INDICATOR\": \"object\",\n", " \"ORDER_FORM_NO\": \"object\",\n", " \"REPORTER_ADDRESS2\": \"object\",\n", " \"REPORTER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDRESS2\": \"object\",\n", " \"NDC_NO\": \"object\",\n", " \"UNIT\": \"object\",\n", " },\n", ")\n", "\n", "df = df[df[\"Measure\"] == \"TAB\"]\n", "max_shipment = df[\"DOSAGE_UNIT\"].max()\n", "max_shipment.compute()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What dask is actually doing is exactly what you just did! It reads in chunks of the dataset, calculates `morphine_equivalent_g` for each chunk, then calculates the maximum value for that chunk. Then it gathers all those maximium values, and finds the maximum of all those chunk maximums. The only difference is that it loads and evaluations chunks in parallel, and it's parallel workers then have to pass their maximum value candidates to a central node for the final evaluation. \n", "\n", "Moreover, when I said that's exactly what dask did, I don't just mean that you and dask are doing the same thing *in principle*—dask is built on pandas, so it really is calling `pd.read_csv`, and the `pandas` `.max()` method, just like you. \n", "\n", "Dask's developers were smart, and didn't want to reinvent the wheel—they just created a package full or recipes for using numpy/pandas to (a) divide tasks into smaller pieces it can distribute to different workers (map), and to (b) recombine the results of sub-problems to give you a single answer (reduce). \n", "\n", "But those recipes are just made of the pandas and numpy code you know and love. Or at least tolerate begrudingly. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Lazy Evaluation\n", "\n", "You've already seen this above, but I think this is THE concept that is central to understanding not just dask, but **any** distributed computing platform (e.g. Spark, Hadoop, etc.), so I really want to drive home the importance of this concept.\n", "\n", "The fact that you can give dask a handful of commands *and then* tell it to execute them is at the absolute core of what makes it effective. The more commands you can give dask up front, the more it can optimize how it distributes that work—and how it moves data between different computers. Moving data between processes and between computers is *by far* the slowest part of distributed computing, so the more dask can minimize those transfers, the faster it will be. \n", "\n", "Moreover, lazy evaluation is how you protect yourself from crashing your computer. Consider the following code:\n", "\n", "```python\n", "df = dd.read_csv(\n", " \"arcos_all_washpost_2006_2012.tsv\",\n", " sep=\"\\t\",\n", " dtype={\n", " \"ACTION_INDICATOR\": \"object\",\n", " \"ORDER_FORM_NO\": \"object\",\n", " \"REPORTER_ADDRESS2\": \"object\",\n", " \"REPORTER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDRESS2\": \"object\",\n", " \"NDC_NO\": \"object\",\n", " \"UNIT\": \"object\",\n", " },\n", ")\n", "\n", "df = df[df['Measure'] == \"TAB\"] \n", "df.compute()\n", "```\n", "\n", "If `dd.read_csv()` executed immediately and tried to load all the data and pass it back to you before subsetting, it'd crash your computer! After all, if you could load it all at once, we probably wouldn't be using dask, would we? It's only because dask knows you want to load each chunk then subset it before collecting all those pieces that this code works.\n", "\n", "### Visualizing Task Graphs\n", "\n", "You can actually see how your computer is developing a plan with the `.dask` method, which you can run on any dask object before you've run `.compute()`. Let's see this in practice with our code from the top of this notebook (You get an HTML object you can manipulate and explore):" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", "
\n", "

HighLevelGraph

\n", "

\n", " HighLevelGraph with 14 layers and 16238 keys from all layers.\n", "

\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer1: read-csv

\n", "
\n", "

\n", " read-csv-98dc9420c2f021fb4e0bb3d80c72065d\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeDataFrameIOLayer
is_materializedFalse
number of outputs1249
npartitions1249
columns['REPORTER_DEA_NO', 'REPORTER_BUS_ACT', 'REPORTER_NAME', 'REPORTER_ADDL_CO_INFO', 'REPORTER_ADDRESS1', 'REPORTER_ADDRESS2', 'REPORTER_CITY', 'REPORTER_STATE', 'REPORTER_ZIP', 'REPORTER_COUNTY', 'BUYER_DEA_NO', 'BUYER_BUS_ACT', 'BUYER_NAME', 'BUYER_ADDL_CO_INFO', 'BUYER_ADDRESS1', 'BUYER_ADDRESS2', 'BUYER_CITY', 'BUYER_STATE', 'BUYER_ZIP', 'BUYER_COUNTY', 'TRANSACTION_CODE', 'DRUG_CODE', 'NDC_NO', 'DRUG_NAME', 'QUANTITY', 'UNIT', 'ACTION_INDICATOR', 'ORDER_FORM_NO', 'CORRECTION_NO', 'STRENGTH', 'TRANSACTION_DATE', 'CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT', 'TRANSACTION_ID', 'Product_Name', 'Ingredient_Name', 'Measure', 'MME_Conversion_Factor', 'Combined_Labeler_Name', 'Revised_Company_Name', 'Reporter_family', 'dos_str']
typedask.dataframe.core.DataFrame
dataframe_typepandas.core.frame.DataFrame
series_dtypes{'REPORTER_DEA_NO': dtype('O'), 'REPORTER_BUS_ACT': dtype('O'), 'REPORTER_NAME': dtype('O'), 'REPORTER_ADDL_CO_INFO': dtype('O'), 'REPORTER_ADDRESS1': dtype('O'), 'REPORTER_ADDRESS2': dtype('O'), 'REPORTER_CITY': dtype('O'), 'REPORTER_STATE': dtype('O'), 'REPORTER_ZIP': dtype('int64'), 'REPORTER_COUNTY': dtype('O'), 'BUYER_DEA_NO': dtype('O'), 'BUYER_BUS_ACT': dtype('O'), 'BUYER_NAME': dtype('O'), 'BUYER_ADDL_CO_INFO': dtype('O'), 'BUYER_ADDRESS1': dtype('O'), 'BUYER_ADDRESS2': dtype('O'), 'BUYER_CITY': dtype('O'), 'BUYER_STATE': dtype('O'), 'BUYER_ZIP': dtype('int64'), 'BUYER_COUNTY': dtype('O'), 'TRANSACTION_CODE': dtype('O'), 'DRUG_CODE': dtype('int64'), 'NDC_NO': dtype('O'), 'DRUG_NAME': dtype('O'), 'QUANTITY': dtype('float64'), 'UNIT': dtype('O'), 'ACTION_INDICATOR': dtype('O'), 'ORDER_FORM_NO': dtype('O'), 'CORRECTION_NO': dtype('float64'), 'STRENGTH': dtype('float64'), 'TRANSACTION_DATE': dtype('int64'), 'CALC_BASE_WT_IN_GM': dtype('float64'), 'DOSAGE_UNIT': dtype('float64'), 'TRANSACTION_ID': dtype('int64'), 'Product_Name': dtype('O'), 'Ingredient_Name': dtype('O'), 'Measure': dtype('O'), 'MME_Conversion_Factor': dtype('float64'), 'Combined_Labeler_Name': dtype('O'), 'Revised_Company_Name': dtype('O'), 'Reporter_family': dtype('O'), 'dos_str': dtype('float64')}
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer2: getitem

\n", "
\n", "

\n", " getitem-b4fc17c18d2b87711188593a28c0412d\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
depends on read-csv-98dc9420c2f021fb4e0bb3d80c72065d
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer3: to_datetime

\n", "
\n", "

\n", " to_datetime-52eca15982ab857959b3001545bb57fa\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
depends on getitem-b4fc17c18d2b87711188593a28c0412d
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer4: assign

\n", "
\n", "

\n", " assign-df1f62c87a9d21a7c2e1959a33e7735a\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
npartitions1249
columns['REPORTER_DEA_NO', 'REPORTER_BUS_ACT', 'REPORTER_NAME', 'REPORTER_ADDL_CO_INFO', 'REPORTER_ADDRESS1', 'REPORTER_ADDRESS2', 'REPORTER_CITY', 'REPORTER_STATE', 'REPORTER_ZIP', 'REPORTER_COUNTY', 'BUYER_DEA_NO', 'BUYER_BUS_ACT', 'BUYER_NAME', 'BUYER_ADDL_CO_INFO', 'BUYER_ADDRESS1', 'BUYER_ADDRESS2', 'BUYER_CITY', 'BUYER_STATE', 'BUYER_ZIP', 'BUYER_COUNTY', 'TRANSACTION_CODE', 'DRUG_CODE', 'NDC_NO', 'DRUG_NAME', 'QUANTITY', 'UNIT', 'ACTION_INDICATOR', 'ORDER_FORM_NO', 'CORRECTION_NO', 'STRENGTH', 'TRANSACTION_DATE', 'CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT', 'TRANSACTION_ID', 'Product_Name', 'Ingredient_Name', 'Measure', 'MME_Conversion_Factor', 'Combined_Labeler_Name', 'Revised_Company_Name', 'Reporter_family', 'dos_str', 'date']
typedask.dataframe.core.DataFrame
dataframe_typepandas.core.frame.DataFrame
series_dtypes{'REPORTER_DEA_NO': dtype('O'), 'REPORTER_BUS_ACT': dtype('O'), 'REPORTER_NAME': dtype('O'), 'REPORTER_ADDL_CO_INFO': dtype('O'), 'REPORTER_ADDRESS1': dtype('O'), 'REPORTER_ADDRESS2': dtype('O'), 'REPORTER_CITY': dtype('O'), 'REPORTER_STATE': dtype('O'), 'REPORTER_ZIP': dtype('int64'), 'REPORTER_COUNTY': dtype('O'), 'BUYER_DEA_NO': dtype('O'), 'BUYER_BUS_ACT': dtype('O'), 'BUYER_NAME': dtype('O'), 'BUYER_ADDL_CO_INFO': dtype('O'), 'BUYER_ADDRESS1': dtype('O'), 'BUYER_ADDRESS2': dtype('O'), 'BUYER_CITY': dtype('O'), 'BUYER_STATE': dtype('O'), 'BUYER_ZIP': dtype('int64'), 'BUYER_COUNTY': dtype('O'), 'TRANSACTION_CODE': dtype('O'), 'DRUG_CODE': dtype('int64'), 'NDC_NO': dtype('O'), 'DRUG_NAME': dtype('O'), 'QUANTITY': dtype('float64'), 'UNIT': dtype('O'), 'ACTION_INDICATOR': dtype('O'), 'ORDER_FORM_NO': dtype('O'), 'CORRECTION_NO': dtype('float64'), 'STRENGTH': dtype('float64'), 'TRANSACTION_DATE': dtype('int64'), 'CALC_BASE_WT_IN_GM': dtype('float64'), 'DOSAGE_UNIT': dtype('float64'), 'TRANSACTION_ID': dtype('int64'), 'Product_Name': dtype('O'), 'Ingredient_Name': dtype('O'), 'Measure': dtype('O'), 'MME_Conversion_Factor': dtype('float64'), 'Combined_Labeler_Name': dtype('O'), 'Revised_Company_Name': dtype('O'), 'Reporter_family': dtype('O'), 'dos_str': dtype('float64'), 'date': dtype('<M8[ns]')}
depends on read-csv-98dc9420c2f021fb4e0bb3d80c72065d
to_datetime-52eca15982ab857959b3001545bb57fa
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer5: getitem

\n", "
\n", "

\n", " getitem-06023399ad03ddb0261ddd8d1b79d5e3\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
depends on assign-df1f62c87a9d21a7c2e1959a33e7735a
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer6: dt-year

\n", "
\n", "

\n", " dt-year-a9efa4ff5a9d6612f62c70f567177170\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
depends on getitem-06023399ad03ddb0261ddd8d1b79d5e3
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer7: assign

\n", "
\n", "

\n", " assign-f485e25f94f5664c192e73df6d42002d\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
npartitions1249
columns['REPORTER_DEA_NO', 'REPORTER_BUS_ACT', 'REPORTER_NAME', 'REPORTER_ADDL_CO_INFO', 'REPORTER_ADDRESS1', 'REPORTER_ADDRESS2', 'REPORTER_CITY', 'REPORTER_STATE', 'REPORTER_ZIP', 'REPORTER_COUNTY', 'BUYER_DEA_NO', 'BUYER_BUS_ACT', 'BUYER_NAME', 'BUYER_ADDL_CO_INFO', 'BUYER_ADDRESS1', 'BUYER_ADDRESS2', 'BUYER_CITY', 'BUYER_STATE', 'BUYER_ZIP', 'BUYER_COUNTY', 'TRANSACTION_CODE', 'DRUG_CODE', 'NDC_NO', 'DRUG_NAME', 'QUANTITY', 'UNIT', 'ACTION_INDICATOR', 'ORDER_FORM_NO', 'CORRECTION_NO', 'STRENGTH', 'TRANSACTION_DATE', 'CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT', 'TRANSACTION_ID', 'Product_Name', 'Ingredient_Name', 'Measure', 'MME_Conversion_Factor', 'Combined_Labeler_Name', 'Revised_Company_Name', 'Reporter_family', 'dos_str', 'date', 'year']
typedask.dataframe.core.DataFrame
dataframe_typepandas.core.frame.DataFrame
series_dtypes{'REPORTER_DEA_NO': dtype('O'), 'REPORTER_BUS_ACT': dtype('O'), 'REPORTER_NAME': dtype('O'), 'REPORTER_ADDL_CO_INFO': dtype('O'), 'REPORTER_ADDRESS1': dtype('O'), 'REPORTER_ADDRESS2': dtype('O'), 'REPORTER_CITY': dtype('O'), 'REPORTER_STATE': dtype('O'), 'REPORTER_ZIP': dtype('int64'), 'REPORTER_COUNTY': dtype('O'), 'BUYER_DEA_NO': dtype('O'), 'BUYER_BUS_ACT': dtype('O'), 'BUYER_NAME': dtype('O'), 'BUYER_ADDL_CO_INFO': dtype('O'), 'BUYER_ADDRESS1': dtype('O'), 'BUYER_ADDRESS2': dtype('O'), 'BUYER_CITY': dtype('O'), 'BUYER_STATE': dtype('O'), 'BUYER_ZIP': dtype('int64'), 'BUYER_COUNTY': dtype('O'), 'TRANSACTION_CODE': dtype('O'), 'DRUG_CODE': dtype('int64'), 'NDC_NO': dtype('O'), 'DRUG_NAME': dtype('O'), 'QUANTITY': dtype('float64'), 'UNIT': dtype('O'), 'ACTION_INDICATOR': dtype('O'), 'ORDER_FORM_NO': dtype('O'), 'CORRECTION_NO': dtype('float64'), 'STRENGTH': dtype('float64'), 'TRANSACTION_DATE': dtype('int64'), 'CALC_BASE_WT_IN_GM': dtype('float64'), 'DOSAGE_UNIT': dtype('float64'), 'TRANSACTION_ID': dtype('int64'), 'Product_Name': dtype('O'), 'Ingredient_Name': dtype('O'), 'Measure': dtype('O'), 'MME_Conversion_Factor': dtype('float64'), 'Combined_Labeler_Name': dtype('O'), 'Revised_Company_Name': dtype('O'), 'Reporter_family': dtype('O'), 'dos_str': dtype('float64'), 'date': dtype('<M8[ns]'), 'year': dtype('int64')}
depends on assign-df1f62c87a9d21a7c2e1959a33e7735a
dt-year-a9efa4ff5a9d6612f62c70f567177170
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer8: getitem

\n", "
\n", "

\n", " getitem-65b082a7179bbd179d42b1f248add400\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
depends on assign-f485e25f94f5664c192e73df6d42002d
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer9: getitem

\n", "
\n", "

\n", " getitem-9b67ae1a2bd7fb1817a63efa1994d7b4\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
depends on assign-f485e25f94f5664c192e73df6d42002d
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer10: mul

\n", "
\n", "

\n", " mul-9ab4fa814864c801e2cba6374dc6a7ac\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
depends on getitem-9b67ae1a2bd7fb1817a63efa1994d7b4
getitem-65b082a7179bbd179d42b1f248add400
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer11: assign

\n", "
\n", "

\n", " assign-499e0f8d8a2531f815b06b72dcbc9adc\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
npartitions1249
columns['REPORTER_DEA_NO', 'REPORTER_BUS_ACT', 'REPORTER_NAME', 'REPORTER_ADDL_CO_INFO', 'REPORTER_ADDRESS1', 'REPORTER_ADDRESS2', 'REPORTER_CITY', 'REPORTER_STATE', 'REPORTER_ZIP', 'REPORTER_COUNTY', 'BUYER_DEA_NO', 'BUYER_BUS_ACT', 'BUYER_NAME', 'BUYER_ADDL_CO_INFO', 'BUYER_ADDRESS1', 'BUYER_ADDRESS2', 'BUYER_CITY', 'BUYER_STATE', 'BUYER_ZIP', 'BUYER_COUNTY', 'TRANSACTION_CODE', 'DRUG_CODE', 'NDC_NO', 'DRUG_NAME', 'QUANTITY', 'UNIT', 'ACTION_INDICATOR', 'ORDER_FORM_NO', 'CORRECTION_NO', 'STRENGTH', 'TRANSACTION_DATE', 'CALC_BASE_WT_IN_GM', 'DOSAGE_UNIT', 'TRANSACTION_ID', 'Product_Name', 'Ingredient_Name', 'Measure', 'MME_Conversion_Factor', 'Combined_Labeler_Name', 'Revised_Company_Name', 'Reporter_family', 'dos_str', 'date', 'year', 'morphine_equivalent_g']
typedask.dataframe.core.DataFrame
dataframe_typepandas.core.frame.DataFrame
series_dtypes{'REPORTER_DEA_NO': dtype('O'), 'REPORTER_BUS_ACT': dtype('O'), 'REPORTER_NAME': dtype('O'), 'REPORTER_ADDL_CO_INFO': dtype('O'), 'REPORTER_ADDRESS1': dtype('O'), 'REPORTER_ADDRESS2': dtype('O'), 'REPORTER_CITY': dtype('O'), 'REPORTER_STATE': dtype('O'), 'REPORTER_ZIP': dtype('int64'), 'REPORTER_COUNTY': dtype('O'), 'BUYER_DEA_NO': dtype('O'), 'BUYER_BUS_ACT': dtype('O'), 'BUYER_NAME': dtype('O'), 'BUYER_ADDL_CO_INFO': dtype('O'), 'BUYER_ADDRESS1': dtype('O'), 'BUYER_ADDRESS2': dtype('O'), 'BUYER_CITY': dtype('O'), 'BUYER_STATE': dtype('O'), 'BUYER_ZIP': dtype('int64'), 'BUYER_COUNTY': dtype('O'), 'TRANSACTION_CODE': dtype('O'), 'DRUG_CODE': dtype('int64'), 'NDC_NO': dtype('O'), 'DRUG_NAME': dtype('O'), 'QUANTITY': dtype('float64'), 'UNIT': dtype('O'), 'ACTION_INDICATOR': dtype('O'), 'ORDER_FORM_NO': dtype('O'), 'CORRECTION_NO': dtype('float64'), 'STRENGTH': dtype('float64'), 'TRANSACTION_DATE': dtype('int64'), 'CALC_BASE_WT_IN_GM': dtype('float64'), 'DOSAGE_UNIT': dtype('float64'), 'TRANSACTION_ID': dtype('int64'), 'Product_Name': dtype('O'), 'Ingredient_Name': dtype('O'), 'Measure': dtype('O'), 'MME_Conversion_Factor': dtype('float64'), 'Combined_Labeler_Name': dtype('O'), 'Revised_Company_Name': dtype('O'), 'Reporter_family': dtype('O'), 'dos_str': dtype('float64'), 'date': dtype('<M8[ns]'), 'year': dtype('int64'), 'morphine_equivalent_g': dtype('float64')}
depends on mul-9ab4fa814864c801e2cba6374dc6a7ac
assign-f485e25f94f5664c192e73df6d42002d
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer12: getitem

\n", "
\n", "

\n", " getitem-9b6041b75a79c3e71d6d7acd7239fd91\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
npartitions1249
columns['year', 'morphine_equivalent_g', 'BUYER_STATE', 'BUYER_COUNTY']
typedask.dataframe.core.DataFrame
dataframe_typepandas.core.frame.DataFrame
series_dtypes{'year': dtype('int64'), 'morphine_equivalent_g': dtype('float64'), 'BUYER_STATE': dtype('O'), 'BUYER_COUNTY': dtype('O')}
depends on assign-499e0f8d8a2531f815b06b72dcbc9adc
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer13: series-groupby-sum-chunk

\n", "
\n", "

\n", " series-groupby-sum-chunk-3fbadc183a38ae4572bc0a532c474cd5-d5a6d15d0b1463d80fbfbb777dc826f7\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeBlockwise
is_materializedFalse
number of outputs1249
depends on getitem-9b6041b75a79c3e71d6d7acd7239fd91
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", "\n", "
\n", " \n", "

Layer14: series-groupby-sum-agg

\n", "
\n", "

\n", " series-groupby-sum-agg-3fbadc183a38ae4572bc0a532c474cd5\n", "

\n", "\n", " \n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
layer_typeDataFrameTreeReduction
is_materializedFalse
number of outputs1
depends on series-groupby-sum-chunk-3fbadc183a38ae4572bc0a532c474cd5-d5a6d15d0b1463d80fbfbb777dc826f7
\n", "
\n", " \n", "
\n", "\n", "
\n", "
\n", " \n", "
\n", "
\n", "
" ], "text/plain": [ "HighLevelGraph with 14 layers.\n", "\n", " 0. read-csv-98dc9420c2f021fb4e0bb3d80c72065d\n", " 1. getitem-b4fc17c18d2b87711188593a28c0412d\n", " 2. to_datetime-52eca15982ab857959b3001545bb57fa\n", " 3. assign-df1f62c87a9d21a7c2e1959a33e7735a\n", " 4. getitem-06023399ad03ddb0261ddd8d1b79d5e3\n", " 5. dt-year-a9efa4ff5a9d6612f62c70f567177170\n", " 6. assign-f485e25f94f5664c192e73df6d42002d\n", " 7. getitem-65b082a7179bbd179d42b1f248add400\n", " 8. getitem-9b67ae1a2bd7fb1817a63efa1994d7b4\n", " 9. mul-9ab4fa814864c801e2cba6374dc6a7ac\n", " 10. assign-499e0f8d8a2531f815b06b72dcbc9adc\n", " 11. getitem-9b6041b75a79c3e71d6d7acd7239fd91\n", " 12. series-groupby-sum-chunk-3fbadc183a38ae4572bc0a532c474cd5-d5a6d15d0b1463d80fbfbb777dc826f7\n", " 13. series-groupby-sum-agg-3fbadc183a38ae4572bc0a532c474cd5" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = dd.read_csv(\n", " \"arcos_all_washpost_2006_2012.tsv\",\n", " sep=\"\\t\",\n", " dtype={\n", " \"ACTION_INDICATOR\": \"object\",\n", " \"ORDER_FORM_NO\": \"object\",\n", " \"REPORTER_ADDRESS2\": \"object\",\n", " \"REPORTER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDL_CO_INFO\": \"object\",\n", " \"BUYER_ADDRESS2\": \"object\",\n", " \"NDC_NO\": \"object\",\n", " \"UNIT\": \"object\",\n", " },\n", ")\n", "\n", "# Extract year\n", "df[\"date\"] = dd.to_datetime(df.TRANSACTION_DATE, format=\"%m%d%Y\")\n", "df[\"year\"] = df.date.dt.year\n", "\n", "# Make an estimate of total morphine equivalent shipments\n", "df[\"morphine_equivalent_g\"] = (df[\"CALC_BASE_WT_IN_GM\"]) * df[\"MME_Conversion_Factor\"]\n", "\n", "# Drop extra vars\n", "df = df[[\"year\", \"morphine_equivalent_g\", \"BUYER_STATE\", \"BUYER_COUNTY\"]]\n", "\n", "# Collapse to total shipments to each county in each year.\n", "collapsed = df.groupby(\n", " [\"year\", \"BUYER_STATE\", \"BUYER_COUNTY\"]\n", ").morphine_equivalent_g.sum()\n", "\n", "collapsed.dask\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Or you can plot out a task graph. You can do this at a high level with `.dask.visualize()`, or in *excruciating* detail for a task this big with just `.visualize()`. Note `visualize()` does require you install additional libraries, so install `ipycytoscape` and `python-graphviz` with conda first. But these get so big I'm not gonna include one here—you can find examples [here if you'd like to see more.](https://docs.dask.org/en/stable/graphviz.html)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## dask versus Spark / PySpark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask is something of a late-comer to the distributed computing game. Before its rise, the most popular platforms were Hadoop and Spark (spark is basically \"Hadoop v2.0\"). So... how does dask compare to these? Or more specifically, how does dask compare to Spark (since no one uses Hadoop anymore) and the various Python libraries for using Spark (e.g. PySpark and koalas)?\n", "\n", "In terms of functionality, dask and Spark basically do the same things. Both are tools for distributed computing across many computers, and conceptually they work in basically the same ways: \n", "\n", "- Both offer both low-level tools (things like `map`/`filter` in pyspark, and `delayed` in dask) as well as higher-level data structures (`spark` has `spark dataframes`, dask has `dask arrays`; `spark` has RDDs, dask has `dask arrays`).\n", "- Both are fault tolerant (if one machine in your cluster dies, the system knows what the machine that died was doing and can assign it to another cluster).\n", "- Both make use of delayed execution / lazy evaluation to allow the system to develop efficient plans for completing a computation efficiently.\n", "- Both can be run on your own computer, or on a cluster of lots of computers.\n", "\n", "**The huge difference, though, is that dask lets you write code with the pandas syntax you already know.** Seriously. There are lots of little \"under the hood\" things, but from the perspective of an applied Data Scientist, that's the big one: it's just a version of pandas.\n", "\n", "Moreover, as we'll see in [later lessons](cloud_dask.ipynb), dask is also insanely easy to setup on a distributed cluster, making it not only easier to use than Spark for a pandas user, but generally also much easier to get up and running (at least in my experience).\n", "\n", "Spark, by contrast, is a stand-alone system. It's built to run on Java virtual machines, and Spark itself is written in a language called Scala. It is not a tool for pandas users per se, and so it has its own syntax for manipulating datasets that is distinct from that of `numpy` and `pandas`. \n", "\n", "**A Note on Koalas:** in the last year or so, there have been inroads in implementing the pandas API on top of spark—check out [koalas](https://koalas.readthedocs.io/en/latest/getting_started/10min.html) project if you're at a place that uses spark!\n", "\n", "As for which is more popular, as is so often the case with software, it depends on where you're working. In this class, I've decided to teach dask because you don't have to learn a new syntax, so you can instead just focus on learning how distributed computing works. But if you get a job someday that requires you to work with Spark, don't panic -- you'll find that all the concepts you've learned for using dask also apply to Spark—you'll just have to learn some new syntax.\n", "\n", "And as for performance, it probably depends on the workload, but I've yet to see anything that suggests dask compute times are slower than Spark compute times. Case studies are always tricky since performance depends on the exact work being done, but here's one [case study comparison of performance](https://arxiv.org/abs/1907.13030) that puts dask just a little ahead. And I've heard reports of dask beating Spark by 40x in another project. So personally, I think dask is a no-brainer if you aren't working at a company that it is already using spark: it will take **much** less of your time setup and use, and it should at least run about even with Spark.\n", "\n", "The only exceptions are some specific use cases, like network analysis, where Spark has some libraries that dask does not. For more on these types of issues, check out this [conceptual comparison to Spark](https://docs.dask.org/en/latest/spark.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What else can dask do?\n", "\n", "At this point, we've mostly emphasized what dask can do in terms of the data wrangling of dataframes. However, dask has a number of additional functionalities to be aware of:\n", "\n", "- Working with dask arrays: dask has a parallelized version of numpy arrays\n", "- Parallelizing arbitrary functions: you can write your own parallelized functions with `delayed`\n", "- Distributed machine learning: dask has parallelized SOME machine learning methods in [dask-ml](https://dask-ml.readthedocs.io/en/latest/)\n", "- dask can be used with different tools for parallelization other than `distributed` (for example, it can do some parallelism through multi-threading). However... it seems like everyone seems to agree at this point you should just use `distributed` all the time. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What can't dask do?\n", "\n", "Because dask is basically a library full of little recipes for parallelizing common python, numpy, and pandas functions, you will occassionally find that there are some things that the authors of dask just haven't implemented (usually because some operations are *really* hard to parallelize). Here are guides to what you can expect to work and what is unlikely to work with dask:\n", "\n", "- [When using dask dataframes](https://docs.dask.org/en/latest/dataframe.html#scope)\n", "- [When using dask arrays](https://docs.dask.org/en/latest/array.html#scope)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## If you really want to get into dask...\n", "\n", "Then *after* doing the exercise below (which I think is actually the most accessible to those taking this class), here are some extensions and next steps:\n", "\n", "- `dask-ml`: As a reminder, if you now want to do some machine learning, you can [use dask-ml on this system](https://ml.dask.org/), which does the same thing for `scikit-learn` that regular dask does for `pandas`. \n", "- Check out [dask best practices here](https://docs.dask.org/en/latest/best-practices.html).\n", "- Here's a great blog post on the [history of dask](https://coiled.io/blog/history-dask/) by one of its creaters. \n", "- Interested in using dask in your company and want help? There's a great new company created by the founders of dask to provide enterprise support for dask [called coiled](https://coiled.io/) (No, I have no affiliation with them, I just think these companies that try to offer paid support services to businesses to help them move from closed source software to open source are a great way to help make open source software better). You can also hear a fun interview with the founders about [both dask and coiled here](https://talkpython.fm/episodes/show/285/dask-as-a-platform-service-with-coiled).\n", "- The folks from coiled have also compiled a [great collection of videos and tutorials about dask and Python at scale here](https://coiled.io/videos/)\n", "- Curious how dask compares to other tools for distributed computing? Here's a [conceptual comparison to Spark](https://docs.dask.org/en/latest/spark.html), and here's a [case study comparison of performance](https://arxiv.org/abs/1907.13030). Comparisons will usually depend a lot on the specifics of the work being done, but at least in this case, dask was a little faster than Spark.\n", "- Working with GPUs? There's a project to offer the kind of CPU parallelization we get from dask for GPUs called [dask-cudf](https://docs.rapids.ai/api/cudf/stable/dask-cudf.html) (part of the [RAPIDS project](https://rapids.ai/index.html). The project is young but growing quickly. My guess, though, is that those libraries will become the infrastructure for updates to tools like `dask-ml` rather than something most applied people need to play with. But putting it here as an FYI!\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercises\n", "\n", "Exercises can be found [here](exercises/Exercise_dask.ipynb)\n", "\n", "If you want more, you can find tutorials written by the dask team [here](https://github.com/dask/dask-tutorial). \n", "\n", "(Note: all these tutorials are great, but as the focus of this class is on real world tabular data, we're gonna focus on those exercises). \n", "\n", "[When you're done, you can find a dask cheatsheet here for future reference!](cheatsheets.ipynb)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.10.8 ('dask')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" }, "vscode": { "interpreter": { "hash": "a7bf1894231b67e95faebcb917bbd88035b8aa99fb0bb538f7696b61105023d6" } } }, "nbformat": 4, "nbformat_minor": 4 }