NiFi Revisited : Aggregate Movie Ratings Data To Find Top 10 Movies


This post is a sample of data aggregation in NiFi.

If you just started learning NiFi, check this blog post, which is a much more detailed sample than this one.

Our goal is :

  1. Fetch the movie ratings data
  2. Calculate average rating per movie
  3. Find the top 10 rated movies
  4. Export the top 10 list in both CSV and AVRO formats.

Download Sample Dataset

Movielens dataset is available in Grouplens website.
 
In this challenge, we'll use MovieLens 100K Dataset. Download the zip file and extract "u.data" file.
u.data is tab delimited file, which keeps the ratings, and contains four columns :

user_id (int), movie_id (int), rating (int), time (int)

Keep this file until we test our NiFi flow. 

GetFile

Create a GetFile processor, and point it to a local folder you created, to fetch input files.



Input Directory/home/oguz/Documents/Olric/File_Source/



UpdateAttribute to alter the file name


Add an UpdateAttribute processor with filename attribute set to "Top10Movies.csv"


Connect GetFile processor to UpdateAttribute.


Source & Target Schemas


We'll now create a Controller Service of type AvroSchemaRegistry, and create two schemas inside this registry :


InputSchema
{     "type": "record",
     "namespace": "movies",
     "name": "movie",
     "fields": [

       { "name": "user_id", "type": ["int", "null"] },
       { "name": "movie_id", "type": ["int", "null"] },
       { "name": "rating", "type": ["float", "null"] },
       { "name": "timestamp", "type": ["int", "null"] }
     ]


OutputSchema
{     "type": "record",
     "namespace": "movies",
     "name": "movie",
     "fields": [

       { "name": "movie_id", "type": ["int", "null"] },
       { "name": "rating", "type": ["float", "null"] }
     ]

Although rating is actually an integer in source files, we read it as a float, since  our average calculation will result in float values.

CSVReader


Add a controller service of type CSVReader. Configuration shall be as follows :




Schema Access Strategy Use Schema Name Property
Schema Registry AvroSchemaRegistry
Schema Name InputSchema
CSV Format Tab Delimited
Treat First Line as Header True
Ignore CSV Header Column Names True


CSVRecordSetWriter

Create a CSVRecordSetWriter as well and configure it as seen below.



Schema Write Strategy Set schema.name Attribute
Schema Access Strategy Use Schema Name Property
Schema Registry AvroSchemaRegistry
Schema Name OutputSchema
CSV Format Custom Format

QueryRecord to aggregate flowfiles


QueryRecord allows us to query incoming data with SQL as if we are working on a relatinal database. Add a QueryRecord processor and align with the following configuration.


Record Reader CSVReader
Record Writer CSVRecordSetWriter
summary select movie_id, rating from
   (select movie_id, avg(rating) as rating, count(user_id) as cnt
      FROM FLOWFILE GROUP BY movie_id
   )
where cnt >= 100
order by
   rating desc limit 10

Connect UpdateAttribute processor to QueryRecord processor.


Also, navigate to the Settings tab of QueryRecord processor, and choose Original and Failure relationships in the section "Automatically Terminate Relationships". Otherwise, these will remain as unhandled relationships.

PutHDFS to upload target files into Hadoop


Now add PutHDFS processor to upload our results into HDFS. PutHDFS processor is configured as below.


Hadoop Configuration Resources /etc/hadoop/3.1.4.0-315/0/hdfs-site.xml,/etc/hadoop/3.1.4.0-315/0/core-site.xml
Directory /user/oguz/Top10Movies

Hadoop Configuration Resources shall include full paths to hdfs-site and core-site XML files, which exist in any HDP node.

Directory is the target directory in HDFS. Ensure that the user you start NiFi has permissions to write to this folder. If the directory does not exist, NiFi will create it automatically.


Connect QueryRecord processor with PutHDFS processor, for summary connection type.




This flow will produce the result in CSV format and upload to HDFS.

But we are not done yet, since we also need the same result in AVRO format.

InferAvroSchema to populate schema from flowfiles


InferAvroSchema makes it easy to populate an AVRO schema, using the incoming files.

Add an InferAvroSchema processor. All attributes will be left as defaults. We'll just


Choose flowfile-attribute from Schema Output Destination drop-down.

Also set the Avro Record Name, as "avrorec".

Terminate the failure, original, and unsopperted content relationships automatically in InferAvroSchema processor, under Setings tab.

Connect QueryRecord processor with InferAvroSchema processor, for summary connection type.



ConvertCSVToAvro


Since we now have a populated Avro schema in hand, we can use it in a ConvertCSVToAvro processor.



We shall set the value of Record Schema as

${inferred.avro.schema}

Also under Settings tab, we have to choose relationships of type failure and incompatible for termination.


Connect  InferAvroSchema with ConvertCSVToAvro for success relationship type.

Our last connection will be from ConvertCSVToAvro to PutHDFS processor,  also for  success relationship type.

And we're done.


Test the Flow


Now that we're ready, we can copy u.data file to the folder where GetFile is listening.

And see what happens.


I see the two files above under Files view of Ambari. So it worked for me.


I can see that the most popular movie's id is 408.

Let's check which title that is. We can check that in file u.item, which is another file in the movielens dataset.

So, here it is :


Were you also expecting something else ?

Comments

  1. This is pretty cool. Something to think about is parallelizing the computation for handling big data sets.

    ReplyDelete
  2. For true fans of this thread I will address is a free online!

    ReplyDelete

Post a Comment