GPU621/Apache Spark Fall 2022

From CDOT Wiki
Revision as of 23:08, 3 December 2022 by RobinYu (talk | contribs) (Build the application)
Jump to: navigation, search

Group Information

Alan Huang; Jianchang Yu; Tim Lin;

Apache Spark Core API

RDD Overview

One of the most important concepts in Spark is a resilient distributed dataset (RDD). RDD is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel. RDDs are created by starting with a file, or an existing Java collection in the driver program, and transforming it. We will introduce some key APIs provided by Spark Core 2.2.1 using Java 8. You can find more information about the RDD here. https://spark.apache.org/docs/2.2.1/rdd-programming-guide.html

Spark Library Installation Using Maven

An Apache Spark application can be easily instantiated using Maven. To add the required libraries, you can copy and paste the following code into the "pom.xml".

   <properties>
       <maven.compiler.source>8</maven.compiler.source>
       <maven.compiler.target>8</maven.compiler.target>
   </properties>
   <dependencies>
       <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
           <version>2.2.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-sql_2.10</artifactId>
           <version>2.2.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-hdfs</artifactId>
           <version>2.2.0</version>
       </dependency>
   </dependencies>

Create And Set Up Spark

Spark needs to be set up in a cluster so first we need to create a JavaSparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application. We will talk about how to set up a spark in a cluster later. Now let's try to create a spark locally. To do that, we will need the following code:

  //create and set up spark
  SparkConf conf = new SparkConf().setAppName("HelloSpark").setMaster("local[*]");
  JavaSparkContext sc = new JavaSparkContext(conf);
  sc.setLogLevel("WARN");

Create RDDs

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

1. Parallelized Collections

Let’s start with some Java collections by calling JavaSparkContext’s parallelize method on an existing Collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.

       //create input data list
       List<Integer> inputData = new ArrayList<>();
       inputData.add(11);
       inputData.add(22);
       inputData.add(33);
       inputData.add(44);
       //use RDD to run create RDDS
       JavaRDD<Integer> javaRDD = sc.parallelize(inputData);

2. External Datasets

The other way is to create RDD from any storage source supported by Hadoop, including your local file system, HDFS, Amazon S3, etc. Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines.

       //From local file
       JavaRDD<String> sentences = sc.textFile("src/main/resources/subtitles/input.txt");
       //From a S3 file
       JavaRDD<String> sentences = sc.textFile("s3://gpu621-demo/input.txt");

RDD APIs

Basically there are types of APIs. Transformations and Actions. Transformation APIs are functions that could return another RDD set. Using these APIs, we can create child RDD from parent RDD. Actions are the functions we want to perform onto the actual dataset. They will not return new RDDs.

1. Transformations

1.1 map(func)

The map function iterates over every line in RDD and split into new RDD. It receives a function, and will use that function to each line and create new RDD.

       //create input data list
       List<Integer> inputData = new ArrayList<>();
       inputData.add(11);
       inputData.add(22);
       inputData.add(33);
       inputData.add(44);
       //create RDD
       JavaRDD<Integer> javaRDD = sc.parallelize(inputData);
       // map from one RDD to another RDD
       JavaRDD<Double> mapRDD = javaRDD.map(value -> Math.sqrt(value));

//Output: 3.3166247903554, 4.69041575982343, 5.744562646538029, 6.6332495807108

       mapRDD.foreach(value->System.out.println(value));

1.2 filter(func)

filter() is used when we only want some elements that meet the conditions. When we use this function, we need to pass another predicate function.

       //create input data list
       List<Integer> inputData = new ArrayList<>();
       inputData.add(11);
       inputData.add(22);
       inputData.add(33);
       inputData.add(44);
       // create RDD
       JavaRDD<Integer> javaRDD = sc.parallelize(inputData);
       //use filter
       JavaRDD<Double> filterRDD = javaRDD.filter(value ->value>=30);

//Output: 33 44

       filterRDD.foreach(value->System.out.println(value));

1.3 flatMap(func)

