Introduction

Hadoop as a platform enables us to store and process vast amounts of data. Storage capacity and processing power are directly related to the size of our Hadoop cluster and scaling it up is a simple as adding new nodes to the cluster.  Amazon Elastic MapReduce service gives us an opportunity to easily and cost-effectively provision as much or as little capacity as we need to successfully process our data. We are also going to leverage Amazon Simple Storage Service (Amazon S3) service to store our input and output data.

In this series of articles I’m going to show how easily we can build and run custom Hadoop MapReduce job using Amazon Elastic MapReduce web service. The first part will demonstrate the basic use of EMR for running custom Hadoop Map Reduce job.

Setting up Hadoop cluster on Amazon ElasticMR

For the purpose of this article we’re going to use Amazon Elastic MapReduce service to provide us a small one-node Hadoop cluster.

To get us started we’re going to need to have Amazon Web Services (AWS) account so we can create new Job flow to execute our custom Hadoop job. Using Elastic MapReduce Command Line Interface (EMR CLI), creation of our cluster is simple as issuing following command:

$ ruby elastic-mapreduce --create --alive
Created job flow j-2JDYPP5IFRQ0L

This command will return job flow ID which we will use when working with clusters from EMR CLI. Using returned job flow ID we can check the status of our cluster at any time by running this command:

$ ruby elastic-mapreduce -j j-2JDYPP5IFRQ0L --list
j-2JDYPP5IFRQ0L     STARTING       xxxxxxxxxxx.compute-1.amazonaws.com         Development Job Flow (requires manual termination)

For detailed information about our job flow we can use the following command:

$ ruby elastic-mapreduce -j j-2JDYPP5IFRQ0L --describe
{
   "JobFlows":[
      {
         "LogUri":"s3n:\/\/xxxxxxx\/logs\/",
         "Name":"Development Job Flow (requires manual termination)",
         "BootstrapActions":[

         ],
         "SupportedProducts":[

         ],
         "ExecutionStatusDetail":{
            "EndDateTime":null,
            "CreationDateTime":1327869971.0,
            "LastStateChangeReason":"Waiting for steps to run",
            "State":"WAITING",
            "StartDateTime":1327870299.0,
            "ReadyDateTime":1327870300.0
         },
         "Steps":[

         ],
         "AmiVersion":"latest",
         "JobFlowId":"j-2JDYPP5IFRQ0L",
         "Instances":{
            "Ec2KeyName":"bigdata-emr",
            "Ec2SubnetId":null,
            "InstanceCount":1,
            "NormalizedInstanceHours":1,
            "Placement":{
               "AvailabilityZone":"us-east-1a"
            },
            "KeepJobFlowAliveWhenNoSteps":true,
            "SlaveInstanceType":null,
            "MasterInstanceType":"m1.small",
            "MasterPublicDnsName":"xxxxxxxxxxxx.compute-1.amazonaws.com",
            "MasterInstanceId":"i-3124f354",
            "InstanceGroups":[
               {
                  "EndDateTime":null,
                  "Name":"Master Instance Group",
                  "InstanceRole":"MASTER",
                  "BidPrice":null,
                  "CreationDateTime":1327869971.0,
                  "LaunchGroup":null,
                  "LastStateChangeReason":"",
                  "InstanceGroupId":"ig-2VJ74F6NXZI47",
                  "State":"RUNNING",
                  "Market":"ON_DEMAND",
                  "InstanceType":"m1.small",
                  "StartDateTime":1327870208.0,
                  "InstanceRunningCount":1,
                  "ReadyDateTime":1327870298.0,
                  "InstanceRequestCount":1
               }
            ],
            "TerminationProtected":false,
            "HadoopVersion":"0.20.205"
         }
      }
   ]
}

 

Once when our job flow reaches WAITING state we’re ready to submit our Hadoop job.

Note: Detailed instructions on how to install,  setup, and use Amazon EMR CLI please can be found in Amazon Elastic MapReduce – Getting Started Guide.

Creating custom Hadoop job for ElasticMR

The next step for us is to create our custom Hadoop map-reduce job and execute it on our EMR cloud.

For the purpose of this article we have developed a custom map-reduce job for image face detection using Open Computer Vision Library (OpenCV) and it’s Java interface – JavaCV.

In my next article, I will be explaining the details about Map-Reduce job, setting up the Hadoop cluster to work with OpenCV libraries, and reading and writing our data from Amazon S3. Here we will be presenting only basic steps to get a custom Hadoop job running on the EMR cloud.

This snippet below shows our Hadoop job class:

public class S3FaceDetectionJob extends Configured implements Tool {

