Spark on Mac

Introduction to Apache Spark

Spark is getting its due attention as the lightning fast distributed computing engine. It is an improvement over the Hadoop Map Reduce. There are significant improvements in Spark that makes it superior for performing analytics on big data.

  1. In-memory storage and computation for iterative algorithms
  2. Supports object-oriented and functional programming paradigms with scala, python and java
  3. interactive shell for quick test and deployment as binaries
  4. Works with Yarn / Mesos – Resource Managers next-generation Hadoop

Installation

Install Java

$ java -version
java version "1.8.0_25"
Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.25-b02, mixed mode)

Install brew

ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 

Install Hadoop

$ brew install hadoop

Brew installs it in the below folder. Add this folder to your bash_profile path

$ vi ~/.bash_profile
export HADOOP_HOME=/usr/local/Cellar/hadoop/2.6.0
$ source ~/.bash_profile
  • Edit Configuration files
/usr/local/Cellar/hadoop/2.6.0$ cd libexec/etc/hadoop/
vi ~/core-site.xml

<configuration>

    <property>

        <name>fs.defaultFS</name>

        <value>hdfs://localhost:9000</value>

    </property>

    <property>

        <name>dfs.replication</name>

        <value>1</value>

    </property>

    <property>

        <name>hadoop.tmp.dir</name>

        <value>/usr/local/Cellar/hadoop/namenode</value>

    </property>

</configuration>

vi yarn-site.xml

<configuration>

    <property>

        <name>yarn.nodemanager.aux-services</name>

        <value>mapreduce_shuffle</value>

    </property>

</configuration>

vi mapred-site.xml

<configuration>

    <property>

        <name>mapreduce.framework.name</name>

        <value>yarn</value>

    </property>

</configuration>

vi hdfs-site,xml

<configuration>

    <property>

        <name>hadoop.tmp.dir</name>

        <value>/usr/local/Cellar/hadoop/namenode</value>

    </property>

</configuration>

Install Apache Maven

$ brew install maven

$ vi ~/.bash_profile
export MAVEN_HOME=/usr/local/Cellar/maven/3.2.5
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$MAVEN_HOME/bin

Install Apache Spark

download the latest tarball for Spark  and unzip in /usr/local

$ tar xvf ~/Downloads/spark-1.3.1.tar.gz /usr/local
/user/local/spark-1.3.1 $ mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package

Once the spark installation is complete and successful, add the following

$ vi ~/.bash_profile
export SPARK_HOME=/usr/local/spark-1.3.1
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$MAVEN_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin
$ source ~/.bash_profile

Resolving Errors

Error1: Detected Maven Version: 3.2.5 is not in the allowed range 3.3.3.
Solution: Upgrade to higher version of maven

$brew update
$brew install maven

Error2: [error] missing or invalid dependency detected while loading class file ‘WebUI.class’.
Solution: Switch to Scala 2.11 version in the spark directory

$ cd /usr/local/spark-1.5.2
$ ./dev/change-scala-version.sh 2.11

Data Exploration

  • Start hadoop
$ HADOOP_HOME/sbin/start-dfs.sh
$ hadoop dfsadmin -safemode leave
$ hadoop fs -mkdir -p /user/<username>/datasets/

Download the restaurant_ratings

Open the file in Excel and save the data as restaurant_ratings.csv

Copy the data in hadoop that you want to parse.

$ hadoop fs -copyFromLocal ./restaurant_ratings.csv /user/<username>/datasets/
  • Start yarn
$ HADOOP_HOME/sbin/start-yarn.sh
  • Start spark interactive shell
$ spark-shell --master yarn-client

scala >

// load data
 val ratings = sc.textFile("/user/<username>/datasets/restaurant_ratings.csv")
// sample 10 records
 ratings.take(10)
// remove header
 def isHeader(line: String) = {if (line.contains("userID")) false else true}
// apply the remove header filter
 val rhRatings = ratings.filter(isHeader)
// define a template for each class
 case class restRating (userId: String, placeId: String, rating: Int)
// parse the data
 def parsed (line: String): restRating = {
 val words = line.split(",")
 val userId = words(0)
 val placeId = words(1)
 val rating = words(2).toInt
 restRating(userId, placeId, rating)
 }
// apply the parsed function to the RDD[String] to convert into RDD[restRatings]
 val parsedRecords = rhRatings.map(parsed)
// extract just the ratings
 val ratings = parsedRecords.map(rr => rr.rating)
// cache into memory
 ratings.cache()
// sample data
 val samples = parsedRecords.take(20)
// pick two columns
 val parsedRDD = parsedRecords.take(20).map(rr => (rr.userId, rr.rating))
import org.apache.spark.SparkContext._
// group by user Id _1 indicates column 1
 val grouped = parsedRDD.groupBy(rr => (rr._1))
// output results
 grouped.mapValues(x => x.size).foreach(println)
 // Summary Statistics
// Count by value
 val matchedCounts = parsedRecords.map(md => md.rating).countByValue()
// scala.collection.Map[Int,Long] = Map(0 -> 254, 2 -> 486, 1 -> 421)
// Convert them to sequence so we can sort them by column 1,2...N
val matchedCountSeq = matchedCounts.toSeq
// Sort by column 1
 matchedCountSeq.sortBy(_._1).foreach(println)
// sort by column 2
 matchedCountSeq.sortBy(_._2).foreach(println)
// descending order sorting
 matchedCountSeq.sortBy(_._2).reverse.foreach(println)
// statistics
 parsedRecords.map(md => md.rating).stats()