Running Spark On YARN

March 29th 2015, 10:30 amCategory: Big Data 0 comments

When it comes to running different distributed applications besides Spark, running Spark in standalone mode (with the embedded cluster manager) is not a good choice for better cluster resources utilization. It would be better, in terms of resources scheduling and utilization, if there is a single cluster manager that has a global view of what is running and want to run on the cluster.

 

Without that single cluster manager, there are two main approaches for resources sharing and allocation:

 
  1. Availing all cluster resources to all types of applications in the same time. However, that would lead to a great unfairly managed contention on resources.

  2. Dividing the pool of resources into smaller pools; a pool for each application type. However, that would lead to inefficient utilization of resources as some applications might require more resources than allocated in the corresponding pool while in the same time less resources are sufficient for other applications. Hence, a dynamic way of resources allocation leads to better resources utilization.

 

There are different cluster managers that can do the job and overcome the issues highlighted above. Choosing one depends on the types of applications being run on the cluster as they all should speak the language of the manager. One of the cluster managers, that Spark applications can run on, is Apache YARN. The design of Apache YARN allows different YARN applications to coexist on the same cluster, so a Spark application can run at the same time as other types of applications (like Hadoop MapReduce jobs) which brings great benefits for manageability and cluster utilization.

 

In this post we will illustrate what are the benefits of running Spark on YARN, how to run Spark on YARN and we will mention important notes to take care of when running Spark on YARN.

 

First we will try to understand the architecture of both Spark and YARN.

Spark Architecture:

Spark Architecture consists of Driver Program , Executors and Cluster Manager.

 
 

Driver Program:The driver program is responsible for managing the job flow and scheduling tasks that will run on the executors.

Executors: Executors are processes that run computation and store data for a Spark application.

Cluster Manager:Cluster Manager is responsible for starting executor processes and where and when they will be run. Spark supports pluggable cluster manager, it supports (YARN, Mesos, and its own “standalone” cluster manager)

 

YARN Architecture:

YARN Architecture consists of Resource Manager, Node Manager, Application Master and Container.

 
 

Resource Manager: manages the use of resources across the cluster.

Node Manager: launches and monitors containers on cluster machines.

Application Master: manages the lifecycle of an application running on the cluster.

Container: It represents a collection of physical resources (CPU cores + memory) on a single node at a cluster. Those resources are allocated for the use of a worker slave.

 

Spark on YARN:

When running Spark on YARN each Spark executor runs as YARN container. Spark supports two modes for running on YARN, yarn-cluster mode and yarn-client mode.

 

YARN-Client Mode:

  • In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
  • yarn-client mode makes sense for interactive and debugging uses where you want to see your application’s output immediately (on the client process side).
 
 

YARN-Cluster Mode:

  • In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application.
  • yarn-cluster mode makes sense for production jobs.
 
 

Why Run on YARN?

Running Spark on YARN has some benefits:

  • YARN allows to dynamically share the cluster resources between different frameworks that run on YARN. For example, you can run a mapreduce job after that you can run a Spark job without any changes in YARN configurations.
  • You can use YARN schedulers for categorizing, isolating, and prioritizing workloads.
  • YARN is the only cluster manager for Spark that supports security. With YARN, Spark can use secure authentication between its processes.

How to Run on YARN

We used cloudera manager to install Spark and YARN. There wasn’t any special configuration to get Spark just run on YARN, we just changed Spark’s master address to yarn-client or yarn-cluster.

 

We want to mention some important issues that we have met during running Spark on YARN:

  • Spark copies the Spark assembly JAR file to HDFS each time you run spark-submit. You can avoid doing this copy each time by manually uploading the Spark assembly JAR file to your HDFS. Then, set the SPARK_JAR environment variable to this HDFS path
     
    hdfs dfs -mkdir -p /user/spark/share/lib
    hdfs dfs -put $SPARK_HOME/assembly/lib/spark-assembly_*.jar  \     
    /user/spark/share/lib/spark-assembly.jar
    export SPARK_JAR=hdfs://<nn>:<port>/user/spark/share/lib/spark-assembly.jar
    

  • Important configuration during submitting the job:
    --executor-cores NUM       Number of cores per executor (Default: 1)
    --num-executors NUM        Number of executors to launch (Default: 2)
    --executor-memory NUM      Amount of memory to use per executor process.
    --driver-memory NUM        Amount of memory to use for the driver process.
    

  • We noticed that YARN uses more memory than we set for each executor, after searching we discovered that YARN uses:
    • executor memory + spark.yarn.executor.memoryOverhead for the executor.
    • driver memory + spark.yarn.driver.memoryOverhead for the driver.
    • We found that this memory overhead is the amount of off-heap memory (in megabytes) to be allocated per executor or driver. This is the memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

  • The local directories, used by Spark executors in saving map output files and RDDs that are stored on disk, will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.

  • Sharing application files (e.g. jar) with executors:
    • In yarn-client mode and Spark Standalone mode a link to the jar at the client machine is created and all executors receive this link to download the jar.
    • In yarn-cluster mode, the jar is uploaded to hdfs before running the job and all executors download the jar from hdfs, so it takes some time at the beginning to upload the jar.

Comparison Between Spark on YARN and Standalone mode:

