Skip to content

GetStarted_YARN

leewyang edited this page May 9, 2017 · 30 revisions

Getting Started TensorFlowOnSpark on Hadoop Cluster

Before you start, you should already be familiar with TensorFlow and have access to a Hadoop grid with Spark installed. If your grid has GPU nodes, they must have cuda installed locally.

Install Python 2.7

From your grid gateway, download/install Python into a local folder. This installation of Python will be distributed to the Spark executors, so that any custom dependencies, including TensorFlow, will be available to the executors.

# download and extract Python 2.7
export PYTHON_ROOT=~/Python
curl -O https://www.python.org/ftp/python/2.7.12/Python-2.7.12.tgz
tar -xvf Python-2.7.12.tgz
rm Python-2.7.12.tgz

# compile into local PYTHON_ROOT
pushd Python-2.7.12
./configure --prefix="${PYTHON_ROOT}" --enable-unicode=ucs4
make
make install
popd
rm -rf Python-2.7.12

# install pip
pushd "${PYTHON_ROOT}"
curl -O https://bootstrap.pypa.io/get-pip.py
bin/python get-pip.py
rm get-pip.py

# install tensorflow (and any custom dependencies)
${PYTHON_ROOT}/bin/pip install pydoop
# Note: add any extra dependencies here
popd

Install TensorFlow (w/o RDMA Support)

If you do not need RDMA support, you can just use ${PYTHON_ROOT}/bin/pip install tensorflow to install the current version of the public TensorFlow package into your Python distribution. If you need a specific version (e.g. Python 2.7 vs. 3.x, CPU vs. GPU, etc), you can follow these instructions from the TensorFlow site.

Install and compile TensorFlow w/ RDMA Support

If you have an environment which supports RDMA, and you wish to take advantage of that, you will need to download our modified version of TensorFlow and compile it as follows:

git clone [email protected]:yahoo/tensorflow.git
# For TensorFlow 0.12 w/ RDMA, checkout the 'yahoo' branch
# For TensorFlow 1.0 w/ RDMA, checkout the 'jun_r1.0' branch
# follow build instructions to install into ${PYTHON_ROOT}

Note: RDMA support has now been merged into TensorFlow/contrib.

Install and compile Hadoop InputFormat/OutputFormat for TFRecords

git clone https://github.com/tensorflow/ecosystem.git
# follow build instructions to generate tensorflow-hadoop-1.0-SNAPSHOT.jar
# copy jar to HDFS for easier reference
hadoop fs -put tensorflow-hadoop-1.0-SNAPSHOT.jar

Create a Python w/ TensorFlow zip package for Spark

pushd "${PYTHON_ROOT}"
zip -r Python.zip *
popd

# copy this Python distribution into HDFS
hadoop fs -put ${PYTHON_ROOT}/Python.zip

Install TensorFlowOnSpark

Next, clone this repo and build a zip package for Spark:

git clone [email protected]:yahoo/TensorFlowOnSpark.git
pushd TensorFlowOnSpark/src
zip -r ../tfspark.zip *
popd

Run MNIST example

Download/zip the MNIST dataset

mkdir ${HOME}/mnist
pushd ${HOME}/mnist >/dev/null
curl -O "http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz"
curl -O "http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz"
curl -O "http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz"
curl -O "http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz"
zip -r mnist.zip *
popd >/dev/null

Convert the MNIST zip files into HDFS files

# set environment variables (if not already done)
export PYTHON_ROOT=~/Python
export LD_LIBRARY_PATH=${PATH}
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"
export PATH=${PYTHON_ROOT}/bin/:$PATH
export QUEUE=gpu

# for CPU mode:
# export QUEUE=default
# remove --conf spark.executorEnv.LD_LIBRARY_PATH \
# remove --driver-library-path \

# save images and labels as CSV files
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 4G \
--archives hdfs:///user/${USER}/Python.zip#Python,mnist/mnist.zip#mnist \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
--output mnist/csv \
--format csv

# save images and labels as TFRecords
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 4G \
--archives hdfs:///user/${USER}/Python.zip#Python,mnist/mnist.zip#mnist \
--jars hdfs:///user/${USER}/tensorflow-hadoop-1.0-SNAPSHOT.jar \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
--output mnist/tfr \
--format tfr

Run distributed MNIST training (using feed_dict)

# for CPU mode:
# export QUEUE=default
# set --conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server" \
# remove --driver-library-path \

# for CDH (per @wangyum)
# set "--conf spark.executorEnv.LD_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib64:$JAVA_HOME/jre/lib/amd64/server"

# hadoop fs -rm -r mnist_model
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64:$JAVA_HOME/jre/lib/amd64/server" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/train/images \
--labels mnist/csv/train/labels \
--mode train \
--model mnist_model
# to use infiniband, add --rdma

Run distributed MNIST inference (using feed_dict)

