Open main menu

CDOT Wiki β

Changes

GPU621/Apache Spark

5,589 bytes added, 18:12, 30 November 2020
Applications
= Apache Hadoop =
[https://hadoop.apache.org/ ''' Apache Hadoop'''] is an open-source framework that allows for the storage and distributed processing of large data sets across clusters of computers using simple programming models. Hadoop is an implementation of MapReduce, an application programming model developed by Google. MapReduce has three basic operations: Map, Shuffle and Reduce. Map, where each worker node applies a map function to the local data and writes the output to temporary storage. Shuffle, where worker nodes redistribute data based on output keys such that all data belonging to one key is located on the same worker node. Finally reduce, where each worker node processes each group of output in parallel.
[[File: MapReduce.PNG|thumb|upright=1.2|right|alt=Hadoop cluster|3.1 MapReduce]]
== Architecture ==
Hadoop has a master-slave architecture as shown in figure 3.12. A small Hadoop cluster consists of a single master and multiple worker nodes. The master node consists of a Job Tracker, Task Tracker, NameNode, and DataNode. A worker node acts as both a task tracker and a DataNode. A file on HDFS is split into multiple blocks and each block is replicated within the Hadoop cluster. NameNode is the master server while the DataNodes store and maintain the blocks. The DataNodes are responsible for retrieving the blocks when requested by the NameNode. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode. [[File: Hadoop_1.png|thumb|upright=1.2|right|alt=Hadoop cluster|3.1 2 A multi-node Hadoop Cluster]]
== Components ==
=== Hadoop MapReduce ===
The processing component of Hadoop ecosystem. It assigns the data fragments from the HDFS to separate map tasks in the cluster and processes the chunks in parallel to combine the pieces into the desired result.
 
== Applications ==
* In the healthcare sector for curing diseases, reducing medical costs, predicting and managing epidemics, and maintaining the quality of human life by keeping track of large-scale health index and metrics.
* In the financial industry to assess risk, build investment models, and create trading algorithms
* In the retail industry to analyze structured and unstructured data to understand and serve their customers, fraud detection and prevention, inventory forecasting, etc.
== Architecture ==
[[File: Cluster-overview.png|thumb|upright=1.5|right|alt=Spark cluster|4.1 Spark Cluster components]]One of the distinguishing features of Spark is that it processes data in RAM using a concept known as Resilient Distributed Datasets (RDDs) - an immutable distributed collection of objects which can contain any type of Python, Java, or Scala objects, including user-defined classes. Each dataset is divided into logical partitions which may be computed on different nodes of the cluster. Spark's RDDs function as a working set for distributed programs that offer a restricted form of distributed shared memory. [[File: ClusterAnother important abstraction in Spark is Directed Acyclic Graph or DAG which is the scheduling layer that implements stage-overvieworiented scheduling.png|thumb|upright=1.1|right|alt=Spark cluster|4.1 Spark Cluster components]] 
At a fundamental level, an Apache Spark application consists of two main components: a driver, which converts the user's code into multiple tasks that can be distributed across worker nodes, and executors, which run on those nodes and execute the tasks assigned to them. The processes are coordinated by the SparkContext object in the driver program. The SparkContext can connect to several types of cluster managers which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for the application. Next, it sends the application code to the executors and finally sends tasks to the executors to run.
== Components ==
[[File: Spark.png|thumb|upright=1.5|right|alt=Spark cluster|4.2 Spark Stack]]
=== Spark Core ===
Spark Core is the basic building block of Spark, which includes all components for job scheduling, performing various memory operations, fault tolerance, task dispatching, basic input/output functionalities, etc.
= Comparison: Spark vs Hadoop MapReduce =
[[File: Comparison.PNG|thumb|upright=2|right|alt=Spark cluster|5.1 Spark vs Hadoop MapReduce]]
=== Performance ===
Spark processes data in RAM while Hadoop persists data back to the disk after a map or reduce action. Spark has been found to run '''100 times faster in-memory''', and '''10 times faster on disk'''. Spark won the 2014 Gray Sort Benchmark where it sorted 100TB of data using 206 machines in 23 minutes beating a Hadoop MapReduce cluster's previous world record of 72 minutes using 2100 nodes.
=== Compatibility ===
Spark can run as a standalone application or on top of Hadoop YARN or Apache Mesos. Spark supports data sources that implement Hadoop input format, so it can integrate with all the same data sources and file formats that Hadoop supports.
[[File:Hadoop-vs-spark.png|upright=2|right||300px]]
=== Data Processing ===
In addition to plain data processing, Spark can also process graphs, and it also has the MLlib machine learning library. Due to its high performance, Spark can do both real-time and batch processing. However, Hadoop MapReduce is great only for batch processing.
=== Security ===
Both Spark and Hadoop have access to support for Kerberos authentication, but Hadoop has more fine-grained security controls for HDFS.
= Spark vs Hadoop Wordcount Performance =
[[File:Google-cloud-dataproc.png]]
 
# We will use the Google Cloud Platform '''Dataproc''' to deploy a 6 virtual machine (VM) nodes (1 master, 5 workers) cluster that is automatically configured for both Hadoop and Spark.
# Store .jar and .py wordcount files and input data in the '''Cloud Storage Bucket'''
# Run a '''Dataproc''' Hadoop MapReduce and Spark jobs to count number of words in large text files and compare the performance between Hadoop and Spark in execution time.
 
=== What is Dataproc? ===
 
Dataproc is a managed Spark and Hadoop service that automates tasks for rapid cluster creation and management. Users can use cluster for larg-scale data processing with Spark or Hadoop with the same cluster. Virtual Machine (VM) nodes in the cluster create in minutes, with all node pre-configured and installed with Hadoop, Spark and other tools. Usage is charged by virtual CPU per hour, with standard and higher performance hardware configurations available at different rate.
=== Setting up Dataproc and Google Cloud Storage===
[[File:Googlecloud-setup-6b.jpg]]
 
'''We will create 5 worker nodes and 1 master node using the N1 series General-Purpose machine with 4vCPU and 15 GB memory and a disk size of 32-50 GB for all nodes.
You can see the cost of your machine configuration per hour. Using machines with more memory, computing power, etc will cost more per hourly use.'''
 '''Create a cluster with 1 standard master node and 5 worker nodes'''* Name your cluster and choose a region and zone* Select a low-cost machine configuration** I.e. General Purpose N1 4vCPU, 15 GB memory for all nodes** 32 GB Standard Persistent Disk[[File:Googlecloud-dataprocsetup-19.jpg]] 
'''Allow API access to all google Cloud services in the project.'''
[[File:Googlecloud-setupdataproc-91.jpg]]
'''To view View the individual nodes in the cluster go '''Go to '''Menu -> Virtual Machines -> VM Instances'''
[[File:Googlecloud-setup-11b.jpg]]
'''Ensure that Dataproc, Compute Engine, and Cloud Storage APIs are all enabled'''
# Go to '''Menu -> API & Services -> Library.# Search for the API name and enable them if they are not already enabled.'''
'''Create a Cloud Storage Bucket by going from '''<br># Menu -> Storage -> Browser -> Create Bucket''''''<br># Make a note of the bucket name.
# To copy from the VM local disk to Cloud Storage bucket enter the following command in the shell:
<Code> gsutil cp /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar gs://<myBucketName>/ </Code>
 
 
'''WordCount.class from Hadoop MapReduce examples'''
 
// WordCountCount.class
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
'''noteNote: ''' Running the job will create the output folder, However for .<br/>For subsequent jobs '''be sure to delete the output folder ''' else Hadoop or Spark will not run. <br/>This limitation is done exists to prevent existing output from being overwritten''' 
[[File:Dataproc-hadoop.jpg]]
'''noteNote: ''' Running the job will create the output folder, However for .<br/>For subsequent jobs '''be sure to delete the output folder ''' else Hadoop or Spark will not run. <br/>This limitation is done exists to prevent existing output from being overwritten''' == Results == === Hadoop Counters ===* Number of splits: 66* Total input files to process: 8* GS: Number of MB read: 8,291* GS: Number of MB written: 138* Launched map tasks: 66* Launched reduce tasks: 19* Map input records (millions): 191.1* Map output records (millions): 1,237.1* Reduce input records (millions): 9.3* Reduce output records (millions): 3.6* CPU time spent (s): 3,597  [[File:Results2.jpg]]  [[File:Results.jpg]] == Conclusion == [[File:Googlecloud-hdfs.jpg|thumb|upright=2|right|alt=Spark cluster|5.1 Spark vs Hadoop MapReduce]]Using the same hardware (RAM, CPUs, HDD) across a 6 node cluster and processing the same data (8 .txt files for total size of 7.77 GB) we can only see an approximately 12% performance improvement between Hadoop Mapreduce and Spark using a word count algorithm. This falls far short of the 10 times faster on disk and 100 times faster in-memory.
=== Results ===Spark does require more memory than Hadoop to cache data in memory, however that should not be a limitation in this case as the worker nodes have 15 GB of memory and none of the input files exceed 2GB. This is more than enough space for Spark to store input data in memory in the resilient distributed datasets (RDDs). It is worth noting that Spark performs best when iterating over the same data many times, while MapReduce was designed for single pass jobs. Furthermore typical uses cases likely involve hundreds to thousands of nodes with terabytes of data.
==== h4 ====Further testing and analyzing Spark internal data could be done to determine if any bottlenecks exists which are limiting the performance of Spark. For example, how well is the cluster utilizing the hardware, namely the RAM?
=== Conclusion ===One possible explanation is the use of Google Cloud Storage Bucket to store the data rather than in Hadoop Distributed File System (HDFS). Both jobs are operating directly on data in the Cloud Strage rather than the HDFS. This may be reducing data access time for Hadoop MapReduce or introducing data access time to Apache Spark, as opposed to having the input data stored directly on the VM data nodes.
== Progress ==
# Nov 28, 2020 - Added setup step for Google Cloud account setup
# Nov 29, 2020 - Added Testing setup and job execution for Dataproc Hadoop and Spark
# Nov 30, 2020 - Added Testing results and conclusion.
= References =
# https://cloud.google.com/dataproc/docs/tutorials/gcs-connector-spark-tutorial#python
# https://www.netjstech.com/2018/07/what-are-counters-in-hadoop-mapreduce.html
# https://www.educative.io/edpresso/mapreduce
# https://phoenixnap.com/kb/hadoop-vs-spark