Wednesday, 11 October 2017

distcp: Copying Data Between Secured And Non-Secured Cluster

Security settings dictate whether DistCp should be run on the source cluster or the destination cluster. The general rule-of-thumb is that if one cluster is secure and the other is not secure, DistCp should be run from the secure cluster -- otherwise there may be security- related issues. When data is copied from secured cluster to non secured cluster, distcp may throw exception with following error

------- Failed on local exception: Server asks us to fall back to SIMPLE auth, but this client is configured to only allow secure connections.; Host Details : local host is: xxx; destination host is: "yyy":8020;

if you encounter this situation, run distcp with ipc.client.fallback-to-simple-auth-allowed=true configuration.

hadoop distcp -D ipc.client.fallback-to-simple-auth-allowed=true hdfs://xxx:8020/src_path hdfs://yyy:8020/target_path

Depending upon cluster setup, above command can be failed with following error:

------- of FileException between local host is yyy; destination host is:xxx;

which means target cluster is blocked for RPC communication, in such cases, webhdfs protocol can be used, so above distcp can be rewritten as

hadoop distcp -D ipc.client.fallback-to-simple-auth-allowed=true hdfs://xxx:8020/src_path webhdfs://yyy:50070/target_path

Wednesday, 17 August 2016

Guide to plan your cluster memory in MapReduce2/ Yarn

Planning memory allocation is very important in MR-2. In this post, I will explain how to plan memory resource MR-2.

Scenario: Let us image that, we have a four node Hadoop cluster.  One master and three datanodes. Each datanode has following configuration:

Memory: 48GB.
vCPU: 12.
Disk: 12.

We need to plan the number of map and reduce tasks that will be running in the cluster and in each datanode.

Understanding a Container: Container can be considered as a block or unit, that has resources like CPU,memory etc:

YARN manages all the available resources on each machine in the cluster. Based on the total available resources, YARN will allocate resource requests for applications such as MapReduce running in the cluster. YARN then provides processing capacity to each application ( map or reduce) by allocating

Core and Disk: General thumb rule is to allocate 1-2 Containers per disk and per core gives the best balance for cluster utilization. So with our example cluster each datanode with 12 disks and 12 cores, we will allow for 20 maximum containers to be allocated to each node. We will give rest to OS.

Memory: In our case, we have 3 datanode hadoop cluster each of 48GB. So as a pool if we consider, we have 144GB available in total.

(Memory in each node) x (Number of datanodes)= Total available memory.

Now, we cannot give all availabe resource to Yarn. Some memory has to be given to OS of each server. The thumb rule is allocate 8GB to OS. Remaining we have 40GB in each node. So, our calculation will be:

(Memory in each node - 8GB for OS) x (Number of datanodes)= Total memory availabe for Yarn


The following property sets the maximum memory YARN can utilize on the node:

In yarn-site.xml:


The above property means that 40GB is available for Yarn on each node.

Next step is to say to Yarn, how to break up the total resources available into Containers. You do this by specifying the minimum unit of RAM to allocate for a Container. According to our calculation we need to run a maximum of 20 Containers in a node, and thus need (40 GB total RAM) / (20 of Containers) = 2 GB minimum per container:

In yarn-site.xml:


YARN will allocate Containers with RAM amounts greater than the yarn.scheduler.minimum-allocation-mb.

Configure MapReduce 2:

When configuring MapReduce2 resource utilization, there are three aspects to consider:

1.Physical RAM limit for each Map and Reduce task
2.The JVM heap size limit for each task
3.The amount of virtual memory each task will get

We can define how much maximum memory each Map and Reduce task will utilize. Since each Map and each Reduce will run in a separate Container, these maximum memory settings should be at least equal to or more than the YARN minimum Container allocation.

For our example cluster, we have the minimum RAM for a Container (yarn.scheduler.minimum-allocation-mb) = 2 GB. We’ll thus assign 4 GB for Map task Containers, and 8 GB for Reduce tasks Containers.

In mapred-site.xml:


Each Container will run JVMs for the Map and Reduce tasks. The JVM heap size should be set to lower than the Map and Reduce memory defined above, so that they are within the bounds of the Container memory allocated by YARN.

In mapred-site.xml:


The above settings configure the upper limit of the physical RAM that Map and Reduce tasks will use. The virtual memory (physical + paged memory) upper limit for each Map and Reduce task is determined by the virtual memory ratio each YARN Container is allowed. This can  be set by the following configuration, and the default value is 2.1:

In yarn-site.xml:


Thus, with the above settings on our example cluster, each Map task will get the memory allocations with the following:

Total physical RAM allocated = 4 GB
JVM heap space upper limit within the Map task Container = 3 GB
Virtual memory upper limit = 4*2.1 = 8.2 GB

With YARN and MapReduce 2, there are no longer pre-configured static slots for Map and Reduce tasks. The entire cluster is available for dynamic resource allocation of Maps and Reduces as needed by the job. In our example cluster, with the above configurations, YARN will be able to allocate on each node up to 10 mappers (40/4) or 5 reducers (40/8) or a permutation within that.

