Get Started with Nifi : Partitioning CSV files based on column value


This tutorial demonstrates how incoming data file can be divided into multiple files based on a column value, using Apache Nifi.



The Nifi Flow will :
  • Fetch files from a local folder
  • Divide the content rows into several files using PartitionRecord processor
  • Modify the file name to include the column value which is used for partitioning
  • And upload the output files to HDFS


Here we go.

Download Sample Dataset

Movielens dataset is available in Grouplens website.
 
In this challenge, we'll use MovieLens 100K Dataset. Download the zip file and extract "u.data" file.
u.data is tab delimited file, which keeps the ratings, and contains four columns :

user_id (int), movie_id (int), rating (int), time (int)

Keep this file until we test our NiFi flow.

Create a Process Group


Click "New Process Group" to create a group named "grp_Partition_Sample", or, whatever you like, actually.


Before developing the flow, we need to create a local folder named "File_Source". I created this in the following path :
/home/oguz/Documents/Olric/File_Source/

Now let's double click the process group. We have a clean sheet now.

GetFile


To add our first processor, we'll drag the processor icon from the toolbar and drop it somewhere in the canvas.



A pop-up window will be shown. All we have to do is find the GetFile processor from the list and click Apply to add it.


Double click the GetFile processor to edit its attributes. The only thing we'll change here is the "Input Directory" attribute. We'll put the full path of our File_Source folder here :
/home/oguz/Documents/Olric/File_Source/


Now we have a processor which will fetch the files from File_Source folder. It checks this folder continuously, and fetches all files from this folder to process in the nifi flow. Since the attribute "Keep Source File" is set to "false" by default, it will *NOT* copy the file. Instead, the folder will be emptied. Unless we don't process and put the contents to somewhere else, the file is gone.

Of course, our processor is not started, so it's not active yet. We'll start the processor later. Keep in mind that once started, a processor can't be configured anymore. We have to stop processors to configure them.


Create CSV Schema For Source Files


Our source files are CSV files,and NiFi needs to know more about them. Which columns are included, what are their data types, which column separator is used etc., all attributes needed, are defined as a Schema in Ni-Fi.

We do this by creating a controller service of type AvroSchemaRegistry

A controller service is a shared configuration, which you can use in multiple processors.

A controller service of type AvroSchemaRegistry is used to define schema definitions, used for CSV or Avro files.

So let's go ahead and create it. Check the Operate panel on left side. If no processors are selected, it should show the name of the process group. If you see a processor's name here, click on an empty area so that it will show the process group here. Then click the "Configuration" button in the panel.


The pop-up window has two tabs. Click the plus sign under "Controller Services" tab and add an "AvroSchemaRegistry" service. Click the configure button of the newly added service.


In the pop-up window, navigate to Properties tab, and click the plus sign to add a new schema.
This controller service can contain multiple schemas, but we'll create one schema for now.
Name it as "InputSchema", and paste the following text block as the value.

{     "type": "record",
     "namespace": "movies",
     "name": "movie",
     "fields": [

       { "name": "user_id", "type": ["int", "null"] },
       { "name": "movie_id", "type": ["int", "null"] },
       { "name": "rating", "type": ["int", "null"] },
       { "name": "timestamp", "type": ["int", "null"] }
     ]
}

If needed, you can enlarge the pop-up editor pane from its below-right corner. The editor pane is closed by hitting the Enter key. Use instead Shift + Enter while editing. (And Shift + Tab instead of Tab)

Create CSV Schema for Target Files

As mentioned above, the same "AvroSchemaRegistry"controller service can host multiple schemas. Let's define our target schema as well.

Click the configure button of the service once more. Add another schema and name it as OutputSchema. Let's assume that we don't need the last column, timestamp, in the output. Then our output schema will be :

{     "type": "record",
     "namespace": "movies",
     "name": "movie",
     "fields": [

       { "name": "user_id", "type": ["int", "null"] },
       { "name": "movie_id", "type": ["int", "null"] },
       { "name": "rating", "type": ["int", "null"] }
     ]
}

Once we've added both schemas, we can click the Enable icon to enable our AvroSchemaRegistry.

Controller services, just like processors, can only be configured when disabled.