${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64:$JAVA_HOME/jre/lib/amd64/server" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/test/images \
--labels mnist/csv/test/labels \
--mode inference \
--model mnist_model \
--output predictions

Run distributed MNIST training (using QueueRunners)

# for CPU mode:
# export QUEUE=default
# set --conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server" \
# remove --driver-library-path \

# for CDH (per @wangyum)
# set "--conf spark.executorEnv.LD_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib64:$JAVA_HOME/jre/lib/amd64/server"

# hadoop fs -rm -r mnist_model
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,tensorflow/examples/mnist/tf/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64:$JAVA_HOME/jre/lib/amd64/server" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/tf/mnist_spark.py \
--images mnist/tfr/train \
--format tfr \
--mode train \
--model mnist_model
# to use infiniband, replace the last line with --model mnist_model --rdma

Run distributed MNIST inference (using QueueRunners)

# hadoop fs -rm -r predictions
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/tf/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64:$JAVA_HOME/jre/lib/amd64/server" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/tf/mnist_spark.py \
--images mnist/tfr/test \
--mode inference \
--model mnist_model \
--output predictions

Run TensorFlowOnSpark using Spark Streaming

Spark also includes a streaming mode, which allows you feed data to your Spark applications in an online/streaming mode vs. reading a static list of files from disk. In this mode, Spark watches a location on disk (or listens on a network port) for new data to arrive and batches the incoming data into a sequence of RDDs for your application.

Convert the MNIST zip files to image-label records

Since streaming data arrives over time, we need to produce a version of the data that encodes the images with the labels. For simplicity, we use a simple concatenation of the label with the image CSV, joined by a pipe '|' character.

# set environment variables (if not already done)
export PYTHON_ROOT=~/Python
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"
export PATH=${PYTHON_ROOT}/bin/:$PATH
export QUEUE=gpu

# for CPU mode:
# export QUEUE=default
# remove --conf spark.executorEnv.LD_LIBRARY_PATH
# remove --driver-library-path

# hadoop fs -rm -r mnist/csv2
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 4G \
--archives hdfs:///user/${USER}/Python.zip#Python,mnist/mnist.zip#mnist \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
--output mnist/csv2 \
--format csv2

Run distributed MNIST training (using Spark Streaming)

# for CPU mode:
# export QUEUE=default
# set --conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server"
# remove --driver-library-path

# create a folder for new streaming data to arrive
hadoop fs -mkdir stream_data

# hadoop fs -rm -r mnist_model stream_data/*
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/streaming/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.streaming.stopGracefullyOnShutdown=true \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64:$JAVA_HOME/jre/lib/amd64/server" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/streaming/mnist_spark.py \
--images stream_data \
--format csv2 \
--mode train \
--model mnist_model

# make a temp copy of the data, so we can atomically move them into the stream_data input dir
hadoop fs -mkdir temp stream_data
hadoop fs -cp mnist/csv2/train/* temp

# drop data into the stream (monitor spark logs after each command to view behavior)
hadoop fs -mv temp/part-00000 stream_data
hadoop fs -mv temp/part-00001 stream_data
hadoop fs -mv temp/part-0000[2-9] stream_data

# shutdown job, since this normally runs forever, waiting for new data to arrive
# the host and port of the reservation server will be in the driver logs, e.g.
# "listening for reservations at ('gpbl191n01.blue.ygrid.yahoo.com', 38254)"
${PYTHON_ROOT}/bin/python TensorFlowOnSpark/com/yahoo/ml/tf/reservation_client.py <host> <port>

Run distributed MNIST inference (using Spark Streaming)

# for CPU mode:
# export QUEUE=default
# set --conf spark.executorEnv.LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server"
# remove --driver-library-path

# hadoop fs -rm -r -skipTrash predictions/* stream_data/*
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/streaming/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.streaming.stopGracefullyOnShutdown=true \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/cuda-7.5/lib64:$JAVA_HOME/jre/lib/amd64/server" \
--driver-library-path="/usr/local/cuda-7.5/lib64" \
TensorFlowOnSpark/examples/mnist/streaming/mnist_spark.py \
--images stream_data \
--format csv2 \
--mode inference \
--model mnist_model \
--output predictions/batch

# make a temp copy of the data, so we can atomically move them into the stream_data input dir
hadoop fs -mkdir temp stream_data
hadoop fs -cp mnist/csv2/test/* temp

# drop data into the stream (monitor spark logs after each command to view behavior)
hadoop fs -mv temp/part-00000 stream_data
hadoop fs -mv temp/part-00001 stream_data
hadoop fs -mv temp/part-0000[2-9] stream_data

# shutdown job, since this normally runs forever, waiting for new data to arrive
# Note: the host and port of the reservation server will be in the driver logs, e.g.
# "listening for reservations at ('<host>', <port>)"
${PYTHON_ROOT}/bin/python TensorFlowOnSpark/src/com/yahoo/ml/tf/reservation_client.py <host> <port>
Clone this wiki locally