Bursting Data Science Workloads to GPUs on Google Cloud Platform with Dask Cloud Provider (2024)

Jacob Tomlinson

·

Follow

9 min read

·

Jan 15, 2021

--

With the recent release of Dask Cloud Provider, we’ve added virtual machine support for a number of cloud platforms. In this blog post, we will discuss scaling from RAPIDS on a local machine to a multi-node, multi-GPU cluster on Google Cloud with Dask Cloud Provider. We’ll use the same docker image we run RAPIDS with on our local machine as we do when we burst to Google Cloud Platform (GCP) for additional GPU access.

Consider the scenario where we are working on some data stored in Google Cloud Storage (GCS) and we want to analyze it using RAPIDS.

Let’s start our work on a laptop with an NVIDIA GPU, it runs Linux, has NVIDIA drivers, and has Docker installed, so I can get started with RAPIDS quickly using the Docker image.

$ docker run — gpus all — rm -it -p 8888:8888 -p 8787:8787 -p 8786:8786 rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8

Once we have our container running we will need additional packages for creating instances on GCP (dask-cloudprovider) and reading from Google Cloud Storage (gcsfs). In the running docker container execute the following:

$ pip install dask-cloudprovider[gcs] gcsfs

For this blog post, we will work with the NYC Taxi data, specifically, the mirror maintained on GCS by Anaconda.

Bursting Data Science Workloads to GPUs on Google Cloud Platform with Dask Cloud Provider (3)

Here we have ~30GBs of CSV data stored in GCS. We could download this data locally, but instead, we are going to use Dask and RAPIDS cuDF to read this data directly and lazily in partitions. Lazy loading here means that we will construct a local representation of the data by only reading the metadata, then when we perform operations on the data it will be read just in time.

To do this we must construct a Dask cluster and connect a client to it. Because we are using RAPIDS we need to construct a LocalCUDACluster from the dask_cuda package.

>>> from dask.distributed import Client
>>> from dask_cuda import LocalCUDACluster
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)

Now that we have a cluster we can use dask_cudf to construct our dataframe.

>>> import dask_cudf
>>> df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")

This will take a few seconds to read in all the metadata from GCS. Then we can have a look at some data.

>>> df.head()
Bursting Data Science Workloads to GPUs on Google Cloud Platform with Dask Cloud Provider (4)

Here we can see the first five rows of the dataframe, but we’ve only read one partition from GCS in order to do so. By default dask_cudf reads in 256MB partitions, but this is configurable. We can see how many partitions our data has been split into.

>>> df.npartitions
110

Next, we should clean our data a little in order to work with it as data is often messy, and this data set is no exception. The column names contain additional whitespace in this dataset and some of the long lat values are missing, so let’s tidy that up and drop missing values.

