In search space, pagination always has to happen. Solr has the feature of basic paging. In basic paging, you simply specify start and rows parameters. start indicates where the returned results should start and rows specifies how many documents are returned. With basic paging, partial index exporting and migration is a problem. Since basic paging needs to sort all the results first before returning the desired subset, it needs large amount of memory if start is of high order. For instance, start=1000000 and rows=10 causes an inefficient memory allocation to happen due to the sorting of 1000010 documents. In a distributed environment, the case is worse because the engine has to fetch 1 million documents from each shard, sort them then return the result set.

Shared storage and HDFS (pros and cons)

Shared storage in a distributed compute cluster

Figure 1 - Shared storage in a distributed compute cluster.

HDFS was built to scale out using disks directly attached to commodity servers so as to form a basis for a distributed processing framework that exploits the data locality principle - moving code to data is much more cheaper than the other way around. Hence, incorporating a shared storage system, as opposed to directly attached storage (DAS), in order to host the HDFS is not straightforward as centralized storage is not the nature of the HDFS.


Moreover, shared storage systems have some potential limitations. As a shared service, it needs to efficiently handle multiple concurrent IO requests from the numerous clients of the compute cluster. Also, the network is the channel for communicating data from and to the shared storage, the thing that would consume much of the network’s bandwidth. While in a DAS model the network is the communication channel as well for sharing data, data local computations reduce network bandwidth consumption significantly. Moreover, comparing network bandwidth to the regular internal per-machine data busses, data busses are faster as, besides the underlying hardware technology, they don’t incur the overhead imposed by the networking stack which constitutes the network infrastructure and protocols (layered protocols headers, checksum computations …).


On the other hand, shared storage systems have unique attractive features that worth to be considered either while constructing a new distributed data processing cluster or when there is a need to exploit an existing infrastructure with a pre-installed shared storage system. One important feature is separating compute from storage resources. This implies independent administration and scalability of resources; it would be possible to scale only one resource type, either compute or store, without unnecessarily scaling the other. In addition to that, compute resources will be almost fully utilized in computations rather than taking the overhead of servicing shared data to other consumers. The thing that would ensure efficiency and high availability for data sharing.


Moreover, as a plus, shared storage systems provide application agnostic features releasing more overhead from compute resources while adding more strengths to the cluster. Those features like: security, backups and snapshots, efficient capacity utilization via compression, enhanced IO performance by leveraging the latest storage and network technologies, and failure handling and high availability. So, with shared storage system, the namenode is no more a single point of failure as the metadata could be backed up safely without the need for other namenodes designed as part of the Hadoop HA (High Availability) feature. Besides, data replication, as an inherent feature in shared storage systems, relieves block replication burden off HDFS daemons saving CPU, memory and network bandwidth as lesser block replications simplifies write pipelines and downsizes block pools.


How HDFS works with shared storage

For HDFS, we can categories shared storage systems into two main groups: HDFS aware and HDFS agnostic. HDFS aware storage systems are those that provide native support for HDFS sometimes to the extent that HDFS daemons are running inside the box of the storage system. In that case, HDFS is readily tuned for the shared storage system, moreover, the rest of the cluster resources become pure compute resources. Alternatively, HDFS agnostic storage systems don’t know about the existence of the HDFS hence their incorporation has much more involvement.


For an HDFS agnostic shared storage system, aside from the recommendations of the storage system provider, a typical deployment would be to allocate separate storage partitions for each host in the cluster and to distribute HDFS daemons such that one or two master hosts to have the namenode and the secondary namenode daemons and each of the rest of the slave hosts to have at least one datanode daemon. Given such configurations, let’s inspect the cost per read and write operations.

Data flow while Host 2 performs a write with replication factor of 2

Figure 2 - Data flow while Host 2 performs a write with replication factor of 2.

Figure 2 illustrates the data flow if a client on Host 2 is writing two replicas, one local on Host 2 and another remote on Host 1. The following are the paths for the data transmission:

  1. The client on Host 2 transmits the data to the colocal data node over kernel network.
  2. The data node on Host 2 transmits the data to the shared storage over network.
  3. The data node on Host 2 transmits the data to the data node on Host 1 over network.
  4. The data node on Host 1 transmits the data to the shared storage over network.


