"Pig Latin / SQL Challenge" or "Analytic / Window Functions in PIG"
Here is a challenge for those who are new to Pig Latin.
We'll first download MovieLens movie ratings data. Our goal is to prepare a report showing :
- The best rated movie of each decade, beginning with 40's.
- The rating of this movie
- The average rating of movies from these decades.
We expect a resultset of six rows. Each row will represent a decade, beginning with 40's, up to 90's. More recent data is not existent in the dataset.
We'll skip movies which have less than 100 ratings. Otherwise we may report some movies which are rated as 5 stars by only few people.
We'll simply calculate the max rating per decade, and list the movie with this rating. There are no duplicate movies with exactly the same max rating, so we don't have to worry about that.
Decade | Avg Rating | Best Movie | Best Movie Rating |
1940 | 4.0237821309661413 | A Romantic Film Noir | 4.4567901234567901 |
1950 | 4.0871914327484486 | A Romantic Thriller | 4.3875598086124402 |
1960 | 4.0033519734014891 | A Drama | 4.2922374429223744 |
1970 | 3.9011845364893668 | A Harrison Ford Movie | 4.3584905660377358 |
1980 | 3.7892266577953785 | Another Harrison Ford Movie | 4.2523809523809524 |
1990 | 3.5279244930323222 | Nothing Spectacular | 4.4910714285714286 |
No spoilers here. Ok, maybe just some hints.
I'll first upload the dataset into PostgreSQL, and use a SQL to fetch the result, and then upload the file into Hadoop to try the same with Pig Latin.
Feel free to skip the PostgreSQL part. But I recommend to at least take a look, and compare the SQL solution with Pig Latin.
The dataset
Movielens dataset is available in Grouplens website.
In this challenge, we'll use MovieLens 100K Dataset. Download the zip file and extract two files : "u.data" and "u.item"
u.data is tab delimited file, whereas u.item is delimited with pipes (|)
u.data keeps the ratings, and contains four columns :
user_id (int), movie_id (int), rating (int), time (int)
u.item keeps the movie definitions. We need only the first tree columns:
movie_id (int), movie_name (string), moviedate (sting, 'DD-MMM-YYYY')
But the file actually contains more columns. We'll get rid of them.
While doing that, we'll also change the encoding to UTF-8 to be aligned with default PostgreSQL database encoding.
In this challenge, we'll use MovieLens 100K Dataset. Download the zip file and extract two files : "u.data" and "u.item"
u.data is tab delimited file, whereas u.item is delimited with pipes (|)
u.data keeps the ratings, and contains four columns :
user_id (int), movie_id (int), rating (int), time (int)
u.item keeps the movie definitions. We need only the first tree columns:
movie_id (int), movie_name (string), moviedate (sting, 'DD-MMM-YYYY')
But the file actually contains more columns. We'll get rid of them.
While doing that, we'll also change the encoding to UTF-8 to be aligned with default PostgreSQL database encoding.
cut -d"|" -f 1,2,3 u.item | iconv -f ISO-8859-1 -t UTF-8 > movie_def.txt
Now we have a new file named movie_def.txt with only 3 columns.
Load data into PostgreSQL
Let's see how many rows we have in these files.
oguz@dikanka:~/moviedata$ wc -l u.data
100000 u.data
oguz@dikanka:~/moviedata$ wc -l movie_def.txt
1682 movie_def.txt
100000 u.data
oguz@dikanka:~/moviedata$ wc -l movie_def.txt
1682 movie_def.txt
Finally, we have to make them accessible by other users, so that postgres user might reach them.
oguz@dikanka:~/moviedata$ chmod 644 movie_def.txt
oguz@dikanka:~/moviedata$ chmod 644 u.data
oguz@dikanka:~/moviedata$ chmod 644 u.data
Now let's create the tables in PostgreSQL, and load data into them.
oguz@dikanka:~/moviedata$ sudo su postgres
postgres@dikanka:/home/oguz$ psql
psql (10.10 (Ubuntu 10.10-0ubuntu0.18.04.1))
Type "help" for help.
postgres=# CREATE DATABASE movielens;
CREATE DATABASE
postgres=# \c movielens;
You are now connected to database "movielens" as user "postgres".
movielens=# CREATE TABLE ratings (user_id int,movie_id int,rating int,time int);
CREATE TABLE
movielens=# COPY ratings FROM '/home/oguz/moviedata/u.data';
COPY 100000
movielens=# CREATE TABLE movies (movie_id int,movie_name varchar(200),movie_date_str varchar(11));
CREATE TABLE
movielens=# COPY movies FROM '/home/oguz/moviedata/movie_def.txt' DELIMITER '|';
COPY 1682
movielens=# ALTER TABLE movies ADD COLUMN movie_date date;
ALTER TABLE
movielens=# UPDATE movies SET movie_date = cast(movie_date_str AS date) WHERE movie_date_str <> '';
UPDATE 1681
postgres@dikanka:/home/oguz$ psql
psql (10.10 (Ubuntu 10.10-0ubuntu0.18.04.1))
Type "help" for help.
postgres=# CREATE DATABASE movielens;
CREATE DATABASE
postgres=# \c movielens;
You are now connected to database "movielens" as user "postgres".
movielens=# CREATE TABLE ratings (user_id int,movie_id int,rating int,time int);
CREATE TABLE
movielens=# COPY ratings FROM '/home/oguz/moviedata/u.data';
COPY 100000
movielens=# CREATE TABLE movies (movie_id int,movie_name varchar(200),movie_date_str varchar(11));
CREATE TABLE
movielens=# COPY movies FROM '/home/oguz/moviedata/movie_def.txt' DELIMITER '|';
COPY 1682
movielens=# ALTER TABLE movies ADD COLUMN movie_date date;
ALTER TABLE
movielens=# UPDATE movies SET movie_date = cast(movie_date_str AS date) WHERE movie_date_str <> '';
UPDATE 1681
The SQL Solution
So here's the SQL solution.
movielens=# WITH mov AS (SELECT movie_id, movie_name, 10*extract(decade from movie_date) decade
from movies), rat as
(SELECT movie_id, avg(rating) rating from ratings
group by movie_id having count(*)>100
)
, dataset as
(select mov.decade, mov.movie_name, rat.rating,
MAX(rat.rating) OVER (partition by mov.decade) Best_Movie_Rating
from mov
JOIN rat
ON mov.movie_id=rat.movie_id
)
select decade, avg(rating) Avg_Rating,
max(case when rating=Best_Movie_Rating then movie_name else null end) as Best_Movie,
max(Best_Movie_Rating) as Best_Movie_Rating
from dataset where decade >= 1940
group by decade order by decade;
from movies), rat as
(SELECT movie_id, avg(rating) rating from ratings
group by movie_id having count(*)>100
)
, dataset as
(select mov.decade, mov.movie_name, rat.rating,
MAX(rat.rating) OVER (partition by mov.decade) Best_Movie_Rating
from mov
JOIN rat
ON mov.movie_id=rat.movie_id
)
select decade, avg(rating) Avg_Rating,
max(case when rating=Best_Movie_Rating then movie_name else null end) as Best_Movie,
max(Best_Movie_Rating) as Best_Movie_Rating
from dataset where decade >= 1940
group by decade order by decade;
Decade | Avg Rating | Best Movie | Best Movie Rating |
1940 | 4.0237821309661413 | Casablanca (1942) | 4.4567901234567901 |
1950 | 4.0871914327484486 | Rear Window (1954) | 4.3875598086124402 |
1960 | 4.0033519734014891 | To Kill a Mockingbird (1962) | 4.2922374429223744 |
1970 | 3.9011845364893668 | Star Wars (1977) | 4.3584905660377358 |
1980 | 3.7892266577953785 | Raiders of the Lost Ark (1981) | 4.2523809523809524 |
1990 | 3.5279244930323222 | Close Shave, A (1995) | 4.4910714285714286 |
Upload Files into Hadoop
I switch to a sepcific user named hdpuser for uploading files to Hadoop.
oguz@dikanka:~/moviedata$ su hdpuser
hdpuser@dikanka:~/moviedata$ hadoop fs -put u.data /user/hdpuser/movies
hdpuser@dikanka:~/moviedata$ hadoop fs -put movie_def.txt /user/hdpuser/movies
hdpuser@dikanka:~/moviedata$ hadoop fs -put u.data /user/hdpuser/movies
hdpuser@dikanka:~/moviedata$ hadoop fs -put movie_def.txt /user/hdpuser/movies
Once the files are uploaded, we can create our pig script.
oguz@dikanka:~/moviedata$vi pig_1
The PIG Solution
Below is the Pig Latin script I wrote. Take a look and compare it with yours.
ratings = LOAD '/user/hdpuser/movies/u.data' AS (userid:int, movieid:int, rating:int, time:int);
DESCRIBE ratings;
grp_ratings = GROUP ratings BY movieid;
avg_rat = FOREACH grp_ratings GENERATE group as movid, AVG(ratings.rating) as avgRating , COUNT(ratings.movieid) as cnt_rat;
avg_ratings = FILTER avg_rat BY cnt_rat >= 100;
movies = LOAD '/user/hdpuser/movies/u.item' USING PigStorage('|') AS (movieid:int, moviename:chararray, moviedate:chararray);
DESCRIBE movies;
mov = FOREACH movies GENERATE movieid, moviename, GetYear(ToDate(moviedate, 'DD-MMM-YYYY'));
mov1 = FILTER mov BY $2 >= 1940;
mov2 = FOREACH mov1 GENERATE $0 as movid, $1 as movnam, $2 as movyr, (int)FLOOR($2/10) * 10 as movdecade;
joined = JOIN avg_ratings BY movid, mov2 BY movid;
dataset = FOREACH joined GENERATE mov2::movdecade as movdecade, mov2::movnam as movnam, avg_ratings::avgRating as avgRating;
grp_dataset = GROUP dataset BY movdecade;
max_avg_ratings = FOREACH grp_dataset GENERATE $0 as movdecade, MAX($1.avgRating) as maxRating, AVG($1.avgRating) as avgRating;
joined1 = JOIN max_avg_ratings BY (movdecade,maxRating), dataset BY (movdecade,avgRating);
best_movies = FOREACH joined1 GENERATE max_avg_ratings::movdecade, max_avg_ratings::avgRating, dataset::movnam as bestMovie, max_avg_ratings::maxRating as bestMovieRating;
DUMP best_movies
grp_ratings = GROUP ratings BY movieid;
avg_rat = FOREACH grp_ratings GENERATE group as movid, AVG(ratings.rating) as avgRating , COUNT(ratings.movieid) as cnt_rat;
avg_ratings = FILTER avg_rat BY cnt_rat >= 100;
movies = LOAD '/user/hdpuser/movies/u.item' USING PigStorage('|') AS (movieid:int, moviename:chararray, moviedate:chararray);
DESCRIBE movies;
mov = FOREACH movies GENERATE movieid, moviename, GetYear(ToDate(moviedate, 'DD-MMM-YYYY'));
mov1 = FILTER mov BY $2 >= 1940;
mov2 = FOREACH mov1 GENERATE $0 as movid, $1 as movnam, $2 as movyr, (int)FLOOR($2/10) * 10 as movdecade;
joined = JOIN avg_ratings BY movid, mov2 BY movid;
dataset = FOREACH joined GENERATE mov2::movdecade as movdecade, mov2::movnam as movnam, avg_ratings::avgRating as avgRating;
grp_dataset = GROUP dataset BY movdecade;
max_avg_ratings = FOREACH grp_dataset GENERATE $0 as movdecade, MAX($1.avgRating) as maxRating, AVG($1.avgRating) as avgRating;
joined1 = JOIN max_avg_ratings BY (movdecade,maxRating), dataset BY (movdecade,avgRating);
best_movies = FOREACH joined1 GENERATE max_avg_ratings::movdecade, max_avg_ratings::avgRating, dataset::movnam as bestMovie, max_avg_ratings::maxRating as bestMovieRating;
DUMP best_movies
Let's run our script and see the results.
(1940,4.023782130966142,Casablanca (1942),4.45679012345679)
(1950,4.087191432748448,Rear Window (1954),4.3875598086124405)
(1960,4.003351973401489,To Kill a Mockingbird (1962),4.292237442922374)
(1970,3.9011845364893674,Star Wars (1977),4.3584905660377355)
(1980,3.7892266577953784,Raiders of the Lost Ark (1981),4.252380952380952)
(1990,3.5229170457861416,Close Shave, A (1995),4.491071428571429)
2019-09-11 15:15:54,713 [main] INFO org.apache.pig.Main - Pig script completed in 23 seconds and 571
milliseconds (23571 ms) milliseconds (23571 ms)
(1950,4.087191432748448,Rear Window (1954),4.3875598086124405)
(1960,4.003351973401489,To Kill a Mockingbird (1962),4.292237442922374)
(1970,3.9011845364893674,Star Wars (1977),4.3584905660377355)
(1980,3.7892266577953784,Raiders of the Lost Ark (1981),4.252380952380952)
(1990,3.5229170457861416,Close Shave, A (1995),4.491071428571429)
2019-09-11 15:15:54,713 [main] INFO org.apache.pig.Main - Pig script completed in 23 seconds and 571
milliseconds (23571 ms) milliseconds (23571 ms)
Results look similar. Small differences of ratings should be related to differences between data types in both environments, but we'll skip that for now.
As you may recognize, SQL solution uses analytic window functions whereas Pig joins the aggregated data set back to the detailed one.
We'll now provide an alternative solution for Pig, which contains a similar approach to window functions.
Find it hereunder.
The Alternative PIG Solution
As you may recognize, SQL solution uses analytic window functions whereas Pig joins the aggregated data set back to the detailed one.
We'll now provide an alternative solution for Pig, which contains a similar approach to window functions.
Find it hereunder.
REGISTER /usr/hdp/3.1.4.0-315/pig/piggybank.jar;
DEFINE rateOver org.apache.pig.piggybank.evaluation.Over('maxrat:double');
ratings = LOAD '/user/hdpuser/movies/u.data' AS (userid:int, movieid:int, rating:int, time:int);
grp_ratings = GROUP ratings BY movieid;
avg_rat = FOREACH grp_ratings GENERATE group as movid, AVG(ratings.rating) as avgRating , COUNT(ratings.movieid) as cnt_rat;
avg_ratings = FILTER avg_rat BY cnt_rat >= 100;
movies = LOAD '/user/hdpuser/movies/u.item' USING PigStorage('|') AS (movieid:int, moviename:chararray, moviedate:chararray);
mov = FOREACH movies GENERATE movieid, moviename, GetYear(ToDate(moviedate, 'DD-MMM-YYYY'));
mov1 = FILTER mov BY $2 >= 1940;
mov2 = FOREACH mov1 GENERATE $0 as movid, $1 as movnam, $2 as movyr, (int)FLOOR($2/10) * 10 as movdecade;
joined = JOIN avg_ratings BY movid, mov2 BY movid;
dataset = FOREACH joined GENERATE mov2::movid as movid, mov2::movdecade as movdecade, mov2::movnam as movnam, avg_ratings::avgRating as avgRating;
A = FOREACH (GROUP dataset BY movdecade) GENERATE flatten(org.apache.pig.piggybank.evaluation.Stitch(dataset, rateOver($1.avgRating, 'max(double)')));
B = FOREACH A GENERATE flatten(stitched::movdecade) as movdecade, flatten(stitched::avgRating) as avgRating, flatten((double)$4)
as maxRating, flatten(stitched::movnam) as movnam;
C = FOREACH B GENERATE movdecade, avgRating, maxRating, movnam, CASE WHEN avgRating==maxRating THEN movnam END as BestMovie;
D = FOREACH (GROUP C BY movdecade) GENERATE $0, AVG($1.avgRating) as DecadeAvgRating, MAX($1.maxRating) as DecadeMaxRating, MAX($1.BestMovie) as BestMovie;
DUMP D;
DEFINE rateOver org.apache.pig.piggybank.evaluation.Over('maxrat:double');
ratings = LOAD '/user/hdpuser/movies/u.data' AS (userid:int, movieid:int, rating:int, time:int);
grp_ratings = GROUP ratings BY movieid;
avg_rat = FOREACH grp_ratings GENERATE group as movid, AVG(ratings.rating) as avgRating , COUNT(ratings.movieid) as cnt_rat;
avg_ratings = FILTER avg_rat BY cnt_rat >= 100;
movies = LOAD '/user/hdpuser/movies/u.item' USING PigStorage('|') AS (movieid:int, moviename:chararray, moviedate:chararray);
mov = FOREACH movies GENERATE movieid, moviename, GetYear(ToDate(moviedate, 'DD-MMM-YYYY'));
mov1 = FILTER mov BY $2 >= 1940;
mov2 = FOREACH mov1 GENERATE $0 as movid, $1 as movnam, $2 as movyr, (int)FLOOR($2/10) * 10 as movdecade;
joined = JOIN avg_ratings BY movid, mov2 BY movid;
dataset = FOREACH joined GENERATE mov2::movid as movid, mov2::movdecade as movdecade, mov2::movnam as movnam, avg_ratings::avgRating as avgRating;
A = FOREACH (GROUP dataset BY movdecade) GENERATE flatten(org.apache.pig.piggybank.evaluation.Stitch(dataset, rateOver($1.avgRating, 'max(double)')));
B = FOREACH A GENERATE flatten(stitched::movdecade) as movdecade, flatten(stitched::avgRating) as avgRating, flatten((double)$4)
as maxRating, flatten(stitched::movnam) as movnam;
C = FOREACH B GENERATE movdecade, avgRating, maxRating, movnam, CASE WHEN avgRating==maxRating THEN movnam END as BestMovie;
D = FOREACH (GROUP C BY movdecade) GENERATE $0, AVG($1.avgRating) as DecadeAvgRating, MAX($1.maxRating) as DecadeMaxRating, MAX($1.BestMovie) as BestMovie;
DUMP D;
Red parts are different than the first solution. We register the library piggybank, and we use two functions of that.
Both functions are used at the row beginning with "A ="
over is generating an aggregate value based the GROUP BY clause, and then stitch joins the aggregated data back to the detailed dataset.
The result of "A" is still granular by movie, so each movie has one row. That's why we have to aggregate it again, when assigning to "D" .
So this is the end of the challenge. I hope you guys found this helpful.
Both functions are used at the row beginning with "A ="
over is generating an aggregate value based the GROUP BY clause, and then stitch joins the aggregated data back to the detailed dataset.
The result of "A" is still granular by movie, so each movie has one row. That's why we have to aggregate it again, when assigning to "D" .
So this is the end of the challenge. I hope you guys found this helpful.
Comments
Post a Comment