A paper on identifying and skipping processed data – an effort to minimize cloud resource wasting in Hadoop when processing data from HDFS.

Problem

The main problem that I’m trying to describe and resolve in this paper is identifying and skipping processed data on Hadoop. In turn, this helped us resolve additional problems such as:

  • processing only the data that wasn’t processed before a job on Hadoop was killed
  • processing only the set of new data that was generated by some other job or added by the user
  • since we are saving any data that fails to process into separate files (and one part of that data can fail multiple times) and sending those files as input data for the job the next time we run it we can use this functionality to make sure that we are never going to process the same data twice in one job run.

Input splits

When a job is created Hadoop will create splits of the input files and will submit N mappers, (where N is the number of splits) to process that data. The block size of the HDFS stored files is used to create splits. If we have two input files, 10MB each and they are created with a block size of 2MB, Hadoop will submit 10 mappers to process the input data. Each mapper is assigned exactly one input split.
File input splits are defined by the input file name, offset, and length.

Solution

Now that we understand how input splits work it’s quite simple to come to the conclusion that one way to skip processed data is to actually skip processed input splits.

Hadoop always creates identical input splits from one file, unless we have changed the file contents. This works for us since we are never appending to input/output data files. Keep in mind that this could be a show stopper for some applications.

As we already said, each file input split is defined by the file name, offset (start), and length. These three attributes, combined together can be used as a unique split identifier. The idea is to take this unique split identifier, and persist it somewhere as soon as the mapper has finished processing that split.

The next time we are running the job, we will filter out all file input splits that we have previously processed (by looking up the persisted input split identifiers) and will return only those input splits that weren’t processed before.

By doing this we are effectively skipping previously processed data. Thankfully all of this is pretty simple to implement as we will see in the implementation section.

Implementation

public static List<InputSplit> filterProcessedSplits(List<InputSplit> inputSplits,

JobContext job) throws IOException {

List<String> processedSplitIds = new ArrayList<String>();

FileSystem hdfs = FileSystem.get(job.getConfiguration());

Path outputDir = FileOutputFormat.getOutputPath(job);

Path splitsDir = new Path(outputDir, META_PATH + "/splits");

FileStatus splitFiles[] = hdfs.listStatus(splitsDir);

&nbsp;

if (splitFiles == null) {

return inputSplits;

}

for (FileStatus s : splitFiles) {

processedSplitIds.add(s.getPath().getName());

}

List<InputSplit> unprocessedSplits = new ArrayList<InputSplit>();

for (InputSplit split : inputSplits) {

String splitId = MD5Hash.generate(split.toString());

if (!processedSplitIds.contains(splitId)) {

unprocessedSplits.add(split);

}

}

return unprocessedSplits;

}

public static String getInputSplitId(InputSplit split) {

return MD5Hash.generate(split.toString());

}

public static void saveInputSplitId(Context context, String splitId)

throws IOException, InterruptedException {

Path workingDir = FileOutputFormat.getWorkOutputPath(context);

FileSystem hdfs = FileSystem.get(context.getConfiguration());

Path path = new Path(workingDir, ".meta/splits/" + splitId);

hdfs.create(path).close();

}

 

These methods can be either implemented directly in some base InputFormat/Mapper class that you will use for the rest of your custom input formats and mappers, or you can separate them into a completely different utility class. We decided to implement them in a separate utility class (InputSplitUtils). The first thing that we need to do is to persist input split identifiers as soon as the task processing the data is successfully finished. We don’t want to persist the input split identifier in the case that the task fails or gets killed for some reason. In our current implementation we are using hdfs to persist the split identifiers by simply storing empty files with the identifier as the file name in a metadata directory. There are of course other ways to store the identifiers (databases, zookeepers, etc.).

Hadoop provides a very convenient way to store files that will be persisted only if the task completes successfully by using task side-effect files.

Task side-effect files

In a nutshell task side effect files are files that are written to the output directory only if the task completes successfully. Side effect files are saved in a special directory that can be obtained from the method getWorkOutputPath() in FileOutputFormat. This enables us to write the split identifiers to hdfs in the setup() method of the mapper and forget about them. If the mapper finishes successfully the split identifier will be persisted successfully, else it will be discarded.

Mapper

The only change that we need to do on the mapper is to persist the split identifier by using getSplitIdentifier() to retrieve the split id and then persist it with saveInputSplitId() in setup() method of the mapper.

@Override

protected void setup(...)

throws IOException, InterruptedException {

  String splitId =

  getInputSplitId(context.getInputSplit());

  saveInputSplitId(context, splitId);

}

File input format

Before mappers are submitted, the file input format takes care of creating the input splits. This is where we are going to filter any processed input splits by overriding the getSplits() method.

@Override

public List<InputSplit>

getSplits(JobContext job) throws IOException {

return filterProcessedSplits(

super.getSplits(), job);

}

Disadvantages

There are some downsides of using this method of identifying and skipping processed data that could be show-stoppers depending on your application.

  • if you are appending data to your input data files this approach might not work for you unless your application is fine with processing some data (usually the last block of the file) multiple times. File appending is usually not used on HDFS so this is not a problem in most cases
  • using reducers will make this method slightly complicated as you will have to discard any persisted input split identifiers if any reducers are killed during execution (and later process those input splits again).

One Comment

  • Jagan'z says:

    Hi,
    If possible can you provide complete source code for this?? I am unable to write a mapper and Reducer for wordcount program using this.

Leave a Reply