The same data is transmitted over network three times and is written on the same storage twice. Hence, write operations are very expensive in terms of network bandwidth and storage capacity consumption. It is meaningless to consider block replication on a shared storage especially if the shared storage system readily provide data replication.

Data flow while either hosts performs a read

Figure 3 - Data flow while either hosts performs a read.

In Figure 3, if a client on Host 1 is reading a block maintained by the colocal data node, the following are the possible data transmission paths:

  1. The data is transmitted over network from the shared storage to the data node on Host 1 (link C).
  2. The data node on Host 1 transmits the data to the colocal client (link A) over kernel network.


However, if a client on Host 2 reads a block only maintained by the remote data node on Host 1, possible paths are:

  1. The data is transmitted over network from the shared storage to the data node on Host 1 (link C).
  2. The data node on Host 1 transmits the data over network to the remote client on Host 2 (link B).


In a remote read, the same data is transmitted over network twice while only once in local reads. Hence, remote reads are expensive than local reads in terms of network bandwidth consumption. The following table summarizes the cost of data transmission in the different scenarios discussed above.


Operation Data Transmission Paths (Cost)
Network Kernel Network
Local write (one replica) 1 1
Remote write (one local replica and one remote replica) 3 1
Local read 1 1
Remote read 2 0


Increasing block replication, increases data locality and read performance however decreases the write performance and wastes storage. It would be great if there is a way to minimize the length of data paths by taking shortcuts.


Short-circuit reads

Short-circuiting local and remote reads from a shared storage

Figure 4 - Short-circuiting local and remote reads from a shared storage.

Short-circuit local reads


HDFS provides one shortcut for local reads. If the client is reading a file from a collocated datanode, the client can read the file directly from the disk bypassing the datanode. In figure 4, a client on Host 1 requests to read a file from the colocal datanode. Then, the datanode redirects the client to the required block in the filesystem and the client reads the file directly from the shared storage over network.


Short-circuit remote reads

By analogy from the short-circuit local read, a short-circuit remote read is defined when a client requests a read from a remote datanode, then the remote datanode provides direct access to the file for the client. This could be a valid scenario if only the data is stored on a shared storage system to which all hosts have direct access. However, HDFS is not aware whether it’s hosted on a shared storage or not. Hence, HDFS is not providing a shortcut for remote reads but it is still possible to hack around it.


HDFS client knows whether it’s colocal with a datanode by checking the datanode IP address against the list of network interfaces on the same host as the client. If the datanode IP is one of them, the client knows that the datanode is colocal. Then, there are two possible ways for the client to get the file: the legacy short-circuit read, or the new short-circuit read. In the legacy implementation, the client requests, through TCP, the absolute path of the block from the datanode. Then, the client opens the block file and reads the content directly after receiving the path info from the datanode. While in the new implementation, the client connects to the datanode through a Unix domain socket. Then, the datanode opens the file and bypasses the file descriptor via the Unix domain socket to the client. After that, the client can consume the file directly through the descriptor.


HDFS short-circuit read mainly involves the following three actions:

  1. Datanode locality check
  2. Interprocess communication through Unix domain socket (new implementation only)
  3. File direct access


Datanode locality check: It is possible to come around this check by adding, to each host, a dummy network interface, one for each remote datanode. Each dummy interface is assigned the same IP address as the corresponding remote datanode. It’s important to hide these interfaces from network so as to be able to reach the real datanodes.


Interprocess communication through Unix domain socket: A Unix domain socket works on the kernel level and connects parties on the same host only. Processes meet at local files but share data through memory. To the extent of our knowledge, it’s possible to forward a Unix domain socket between two hosts hence enabling two remote processes to communicate through the Unix domain socket. For example, OpenSSH can do Unix domain socket forwarding. Hadoop provides only one configuration parameter for setting the socket address (dfs.domain.socket.path) such that the datanode and the DFS client on the same node use the same socket address which makes it difficult to forward the DFS client connection to the appropriate datanode. So, it’s not a straightforward task to hack the short-circuit new implementation for short-circuiting remote reads.