Monday, 27 June 2016

HDFS: Practical scenarios of HDFS Trash

Imagine you have deleted a file from your Windows machine unknowingly. Now, you don't have to worry much because it is going to be in your Recycle bin. The case is same when it comes to HDFS filesystem, whenever a file is deleted from HDFS, it is automatically moved to "Trash" folder by default. This post discusses about this feature of HDFS.

Parameter:  fs.trash.interval
This parameter "fs.trash.interval" can be a number greater than 0 and is set in core-site.xml. After the trash feature is enabled, when something is removed from HDFS by using the rm command, the removed files or directories will not be wiped out immediately. Instead, they will be moved to a trash directory (for example, /user/${username}/.Trash). 

 Let us consider a sample output below:
hadoop dfs -rm -r /tmp/10gb

27/06/16 21:32:47 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 360 minutes, Emptier interval = 0 minutes. Moved: ‘hdfs://hdpnn/tmp/10gb’ to trash at: hdfs://hdpnn/user/manoj/.Trash/Current

From the above output, we can see two parameters highlighted, they are:

Deletion interval: means the time in minutes a checkpoint will be expired before it is deleted. It is the value of fs.trash.interval. The NameNode runs a thread to periodically remove expired checkpoints from the file system.

Emptier interval specifies the time in minutes the NameNode waits before running a thread to manage checkpoints. The NameNode deletes checkpoints that are older than fs.trash.interval and creates a new checkpoint from /user/${username}/.Trash/Current. This frequency is determined by the value of fs.trash.checkpoint.interval, and it must not be greater than the deletion interval. This ensures that in an emptier window, there are one or more checkpoints in the trash.

Let us take a configuration example,
fs.trash.interval = 360 (deletion interval = 6 hours)
fs.trash.checkpoint.interval = 60 (emptier interval = 1 hour)

This forces the NameNode to create a new checkpoint every one hour and delete checkpoints that have existed longer than 6 hours.

Briefly about checkpoint

A checkpoint is a directory under the user trash that is used to store all files or directories that were deleted before the checkpoint is created. If you want to take a look at the trash directory, you can see it at /user/${username}/.Trash/{timestamp_of_checkpoint_creation}.

Empty the Tash using:  hadoop fs -expunge

This command causes the NameNode to permanently delete files from the trash that are older than the threshold. It immediately removes expired checkpoints from the file system.

Real world Scenario

It is always good to enable trash to avoid unexpected removals of HDFS files and directories. Enabling trash provides a chance to recover data caused due to operational and user errors. At the same time, it is also important to set appropriate values for fs.trash.interval and fs.trash.checkpoint.interval. Example, if you need to frequently upload and delete files from the HDFS, you probably want to set fs.trash.interval to a smaller value, otherwise the checkpoints would take too much space.

Now, let us take a scenario where trash is enabled and you remove some files, HDFS capacity does not increase because files are not fully deleted. The HDFS does not reclaim the space unless the files are removed from the trash, which occurs only after checkpoints are expired. Sometimes you might want to temporarily disable trash when deleting files. In that case, you can run the rm command with the -skipTrash option as below:
hadoop fs -rm -skipTrash /path/to/permanently/delete

This bypasses the trash and removes the files immediately from the file system.

Monday, 7 December 2015

Hadoop: Export and import hive tables


We need to move a table named "daily_input" from cluster whose master server is "hadoop-1-test-master" (source) to "hadoop-2-test-master" (master of destination cluster).



Run the following command in hive shell of hadoop-1-test-master:
EXPORT TABLE daily_input TO '/tmp/backup/table-source/';

The above command will export the table "daily_input" to HDFS of source cluster inside the folder named "/tmp/backup1/table-source/".


Run the below command in destination cluster:
hadoop distcp -skipcrccheck -update h hdfs://

The above command will copy the exported table from source "hadoop-1-test-master" to destination "hadoop-2-test-master" HDFS folder "/tmp/backup/table-destination"


Run the following command in the destination hive shell:

IMPORT TABLE daily_input FROM '/tmp/backup/table-destination';

The above command will import the table "daily_input" to destination hive table.

IMPORTANT: No need to manually create the table in the destination, it will be automatically created during import.

Reference :

Saturday, 5 December 2015

Hadoop: distcp to copy data between Hadoop Cluster

In this post, I will explain how to use distcp to Migrate Data between two Clusters.

distcp tool available in hadoop cluster helps to move the data between two clusters. If you are running different versions of hadoop, run the distcp tool with hftp:// as the source file system and hdfs:// as the destination file system. This uses the HFTP protocol for the source, and the HDFS protocol for the destination. The default port used by HFTP is 50070, while the default port used for HDFS is 8020.

Source URI: hftp://namenode-location:50070/source-directory

