In early 2000, Google designed and implemented a programming model called MapReduce for processing and generating large data sets that can be executed in parallel across a large cluster of machines. Google later open sourced this programming model for anyone to implement and use.
Hadoop is an implementation of MapReduce programming model, that enables distributed and parallel processing of large data sets across clusters of computers using simple programming models.
Hadoop is designed to scale up from single server to clusters of thousands of machines, utilizing the local computation and storage of each server. This enables Hadoop to process massive amounts of data in times that cannot be done with other distributed computing processes.
Hadoop is designed to detect node failures in the clusters and handle the failure at the application layer. Hence Hadoop provides a highly-available service on top of the cluster of computers each of which may be prone to failures.
Apache Hadoop contains the following core modules.
1. Hadoop Common: Hadoop Common contains the common utilities and libraries that support the other Hadoop modules.
2. Hadoop Distributed File System (HDFS): Hadoop HDFS is a distributed file system that provides high-throughput and fault-tolerant access to application data.
3. Hadoop YARN: Hadoop YARN is a framework for job scheduling and cluster resource management.
4. Hadoop MapReduce: Hadoop MapReduce is the implementation of the MapReduce programming model that enables parallel processing of large data sets.
Following are some Apache projects related to Hadoop separated by their category.
Database:
1. HBase - HBase is a distributed and scalable database that supports structered data for very large tables.
2. Hive - Hive is a data warehouse platform that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL
3. Cassandra - Cassandra is a scalable multi-master NoSQL database with no single points of failure.
Data-Flow:
Pig - Pig is a high level data-flow language and execution framework for parallel computation.
Tez - A generalized data-flow programming framework, built on Hadoop YARN, which provides a powerful and flexible engine to execute an arbitrary DAG of tasks to process data for both batch and interactive use-cases.
Data-Collection:
Compute
Spark - A fast and general compute engine for Hadoop data. Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL, machine learning, stream processing, and graph computation.
Mahout - A Scalable machine learning and data mining library.
Administration:
Ambari - A web-based tool for provisioning, managing, and monitoring Apache Hadoop clusters.
Zookeeper - A high-performance coordination service for distributed applications.
Hadoop is an open source project developed by the Apache community. In addition to core Hadoop, Apache has many other projects that are related to Hadoop and the Big Data ecosystem.
Third party vendors package togeather - the core Hadoop Apache framework. related projects, and add proppriety utilities and capabilites on top and package into a unified project.
Cloudera, HortonWorks and MapR are the three popular commercial distributions of Hadoop.
Hadoop provides the Mapper and Reducer classes that support the implementation of MapReduce phases.
Mapper - Hadoop framework provides the org.apache.hadoop.mapreduce.Mapper class that defines the abstract method map(). The programmer has to override the map() method and implement the map phase. The map() function takes three parameters - key, value and a Context object to which the output is written to.
Reducer - Hadoop framework provides the org.apache.hadoop.mapreduce.Reducer class that defines the abstract method reduce(). The programmer has to override the reduce() method and implement the reduce phase. Similar to the map() function, the reduce() function also takes three parameters - key, value and the Context object to write the output to.
The output types of the map() function should match the input types of the reduce() function.
Hadoop provides the 'org.apache.hadoop.mapreduce.Job' class that is used to configure a MapReduce job, submit the job, control the execution of the job and query the state of the job.
Following are the key steps in a Job program.
1. Create an instance of Job class.
2. Set job specific parameters.
3. Set mapper, reducer and optionally a combiner class for the job.
4. Set input and output paths.
5. Submit job and poll for completion.
Below code snippet highlights these steps.
//create job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, 'wordcount');
job.setJarByClass(WordCount.class);
//set job specific parameters
job.setJobName('wordcount');
//set mapper, reducer and combiner
job.setMapperClass(WordcountMapper.class);
job.setCombinerClass(WordcountReducer.class);
job.setReducerClass(WordcountReducer.class);
//set key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//set input and output paths
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Submit the job and poll for progress
System.exit(job.waitForCompletion(true) ? 0 : 1);
Hadoop throws an IllegalStateException if you call the set method on the Job class after it is submitted for execution.
Hadoop provides the org.apache.hadoop.mapreduce.Mapper class that can be extended to implement the map functionality. Mapper class maps input key/value pairs to a set of intermediate key/value pairs. Maps (Instances of Mapper class) are the individual tasks which transform input key/value pairs into intermediate key/value pairs. The transformed intermediate records need not be of the same type as the input records. An input pair may map to zero or many output pairs.
Mapper class provides the following methods.
setup() - setup() is called once at the beginning of the map task and is used to setup one time resources for the task.
map() - map() is called once for each key/value pair of the input split and perform the map functionality
cleanup() - cleanup() is called once at the end of the map task and is used to clean up resources.
run() - run() method can be overridden to get more complete control over the execution of the Mapper
The map() method of the Mapper class has to be overridden to implement the map phase of the MapReduce functionality. The map() method is called once for each key/value pair in the input split.
Key, Value and Context objects have to be passed to the map() function.
setup() method is called once at the beginning of the map task and is used to setup resources.
cleanup() method is called once at the end of the map task and is used to clean up resources.
The word count program is a common MapReduce program that is commonly used to demonstrate the MapReduce functionality. You can use this program if the interviewer does not ask for a specific functioality.
The map() function in a word count program takes each line and input, splits the words, and outputs key/value pairs in the form of of
//map function - key, value and context are passed as parameters
//key - line number
//value - line
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
Following are the three primary phases of a Reducer.
Shuffle - In this phase the Reducer copies the sorted output from each Mapper using HTTP across the network.
Sort - In this phase the framework merge sorts Reducer inputs by keys (since different Mappers may have output the same key).The shuffle and sort phases occur simultaneously i.e. while outputs are being fetched they are merged.
Reduce - In this phase the reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context) method is called for each
Reducer class provides the following methods.
setup() - setup() is called once at the beginning of the map task and is used to setup one time resources for the reduce task.
reduce() - reduce() is called once for each key/value pair of the input split and perform the reduce functionality
cleanup() - cleanup() is called once at the end of the reduce task and is used to clean up resources.
run() - run() method can be overridden to get more complete control over the execution of the Mapper
The reduce() method of the Mapper class has to be overridden to implement the reduce phase of the MapReduce functionality. The reduce() method is called once for each key.
Key, Value and Context objects have to be passed to the map() function.
The word count program is a common MapReduce program that is commonly used to demonstrate the MapReduce functionality. You can use this program if the interviewer does not ask for a specific functioality.
The reduce() function in a word count program takes each key/value pairs output from the mapper and aggregates the count of reach key.
public void reduce(Text word, Iterablecounts, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(word, new IntWritable(sum));
}
Apache YARN, which stands for 'Yet Another Resource Negotiator', is Hadoop's cluster resource management system.
YARN provides APIs for requesting and working with Hadoop's cluster resources. These APIs are usually used by components of Hadoop's distributed frameworks such as MapReduce, Spark, Tez etc. which are build on top of YARN. User applications typically do not use the YARN APIs directly. Instead, they use higher level APIs provided by the framework (MapReduce, Spark, etc.) which hide the resource management details from the user.
Apache YARN, which stands for 'Yet Another Resource Negotiator', is Hadoop's cluster resource management system.
YARN provides APIs for requesting and working with Hadoop's cluster resources. These APIs are usually used by components of Hadoop's distributed frameworks such as MapReduce, Spark, Tez etc. which are build on top of YARN. User applications typically do not use the YARN APIs directly. Instead, they use higher level APIs provided by the framework (MapReduce, Spark, etc.) which hide the resource management details from the user.
The basic idea of YARN is to split the functionality of resource management and job scheduling/monitoring into separate daemons. YARN consists of the following different components
ResourceManager - The ResourceManager is a global component or daemon, one per cluster, which manages the requests to and resources across the nodes of the cluster.
NodeManager - NodeManger runs on each node of the cluster and is responsible for launching and monitoring containers and reporting the status back to the ResourceManager
ApplicationMaster is a per-application component that is responsible for negotiating resource requirements for the resource manager and working with NodeManagers to execute and monitor the tasks.
Container Container is YARN framework is a unix process running on the node that executes an application-specific process with a constrained set of resources (Memory, CPU, etc.)
The basic idea of YARN is to split the functionality of resource management and job scheduling/monitoring into separate daemons. YARN consists of the following different components
ResourceManager - The ResourceManager is a global component or daemon, one per cluster, which manages the requests to and resources across the nodes of the cluster.
NodeManager - NodeManger runs on each node of the cluster and is responsible for launching and monitoring containers and reporting the status back to the ResourceManager
ApplicationMaster is a per-application component that is responsible for negotiating resource container requirements from the ResourceManager, and working with NodeManagers to execute and monitor the container tasks.
Container Container is YARN framework is a unix process running on the node that executes an application-specific process with a constrained set of resources (Memory, CPU, etc.). Container in YARN is an abstract notion and is not a physical component.
The YARN ResourceManager is a global component or daemon, one per cluster, which manages the requests to and resources across the nodes of the cluster.
The ResourceManager has two main components - Scheduler and ApplicationsManager
Scheduler - The scheduler is responsible for allocating resources to and starting applications based on the abstract notion of resource containers having a constrained set of resources.
ApplicationManager - The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
The YARN ResourceManager is a global component or daemon, one per cluster, which manages the requests to and resources across the nodes of the cluster.
The ResourceManager has two main components - Scheduler and ApplicationsManager
Scheduler - The scheduler is responsible for allocating resources to and starting applications based on the abstract notion of resource containers having a constrained set of resources.
ApplicationManager - The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
YARN scheduler is responsible for scheduling resources to user applications based on a defined scheduling policy. YARN provides three scheduling options - FIFO scheduler, Capacity scheduler and Fair scheduler.
FIFO Scheduler - FIFO scheduler puts application requests in queue and runs them in the order of submission.
Capacity Scheduler - Capacity scheduler has a separate dedicated queue for smaller jobs and starts them as soon as they are submitted.
Fair Scheduler - Fair scheduler dynamically balances and allocates resources between all the running jobs.
You can configure the ResourceManager to use CapacityScheduler by setting the value of property 'yarn.resourcemanager.scheduler.class' to 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler' in the file 'conf/yarn-site.xml'.
You can configure the ResourceManager to use FairScheduler by setting the value of property 'yarn.resourcemanager.scheduler.class' to 'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler' in the file 'conf/yarn-site.xml'.
ResourceManager is responsible for scheduling applications and tracking resources in a cluster. Prior to Hadoop 2.4, the ResourceManager does not have option to be setup for HA and is a single point of failure in a YARN cluster.
Since Hadoop 2.4, YARN ResourceManager can be setup for high availability. High availability of ResourceManager is enabled by use of Active/Standby architecture. At any point of time, one ResourceManager is active and one or more of ResourceManagers are in the standby mode. In case the active ResourceManager fails, one of the standby ResourceManagers transitions to a active mode
HDFS, which stands for 'Hadoop Distributed File System', is a distributed file system that comes with the Hadoop platform. HDFS is designed for storing large files on clusters of commodity hardware, with high throughput streaming access to the file system data.
Large Data Sets - HDFS is designed to store very large data sets across hundreds or thousands of commodity machines. The size of HDFS data sets can be hundreds of megabytes, gigabytes or terabytes in size.
Streaming Data Access - HDFS is designed for write-once read-many-times pattern. HDFS is designed for applications that need high throughput of data access rather than low latency of data access.
Fault Tolerant - An HDFS instance may consist of hundreds or thousands of server machines, each storing part of the file system’s data. HDFS is architectured for detection of faults and quick, automatic recovery from node failures.
Simple data coherency model - HDFS is designed for applications that need a write-once read-many-times access model for files. A file once created, written, and closed need not be changed except for appends and truncates. Appending the content to the end of the files is supported but cannot be updated at arbitrary point. This assumption simplifies data coherency issues and enables high throughput data access.
HDFS follows a master-slave architecture. Following are the main components of a HDFS installation.
NameNode - An HDFS cluster consists of a single NameNode, which is a master server that manages the file system namespace and regulates access to files by clients.
Datanode - An HDFS cluster consists of many DataNodes, usually one per node in the cluster, which manages the storage attached to that particular node.
Data Blocks - Files in HDFS are split into one or more blocks and these blocks are stored and replicated across a set of DataNodes.
HDFS follows a master-slave architecture. NameNode is a piece of software that acts as the master and is designed to run on any commodity server. Following are the key responsibilities of the NameNode.
- The NameNode manages the HDFS filesystem namespace. It manages the filesystem tree and the metadata for all the files and directories in that tree. NomeNode stores this information on the disc in two files - namespace image and edit log.
- The NameNode executes file system namespace operations like opening files, closing files, and renaming files and directories.
The NameNode maintains the mapping between the blocks of a data file and the NameNodes on which the block exists.
HDFS splits large files into smaller chunks called data blocks, stores them as independent units, and replicates the blocks across multiple nodes in a cluster.
HDFS block sizes are usually 128 MB by default.
HDFS is designed to be fault-tolerant. Large HDFS data files are split into smaller chunks called blocks and each block is stored in multiple DataNodes across the cluster. The block size and the replication factor can be configured per file.
HDFS is rack aware for multi-clustered evnironments, and takes this into consideration when replicating blocks for fault-tolerance. HDFS ensures that the blocks are replicated on DataNodes that are on different racks, so if a rack goes down the data is still available from the DataNode on the other rack.
NameNode manages and store the metadata of the HDFS filesystem namespace. If the NameNode goes down or if the data gets corrupted then the HDFS filesystem cannot be used.
Hadoop provides two mechanisms to make the NameNode resilient to failures.
1. Backup files - The first mechanism is to take a backup of the files that store the persistent state of the filesystem, so that if the files storing the persistent state is corrupted then the backup data can be used. Hadoop can be configured so that the NameNode writes its persistent state to multiple systems.
2. Secondary NameNode - The second option is to have two NameNodes in the same cluster, one primary and the other secondary, setup as active and passive nodes. The secondary node copies the state from the primary node on a regular basis, and promotes itself as the active node when the primary node has a failure.
You can copy files from HDFS file system to local filesystem via command-line interface by using the sub-command -copyToLocal
% hadoop fs -copyToLocal /temp/test1.txt /docs/test1.txt
The Hadoop API provides the class org.apache.hadoop.fs.FileSystem to access the HDFS filesystem. You call the open() method on the FileSystem class and pass the HDFS filesystem path as a parameter.
String uri = 'hdfs://localhost/interviewgrid/hadoop_questions.txt'
FileSystem fs = FileSystem.get(URI.create(uri), conf);
fs.open(new Path(uir));
The Hadoop API provides the class org.apache.hadoop.fs.FileSystem to access the HDFS filesystem. You call the create() method on the FileSystem class and pass the HDFS filesystem path as a parameter. The create() method returns an output stream to which you can write the data to.
String uri = 'hdfs://localhost/interviewgrid/hadoop_questions.txt'
FileSystem fs = FileSystem.get(URI.create(uri), conf);
fs.create(new Path(uri));