    @Override
    public int run(String[] arg0) throws Exception {

        if (arg0.length != 1) {
            throw new IOException("Incorrect number of arguments for job. Expected: outputPath");
        }

        Job job = new Job(getConf());
        job.setJarByClass(S3FaceDetectionJob.class);

        job.setMapperClass(S3FaceDetectionMapper.class);
        job.setNumReduceTasks(0);

        DistributedCache.createSymlink(getConf());

        job.setInputFormatClass(S3ObjectInputFormat.class);

        FileOutputFormat.setOutputPath(job, new Path(arg0[0]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new S3FaceDetectionJob(), args);
        System.exit(res);
    }
}

 

It is important to underline the following line:

DistributedCache.createSymlink(getConf());
1
As our job has multiple dependencies on third-party libraries (OpenCV, JavaCV, JavaCPP,..) where some of them are Java libraries while others are native Linux binaries decision was made to not include libraries into Hadoop job jar and instead we will pass all libraries using Hadoop Distributed cache when submitting the job. This method may seem more complex than packaging all dependencies inside job jar, but I have found this to be the only method to use native Linux libraries inside Hadoop map-reduce job without installing libraries on all nodes in Hadoop cluster.

As we’re using Amazon S3 for reading and writing our data in our job, we will also use S3 to upload our job jar and all its dependencies. With s3cmd command we can easily upload all files we need to S3.

Here’s the layout of our S3 bucket after we have successfully uploaded our job and all it’s dependencies to S3:

$ s3cmd ls s3://abh.bigdata/pocs/bin/
2012-01-12 23:25    471593   s3://abh.bigdata/pocs/bin/face-recognition-1.0-SNAPSHOT.jar
$ s3cmd ls s3://abh.bigdata/pocs/lib/
DIR   s3://abh.bigdata/pocs/lib/native/
2012-01-29 21:52         0   s3://abh.bigdata/pocs/lib/
2012-01-29 21:52   3753801   s3://abh.bigdata/pocs/lib/aws-java-sdk-1.2.15.jar
2012-01-29 21:52     76906   s3://abh.bigdata/pocs/lib/javacpp.jar
2012-01-29 21:52   1202842   s3://abh.bigdata/pocs/lib/javacv-linux-x86.jar
2012-01-29 21:52    866704   s3://abh.bigdata/pocs/lib/javacv.jar

Running the job

Once when we have EMR cluster up and running, input data, job, and necessary dependencies uploaded to Amazon S3 we’re ready to run our job. The following article describes how to run various types of EMR job flows (hive, pig, streaming, or custom jar) – How to Create and Debug an Amazon Elastic MapReduce Job Flow.

In our case, we are not able to use the proposed method of running custom Hadoop job jar because we have to use Hadoop distributed cache functionality when running our job which is not supported in the latest version of EMR CLI. Because of that, we will run our job directly from our EMR cluster after we login to it using SSH.

To establish an SSH connection with our cluster we need to have RSA private key (.pem) downloaded from the AWS management console. With the .pem file available, we connect to the Hadoop master node using “MasterPublicDnsName” found in job flow definition using the following command:

$ ruby elastic-mapreduce -j j-2JDYPP5IFRQ0L --ssh --key-pair-file emr-bigdata.pem

Once connected, we have to setup s3cmd tool and download our Hadoop job jar:

[email protected]:~$ s3cmd --configure

Enter new values or accept defaults in brackets with Enter.
Refer to user manual for detailed description of all options.

Access key and Secret key are your identifiers for Amazon S3
Access Key: xxxxxxxxxxxxxxxxxxx
Secret Key: xxxxxxxxxxxxxxxxxxxxxxxxx

Encryption password is used to protect your files from reading
by unauthorized persons while in transfer to S3
Encryption password: xxxxxx
Path to GPG program [/usr/bin/gpg]:

When using secure HTTPS protocol all communication with Amazon S3
servers is protected from 3rd party eavesdropping. This method is
slower than plain HTTP and can't be used if you're behind a proxy
Use HTTPS protocol [No]:

On some networks all internet access must go through a HTTP proxy.
Try setting it here if you can't conect to S3 directly
HTTP Proxy server name:

New settings:
Access Key: xxxxxxxxxxxxxxxxx
Secret Key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Encryption password: xxxxxxxxxxx
Path to GPG program: /usr/bin/gpg
Use HTTPS protocol: False
HTTP Proxy server name:
HTTP Proxy server port: 0

Test access with supplied credentials? [Y/n]
Please wait...
Success. Your access key and secret key worked fine

Now verifying that encryption works...
Success. Encryption and decryption worked fine

Save settings? [y/N] y
Configuration saved to '/home/hadoop/.s3cfg'

 

$ s3cmd get s3://abh.bigdata/pocs/bin/face-recognition-1.0-SNAPSHOT.jar
s3://abh.bigdata/pocs/bin/face-recognition-1.0-SNAPSHOT.jar -> ./face-recognition-1.0-SNAPSHOT.jar  [1 of 1]
471593 of 471593   100% in    1s   394.16 kB/s  done

To simplify running our job, we can create small-batch script:

#!/bin/bash

# S3 prefix used for selecting job input data
s3_prefix=users/10

# Job output directory
out_dir=s3://abh.bigdata/pocs/out/$s3_prefix

# We have to clear output directory
s3cmd del --recursive $out_dir

# Run face detection job passing in all job's dependencies
hadoop jar face-recognition-1.0-SNAPSHOT.jar com.atlantbh.bigdata.pocs.fd.hadoop.S3FaceDetectionJob -libjars s3://abh.bigdata/pocs/lib/javacv.jar,s3://abh.big
data/pocs/lib/javacv-linux-x86.jar,s3://abh.bigdata/pocs/lib/javacpp.jar -files s3://abh.bigdata/pocs/lib/native/cv2.so#cv2.so,s3://abh.bigdata/pocs/lib/nativ
e/libopencv_calib3d.so.2.3.1#libopencv_calib3d.so.2.3,s3://abh.bigdata/pocs/lib/native/libopencv_contrib.so.2.3.1#libopencv_contrib.so.2.3,s3://abh.bigdata/po
cs/lib/native/libopencv_core.so.2.3.1#libopencv_core.so,s3://abh.bigdata/pocs/lib/native/libopencv_features2d.so.2.3.1#libopencv_features2d.so.2.3,s3://abh.bi
gdata/pocs/lib/native/libopencv_flann.so.2.3.1#libopencv_flann.so.2.3,s3://abh.bigdata/pocs/lib/native/libopencv_gpu.so.2.3.1#libopencv_gpu.so,s3://abh.bigdat
a/pocs/lib/native/libopencv_highgui.so.2.3.1#libopencv_highgui.so,s3://abh.bigdata/pocs/lib/native/libopencv_imgproc.so.2.3.1#libopencv_imgproc.so,s3://abh.bi
gdata/pocs/lib/native/libopencv_legacy.so.2.3.1#libopencv_legacy.so,s3://abh.bigdata/pocs/lib/native/libopencv_ml.so.2.3.1#libopencv_ml.so,s3://abh.bigdata/po
cs/lib/native/libopencv_objdetect.so.2.3.1#libopencv_objdetect.so,s3://abh.bigdata/pocs/lib/native/libopencv_ts.so.2.3.1#libopencv_ts.so,s3://abh.bigdata/pocs
/lib/native/libopencv_video.so.2.3.1#libopencv_video.so -D s3.bucket.name=xxxxxxx_dev -D s3

Following is the job’s output logged inside console:

12/01/29 23:26:48 INFO mapred.JobClient: Default number of map tasks: null
12/01/29 23:26:48 INFO mapred.JobClient: Setting default number of map tasks based on cluster size to : 4
12/01/29 23:26:48 INFO mapred.JobClient: Default number of reduce tasks: 0
12/01/29 23:26:48 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/cv2.so#cv2.so' for reading
12/01/29 23:26:49 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_calib3d.so.2.3.1#libopencv_calib3d.so.2.3' for reading
12/01/29 23:26:49 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_contrib.so.2.3.1#libopencv_contrib.so.2.3' for reading
12/01/29 23:26:49 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_core.so.2.3.1#libopencv_core.so' for reading
12/01/29 23:26:50 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_features2d.so.2.3.1#libopencv_features2d.so.2.3' for reading
12/01/29 23:26:51 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_flann.so.2.3.1#libopencv_flann.so.2.3' for reading
12/01/29 23:26:51 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_gpu.so.2.3.1#libopencv_gpu.so' for reading
12/01/29 23:26:52 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_highgui.so.2.3.1#libopencv_highgui.so' for reading
12/01/29 23:26:52 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_imgproc.so.2.3.1#libopencv_imgproc.so' for reading
12/01/29 23:26:53 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_legacy.so.2.3.1#libopencv_legacy.so' for reading
12/01/29 23:26:53 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_ml.so.2.3.1#libopencv_ml.so' for reading
12/01/29 23:26:53 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_objdetect.so.2.3.1#libopencv_objdetect.so' for reading
12/01/29 23:26:53 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_ts.so.2.3.1#libopencv_ts.so' for reading
12/01/29 23:26:54 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_video.so.2.3.1#libopencv_video.so' for reading
12/01/29 23:26:54 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/javacv.jar' for reading
12/01/29 23:26:55 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/javacv-linux-x86.jar' for reading
12/01/29 23:26:55 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/javacpp.jar' for reading
12/01/29 23:26:56 WARN S3.S3InputFormat: Using s3.input.numOfKeys value to determine input splits
12/01/29 23:26:58 INFO S3.S3InputFormat: Number of input splits=4
12/01/29 23:26:58 INFO mapred.JobClient: Running job: job_201201292050_0001
12/01/29 23:26:59 INFO mapred.JobClient:  map 0% reduce 0%
12/01/29 23:27:59 INFO mapred.JobClient:  map 25% reduce 0%
12/01/29 23:28:05 INFO mapred.JobClient:  map 50% reduce 0%
12/01/29 23:28:29 INFO mapred.JobClient:  map 75% reduce 0%
12/01/29 23:28:32 INFO mapred.JobClient:  map 100% reduce 0%
12/01/29 23:28:37 INFO mapred.JobClient: Job complete: job_201201292050_0001
12/01/29 23:28:37 INFO mapred.JobClient: Counters: 21
12/01/29 23:28:37 INFO mapred.JobClient:   Job Counters
12/01/29 23:28:37 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=152951
12/01/29 23:28:37 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
12/01/29 23:28:37 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
12/01/29 23:28:37 INFO mapred.JobClient:     Launched map tasks=4
12/01/29 23:28:37 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
12/01/29 23:28:37 INFO mapred.JobClient:   File Output Format Counters
12/01/29 23:28:37 INFO mapred.JobClient:     Bytes Written=9254
12/01/29 23:28:37 INFO mapred.JobClient:   FileSystemCounters
12/01/29 23:28:37 INFO mapred.JobClient:     HDFS_BYTES_READ=703
12/01/29 23:28:37 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=129232
12/01/29 23:28:37 INFO mapred.JobClient:     S3_BYTES_WRITTEN=9254
12/01/29 23:28:37 INFO mapred.JobClient:   File Input Format Counters
12/01/29 23:28:37 INFO mapred.JobClient:     Bytes Read=0
12/01/29 23:28:37 INFO mapred.JobClient:   com.atlantbh.bigdata.pocs.fd.hadoop.S3FaceDetectionJob$S3FaceDetectionMapper$FaceDetectionCounters
12/01/29 23:28:37 INFO mapred.JobClient:     SUCCESS=738
12/01/29 23:28:37 INFO mapred.JobClient:     FACES_DETECTED=53
12/01/29 23:28:37 INFO mapred.JobClient:     FAILED=151
12/01/29 23:28:37 INFO mapred.JobClient:   Map-Reduce Framework
12/01/29 23:28:37 INFO mapred.JobClient:     Map input records=889
12/01/29 23:28:37 INFO mapred.JobClient:     Physical memory (bytes) snapshot=250867712
12/01/29 23:28:37 INFO mapred.JobClient:     Spilled Records=0
12/01/29 23:28:37 INFO mapred.JobClient:     CPU time spent (ms)=25860
12/01/29 23:28:37 INFO mapred.JobClient:     Total committed heap usage (bytes)=65011712
12/01/29 23:28:37 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=2384003072
12/01/29 23:28:37 INFO mapred.JobClient:     Map output records=53
12/01/29 23:28:37 INFO mapred.JobClient:     SPLIT_RAW_BYTES=703

The result of our job is set of text files (one file per each job mapper) containing information about images (S3 bucket, S3 key, size…) where the face has been detected including bounding box of the detected face:

[bucketName=xxxxxx, key=users/10/avatar_10e5213a-756a-416c-bc0d-267807fe97ad.jpg, size=6339, ETag=dcca74b9ce159bde603b5f75f8e49542, storageClass=STANDARD] (36, 25; 56, 56)
[bucketName=xxxxxx, key=users/10/avatar_25cddcbf-dc27-4217-851b-a309e9332342.jpg, size=4020, ETag=2bc1a423c5cfa5002f0e0c11eafb4da8, storageClass=STANDARD] (14, 10; 22, 22)
[bucketName=xxxxxx, key=users/10/avatar_266d6a7b-4670-4cbd-bf77-ffeb1e17b44b.jpg, size=4020

Once when we’re done playing with our EMR cluster, we have to terminate it:

$ ruby elastic-mapreduce -j j-2JDYPP5IFRQ0L --terminate
Terminated job flow j-2JDYPP5IFRQ0L

Summary

This article demonstrated some of the basic steps we have to follow to get a custom map-reduce job run on Amazon Elastic Map Reduce.

Leave a Reply