File direct access: Clients can have direct access to files maintained by a remote datanode if the files are hosted in a shared file system to which all hosts have access. Each datanode should be assigned an independent data directory that is hosted on a shared file system and that have the same mount point on all hosts. For example, datanode 1 on Host 1 is assigned a data directory /hdfs/data1/ and datanode 2 on Host 2 is assigned a data directory /hdfs/data2/. Both directories are hosted by a shared file system (say NFS) and are mounted on each host at the mount points /hdfs/data1/ and /hdfs/data2/, respectively.


Fortunately, we can hack around the legacy implementation of short-circuit local reads as it does not involve any Unix domain communication. The following table summarizes the cost of data transmission in the different read scenarios after enabling short-circuit remote reads. Unfortunately, HDFS does not provide a shortcut for writes hence the write cost stays the same.


Operation Data Transmission Paths (Cost)
Network Kernel Network
Local read 1 0
Remote read 1 0


Now, local and remote reads have the same reduced cost. This is a substantial improvement to HDFS for reading from a shared storage.


Proof of concept

Experiment Setup

Short-circuit remote read experiment setup (SCRR is a shorthand for Short Circuit Remote Read)

Figure 5 - Short-circuit remote read experiment setup (SCRR is a shorthand for Short Circuit Remote Read).

As illustrated in figure 5, we build a small cluster of three machines with the following configurations. SCRR3 is the master node hosting an NFS server simulating the shared storage and exposing two locations (/home/host/hdfs1 and /home/host/hdfs2) as the storage directories for the cluster datanodes. It also hosts the cluster namenode and secondary namenode. SCRR2 and SCRR1 are both slave nodes each one hosting a datanode. The datanode on SCRR1 is configured with a storage directory that is hosted on the shared storage and mounted on /home/host/hdfs1. Also, the datanode on SCRR2 has a storage directory that is hosted on the shared storage however mounted on /home/host/hdfs2. Both storage directories are mounted on both nodes (SCRR1 and SCRR2) on the same mount points such that a datanode directory could be accessed on both nodes through the same path.


In our experiment, we assign each node a specific role. SCRR3 has the shared storage role, SCRR2 has the DFS client role and SCRR1 has the datanode role. We configure the HDFS cluster with minimum replication factor of one and we enable the legacy short circuit local read. We shutdown the datanode on SCRR2 and configure a dummy network interface with the same address of the node on SCRR1. We upload a file on the HDFS cluster (that would be kept by the datanode on SCRR1). We then run a simple DFS client on the node SCRR2 that reads the file byte by byte counting the number of bytes read. After the DFS client finishes we disable the short circuit remote read configuration by deleting the dummy network interface and then we re-run the same DFS client. We repeatedly run the same DFS client alternating between the two modes (short circuit enabled and short circuit disabled) while watching the network bandwidth consumption.



Network bandwidth consumption on each node.

Figure 6 - Network bandwidth consumption on each node.

Figure 6 shows the network bandwidth consumption on each node while performing the experiment. The short circuit is enabled in runs (B, D and F) and is disabled in runs (A, C, E and G). The following table summarizes the network bandwidth consumption on each node in terms of data transmission and reception rates as well as what data is being communicated. It is clear enough how short circuiting remote reads from shared storage reduces network bandwidth consumption significantly, not to mention the applications running time.


Run Label Datandoe DFS Client Shared Storage
Receive Transmit Receive Transmit Receive Transmit
A High
(data from shared storage)
(data to DFS client)
(data from datanode)
(acks …)
(acks …)
(data to datanode)
B Idle
(short circuit)
(short circuit)
(data from shared storage)
(acks …)
(acks …)
(data to DFS client)
C, E, G Low
(acks … data in OS cache)
(data to DFS client)
(data from datanode)
(acks …)
(data cached in datanode side)
(data cached in datanode side)
D, F Idle
(short circuit)
(short circuit)
(data in OS cache)
(data in OS cache)
(data cached in DFS client side)
(data cached in DFS client side)

Appendix: Dummy network interface configuration on Linux:

Assume a cluster of three slaves