flatMap is similar to map, but the difference is that flatMap could return multiple elements from one input.

       //create input data list
       List<String> inputData = new ArrayList<>();
       inputData.add("WARN: Monday 1 May 0101");
       inputData.add("ERROR: Monday 1 May 0102");
       inputData.add("WARN: Monday 2 May 0301");
       inputData.add("ERROR: Monday 3 May 0401");
       inputData.add("FATAL: Monday 4 May 0406");
       // create RDD
       JavaRDD<String> sentencesRDD = sc.parallelize(inputData);
       JavaRDD<String> wordsRDD = sentencesRDD.flatMap(value -> Arrays.asList(value.split(" ")).iterator());

1.4 mapToPair()

It is similar to map transformation; however, this transformation produces PairRDD, that is, an RDD consisting of key and value pairs. This transformation is specific to Java RDDs. With other RDDs, map transformation can perform both (map and mapToPair()) of the tasks.

       List<String> inputData = new ArrayList<>();
       inputData.add("WARN: Monday 1 May 0101");
       inputData.add("ERROR: Monday 1 May 0102");
       inputData.add("WARN: Monday 2 May 0301");
       inputData.add("ERROR: Monday 3 May 0401");
       inputData.add("FATAL: Monday 4 May 0406");
       // create RDD
       JavaRDD<String> logMessages = sc.parallelize(inputData);
       //use Fluent API
       JavaPairRDD<String, Long> levelRDD= logMessages.mapToPair(value ->new Tuple2<>(value.split(":")[0], 1L));
       // (WARN,1) (ERROR,1) (ERROR,1) (FATAL,1) (WARN,1)
       levelRDD.foreach(tuple2->System.out.println(tuple2));

1.5 reduceByKey(func)

When we use reduceByKey on a dataset (K, V), the pairs on the same machine with the same key are combined.

       List<String> inputData = new ArrayList<>();
       inputData.add("WARN: Monday 1 May 0101");
       inputData.add("ERROR: Monday 1 May 0102");
       inputData.add("WARN: Monday 2 May 0301");
       inputData.add("ERROR: Monday 3 May 0401");
       inputData.add("FATAL: Monday 4 May 0406");
       // create RDD
       JavaRDD<String> logMessages = sc.parallelize(inputData);
       //use Fluent API
       JavaPairRDD<String, Long> levelRDD= logMessages.mapToPair(value ->new Tuple2<>(value.split(":")[0], 1L));
       // (WARN,2) (ERROR,2) (FATAL,1) 
       levelRDD.reduceByKey ((a,b)->a+b).forEach(t->System.out.println(t))


1.6 sortByKey()

When we apply the sortByKey() function on a dataset of (K, V) pairs, the data is sorted according to the key K in another RDD.

2. Actions

When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion. reduce(func)

1. Reduce()

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one).

       //create input data list
       List<Integer> inputData = new ArrayList<>();
       inputData.add(11);
       inputData.add(22);
       inputData.add(33);
       inputData.add(44);
       //use RDD to run reduce function
       JavaRDD<Integer> javaRDD = sc.parallelize(inputData);
       Integer result = javaRDD.reduce((Integer a, Integer b) -> a + b);

//output: 110

       System.out.println(result);

2. Count()

count() returns the number of elements in RDD.

3. take(n)

The action take(n) returns n number of elements from RDD. It tries to cut the number of partition it accesses, so it represents a biased collection. We cannot presume the order of the elements.

4. collect()

The action collect() is the common and simplest operation that returns our entire RDDs content to driver program.

5. foreach()

When we have a situation where we want to apply operation on each element of RDD, but it should not return value to the driver. In this case, foreach() function is useful.

Useful Case

Scenario: If you have a video caption script, you want to sort all the word by frequency. How should we do it?

       //create and set up spark
       SparkConf conf = new SparkConf().setAppName("HelloSpark").setMaster("local[*]");
       JavaSparkContext sc = new JavaSparkContext(conf);
       sc.setLogLevel("WARN");
       //read data from text file
       //note that for production mode, we should not use file from local machine
       //should use S3 or other resources
       JavaRDD<String> sentences = sc.textFile("src/main/resources/subtitles/input.txt");
       //remove the time stamp
       JavaRDD<String> lettersOnlyRDD = sentences.map(sentence -> sentence.replaceAll("[^a-zA-Z\\s]", "").toLowerCase());
       //remove the blank line
       JavaRDD<String> removeBlankLineRDD = lettersOnlyRDD.filter(sentence -> sentence.trim().length() > 0);
       //map to only words
       JavaRDD<String> wordsRDD = removeBlankLineRDD.flatMap(sentence -> Arrays.asList(sentence.split(" ")).iterator());

