Problems on Apache spark rdd, DataFrame, SQL query using SQLContext with solution


Problem: 
1. Download data from below site.
https://datasets.imdbws.com/
2. Download the movies data title.ratings.tsv.gz and title.akas.tsv.gz
3. Find the top 50 rated movies with more than 100000 votes
4. Storage details
Columns: titleId,title,region,language,averageRating,numVotes
Store the result at below location: /home/cloudera/workspace/movies/<Method>/<formatname>
Store the result in following format.

a. Text file
Columns to be seperated with tab "\t"
Compression: Bzip2cOdec
b. Sequence file.
Compression: Bzip2cOdec
c. JSON file.
Compression: Bzip2cOdec
d. Parquet.
Compression:  uncompressed
e. ORC file.
f. Avro file.
Compression:  uncompressed

Use following methods:
Method 1: Use RDD
Method 2: Use DF
Method 3: Use SQL query.

Pre work:

hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -mkdir /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -put /home/cloudera/Downloads/movies/title.ratings.tsv.gz /home/cloudera/workspace/movies
[root@quickstart movies]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 1 items
-rw-r--r--   1 root supergroup    3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz

val titleRating = sc.textFile("file:///home/cloudera/Downloads/movies/title.ratings.tsv.gz")
or
val titleRating = sc.textFile("/home/cloudera/workspace/movies/title.ratings.tsv.gz")

scala> titleRating.take(2)
res24: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)


[root@quickstart Downloads]# hadoop fs -put /home/cloudera/Downloads/movies/title.akas.tsv.gz /home/cloudera/workspace/movies
[root@quickstart Downloads]# hadoop fs -ls /home/cloudera/workspace/movies/
Found 2 items
-rw-r--r--   1 root supergroup   52301827 2018-02-18 15:32 /home/cloudera/workspace/movies/title.akas.tsv.gz
-rw-r--r--   1 root supergroup    3984256 2018-02-18 15:26 /home/cloudera/workspace/movies/title.ratings.tsv.gz

val title = sc.textFile("file:///home/cloudera/Downloads/movies/title.akas.tsv.gz")
or
val title = sc.textFile("/home/cloudera/workspace/movies/title.akas.tsv.gz")

scala> title.take(2)
res22: Array[String] = Array(titleId ordering title region language types attributes isOriginalTitle, tt0000001 1 Carmencita - spanyol tánc HU \N imdbDisplay \N 0)

Solution:
-----------------------------------------------------------------------------------------
Method 1: Using RDD

scala> titleRating.take(2)
res25: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)

scala> title.take(2)
res26: Array[String] = Array(titleId ordering title region language types attributes isOriginalTitle, tt0000001 1
Carmencita - spanyol tánc HU \N imdbDisplay \N 0)

Output columns: titleId,title,region,language,averageRating,numVotes

val titlefirst = title.first

val titleMap = title.filter(e => e!=titlefirst).map(e => {
   val splitted = e.split("\t")
   val titleId = splitted(0).trim
   val title = splitted(2).trim
   val region = splitted(3).trim
   val language = splitted(4).trim
   (titleId, (title, region, language)) 
})

scala> titleRating.take(2)
res24: Array[String] = Array(tconst averageRating numVotes, tt0000001 5.8 1350)

val titleRatingFirst = titleRating.first
val titleRatingMap = titleRating.filter(e=>e!=titleRatingFirst).map(e=>{
    val splitted = e.split("\t")
    val titleId = splitted(0).trim
    val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
    val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
    (titleId, (averageRating, numVotes))
})

val highVotedMovies = titleRatingMap.filter(e=>{
   e._2._2>=100000
})

val titleRatingMapJoin = highVotedMovies.join(titleMap).distinct