Host IP Address MAC Address
Host1 00:00:00:00:00:01
Host2 00:00:00:00:00:02
Host3 00:00:00:00:00:03

  1. On each host, create a dummy network interface and assign it a suitable name, one for each remote datanode. For example, on Host1:
    sudo ip link add dummyDataNode2 type dummy
    sudo ip link add dummyDataNode3 type dummy
  2. On each host, assign the same address of the real datanode for each device. For example, on Host 3:
    sudo ifconfig dummyDataNode1
    sudo ifconfig dummyDataNode2
  3. Make sure that all machines have the MAC address of the real datanode. For example, on Host 1, assuming eth0 is the interface through which Host 1 is supposed to communicate with HDFS cluster network:
    sudo arp -s 00:00:00:00:00:02 -i eth0
    sudo arp -s 00:00:00:00:00:03 -i eth0
  4. Make sure that all machines have a route to the real datanodes through the correct interface. For example, on Host 2:
    sudo route add -host dev eth0
    sudo route add -host dev eth0
  5. On each host, hide the dummy interface from the network by disabling ARP for the dummy device and deleting all routing entries that involve the dummy device. Use both the route and ip-route commands to query for the routing entries involving the dummy device. For example, on Host 1:
Switch off ARP on the devices so as not send nor to respond to any ARP requests:
	sudo ifconfig dummyDataNode2 -arp
	sudo ifconfig dummyDataNode3 -arp

  Query routes with route command:

	Kernel IP routing table
	Destination  Gateway    Genmask         Flags Metric Ref    Use Iface
	default         UG    0      0        0 eth0      *   U     1      0        0 eth0      *   U     1      0        0 dummyDataNode2      *   U     1      0        0 dummyDataNode3

  Delete the routing entries associated with the dummy interfaces:
	sudo route del -net netmask dev dummyDataNode2
	sudo route del -net netmask dev dummyDataNode3

  Query routes with ip-route command for each dummy interface:
	ip route show table local dev dummyDataNode2

	broadcast  proto kernel  scope link  src 
	local  proto kernel  scope host  src
	broadcast  proto kernel  scope link  src
	ip route show table local dev dummyDataNode3

	broadcast  proto kernel  scope link  src
	local  proto kernel  scope host  src
	broadcast  proto kernel  scope link  src

  Delete the routing entries associated with the dummy interfaces:
	sudo ip route del table local dev dummyDataNode2 to local
	sudo ip route del table local dev dummyDataNode2 to broadcast
	sudo ip route del table local dev dummyDataNode2 to broadcast
	sudo ip route del table local dev dummyDataNode3 to local
	sudo ip route del table local dev dummyDataNode3 to broadcast
	sudo ip route del table local dev dummyDataNode3 to broadcast


Read More:

A RAM disk is a proportion of the RAM that is treated as if being a disk storage. That is, it could host a file system through which applications can deal with files in the same way as dealing with files on other disks, using the same file API, however in a much more performant manner.


RAM is the fastest storage medium accessible by applications when compared against other storage mediums (especially HDD and SSD). However, as a very expensive storage medium, RAM is a scarce memory resource that has a relatively limited capacity (measured in gigabytes while the capacity of other storage mediums are measured in terabytes). Moreover, it’s a volatile memory that loses data after a system restart, shutdown or other power loss scenarios. It’s important to keep these limitations in mind while thinking of a way to leverage the outstanding performance of the RAM disk.

  • Scarcity limits the amount of data stored, the thing that could bring a solution idea down. However, limits could be pushed further by paying more money for more capacity as RAMs are now in hundreds of gigabytes.

  • The volatility of RAM could bring a solution idea up or down; RAM does not fit for persistent data however it fits for volatile temporary data.

  • RAM performance is always a win for the solution. It’s the reason to why we think of RAM disks.

In the big data applications domain, we list two possible generic use cases:

  • Storing temporary and intermediate discardable data

  • Storing persistent data backed by persistent storage medium

Storing temporary and intermediate discardable data:

In the big data applications domain, most of the temporary and intermediate data is volatile in nature; it is nonpermanent (by definition) and discardable such that the existence of the data is tied with the existence of the application. As long as the application is running, this data is being generated and is being kept such that its existence is mandatory for the application to finish processing. Once the application is done processing, it becomes not necessary to keep this data and hence it becomes discardable.


Moreover, the discardability of the temp data, in the majority of the big data applications, is extended much more beyond that; it is not only discardable after the termination of the application but also during the lifetime of the application as well. That is, an application can regenerate this data (as a whole or in parts) if for some reason the data is lost. In general, this is a design principle for big data frameworks.