//create pair RDD

       JavaPairRDD<String, Long> pairRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1L));

//get frequency

       JavaPairRDD<String, Long> totals = pairRDD.reduceByKey((a, b) -> a + b);

//make frequency as the key

       JavaPairRDD<Long, String> reversedMap = totals.mapToPair(t -> new Tuple2(t._2, t._1));

//sort the rdd

       List<Tuple2<Long, String>> results = reversedMap.sortByKey(false).collect();

//print out the result

       results.forEach(t->System.out.println(t));

//close

       sc.close();

Deploy Apache Spark Application On AWS

Amazon EMR is a cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning using open-source frameworks such as Apache Spark, Apache Hive, and Presto provided by AWS cloud service. EMR is easy to use and it has low cost, so it’s a great start for spark beginners.

Prerequisite

From here, I will assume you have an AWS service account and that you have basic knowledge about AWS services like how to use S3 bucket, or how to add role or policy to services.

Also, you will need to have basic knowledge about SSH and Linux commands.

Create an EMR cluster

Search and choose EMR on AWS service panel.

Create cluster.png

Click the Create Cluster button.

Cluster name.png

Enter as cluster name and choose a release version. Here I will choose the EMR-5.11.1 for the Release version. For the application, you can see that there are many options, we will choose Spark as this is our main topic.

Ec2.png

Next, we need to choose an instance type. As you may know, the cluster will run on multiple EC2 instances and different EC2 instances have different features. Please note, different EC2 types cost differently. Please refer to the EC2 type table to check the prices. Here I will choose c4.large type as it’s the most inexpensive one. For the number of instances, I will choose 3, that is, one master and 2 nodes.

For the security part. Please choose an EC2 key pair you already used for other services before, or create a new one.

Click Create Cluster button to wait for the cluster to be set up.

Cluster info.png

You will see a page like this. Next, we need to change the security group for Master, which acts like a firewall to add an inbound rule.

Inbound rule.png

We need to open port 22 and port 18080 for your IP so that you can visit the Master EC2.

Then, you can try to ssh to the master node by using

ssh -I <private_key.pem> hadoop@<MasterPublicDNS>

You should see a welcome page like this:

Welcome page.png

Create an S3 bucket

Unlike the previous case where we run on a solo computer, now we need to run the application on different nodes. It makes no sense to read the file from a local hard disk because most of the time the file will be too big for one node to handle. We need to put the file onto something that all nodes can share, and we can use aN S3 file as the input file. S3 is another service AWS provides. The size of a single file on S3 can be as large as 5TB. I will skip this part and please search how to create a new bucket to hold both the input file and the application package. Please make the bucket open to the public so you will not have permission issue later on.


S3 bucket.png

Build the application

1. Change code

In order to run it on the cloud cluster, we need to do some modifications to the code. First, let’s change the file path from a local position to the s3.

From

JavaRDD<String> sentences = sc.textFile("src/main/resources/subtitles/input.txt");

To

JavaRDD<String> sentences = sc.textFile("s3://gpu621-demo/input.txt"); 

Also, add the entry point class to the pom file:

   <build>
       <plugins>
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-jar-plugin</artifactId>
               <version>3.0.2</version>
               <configuration>
                   <archive>
                       <manifest>
                           <mainClass>ca.wonderfish.spark8.Main</mainClass>
                       </manifest>
                   </archive>
               </configuration>
           </plugin>
       </plugins>
   </build>


2. Build the package

Build the package using command line or the IDE. If you are using Idea, click the package under lifecycle Tab of maven


Build package.png

You will get a jar file under the target folder.

Upload the jar file to your S3 bucket

Upload the jar file into the S3 bucket you created before.

Run the application on the cluster

Ssh into the master node of the cluster. Then issue a copy command to copy the jar file to the master node using:

aws s3 cp <s3://yourbucket/jarFileName.jar> 

Then you can run the app using:

Spark-submit <jarFileName.jar>

Then you should see the log info and the output of your application.

-IMAGE-

That’s how we deploy a spark app on aws EMR.