Here is how our AvroSchemaRegistry shall look like :


CSV Reader and CSV Writer

Our NiFi flow will split the incoming flowfile into multiple flowfiles, based on movie_id column. This is done with a PartitionRecord processor.

To do that, it needs two controller services, a CSVReader and a CSVRecordSetWriter. Let's add two controller services.

Once added, configure the CSV Reader as follows :


Schema Access Strategy Use Schema Name Property
Schema Registry AvroSchemaRegistry
Schema Name InputSchema
CSV Format Tab Delimited
Treat First Line as Header True
Ignore CSV Header Column Names True

So we declared that our source files are tab delimited CSV files, containing a header line. But we'll ignore the field names from this header line. Instead, we'll use our column names from the InputSchema.
And here's how we configure our CSVRecordSetWriter.


Schema Write Strategy Set schema.name Attribute
Schema Access Strategy Use Schema Name Property
Schema Registry AvroSchemaRegistry
Schema Name OutputSchema
CSV Format Custom Format


So, our output CSV will be comma delimited file, and will not contain the timestamp column of the tab-delimited source files.

Make sure you enabled all three controller services before you close the configuration window.


PartitionRecord to Split Files

Now it's time to divide our flowfiles, based on movie_id column.

Add a PartitionRecord processor. Configure it as shown below.


Record Reader CSVReader
Record Writer CSVRecordSetWriter
movie_id /movie_id


Next thing we'll do is, building a connection between these two processors. Hover on the GetFile processor. An arrow will appear on top of it. Drag this arrow icon and drop it on the PartitionRecord processor.

A pop-up window will show up.



This connection is by default built for "success" relationship type,which is ok.

This means, only the successful flow is following this path. Any types of failures may also be handled, but this is not in scope of this post.




Now we have a flow with two steps. The GetFile processor has a red square sign, which means this processor has no issues, but it's also not active.
The PartitionRecord processor has a yellow warning sign. We see the warnings if we hover on this sign.

It says, relationship "success" is invalid because it's not terminated yet. Fair enough, because we didn't finalize our flow yet.

Let's take a look at the connection as well. It is a "success" connection, and there are no flowfiles on the queue.

If a processor is not active, while the previous processors are, then we can see the flowfiles queued in a connection. You can right-click on the connection and list the files in the queue. You can even view their contents; so it's really useful when you need to debug your flow.

UpdateAttribute to alter file names


Add a third processor of type UpdateAttribute. We'll add one property, filename, and set it to :

MovieRatings_${movie_id}.csv


This will ensure that we have movie_id value added to target file names.

Connect the UpdateAttribute processor to the end of our flow. Choose success as connection type.


PutHDFS to upload target files into Hadoop


Our fourth and last processor will be oftype PutHDFS. You may prefer a PutFile processor instead, if you don't have access to an HDFS environment right now.

Our PutHDFS processor is configured as below.


Hadoop Configuration Resources /etc/hadoop/3.1.4.0-315/0/hdfs-site.xml,/etc/hadoop/3.1.4.0-315/0/core-site.xml
Directory /user/oguz/Movie_Rating_Details

Hadoop Configuration Resources shall include full paths to hdfs-site and core-site XML files, which exist in any HDP node.

Directory is the target directory in HDFS. Ensure that the user you start NiFi has permissions to write to this folder. If the directory does not exist, NiFi will create it automatically.

Also, navigate to the Settings tab of PutHDFS processor, and choose Success and Failure relationships in the section "Automatically Terminate Relationships". Otherwise, these will remain as unhandled relationships.

Test the Flow


Click on any empty place on the canvas and choose Start. This will start our NiFi flow. Here's what it should look like :


Now copy the u.data file to the source file folder we created. (/home/oguz/Documents/Olric/File_Source/) 
It will disappear in few seconds. Then you'll -hopefully- see the output files uploaded to HDFS.


Debug the Flow


If things go wrong, I recommend to stop all processors and start them one by one. It gives you the chance to take a look at all flowfiles waiting in the queues.

Right -click menu of connections give you the option to review the files in the queue.

Also, you can empty the queue to reset the flow.

Good luck!

Comments