In a mapreduce framework, shuffle files are intermediate files that are written to disk and transferred through network. They are volatile in the same way described above. However, these data are comparable in size to the input data. Hence, in almost all the situations, it’s as big as the big input data unless the processing framework is doing something more with the data; something like writing these files compressed or partitioned either across time (i.e. processing one partition at a time) or across several machines. Writing these files to a RAM disk would definitely increase the shuffle performance especially when multiple jobs with different workloads are running.

 RAM Disk VS App and OS Buffers

It worth mentioning that there are at least two levels of buffering when making disk IOs: the application level buffers and the OS level buffers. Making the appropriate configurations for both levels, it’s possible to reach the same level of performance like that of the RAM disk. However, it’s not always possible to make the appropriate configurations; the application buffers capacity differs from an application to another and from a workload to another while the OS buffers are affected by other IO operations from the same or other applications. So, having an adaptable dedicated buffer for the temp volatile data would most often assure the required performance level.


Storing persistent data backed by persistent storage medium:

There are situations when computing reusable data is very expensive in terms of the consumption of computing resources. Moreover, such expensive data could be used frequently afterwards (for example, by other analytical tasks and low-latency jobs). Hence, persisting this data becomes mandatory to save computation resources and to assure real time performance of its consumers.


In order to devise a scheme for persisting such data, a number of factors should be considered. For how long would that data be persisted? How frequently would that data be consumed? Would the data be shared across different applications? If the data is required for a relatively long period, a persistent storage medium is obligatory. However, if the data is being consumed frequently, a reasonably fast storage medium is prefered. In addition to that, sharing data efficiently across applications is not straightforward. If there is a storage medium with which we can satisfy the three aspects of persistency, usage frequency and sharing, it would be perfect for that case. 

 RAM Disk Backed by Persistent Disk

All these aspects could be satisfied by leveraging different storage types in a tiered storage model. At the top tier, the RAM disk resides while being backed at lower tiers by persistent storage. RAM disk would assure high performance while the persistent storage would assure persistency. If it happens that data in the RAM disk is lost, with a suitable failover plan, data could be restored from the persistence storage. Also, sharing data among applications becomes a matter of reading and writing files which simplifies the implementation of applications. 


A simple implementation for the tiered storage model, is to save data into two locations: a copy in the RAM disk and another copy in the persistent disk. Then for a consumer application, it could first try reading the data from the RAM disk. If it’s not found, the consumer application could read it from the persistent disk while at the same time rewriting it to the RAM disk for consequent usages.


It worth mentioning that Tachyon is an in-memory distributed file system that is backed by HDFS. Also, by the time of writing this article, HDFS is being developed in phases in order to support heterogeneous storage types (like RAM disk, regular disk, SSD …) and different data storage policies (like hot data, warm data, cold data …). Both features coupled allow the implementation of the tiered storage system we seek.


Read More:

Running Spark On YARN

March 29th 2015, 10:30 amCategory: Big Data 0 comments

When it comes to running different distributed applications besides Spark, running Spark in standalone mode (with the embedded cluster manager) is not a good choice for better cluster resources utilization. It would be better, in terms of resources scheduling and utilization, if there is a single cluster manager that has a global view of what is running and want to run on the cluster.


Without that single cluster manager, there are two main approaches for resources sharing and allocation:

  1. Availing all cluster resources to all types of applications in the same time. However, that would lead to a great unfairly managed contention on resources.

  2. Dividing the pool of resources into smaller pools; a pool for each application type. However, that would lead to inefficient utilization of resources as some applications might require more resources than allocated in the corresponding pool while in the same time less resources are sufficient for other applications. Hence, a dynamic way of resources allocation leads to better resources utilization.


There are different cluster managers that can do the job and overcome the issues highlighted above. Choosing one depends on the types of applications being run on the cluster as they all should speak the language of the manager. One of the cluster managers, that Spark applications can run on, is Apache YARN. The design of Apache YARN allows different YARN applications to coexist on the same cluster, so a Spark application can run at the same time as other types of applications (like Hadoop MapReduce jobs) which brings great benefits for manageability and cluster utilization.