scala> titleRatingMapJoin.lookup("tt0000001")
res32: Seq[((String, String), (String, String, String))] = ArrayBuffer(((5.8,1350),(Carmencita - spanyol tánc,HU,\N)), ((5.8,1350),(Карменсита,RU,\N)), ((5.8,1350),(Carmencita,US,\N)), ((5.8,1350),(Carmencita,\N,\N)))

scala> titleRatingMapJoin.take(2)
res33: Array[(String, ((String, String), (String, String, String)))] = Array((tt0000001,((5.8,1350),(Carmencita - spanyol tánc,HU,\N))), (tt0000001,((5.8,1350),(Карменсита,RU,\N))))


val titleRatingSorted = titleRatingMapJoin.map(e=> {
   val averageRating = e._2._1._1
   val titleId = e._1
   val title = e._2._2._1
   val region = e._2._2._2
   val language = e._2._2._3
   val numVotes = e._2._1._2
   (averageRating, (titleId,title,region,language,averageRating,numVotes))
}).sortByKey(false)

val top5MoviesAvgRating = titleRatingSorted.take(5)
val top5MovieMap = sc.parallelize(top5MoviesAvgRating)
scala> val top5MovieMap = sc.parallelize(top5MoviesAvgRating)
top5MovieMap: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, String))] = ParallelCollectionRDD[86] at parallelize at <console>:45

Saving the data in different file formats.

a. Text file
Columns to be seperated with tab "\t"
Compression: Bzip2cOdec

val tabbedMovies = top5MovieMap.map(e=>{
   val titleId = e._2._1
   val title = e._2._2
   val region = e._2._3
   val language = e._2._4
   val averageRating = e._2._5
   val numVotes = e._2._6
   titleId + "\t" + title  + "\t" + region + "\t" +  language + "\t" +  averageRating + "\t" +  numVotes
})

tabbedMovies.saveAsTextFile("/home/cloudera/workspace/movies/rdd/text", classOf[org.apache.hadoop.io.compress.BZip2Codec])
b. Sequence file.
Compression: Bzip2cOdec

val tabbedMoviesSeq = top5MovieMap.map(e=>{
   val titleId = e._2._1
   val title = e._2._2
   val region = e._2._3
   val language = e._2._4
   val averageRating = e._2._5
   val numVotes = e._2._6
   (titleId, (titleId + "\t" + title  + "\t" + region + "\t" +  language + "\t" +  averageRating + "\t" +  numVotes))
})