where namenode-location refers to the Hadoop's NameNode hostname as defined by its config and 50070 is the NameNode's HTTP server port, as defined by the config dfs.http.address.

Destination URI: hdfs://nameservice-id/desitination-directory or hdfs://namenode-location

This refers to the Hadoop's NameNode as defined by its configuration fs.defaultFS.

 NOTE : If you are using distcp as part of an upgrade, run the following distcp commands on the destination cluster only. For example, if you are copying from hadoop1 cluster (version 1.0) to hadoop2 cluster(version 2.0).

$ hadoop distcp -skipcrccheck -update hftp://hadoop1-namenode:50070/backup/source-directory hdfs://hadoop2-namenode:8020/backup/destination-directory

Or use a specific path, such as /hbase to move HBase data, for example:

$ hadoop distcp hftp://hadoop1-namenode:50070/hbase hdfs://hadoop2-namenode:8020/hbase

distcp is a general utility for copying files between distributed filesystems in different clusters.

Keep reading :)

Tuesday, 1 December 2015

Hadoop: Install and manage Zookeeper

In this post, I will explain how to install and setup Zookeeper cluster.

Zookeeper is a distributed co-ordination service for distributed application. It is a centralized repository where applications can put data in and out. Its general role is synchronization,serialization and co-ordination.

In a cluster, normally zookeeper is run on odd number 3 or 5 etc. The reason for opting for odd number is for majority and to prevent split brain scenario. This will prevent data inconsistencies.

Scenario: We have a 3 node cluster as below:

We have to setup a zookeeper cluster using these 3 nodes

Installation steps:

Step 1: Download the package from the apache's offical website:

Here, I downloaded the version zookeeper-3.4.6 to "/usr/local" of server "" .


Extract the tarball and rename the folder to "zookeeper" :

cd /usr/local/
tar -xvzf zookeeper-3.4.6.tar.gz
mv  zookeeper-3.4.6.tar.gz  zookeeper

Now, create a directory call "/usr/local/zookeeper/data/" which will be the data directory for zookeeper.

mkdir /usr/local/zookeeper/data/

Append to the PATH variable:

export PATH=$PATH:/usr/local/zookeeper/bin

Step 2: Editing zookeeper configuration files:

vim  /usr/local/zookeeper/conf/zoo.cfg



tickTime---> This is in milliseconds

initLimit X ticktime = 20000ms ( 20 seconds)

This  means that, anytime a quorum member comes in, it has 20 seconds to download  the data initally. If it could not download within 20 seconds, it is timed out.

syncLimit X ticktime = 10000 milliseconds ( 10 seconds).

If a follower, not able to connect to leader in 10 seconds then master is going to be considered as dead and election process takes place.

Generally synclimit lesser than initlimit. Because, it is going to take more time to download the data initally.

2888 --> This is peer to peer port
3888---> Leader election port.

Create my id file:

vim /usr/local/zookeeper/data/myid

Put, the number "1".

Perform STEP 1, 2 and 3 on other two nodes "" and "", with only difference of:

In ""

vim  /usr/local/zookeeper/data/myid ---> Put number "2"

In ""

vim  /usr/local/zookeeper/data/myid ---> Put number "3"

STEP 4: Starting zookeeper

Run the below command on all nodes.
---- start

To check status
------ status

Connecting to shell:
-------- -server hadoop-master-test:2181
> help

I will discuss more about zookeeper management in next post.

Keep reading :)

Wednesday, 25 November 2015

MYSQL: Steps to repair and recover innodb db

In this post, I will explain how to recover an innodb crash in a Linux server.


To confirm innodb crash, first of all add the following parameter to mysql configuration file "/etc/my.cnf" and try to restart mysql.

If it starts fine now, then we can confirm  that it is a innodb crash.

Now, remove the parameter "skip-innodb"  from my.cnf and follow the steps below to recover innodb:


Add this line to your /etc/my.cnf configuration file: [mysqld] innodb_force_recovery = 4


Restart Mysql now. Database should start, but with innodb_force_recovery in my.cnf. All insert and update operation will be ignored.


Mysqldump all databases.

    mysqldump -A > dump.sql


 Dump the user and db table from mysql databae

    mysqldump mysql user > user.sql
    mysqldump mysql db > db.sql

if the dump fails create a copy of current /var/lib/mysql directory

    cp -pr /var/lib/mysql /var/lib/mysql_bak


Shutdown database and delete the data directory (better if you move it) and execute mysql_install_db from shell to create MySQL default tables.



Remove the "innodb_force_recovery" line from your /etc/my.cnf file and restart the database.


Restore everything from your backup except default 'mysql' database. For restoring the previous user db privileges do the following.

    cp -rp /var/lib/mysql_bak/mysql /var/lib/mysql/oldmsqldb
    mysqldump oldmysqldb user > oldmysqldb_user.sql
    mysql mysql user < /var/lib/mysql/oldmysqldb_user.sql
    mysql mysql db < /var/lib/mysql/oldmysqldb_db.sql

Keep reading :)