In this post we will illustrate what are the benefits of running Spark on YARN, how to run Spark on YARN and we will mention important notes to take care of when running Spark on YARN.


First we will try to understand the architecture of both Spark and YARN.

Spark Architecture:

Spark Architecture consists of Driver Program , Executors and Cluster Manager.


Driver Program:The driver program is responsible for managing the job flow and scheduling tasks that will run on the executors.

Executors: Executors are processes that run computation and store data for a Spark application.

Cluster Manager:Cluster Manager is responsible for starting executor processes and where and when they will be run. Spark supports pluggable cluster manager, it supports (YARN, Mesos, and its own “standalone” cluster manager)


YARN Architecture:

YARN Architecture consists of Resource Manager, Node Manager, Application Master and Container.


Resource Manager: manages the use of resources across the cluster.

Node Manager: launches and monitors containers on cluster machines.

Application Master: manages the lifecycle of an application running on the cluster.

Container: It represents a collection of physical resources (CPU cores + memory) on a single node at a cluster. Those resources are allocated for the use of a worker slave.


Spark on YARN:

When running Spark on YARN each Spark executor runs as YARN container. Spark supports two modes for running on YARN, yarn-cluster mode and yarn-client mode.


YARN-Client Mode:

  • In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
  • yarn-client mode makes sense for interactive and debugging uses where you want to see your application’s output immediately (on the client process side).

YARN-Cluster Mode:

  • In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application.
  • yarn-cluster mode makes sense for production jobs.

Why Run on YARN?

Running Spark on YARN has some benefits:

  • YARN allows to dynamically share the cluster resources between different frameworks that run on YARN. For example, you can run a mapreduce job after that you can run a Spark job without any changes in YARN configurations.
  • You can use YARN schedulers for categorizing, isolating, and prioritizing workloads.
  • YARN is the only cluster manager for Spark that supports security. With YARN, Spark can use secure authentication between its processes.

How to Run on YARN

We used cloudera manager to install Spark and YARN. There wasn’t any special configuration to get Spark just run on YARN, we just changed Spark’s master address to yarn-client or yarn-cluster.


We want to mention some important issues that we have met during running Spark on YARN:

  • Spark copies the Spark assembly JAR file to HDFS each time you run spark-submit. You can avoid doing this copy each time by manually uploading the Spark assembly JAR file to your HDFS. Then, set the SPARK_JAR environment variable to this HDFS path
    hdfs dfs -mkdir -p /user/spark/share/lib
    hdfs dfs -put $SPARK_HOME/assembly/lib/spark-assembly_*.jar  \     
    export SPARK_JAR=hdfs://<nn>:<port>/user/spark/share/lib/spark-assembly.jar

  • Important configuration during submitting the job:
    --executor-cores NUM       Number of cores per executor (Default: 1)
    --num-executors NUM        Number of executors to launch (Default: 2)
    --executor-memory NUM      Amount of memory to use per executor process.
    --driver-memory NUM        Amount of memory to use for the driver process.

  • We noticed that YARN uses more memory than we set for each executor, after searching we discovered that YARN uses:
    • executor memory + spark.yarn.executor.memoryOverhead for the executor.
    • driver memory + spark.yarn.driver.memoryOverhead for the driver.
    • We found that this memory overhead is the amount of off-heap memory (in megabytes) to be allocated per executor or driver. This is the memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

  • The local directories, used by Spark executors in saving map output files and RDDs that are stored on disk, will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.

  • Sharing application files (e.g. jar) with executors:
    • In yarn-client mode and Spark Standalone mode a link to the jar at the client machine is created and all executors receive this link to download the jar.
    • In yarn-cluster mode, the jar is uploaded to hdfs before running the job and all executors download the jar from hdfs, so it takes some time at the beginning to upload the jar.

Comparison Between Spark on YARN and Standalone mode:

Spark On YARN Spark Standalone
yarn-cluster yarn-client
Driver runs in Application Master Client Client
Who requests resources Application Master Client
Who starts executor processes YARN NodeManager Spark Worker (Slave)
Support for Sparkshell No Yes Yes
Sharing jar with executors uploads jar to hdfs creates link for the jar on the client creates link for the jar on the client
Share cluster resources among different frameworks Yes No


Running SparkPi in Standalone Mode

spark-submit \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10

