Machine learning at scale

Distributed ML with Dask and Kubernetes on GCP

In just 3 steps 😎

Deep Learning
Marketing
Data Science
Nicolas Grislain

At Sarus we build tools to work with privacy sensitive data. Our product is built around 3 ideas:

  • Remote execution of code, to keep sensitive data undisclosed.
  • Synthetic data generation, to give the user something to build with.
  • Differential privacy, to control privacy loss.

Hence, we spend a lot of time training machine learning models for synthetic data generation. Besides, we design tools for remote execution. Dask is somehow related to both. It is a great library for parallel computing in Python; but it is also a well designed piece of software and a great source of inspiration for seamless remote execution of code.

In this short tutorial I wanted to cover how to deploy a dask cluster on kubernetes on a public cloud such as GCP.

Step 1: Start a kubernetes cluster

On the GCP console, create a GKE cluster:

In our case, it will be a basic cluster named sarus-clusterin the europe-west1 region and in our current project: sarus-public.

Then you need to download a credentials file to access the cluster from your computer. To do so, simply type the following command in a terminal:

gcloud container clusters get-credentials sarus-cluster --region europe-west1 --project sarus-public

If you don’t have gcloud installed, just have a look at this doc, it should install easily on many platforms.

Your kubernetes cluster should now be running. To control the cluster you need to install the kubectl command line utility. This doc gives an overview of the tool and how to install it.

You can check the nodes of your cluster by running the following command:

# kubectl get nodes

Step 2: Deploy dask using helm

To deploy dask, we will use the helm package manager. To do so, simply install helm and type the following commands:

helm repo add dask https://helm.dask.org/
helm repo update
helm install sarus-dask dask/dask

If you are not happy with the default configuration of the dask helm chart. You can use a configuration file:

# dask_config.yaml
worker:
  replicas: 7
  resources:
    limits:
      cpu: 1
      memory: 2G
    requests:
      cpu: 0.5
      memory: 1G
  env:
    - name: EXTRA_PIP_PACKAGES
      value: pandas scikit-learn --upgrade

and run:

helm install sarus-dask dask/dask -f dask_config.yaml

You can check the deployment went well and that the various services provided by dask are running properly using:

# kubectl get deployment
NAME                   READY   UP-TO-DATE   AVAILABLE   AGE
sarus-dask-jupyter     1/1     1            1           5m21s
sarus-dask-scheduler   1/1     1            1           5m21s
sarus-dask-worker      3/3     3            3           5m21s
# kubectl get service
...

You can check the state of the cluster through a web UI, but, because the dask cluster does not expose any address to access it from outside GCP, you need to forward local HTTP requests to the cluster using the following command:

kubectl port-forward service/sarus-dask-scheduler 8001:80

You can then check http://localhost:8001/status and 🎉 you should see something like this:

Not so interesting for now 🤔.

Step 3: Connect to the cluster and voilà!

To connect to the cluster, make sure dask and dask_kubernetes python packages are installed locally, then launch a python interpreter and run:

from dask_kubernetes import HelmCluster
from dask.distributed import Client
cluster = HelmCluster(release_name="sarus-dask")
client = Client(cluster)

This connects you to the cluster. You are then ready to launch a simple job:

import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z.compute()

You can watch the workers getting busy in the web UI:

Now we are talking!

You can also manipulate panda-like dataframes:

from dask_kubernetes import HelmCluster
from dask.distributed import Client
cluster = HelmCluster(release_name="sarus-dask")
client = Client(cluster)

We can see the cluster store the data:

Dask can also be used to launch Machine Learning jobs in parallel. Here is an example of a grid search on the hyper-parameters of a Support Vector Classification:

from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd
X, y = make_classification(n_samples=1000, random_state=0)
param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}
grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           cv=3,
                           n_jobs=-1)
import joblib
with joblib.parallel_backend('dask'):
    grid_search.fit(X, y)

This demonstrate how powerful the cloud + k8s + dask can be. It also shows how mature the data-engineering ecosystem has become, since getting up and running is a matter of a few clicks and commands.

Of course there is a lot to do to make this ready for a production environment (from a security and permission management perspective at least), but kubernetes already does a lot, like automated resource provisioning and auto-scaling / self-healing.

In a later post we will show how to run real-life jobs on large datasets in GCS or S3 buckets. If you are interested in how to run remote jobs with strong privacy protection and super easy UX, contact us for a demo of Sarus.

The computation graph of the dataframe aggregation above.

About the author

Nicolas Grislain

Cofounder & CSO @ Sarus

Ready?

Ready to unlock the value of your data? We can set you up in no time.
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

Shell

Subscribe to our newsletter

You're on the list! Thank you for signing up.
Oops! Something went wrong while submitting the form.
128 rue La Boétie
75008 Paris — France
Resources
Blog
©2023 Sarus Technologies.
All rights reserved.