>>> df.columns = [s.strip() for s in list(df.columns)]
>>> df = df.dropna(subset=["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])

As a small example, let’s use RAPIDS cuSpatial to calculate the direct distance between the pickup and dropoff locations using the haversine_distance function. So next let’s write a function which we will map over our partitions which will calculate the distance.

>>> from cuspatial import haversine_distance
>>> def apply_haversine(partition):
... return cudf.Series(haversine_distance(
... partition.pickup_longitude,
... partition.pickup_latitude,
... partition.dropoff_longitude,
... partition.dropoff_latitude
... ))

Now we can map this function over our partitions and calculate the mean distance of all the NYC taxi trips in 2014.

>>> df.map_partitions(apply_haversine).mean().compute()
34.30891087351521

Success! We have successfully read our data directly from GCS over the internet and used our local GPU to crunch some numbers. However, this operation is limited by two main factors: slow internet connection and GPU performance.

  1. Downloading and storing the data locally doesn’t scale, so to avoid that we have streamed the data from GCS into memory. However, this approach is limited by your internet connection.
  2. We are also using a low-end mobile GPU. This is an excellent way to explore and experiment with RAPIDS on your own hardware, but when we want to do some more complex calculations we will need something faster.

But what if we could use top of the line GPUs located within Google Cloud instead of our local ones. This would also allow our computation to happen closer to the data, so we don’t need to worry about our local bandwidth.

Dask Cloud Provider is a package that allows you to construct Dask clusters on various cloud resources. For this example, we are going to use the GCPCluster class which will construct a Dask cluster running on Google Cloud VM Instances.

Construct a Dask Cluster running on GCP VM Instances

The VM instances we are going to use have NVIDIA A100 GPUs which are 20x more performant than the previous generation, have 40GB of GPU memory each, and are connected via NVLINK with 600GB/s of bandwidth.

>>> from dask_cloudprovider.gcp import GCPCluster

Configure Authentication and Project

In order for GCPCluster to interact with GCP, we need to configure our authentication and project. To authenticate we can use the gcloud CLI tool.

$ gcloud auth init

We also need to configure our project in our Dask settings. This can either be placed in our Dask config at ~/.config/dask/cloudprovider.yaml

cloudprovider:
gcp:
projectid: "my-project-id"

As an environment variable

$ export DASK_CLOUDPROVIDER__GCP__PROJECTID="my-project-id"

Or directly in our code

>>> import dask.config
>>> dask.config.set(**{"cloudprovider.gcp.projectid": "my-project-id"})

Now that we have that set up we can create a cluster, but we specifically need to create a RAPIDS cluster.

cluster = GCPCluster(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
)
client = Client(cluster)

Here we specify the following options:

  • The zone is the GCP zone we want our instances to launch in.
  • The machine type is the type of VM we want, here we have chosen an a2 instance that has two NVIDIA A100 GPUs.
  • We start with one worker but can scale this up later if we wished.
  • We choose the same Docker image as we are running locally. This is important to ensure the two Python environments are the same.
  • We specify that we want to use the dask_cuda.CUDAWorker class for our Dask workers as that will correctly configure our remote GPUs (this was handled locally by LocalCUDACluster)
  • We set the EXTRA_PIP_PACKAGES environment variable to ensure all the workers have gcsfs installed.

Running this will launch our Dask cluster on GCP in around 15 minutes.

Once our cluster is up and running we can perform the same analysis work we did before.

>>> df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")
>>> df.columns = [s.strip() for s in list(df.columns)]
>>> df = df.dropna(subset=["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
>>> df.map_partitions(apply_haversine).mean().compute()

This time all of the work will happen remotely in GCP, which means our data will read faster and the computation will happen on top of the line A100 GPUs.

Lastly, we need to remember to shut our cluster down.

client.close()
cluster.close()

This is great! We demonstrated how to scale our workload with more cloud resources but there are additional features to make this experience even better. We can use Context Managers for cleanup and Packer for faster launching.

It is easy to forget to close out your cluster, and the last thing you want to do is leave some high-performance infrastructure running unnecessarily on your account. To solve this all of the Dask cluster managers can also be used as context managers.

with GCPCluster(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
) as cluster:
with Client(cluster) as client: df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")
df.columns = [s.strip() for s in list(df.columns)]
df = df.dropna(subset=['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude'])
df.map_partitions(apply_haversine).mean().compute()

By using our cluster manager with a statement the cluster will be created and destroyed automatically in order to run this piece of work.

Waiting 15 minutes to start our cluster at the beginning of our workload may be an acceptable trade-off. There are network performance benefits with being inside GCP and having access to scalable GPUs means your workload will be much faster than if you were running them locally.

However, we can improve on this.

When GCPCluster starts our instances it has to prepare the VM, install drivers and software and pull down a copy of the RAPIDS docker image and decompress it. But what if our nodes were ready to go immediately?

Using a tool called packer we can create our own custom source image which already contains all of our dependencies. So let’s install Packer.

$ wget https://releases.hashicorp.com/packer/1.6.5/packer_1.6.5_linux_amd64.zip
$ unzip packer_1.6.5_linux_amd64.zip

Next, we need to create a packer config file. Packer creates a VM, runs setup scripts, and then creates a new image from that VM. So we need to first extract the provisioning script that GCPCluster uses to create our machines. We can do this with the get_cloud_init classmethod and pass in the same arguments we would use to create a cluster.

cloud_init = GCPCluster.get_cloud_init(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
)

Next, we need to construct our Packer config. Most of the options here will be familiar as we’ve used them already in Dask Cloud Provider.

packer_config = {
"builders": [
{
"type": "googlecompute",
"project_id": "<PROJECT ID>",
"source_image": "ubuntu-minimal-1804-bionic-v20201014",
"ssh_username": "packer",
"zone": "us-central1-a",
"on_host_maintenance": "TERMINATE",
"disk_size": "50",
"machine_type": "a2-highgpu-2g",
"metadata": {"user-data": cloud_init},
}
],
"provisioners": [
{
"type": "shell",
"inline": [
"echo 'Waiting for cloud-init'; while [ ! -f /var/lib/cloud/instance/boot-finished ]; do sleep 1; done; echo 'Done'"
],
}
],
}

Now that we have created our config we should write it out to a JSON file to use with Packer.

import json
with open("packer.json", "w") as fh:
fh.write(json.dumps(packer_config))

We can then run the packer build.

$ packer build packer.json
googlecompute: output will be in this color.
==> googlecompute: Checking image does not exist...
==> googlecompute: Creating temporary rsa SSH key for instance...
==> googlecompute: Using image: ubuntu-minimal-1804-bionic-v20201014
==> googlecompute: Creating instance...
googlecompute: Loading zone: us-central1-a
googlecompute: Loading machine type: a2-highgpu-2g
googlecompute: Requesting instance creation...
googlecompute: Waiting for creation operation to complete...
googlecompute: Instance has been created!
==> googlecompute: Waiting for the instance to become running...
googlecompute: IP: 34.72.119.86
==> googlecompute: Using ssh communicator to connect: 34.72.119.86
==> googlecompute: Waiting for SSH to become available...
==> googlecompute: Connected to SSH!
==> googlecompute: Provisioning with shell script: /tmp/packer-shell700735704
googlecompute: Waiting for cloud-init
googlecompute: Done
==> googlecompute: Deleting instance...
googlecompute: Instance has been deleted!
==> googlecompute: Creating image...
==> googlecompute: Deleting disk...
googlecompute: Disk has been deleted!
Build 'googlecompute' finished after 9 minutes 53 seconds.
==> Wait completed after 9 minutes 53 seconds==> Builds finished. The artifacts of successful builds are:
--> googlecompute: A disk image was created: packer-1605545458

Here we can see at the bottom of our output that we have a new image called packer-1605545458. So next time we launch a GCPCluster we can pass that as the source_image option and disable the bootstrapping that Dask Cloud Provider does because we have already baked that in.

cluster = GCPCluster(
source_image="packer-1605545458",
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"},
boostrap=False
)
client = Client(cluster)

Now when we start our cluster it only takes around 5 minutes instead of 15.

Being able to move your workload to the cloud by changing the Dask cluster manager you are using is extremely powerful. It allows you to explore and experiment with cheap resources and then easily scale up when you are ready.

To make this experience even more pleasant there are more things we can do.

Currently, you have to learn a little about Packer and create your own Packer build config. All of the information needed is already contained within GCPCluster, so things could be expanded to generate this config for you, or even to run packer on your behalf under the hood.

We also end up waiting twice when we launch a cluster. First, we start a Dask scheduler and wait for it to become available, then we start the workers and they connect to the cluster. Therefore we could cut the wait time in half if we colocated a worker with the scheduler or improved the worker so that it could be started at the same time as the scheduler and attempt to retry the connection if the scheduler came up slower.

Imagine being able to get access to untapped amounts of GPU resources in around two minutes!

Bursting Data Science Workloads to GPUs on Google Cloud Platform with Dask Cloud Provider (2024)

FAQs

Does Google Cloud Run support GPU? ›

You can use NVIDIA GPUs on GCP for large scale cloud deep learning projects, analytics, physical object simulation, video transcoding, and molecular modeling. GCP also provides virtual NVIDIA GRID workstations, which can let an organization's employees run graphics-intensive workloads remotely.

How do I use GPU on Google Cloud Platform? ›

Configure a VM with GPUs in Google Cloud
  1. Initialize a new instance. Create a Google Cloud account. Set up your Virtual Machine.
  2. Establish your environment using Docker.
  3. Install the NVIDIA drivers and container toolkit.
  4. Confirm GPUs availability.
  5. Additional Resources.

How to use dask on GCP? ›

To use Dask Cloudprovider with GCP you must also configure your Project ID. Generally when creating a GCP account you will create a default project. This can be found at the top of the GCP dashboard. Your Project ID must be added to your Dask config file.

Which of Google Cloud Big data managed services is optimized for large scale batch processing or long running stream processing of structured and unstructured data? ›

Dataflow is optimized for large-scale batch processing or long-running stream processing. BigQuery is optimized for getting questions answered rapidly over very large data sets.

How many GPUs does Google Cloud have? ›

Google's New AI-Focused 'A3' Supercomputer Has 26,000 GPUs.

How does cloud GPUs work? ›

Cloud GPUs are computer instances that provide hardware acceleration for an application without deploying GPUs on the user's local device, but rather using GPUs on a cloud service. They provide more flexibility and bandwidth than the CPU's L1 cache, resulting in lower hardware costs and total cost of ownership.

How do I check my GPU usage on Google Cloud? ›

Review metrics in Cloud Monitoring

In the Google Cloud console, go to the Monitoring > Dashboards page. Select the Sample Library tab. In the filter_list Filter field, type NVIDIA. The NVIDIA GPU Monitoring Overview (GCE and GKE) dashboard displays.

What GPUs does Google use for AI? ›

“Runway's text-to-video platform is powered by AI Hypercomputer. At the base, A3 VMs, powered by NVIDIA H100 GPUs gave our training a significant performance boost over A2 VMs, enabling large-scale training and inference for our Gen-2 model.

How to use GPU in cloud run? ›

To use GPUs, a job must do all of the following:
  1. Install the required GPU drivers either automatically or manually depending on the job's requirements.
  2. If your job specifies any other resources for the job's VMs (directly or using a VM instance template), the job must define compatible VM resources.

What are the disadvantages of Dask? ›

Disadvantages: Overhead: Dask introduces some overhead due to the task scheduling and communication between workers, which might impact the performance for certain operations on smaller datasets compared to specialized libraries like Polars.

When not to use Dask? ›

Stop Using Dask When No Longer Needed

In many workloads it is common to use Dask to read in a large amount of data, reduce it down, and then iterate on a much smaller amount of data. For this latter stage on smaller data it may make sense to stop using Dask, and start using normal Python again.

When should you use Dask? ›

To avoid excess memory use, Dask is good at finding ways to evaluate computations in a low-memory footprint when possible by pulling in chunks of data from disk, doing the necessary processing, and throwing away intermediate values as quickly as possible.

What are the three Google Cloud services commonly used together in data engineering solutions? ›

Google Cloud Composer

Data engineering projects often involve complex workflows with multiple steps, and Cloud Composer helps streamline and automate these processes. It can be combined with services like Dataflow, Dataprep, and BigQuery to create end-to-end data pipelines.

What is the difference between Google Cloud and Google Cloud platform? ›

Google Cloud vs Google Cloud Platform

Google Cloud includes a combination of services available over the internet that can help organizations go digital. Google Cloud Platform (which provides public cloud infrastructure for hosting web-based applications and is the focus of this blog post) is a part of Google Cloud.

What are the three types of networks offered in Google Cloud Platform? ›

What are the three types of networks offered in google cloud? Three Types of networks are offered in GCP – Default network, Auto mode VPC network, and Custom mode VPC network.

Does Google Colab run on GPU? ›

Colab is a hosted Jupyter Notebook service that requires no setup to use and provides free access to computing resources, including GPUs and TPUs. Colab is especially well suited to machine learning, data science, and education. Yes.

Can Chrome use GPU? ›

Your Google Chrome will now use the full potential of your graphics card.

How do I check my GPU on Google Cloud? ›

In the Google Cloud console, go to the Monitoring > Dashboards page. Select the Sample Library tab. In the filter_list Filter field, type NVIDIA. The NVIDIA GPU Monitoring Overview (GCE and GKE) dashboard displays.

Does Google use GPUs? ›

Many of Google's products are built and served on NVIDIA GPUs, and many of our customers are seeking out NVIDIA accelerated computing to power efficient development of LLMs to advance generative AI.”

Top Articles
Latest Posts
Article information

Author: Sen. Ignacio Ratke

Last Updated:

Views: 6837

Rating: 4.6 / 5 (76 voted)

Reviews: 83% of readers found this page helpful

Author information

Name: Sen. Ignacio Ratke

Birthday: 1999-05-27

Address: Apt. 171 8116 Bailey Via, Roberthaven, GA 58289

Phone: +2585395768220

Job: Lead Liaison

Hobby: Lockpicking, LARPing, Lego building, Lapidary, Macrame, Book restoration, Bodybuilding

Introduction: My name is Sen. Ignacio Ratke, I am a adventurous, zealous, outstanding, agreeable, precious, excited, gifted person who loves writing and wants to share my knowledge and understanding with you.