"Pig Latin / SQL Challenge" or "Analytic / Window Functions in PIG"


Here is a challenge for those who are new to Pig Latin.
We'll first download MovieLens movie ratings data. Our goal is to prepare a report showing :

  • The best rated movie of each decade, beginning with 40's.
  • The rating of this movie
  • The average rating of movies from these decades.

We expect a resultset of six rows. Each row will represent a decade, beginning with 40's, up to 90's. More recent data is not existent in the dataset. 

We'll skip movies which have less than 100 ratings. Otherwise we may report some movies which are rated as 5 stars by only few people.

We'll simply calculate the max rating per decade, and list the movie with this rating. There are no duplicate movies with exactly the same max rating, so we don't have to worry about that.
DecadeAvg RatingBest MovieBest Movie Rating
19404.0237821309661413A Romantic Film Noir4.4567901234567901
19504.0871914327484486A Romantic Thriller4.3875598086124402
19604.0033519734014891A Drama4.2922374429223744
19703.9011845364893668A Harrison Ford Movie4.3584905660377358
19803.7892266577953785Another Harrison Ford Movie4.2523809523809524
19903.5279244930323222Nothing Spectacular4.4910714285714286


No spoilers here. Ok, maybe just some hints.
I'll first upload the dataset into PostgreSQL, and use a SQL to fetch the result, and then upload the file into Hadoop to try the same with Pig Latin.
Feel free to skip the PostgreSQL part. But I recommend to at least take a look, and compare the SQL solution with Pig Latin.

 

 The dataset

Movielens dataset is available in Grouplens website.
 
In this challenge, we'll use MovieLens 100K Dataset. Download the zip file and extract two files : "u.data" and "u.item"
u.data is tab delimited file, whereas u.item is delimited with pipes (|)

u.data keeps the ratings, and contains four columns :
user_id (int), movie_id (int), rating (int), time (int)

u.item keeps the movie definitions. We need only the first tree columns:

movie_id (int), movie_name (string), moviedate (sting, 'DD-MMM-YYYY')

But the file actually contains more columns. We'll get rid of them.

While doing that, we'll also change the encoding to UTF-8 to be aligned with default PostgreSQL database encoding.
cut -d"|" -f 1,2,3 u.item | iconv -f ISO-8859-1 -t UTF-8 > movie_def.txt

Now we have a new file named movie_def.txt with only 3 columns.


Load data into PostgreSQL


Let's see how many rows we have in these files.

oguz@dikanka:~/moviedata$ wc -l u.data
100000 u.data
oguz@dikanka:~/moviedata$ wc -l movie_def.txt
1682 movie_def.txt


Finally, we have to make them accessible by other users, so that postgres user might reach them.

oguz@dikanka:~/moviedata$ chmod 644 movie_def.txt
oguz@dikanka:~/moviedata$ chmod 644 u.data

Now let's create the tables in PostgreSQL, and load data into them.

oguz@dikanka:~/moviedata$ sudo su postgres
postgres@dikanka:/home/oguz$ psql
psql (10.10 (Ubuntu 10.10-0ubuntu0.18.04.1))
Type "help" for help.
postgres=# CREATE DATABASE movielens;
CREATE DATABASE
postgres=# \c movielens;
You are now connected to database "movielens" as user "postgres".
movielens=# CREATE TABLE ratings (user_id int,movie_id int,rating int,time int);
CREATE TABLE

movielens=# COPY ratings FROM '/home/oguz/moviedata/u.data';
COPY 100000

movielens=# CREATE TABLE movies (movie_id int,movie_name varchar(200),movie_date_str varchar(11));
CREATE TABLE


movielens=# COPY movies FROM '/home/oguz/moviedata/movie_def.txt' DELIMITER '|';
COPY 1682

movielens=# ALTER TABLE movies ADD COLUMN movie_date date;
ALTER TABLE

movielens=# UPDATE movies SET movie_date = cast(movie_date_str AS date) WHERE movie_date_str <> '';
UPDATE 1681



The SQL Solution


So here's the SQL solution.

movielens=# WITH mov AS (SELECT movie_id, movie_name, 10*extract(decade from movie_date) decade
from movies), rat as
   (SELECT movie_id, avg(rating) rating from ratings
      group by movie_id having count(*)>100
   )
   , dataset as
   (select mov.decade, mov.movie_name, rat.rating,
      MAX(rat.rating) OVER (partition by mov.decade) Best_Movie_Rating
      from mov
      JOIN rat
      ON mov.movie_id=rat.movie_id
   )
select decade, avg(rating) Avg_Rating,
max(case when rating=Best_Movie_Rating then movie_name else null end) as Best_Movie,
max(Best_Movie_Rating) as Best_Movie_Rating
from dataset where decade >= 1940
group by decade order by decade;



DecadeAvg RatingBest MovieBest Movie Rating
19404.0237821309661413Casablanca (1942) 4.4567901234567901
19504.0871914327484486Rear Window (1954) 4.3875598086124402
19604.0033519734014891To Kill a Mockingbird (1962) 4.2922374429223744
19703.9011845364893668Star Wars (1977) 4.3584905660377358
19803.7892266577953785Raiders of the Lost Ark (1981)4.2523809523809524
19903.5279244930323222Close Shave, A (1995) 4.4910714285714286


Upload Files into Hadoop

I switch to a sepcific user named hdpuser for uploading files to Hadoop.

oguz@dikanka:~/moviedata$ su hdpuser
hdpuser@dikanka:~/moviedata$ hadoop fs -put u.data /user/hdpuser/movies
hdpuser@dikanka:~/moviedata$ hadoop fs -put movie_def.txt /user/hdpuser/movies
Once the files are uploaded, we can create our pig script.
oguz@dikanka:~/moviedata$vi pig_1