tabbedMoviesSeq.saveAsSequenceFile("/home/cloudera/workspace/movies/rdd/Sequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))

c. JSON file.
Compression: Bzip2cOdec
val tabbedMoviesJson = top5MovieMap.map(e=>{
   val titleId = e._2._1
   val title = e._2._2
   val region = e._2._3
   val language = e._2._4
   val averageRating = e._2._5
   val numVotes = e._2._6
   (titleId , title  , region ,  language ,  averageRating ,  numVotes)
}).toDF("titleId" , "title"  , "region" ,  "language" ,  "averageRating" ,  "numVotes")

tabbedMoviesJson.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/rdd/json", classOf[org.apache.hadoop.io.compress.BZip2Codec])

d. Parquet.
Compression: uncompressed
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")

val tabbedMoviesDF = top5MovieMap.map(e=>{
   val titleId = e._2._1
   val title = e._2._2
   val region = e._2._3
   val language = e._2._4
   val averageRating = e._2._5
   val numVotes = e._2._6
   (titleId , title  , region ,  language ,  averageRating ,  numVotes)
}).toDF("titleId" , "title"  , "region" ,  "language" ,  "averageRating" ,  "numVotes")

tabbedMoviesDF.write.parquet("/home/cloudera/workspace/movies/rdd/parquet")

e. ORC file.
tabbedMoviesDF.write.orc("/home/cloudera/workspace/movies/rdd/orc")

f. Avro file.
Compression: uncompressed

import com.databricks.spark.avro._
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
tabbedMoviesDF.write.avro("/home/cloudera/workspace/movies/rdd/avro")

-----------------------------------------------------------------------------------------
Method 2: Using RDD

File location:
/home/cloudera/workspace/movies/title.ratings.tsv.gz
/home/cloudera/workspace/movies/title.akas.tsv.gz

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val title = sqlContext.read.text("/home/cloudera/workspace/movies/title.ratings.tsv.gz")
scala> title.limit(5).show
+--------------------+
|               value|
+--------------------+
|tconst averageRat...|
|  tt0000001 5.8 1350|
|   tt0000002 6.5 157|
|   tt0000003 6.6 933|
|    tt0000004 6.4 93|
+--------------------+

val titlerdd = title.rdd

case class Title(titleId:String, averageRating:Float, numVotes:Int)

val titlefirst = titlerdd.first
val titleMapped = titlerdd.filter(e=> e!=titlefirst).map(e=> {
   val rowStr = e.getString(0)
   val splitted = rowStr.split("\t")
   val titleId = splitted(0).trim
   val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
   val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
   Title(titleId, averageRating, numVotes)
})

val titleMappedDF =  titleMapped.toDF

scala> titleMappedDF.limit(2).show
+---------+-------------+--------+
|  titleId|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.8|    1350|
|tt0000002|          6.5|     157|
+---------+-------------+--------+

val titledetails = sqlContext.read.text("/home/cloudera/workspace/movies/title.akas.tsv.gz")

val titledetailsrdd = titledetails.rdd

val titledetailsrddfirst = titledetailsrdd.first

case class TitleDetail(titleId:String, title:String, region:String, language:String)

val titledetailsMap = titledetailsrdd.filter(e=>e!=titledetailsrddfirst).map(e=>{
    val row = e.getString(0)
    val splitted = row.split("\t")
    val titleId = splitted(0).trim
    val title = splitted(2).trim
    val region = splitted(3).trim
    val language = splitted(4).trim
    TitleDetail(titleId, title, region, language)
})

val titledetailsDF =  titledetailsMap.toDF

scala> titledetailsDF.limit(5).show
+---------+--------------------+------+--------+
|  titleId|               title|region|language|
+---------+--------------------+------+--------+
|tt0000001|Carmencita - span...|    HU|      \N|
|tt0000001|          Карменсита|    RU|      \N|
|tt0000001|          Carmencita|    US|      \N|
|tt0000001|          Carmencita|    \N|      \N|
|tt0000002|Le clown et ses c...|    \N|      \N|

scala> titleMappedDF.join(titledetailsDF, titleMappedDF("titleId") === titledetailsDF("titleId"), "inner")
res99: org.apache.spark.sql.DataFrame = [titleId: string, averageRating: float, numVotes: int, titleId: string, title: string, region: string, language: string]

or
val movies = titleMappedDF.join(titledetailsDF, titleMappedDF("titleId").equalTo(titledetailsDF("titleId")), "inner").select(titleMappedDF("titleId"), titledetailsDF("title"),titledetailsDF("region"),titledetailsDF("language"),titleMappedDF("averageRating") , titleMappedDF("numVotes"))

scala> movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy($"averageRating".desc).limit(5).show
+---------+--------------------+------+--------+-------------+--------+       
|  titleId|               title|region|language|averageRating|numVotes|
+---------+--------------------+------+--------+-------------+--------+
|tt4283088|Battle of the Bas...|    US|      \N|          9.9|  152823|
|tt4283094| The Winds of Winter|    US|      \N|          9.9|  105085|
|tt0944947|       Igra prestola|    RS|      \N|          9.5| 1292492|
|tt0944947|     Game of Thrones|    \N|      \N|          9.5| 1292492|
|tt0944947|Samefo Karis Tama...|    GE|      \N|          9.5| 1292492|
+---------+--------------------+------+--------+-------------+--------+

movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy(movies("averageRating").desc).limit(5).show

movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy(col("averageRating").desc).limit(5).show

movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy(desc("averageRating")).limit(5).show

val top5movies =  movies.select("titleId", "title", "region", "language", "averageRating", "numVotes").where("numVotes>100000").orderBy($"averageRating".desc).limit(5)

a. Text file
Columns to be seperated with tab "\t"
Compression: Bzip2cOdec

val moviestext = top5movies.rdd.map(e=>{
   val titleId = e.getString(0)
   val title = e.getString(1)
   val region = e.getString(2)
   val language = e.getString(3)
   val averageRating = e.getFloat(4)
   val numVotes = e.getInt(5)
   (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes)
})

moviestext.saveAsTextFile("/home/cloudera/workspace/movies/df/text", classOf[org.apache.hadoop.io.compress.BZip2Codec])

b. Sequence file.
Compression: Bzip2cOdec

val moviesSequence = top5movies.rdd.map(e=>{
   val titleId = e.getString(0)
   val title = e.getString(1)
   val region = e.getString(2)
   val language = e.getString(3)
   val averageRating = e.getFloat(4)
   val numVotes = e.getInt(5)
   (titleId, (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes))
})

moviesSequence.saveAsSequenceFile("/home/cloudera/workspace/movies/df/Sequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))

c. JSON file.
Compression: Bzip2cOdec

top5movies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/df/json", classOf[org.apache.hadoop.io.compress.BZip2Codec])

d. Parquet.
Compression:  uncompressed

sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top5movies.write.parquet("/home/cloudera/workspace/movies/df/parquet")

e. ORC file.

top5movies.write.orc("/home/cloudera/workspace/movies/df/orc")

f. Avro file.
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top5movies.write.avro("/home/cloudera/workspace/movies/df/avro")
-----------------------------------------------------------------------------------------
Method 3: Use SQL query.

File location:
/home/cloudera/workspace/movies/title.ratings.tsv.gz
/home/cloudera/workspace/movies/title.akas.tsv.gz

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val title = sqlContext.read.text("/home/cloudera/workspace/movies/title.ratings.tsv.gz")
scala> title.limit(5).show
+--------------------+
|               value|
+--------------------+
|tconst averageRat...|
|  tt0000001 5.8 1350|
|   tt0000002 6.5 157|
|   tt0000003 6.6 933|
|    tt0000004 6.4 93|
+--------------------+

val titlerdd = title.rdd

case class Title(titleId:String, averageRating:Float, numVotes:Int)

val titlefirst = titlerdd.first
val titleMapped = titlerdd.filter(e=> e!=titlefirst).map(e=> {
   val rowStr = e.getString(0)
   val splitted = rowStr.split("\t")
   val titleId = splitted(0).trim
   val averageRating = scala.util.Try(splitted(1).trim.toFloat) getOrElse(0.0f)
   val numVotes = scala.util.Try(splitted(2).trim.toInt) getOrElse(0)
   Title(titleId, averageRating, numVotes)
})

val titleMappedDF =  titleMapped.toDF

scala> titleMappedDF.limit(2).show
+---------+-------------+--------+
|  titleId|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.8|    1350|
|tt0000002|          6.5|     157|
+---------+-------------+--------+

val titledetails = sqlContext.read.text("/home/cloudera/workspace/movies/title.akas.tsv.gz")

val titledetailsrdd = titledetails.rdd

val titledetailsrddfirst = titledetailsrdd.first

case class TitleDetail(titleId:String, title:String, region:String, language:String)

val titledetailsMap = titledetailsrdd.filter(e=>e!=titledetailsrddfirst).map(e=>{
    val row = e.getString(0)
    val splitted = row.split("\t")
    val titleId = splitted(0).trim
    val title = splitted(2).trim
    val region = splitted(3).trim
    val language = splitted(4).trim
    TitleDetail(titleId, title, region, language)
})

val titledetailsDF =  titledetailsMap.toDF

titleMappedDF.registerTempTable("title")

val sqlContext = new org.apache.sql.SQLContext(sc)
sqlContext.sql("select * from title limit 2").show

Using above methos throws org.apache.spark.sql.AnalysisException: Table not found: title;

As a work around use below method.

val sqlContext = titleMappedDF.sqlContext

scala> val sqlContext = titleMappedDF.sqlContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@38f221e1

scala> sqlContext.sql("select * from title limit 2").show
+---------+-------------+--------+
|  titleId|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.8|    1350|
|tt0000002|          6.5|     157|
+---------+-------------+--------+

Similarly register other table.
titledetailsDF.registerTempTable("titledetail")

scala> titledetailsDF.registerTempTable("titledetail")

scala> sqlContext.sql("select * from titledetail limit 2").show
+---------+--------------------+------+--------+
|  titleId|               title|region|language|
+---------+--------------------+------+--------+
|tt0000001|Carmencita - span...|    HU|      \N|
|tt0000001|          Карменсита|    RU|      \N|
+---------+--------------------+------+--------+

val top5movies = sqlContext.sql("select td.titleId, title, region, language, averageRating, numVotes  from titledetail td inner join title t on (t.titleId=td.titleId) where numVotes>100000 order by averageRating desc limit 5")

scala> top5movies.show
+---------+--------------------+------+--------+-------------+--------+       
|  titleId|               title|region|language|averageRating|numVotes|
+---------+--------------------+------+--------+-------------+--------+
|tt4283088|Battle of the Bas...|    US|      \N|          9.9|  152823|
|tt4283094| The Winds of Winter|    US|      \N|          9.9|  105085|
|tt0944947|       Igra prestola|    RS|      \N|          9.5| 1292492|
|tt0944947|     Game of Thrones|    \N|      \N|          9.5| 1292492|
|tt0944947|Samefo Karis Tama...|    GE|      \N|          9.5| 1292492|
+---------+--------------------+------+--------+-------------+--------+


a. Text file
Columns to be seperated with tab "\t"
Compression: Bzip2cOdec

val moviestext = top5movies.rdd.map(e=>{
   val titleId = e.getString(0)
   val title = e.getString(1)
   val region = e.getString(2)
   val language = e.getString(3)
   val averageRating = e.getFloat(4)
   val numVotes = e.getInt(5)
   (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes)
})

moviestext.saveAsTextFile("/home/cloudera/workspace/movies/sqlquery/text", classOf[org.apache.hadoop.io.compress.BZip2Codec])

b. Sequence file.
Compression: Bzip2cOdec

val moviesSequence = top5movies.rdd.map(e=>{
   val titleId = e.getString(0)
   val title = e.getString(1)
   val region = e.getString(2)
   val language = e.getString(3)
   val averageRating = e.getFloat(4)
   val numVotes = e.getInt(5)
   (titleId, (titleId + "\t" + title + "\t" + region + "\t" + language + "\t" + averageRating + "\t" + numVotes))
})

moviesSequence.saveAsSequenceFile("/home/cloudera/workspace/movies/sqlquery/Sequence", Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))

c. JSON file.
Compression: Bzip2cOdec

top5movies.toJSON.saveAsTextFile("/home/cloudera/workspace/movies/sqlquery/json", classOf[org.apache.hadoop.io.compress.BZip2Codec])

d. Parquet.
Compression:  uncompressed

sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
top5movies.write.parquet("/home/cloudera/workspace/movies/sqlquery/parquet")

e. ORC file.

top5movies.write.orc("/home/cloudera/workspace/movies/sqlquery/orc")

f. Avro file.
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
top5movies.write.avro("/home/cloudera/workspace/movies/sqlquery/avro")
-----------------------------------------------------------------------------------------

Comments

Popular posts from this blog

Conversion from one file format to other in Apache Spark

How to use (inner) JOIN and group by in apache spark SQL.

Problem: Find top rated movie using HIVE and store the result to HDFS