Running SparkPi in YARN Client Mode

spark-submit \
--master yarn-client \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10

Running SparkPi in YARN Cluster Mode

spark-submit \
--master yarn-cluster \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10


In this post , we will show the configuration needed by hadoop to start the namenode sucessfully. Usually we format the namenode before starting hadoop, but a common problem is that the namenode formatting files are written (by default configuration) into tmp directory that is deleted by the operating system every time it starts. So we show the steps to change this defaul behaviour 


  1. in hdfs-site.xml , put the following property 
        <value>/<hadoop installation path>/hadoop-1.2.1/name/data</value>
    where this is the path to write namenode metadata , run
  2. ~$
  3. Change directory permission to give user full access (First digit is 7, u)
    ~$sudo chmod 750 /<hadoop installation path>/hadoop-1.2.1/name/data  
  4. format the name node
    ~$ hadoop namenode -format
    The output should be 
    15/03/25 12:27:06 INFO namenode.NameNode: STARTUP_MSG: 
    STARTUP_MSG: Starting NameNode
    STARTUP_MSG:   host = baddar-pc/
    STARTUP_MSG:   args = [-format]
    STARTUP_MSG:   version = 1.2.1
    STARTUP_MSG:   build = -r 1503152; compiled by 'mattf' on Mon Jul 22 15:23:09 PDT 2013
    STARTUP_MSG:   java = 1.8.0_40
    Re-format filesystem in /home/baddar/hadoop-1.2.1/name/data ? (Y or N) Y
    15/03/25 12:27:11 INFO util.GSet: Computing capacity for map BlocksMap
    15/03/25 12:27:11 INFO util.GSet: VM type       = 64-bit
    15/03/25 12:27:11 INFO util.GSet: 2.0% max memory = 932184064
    15/03/25 12:27:11 INFO util.GSet: capacity      = 2^21 = 2097152 entries
    15/03/25 12:27:11 INFO util.GSet: recommended=2097152, actual=2097152
    15/03/25 12:27:11 INFO namenode.FSNamesystem: fsOwner=baddar
    15/03/25 12:27:11 INFO namenode.FSNamesystem: supergroup=supergroup
    15/03/25 12:27:11 INFO namenode.FSNamesystem: isPermissionEnabled=true
    15/03/25 12:27:11 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
    15/03/25 12:27:11 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
    15/03/25 12:27:11 INFO namenode.FSEditLog: dfs.namenode.edits.toleration.length = 0
    15/03/25 12:27:11 INFO namenode.NameNode: Caching file names occuring more than 10 times 
    15/03/25 12:27:11 INFO common.Storage: Image file /home/baddar/hadoop-1.2.1/name/data/current/fsimage of size 112 bytes saved in 0 seconds.
    15/03/25 12:27:11 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/home/baddar/hadoop-1.2.1/name/data/current/edits
    15/03/25 12:27:11 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/home/baddar/hadoop-1.2.1/name/data/current/edits
    15/03/25 12:27:11 INFO common.Storage: Storage directory /home/baddar/hadoop-1.2.1/name/data has been successfully formatted.
    15/03/25 12:27:11 INFO namenode.NameNode: SHUTDOWN_MSG: 
    SHUTDOWN_MSG: Shutting down NameNode at baddar-pc/

    Note that name metadata are written to the specified path
  5. make sure namenode metadata is written to the path (list all recursive)
    $ ls -aR /home/baddar/hadoop-1.2.1/name/data/
    .  ..  current  image  in_use.lock  previous.checkpoint

    .  ..  edits  fsimage  fstime  VERSION

    .  ..  fsimage

    .  ..  edits  fsimage  fstime  VERSION
  6. start all hadoop daemons 
    the output should be 
    starting namenode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-namenode-baddar-pc.out
    localhost: starting datanode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-datanode-baddar-pc.out
    localhost: starting secondarynamenode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-secondarynamenode-baddar-pc.out
    starting jobtracker, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-jobtracker-baddar-pc.out
    localhost: starting tasktracker, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-tasktracker-baddar-pc.out
  7. make sure that all daemons are started 
    ~$ jps

    23678 TaskTracker
    23060 NameNode
    23406 SecondaryNameNode
    23978 Jps
    23504 JobTracker