The PIG Solution

Below is the Pig Latin script I wrote. Take a look and compare it with yours.


ratings = LOAD '/user/hdpuser/movies/u.data' AS (userid:int, movieid:int, rating:int, time:int); DESCRIBE ratings;
grp_ratings = GROUP ratings BY movieid;
avg_rat = FOREACH grp_ratings GENERATE group as movid, AVG(ratings.rating) as avgRating , COUNT(ratings.movieid) as cnt_rat;
avg_ratings = FILTER avg_rat BY cnt_rat >= 100;



movies = LOAD '/user/hdpuser/movies/u.item' USING PigStorage('|') AS (movieid:int, moviename:chararray, moviedate:chararray);
DESCRIBE movies;
mov = FOREACH movies GENERATE movieid, moviename, GetYear(ToDate(moviedate, 'DD-MMM-YYYY'));
mov1 = FILTER mov BY $2 >= 1940;
mov2 = FOREACH mov1 GENERATE $0 as movid, $1 as movnam, $2 as movyr, (int)FLOOR($2/10) * 10 as movdecade;



joined = JOIN avg_ratings BY movid, mov2 BY movid;
dataset = FOREACH joined GENERATE mov2::movdecade as movdecade, mov2::movnam as movnam, avg_ratings::avgRating as avgRating;
grp_dataset = GROUP dataset BY movdecade;

max_avg_ratings = FOREACH grp_dataset GENERATE $0 as movdecade, MAX($1.avgRating) as maxRating, AVG($1.avgRating) as avgRating;
joined1 = JOIN max_avg_ratings BY (movdecade,maxRating), dataset BY (movdecade,avgRating);
best_movies = FOREACH joined1 GENERATE max_avg_ratings::movdecade, max_avg_ratings::avgRating, dataset::movnam as bestMovie, max_avg_ratings::maxRating as bestMovieRating;

DUMP best_movies
Let's run our script and see the results.
(1940,4.023782130966142,Casablanca (1942),4.45679012345679)
(1950,4.087191432748448,Rear Window (1954),4.3875598086124405)
(1960,4.003351973401489,To Kill a Mockingbird (1962),4.292237442922374)
(1970,3.9011845364893674,Star Wars (1977),4.3584905660377355)
(1980,3.7892266577953784,Raiders of the Lost Ark (1981),4.252380952380952)
(1990,3.5229170457861416,Close Shave, A (1995),4.491071428571429)
2019-09-11 15:15:54,713 [main] INFO org.apache.pig.Main - Pig script completed in 23 seconds and 571
milliseconds (23571 ms) milliseconds (23571 ms)
Results look similar. Small differences of ratings should be related to differences between data types in both environments, but we'll skip that for now.


The Alternative PIG Solution


As you may recognize, SQL solution uses analytic window functions whereas Pig joins the aggregated data set back to the detailed one.

We'll now provide an alternative solution for Pig, which contains a similar approach to window functions.

Find it hereunder.

REGISTER /usr/hdp/3.1.4.0-315/pig/piggybank.jar;
DEFINE rateOver org.apache.pig.piggybank.evaluation.Over('maxrat:double');
ratings = LOAD '/user/hdpuser/movies/u.data' AS (userid:int, movieid:int, rating:int, time:int);
grp_ratings = GROUP ratings BY movieid;
avg_rat = FOREACH grp_ratings GENERATE group as movid, AVG(ratings.rating) as avgRating , COUNT(ratings.movieid) as cnt_rat;
avg_ratings = FILTER avg_rat BY cnt_rat >= 100;


movies = LOAD '/user/hdpuser/movies/u.item' USING PigStorage('|') AS (movieid:int, moviename:chararray, moviedate:chararray);
mov = FOREACH movies GENERATE movieid, moviename, GetYear(ToDate(moviedate, 'DD-MMM-YYYY'));
mov1 = FILTER mov BY $2 >= 1940;
mov2 = FOREACH mov1 GENERATE $0 as movid, $1 as movnam, $2 as movyr, (int)FLOOR($2/10) * 10 as movdecade;


joined = JOIN avg_ratings BY movid, mov2 BY movid;
dataset = FOREACH joined GENERATE mov2::movid as movid, mov2::movdecade as movdecade, mov2::movnam as movnam, avg_ratings::avgRating as avgRating;

A = FOREACH (GROUP dataset BY movdecade) GENERATE flatten(org.apache.pig.piggybank.evaluation.Stitch(dataset, rateOver($1.avgRating, 'max(double)')));
B = FOREACH A GENERATE flatten(stitched::movdecade) as movdecade, flatten(stitched::avgRating) as avgRating, flatten((double)$4)
as maxRating, flatten(stitched::movnam) as movnam;
C = FOREACH B GENERATE movdecade, avgRating, maxRating, movnam, CASE WHEN avgRating==maxRating THEN movnam END as BestMovie;
D = FOREACH (GROUP C BY movdecade) GENERATE $0, AVG($1.avgRating) as DecadeAvgRating, MAX($1.maxRating) as DecadeMaxRating, MAX($1.BestMovie) as BestMovie;

DUMP D;
Red parts are different than the first solution. We register the library piggybank, and we use two functions of that.
Both functions are used at the row beginning with "A ="
over is generating an aggregate value based the GROUP BY clause, and then stitch joins the aggregated data back to the detailed dataset.
The result of "A" is still granular by movie, so each movie has one row. That's why we have to aggregate it again, when assigning to "D" .


So this is the end of the challenge. I hope you guys found this helpful.

No comments:

Post a Comment