Spark On YARN Spark Standalone
yarn-cluster yarn-client
Driver runs in Application Master Client Client
Who requests resources Application Master Client
Who starts executor processes YARN NodeManager Spark Worker (Slave)
Support for Sparkshell No Yes Yes
Sharing jar with executors uploads jar to hdfs creates link for the jar on the client creates link for the jar on the client
Share cluster resources among different frameworks Yes No
 

Example:

Running SparkPi in Standalone Mode

spark-submit \
--master spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10

Running SparkPi in YARN Client Mode

spark-submit \
--master yarn-client \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10

Running SparkPi in YARN Cluster Mode

spark-submit \
--master yarn-cluster \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
 

References:


In this post , we will show the configuration needed by hadoop to start the namenode sucessfully. Usually we format the namenode before starting hadoop, but a common problem is that the namenode formatting files are written (by default configuration) into tmp directory that is deleted by the operating system every time it starts. So we show the steps to change this defaul behaviour 

Steps 

  1. in hdfs-site.xml , put the following property 
    <property>
        <name>dfs.name.dir</name>
        <value>/<hadoop installation path>/hadoop-1.2.1/name/data</value>
    </property>
    where this is the path to write namenode metadata , run
  2. ~$ stop-all.sh
  3. Change directory permission to give user full access (First digit is 7, u)
    ~$sudo chmod 750 /<hadoop installation path>/hadoop-1.2.1/name/data  
  4. format the name node
    ~$ hadoop namenode -format
    The output should be 
    15/03/25 12:27:06 INFO namenode.NameNode: STARTUP_MSG: 
    /************************************************************
    STARTUP_MSG: Starting NameNode
    STARTUP_MSG:   host = baddar-pc/127.0.1.1
    STARTUP_MSG:   args = [-format]
    STARTUP_MSG:   version = 1.2.1
    STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1503152; compiled by 'mattf' on Mon Jul 22 15:23:09 PDT 2013
    STARTUP_MSG:   java = 1.8.0_40
    ************************************************************/
    Re-format filesystem in /home/baddar/hadoop-1.2.1/name/data ? (Y or N) Y
    15/03/25 12:27:11 INFO util.GSet: Computing capacity for map BlocksMap
    15/03/25 12:27:11 INFO util.GSet: VM type       = 64-bit
    15/03/25 12:27:11 INFO util.GSet: 2.0% max memory = 932184064
    15/03/25 12:27:11 INFO util.GSet: capacity      = 2^21 = 2097152 entries
    15/03/25 12:27:11 INFO util.GSet: recommended=2097152, actual=2097152
    15/03/25 12:27:11 INFO namenode.FSNamesystem: fsOwner=baddar
    15/03/25 12:27:11 INFO namenode.FSNamesystem: supergroup=supergroup
    15/03/25 12:27:11 INFO namenode.FSNamesystem: isPermissionEnabled=true
    15/03/25 12:27:11 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
    15/03/25 12:27:11 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
    15/03/25 12:27:11 INFO namenode.FSEditLog: dfs.namenode.edits.toleration.length = 0
    15/03/25 12:27:11 INFO namenode.NameNode: Caching file names occuring more than 10 times 
    15/03/25 12:27:11 INFO common.Storage: Image file /home/baddar/hadoop-1.2.1/name/data/current/fsimage of size 112 bytes saved in 0 seconds.
    15/03/25 12:27:11 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/home/baddar/hadoop-1.2.1/name/data/current/edits
    15/03/25 12:27:11 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/home/baddar/hadoop-1.2.1/name/data/current/edits
    15/03/25 12:27:11 INFO common.Storage: Storage directory /home/baddar/hadoop-1.2.1/name/data has been successfully formatted.
    15/03/25 12:27:11 INFO namenode.NameNode: SHUTDOWN_MSG: 
    /************************************************************
    SHUTDOWN_MSG: Shutting down NameNode at baddar-pc/127.0.1.1
    ************************************************************/

    Note that name metadata are written to the specified path
  5. make sure namenode metadata is written to the path (list all recursive)
    $ ls -aR /home/baddar/hadoop-1.2.1/name/data/
    /home/baddar/hadoop-1.2.1/name/data/:
    .  ..  current  image  in_use.lock  previous.checkpoint

    /home/baddar/hadoop-1.2.1/name/data/current:
    .  ..  edits  fsimage  fstime  VERSION

    /home/baddar/hadoop-1.2.1/name/data/image:
    .  ..  fsimage

    /home/baddar/hadoop-1.2.1/name/data/previous.checkpoint:
    .  ..  edits  fsimage  fstime  VERSION
  6. start all hadoop daemons 
    ~$ start-all.sh 
    the output should be 
    starting namenode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-namenode-baddar-pc.out
    localhost: starting datanode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-datanode-baddar-pc.out
    localhost: starting secondarynamenode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-secondarynamenode-baddar-pc.out
    starting jobtracker, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-jobtracker-baddar-pc.out
    localhost: starting tasktracker, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-tasktracker-baddar-pc.out
  7. make sure that all daemons are started 
    ~$ jps

    23678 TaskTracker
    23060 NameNode
    23406 SecondaryNameNode
    23978 Jps
    23504 JobTracker

Testing application over android environment is costly because more devices  with diffrent specs have to be tested with,
so it is costly and time consuming so indeed the best solution is to test over default Android emulator, but the first question in our minds is what we should do about emulator poor performance?