Introduction
Hadoop as a platform enables us to store and process vast amounts of data. Storage capacity and processing power are directly related to the size of our Hadoop cluster and scaling it up is a simple as adding new nodes to the cluster. Amazon Elastic MapReduce service gives us an opportunity to easily and cost-effectively provision as much or as little capacity as we need to successfully process our data. We are also going to leverage Amazon Simple Storage Service (Amazon S3) service to store our input and output data.
In this series of articles I’m going to show how easily we can build and run custom Hadoop MapReduce job using Amazon Elastic MapReduce web service. The first part will demonstrate the basic use of EMR for running custom Hadoop Map Reduce job.
Setting up Hadoop cluster on Amazon ElasticMR
For the purpose of this article we’re going to use Amazon Elastic MapReduce service to provide us a small one-node Hadoop cluster.
To get us started we’re going to need to have Amazon Web Services (AWS) account so we can create new Job flow to execute our custom Hadoop job. Using Elastic MapReduce Command Line Interface (EMR CLI), creation of our cluster is simple as issuing following command:
$ ruby elastic-mapreduce --create --alive Created job flow j-2JDYPP5IFRQ0L
This command will return job flow ID which we will use when working with clusters from EMR CLI. Using returned job flow ID we can check the status of our cluster at any time by running this command:
$ ruby elastic-mapreduce -j j-2JDYPP5IFRQ0L --list j-2JDYPP5IFRQ0L STARTING xxxxxxxxxxx.compute-1.amazonaws.com Development Job Flow (requires manual termination)
For detailed information about our job flow we can use the following command:
$ ruby elastic-mapreduce -j j-2JDYPP5IFRQ0L --describe { "JobFlows":[ { "LogUri":"s3n:\/\/xxxxxxx\/logs\/", "Name":"Development Job Flow (requires manual termination)", "BootstrapActions":[ ], "SupportedProducts":[ ], "ExecutionStatusDetail":{ "EndDateTime":null, "CreationDateTime":1327869971.0, "LastStateChangeReason":"Waiting for steps to run", "State":"WAITING", "StartDateTime":1327870299.0, "ReadyDateTime":1327870300.0 }, "Steps":[ ], "AmiVersion":"latest", "JobFlowId":"j-2JDYPP5IFRQ0L", "Instances":{ "Ec2KeyName":"bigdata-emr", "Ec2SubnetId":null, "InstanceCount":1, "NormalizedInstanceHours":1, "Placement":{ "AvailabilityZone":"us-east-1a" }, "KeepJobFlowAliveWhenNoSteps":true, "SlaveInstanceType":null, "MasterInstanceType":"m1.small", "MasterPublicDnsName":"xxxxxxxxxxxx.compute-1.amazonaws.com", "MasterInstanceId":"i-3124f354", "InstanceGroups":[ { "EndDateTime":null, "Name":"Master Instance Group", "InstanceRole":"MASTER", "BidPrice":null, "CreationDateTime":1327869971.0, "LaunchGroup":null, "LastStateChangeReason":"", "InstanceGroupId":"ig-2VJ74F6NXZI47", "State":"RUNNING", "Market":"ON_DEMAND", "InstanceType":"m1.small", "StartDateTime":1327870208.0, "InstanceRunningCount":1, "ReadyDateTime":1327870298.0, "InstanceRequestCount":1 } ], "TerminationProtected":false, "HadoopVersion":"0.20.205" } } ] }
Once when our job flow reaches WAITING state we’re ready to submit our Hadoop job.
Note: Detailed instructions on how to install, setup, and use Amazon EMR CLI please can be found in Amazon Elastic MapReduce – Getting Started Guide.
Creating custom Hadoop job for ElasticMR
The next step for us is to create our custom Hadoop map-reduce job and execute it on our EMR cloud.
For the purpose of this article we have developed a custom map-reduce job for image face detection using Open Computer Vision Library (OpenCV) and it’s Java interface – JavaCV.
In my next article, I will be explaining the details about Map-Reduce job, setting up the Hadoop cluster to work with OpenCV libraries, and reading and writing our data from Amazon S3. Here we will be presenting only basic steps to get a custom Hadoop job running on the EMR cloud.
This snippet below shows our Hadoop job class:
public class S3FaceDetectionJob extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { if (arg0.length != 1) { throw new IOException("Incorrect number of arguments for job. Expected: outputPath"); } Job job = new Job(getConf()); job.setJarByClass(S3FaceDetectionJob.class); job.setMapperClass(S3FaceDetectionMapper.class); job.setNumReduceTasks(0); DistributedCache.createSymlink(getConf()); job.setInputFormatClass(S3ObjectInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(arg0[0])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new S3FaceDetectionJob(), args); System.exit(res); } }
It is important to underline the following line:
DistributedCache.createSymlink(getConf());
1
|
As our job has multiple dependencies on third-party libraries (OpenCV, JavaCV, JavaCPP,..) where some of them are Java libraries while others are native Linux binaries decision was made to not include libraries into Hadoop job jar and instead we will pass all libraries using Hadoop Distributed cache when submitting the job. This method may seem more complex than packaging all dependencies inside job jar, but I have found this to be the only method to use native Linux libraries inside Hadoop map-reduce job without installing libraries on all nodes in Hadoop cluster.
|
As we’re using Amazon S3 for reading and writing our data in our job, we will also use S3 to upload our job jar and all its dependencies. With s3cmd command we can easily upload all files we need to S3.
Here’s the layout of our S3 bucket after we have successfully uploaded our job and all it’s dependencies to S3:
$ s3cmd ls s3://abh.bigdata/pocs/bin/ 2012-01-12 23:25 471593 s3://abh.bigdata/pocs/bin/face-recognition-1.0-SNAPSHOT.jar $ s3cmd ls s3://abh.bigdata/pocs/lib/ DIR s3://abh.bigdata/pocs/lib/native/ 2012-01-29 21:52 0 s3://abh.bigdata/pocs/lib/ 2012-01-29 21:52 3753801 s3://abh.bigdata/pocs/lib/aws-java-sdk-1.2.15.jar 2012-01-29 21:52 76906 s3://abh.bigdata/pocs/lib/javacpp.jar 2012-01-29 21:52 1202842 s3://abh.bigdata/pocs/lib/javacv-linux-x86.jar 2012-01-29 21:52 866704 s3://abh.bigdata/pocs/lib/javacv.jar
Running the job
Once when we have EMR cluster up and running, input data, job, and necessary dependencies uploaded to Amazon S3 we’re ready to run our job. The following article describes how to run various types of EMR job flows (hive, pig, streaming, or custom jar) – How to Create and Debug an Amazon Elastic MapReduce Job Flow.
In our case, we are not able to use the proposed method of running custom Hadoop job jar because we have to use Hadoop distributed cache functionality when running our job which is not supported in the latest version of EMR CLI. Because of that, we will run our job directly from our EMR cluster after we login to it using SSH.
To establish an SSH connection with our cluster we need to have RSA private key (.pem) downloaded from the AWS management console. With the .pem file available, we connect to the Hadoop master node using “MasterPublicDnsName” found in job flow definition using the following command:
$ ruby elastic-mapreduce -j j-2JDYPP5IFRQ0L --ssh --key-pair-file emr-bigdata.pem
Once connected, we have to setup s3cmd tool and download our Hadoop job jar:
[email protected]:~$ s3cmd --configure Enter new values or accept defaults in brackets with Enter. Refer to user manual for detailed description of all options. Access key and Secret key are your identifiers for Amazon S3 Access Key: xxxxxxxxxxxxxxxxxxx Secret Key: xxxxxxxxxxxxxxxxxxxxxxxxx Encryption password is used to protect your files from reading by unauthorized persons while in transfer to S3 Encryption password: xxxxxx Path to GPG program [/usr/bin/gpg]: When using secure HTTPS protocol all communication with Amazon S3 servers is protected from 3rd party eavesdropping. This method is slower than plain HTTP and can't be used if you're behind a proxy Use HTTPS protocol [No]: On some networks all internet access must go through a HTTP proxy. Try setting it here if you can't conect to S3 directly HTTP Proxy server name: New settings: Access Key: xxxxxxxxxxxxxxxxx Secret Key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx Encryption password: xxxxxxxxxxx Path to GPG program: /usr/bin/gpg Use HTTPS protocol: False HTTP Proxy server name: HTTP Proxy server port: 0 Test access with supplied credentials? [Y/n] Please wait... Success. Your access key and secret key worked fine Now verifying that encryption works... Success. Encryption and decryption worked fine Save settings? [y/N] y Configuration saved to '/home/hadoop/.s3cfg'
$ s3cmd get s3://abh.bigdata/pocs/bin/face-recognition-1.0-SNAPSHOT.jar s3://abh.bigdata/pocs/bin/face-recognition-1.0-SNAPSHOT.jar -> ./face-recognition-1.0-SNAPSHOT.jar [1 of 1] 471593 of 471593 100% in 1s 394.16 kB/s done
To simplify running our job, we can create small-batch script:
#!/bin/bash # S3 prefix used for selecting job input data s3_prefix=users/10 # Job output directory out_dir=s3://abh.bigdata/pocs/out/$s3_prefix # We have to clear output directory s3cmd del --recursive $out_dir # Run face detection job passing in all job's dependencies hadoop jar face-recognition-1.0-SNAPSHOT.jar com.atlantbh.bigdata.pocs.fd.hadoop.S3FaceDetectionJob -libjars s3://abh.bigdata/pocs/lib/javacv.jar,s3://abh.big data/pocs/lib/javacv-linux-x86.jar,s3://abh.bigdata/pocs/lib/javacpp.jar -files s3://abh.bigdata/pocs/lib/native/cv2.so#cv2.so,s3://abh.bigdata/pocs/lib/nativ e/libopencv_calib3d.so.2.3.1#libopencv_calib3d.so.2.3,s3://abh.bigdata/pocs/lib/native/libopencv_contrib.so.2.3.1#libopencv_contrib.so.2.3,s3://abh.bigdata/po cs/lib/native/libopencv_core.so.2.3.1#libopencv_core.so,s3://abh.bigdata/pocs/lib/native/libopencv_features2d.so.2.3.1#libopencv_features2d.so.2.3,s3://abh.bi gdata/pocs/lib/native/libopencv_flann.so.2.3.1#libopencv_flann.so.2.3,s3://abh.bigdata/pocs/lib/native/libopencv_gpu.so.2.3.1#libopencv_gpu.so,s3://abh.bigdat a/pocs/lib/native/libopencv_highgui.so.2.3.1#libopencv_highgui.so,s3://abh.bigdata/pocs/lib/native/libopencv_imgproc.so.2.3.1#libopencv_imgproc.so,s3://abh.bi gdata/pocs/lib/native/libopencv_legacy.so.2.3.1#libopencv_legacy.so,s3://abh.bigdata/pocs/lib/native/libopencv_ml.so.2.3.1#libopencv_ml.so,s3://abh.bigdata/po cs/lib/native/libopencv_objdetect.so.2.3.1#libopencv_objdetect.so,s3://abh.bigdata/pocs/lib/native/libopencv_ts.so.2.3.1#libopencv_ts.so,s3://abh.bigdata/pocs /lib/native/libopencv_video.so.2.3.1#libopencv_video.so -D s3.bucket.name=xxxxxxx_dev -D s3
Following is the job’s output logged inside console:
12/01/29 23:26:48 INFO mapred.JobClient: Default number of map tasks: null 12/01/29 23:26:48 INFO mapred.JobClient: Setting default number of map tasks based on cluster size to : 4 12/01/29 23:26:48 INFO mapred.JobClient: Default number of reduce tasks: 0 12/01/29 23:26:48 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/cv2.so#cv2.so' for reading 12/01/29 23:26:49 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_calib3d.so.2.3.1#libopencv_calib3d.so.2.3' for reading 12/01/29 23:26:49 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_contrib.so.2.3.1#libopencv_contrib.so.2.3' for reading 12/01/29 23:26:49 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_core.so.2.3.1#libopencv_core.so' for reading 12/01/29 23:26:50 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_features2d.so.2.3.1#libopencv_features2d.so.2.3' for reading 12/01/29 23:26:51 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_flann.so.2.3.1#libopencv_flann.so.2.3' for reading 12/01/29 23:26:51 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_gpu.so.2.3.1#libopencv_gpu.so' for reading 12/01/29 23:26:52 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_highgui.so.2.3.1#libopencv_highgui.so' for reading 12/01/29 23:26:52 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_imgproc.so.2.3.1#libopencv_imgproc.so' for reading 12/01/29 23:26:53 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_legacy.so.2.3.1#libopencv_legacy.so' for reading 12/01/29 23:26:53 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_ml.so.2.3.1#libopencv_ml.so' for reading 12/01/29 23:26:53 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_objdetect.so.2.3.1#libopencv_objdetect.so' for reading 12/01/29 23:26:53 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_ts.so.2.3.1#libopencv_ts.so' for reading 12/01/29 23:26:54 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/native/libopencv_video.so.2.3.1#libopencv_video.so' for reading 12/01/29 23:26:54 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/javacv.jar' for reading 12/01/29 23:26:55 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/javacv-linux-x86.jar' for reading 12/01/29 23:26:55 INFO s3native.NativeS3FileSystem: Opening 's3://abh.bigdata/pocs/lib/javacpp.jar' for reading 12/01/29 23:26:56 WARN S3.S3InputFormat: Using s3.input.numOfKeys value to determine input splits 12/01/29 23:26:58 INFO S3.S3InputFormat: Number of input splits=4 12/01/29 23:26:58 INFO mapred.JobClient: Running job: job_201201292050_0001 12/01/29 23:26:59 INFO mapred.JobClient: map 0% reduce 0% 12/01/29 23:27:59 INFO mapred.JobClient: map 25% reduce 0% 12/01/29 23:28:05 INFO mapred.JobClient: map 50% reduce 0% 12/01/29 23:28:29 INFO mapred.JobClient: map 75% reduce 0% 12/01/29 23:28:32 INFO mapred.JobClient: map 100% reduce 0% 12/01/29 23:28:37 INFO mapred.JobClient: Job complete: job_201201292050_0001 12/01/29 23:28:37 INFO mapred.JobClient: Counters: 21 12/01/29 23:28:37 INFO mapred.JobClient: Job Counters 12/01/29 23:28:37 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=152951 12/01/29 23:28:37 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/01/29 23:28:37 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/01/29 23:28:37 INFO mapred.JobClient: Launched map tasks=4 12/01/29 23:28:37 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/01/29 23:28:37 INFO mapred.JobClient: File Output Format Counters 12/01/29 23:28:37 INFO mapred.JobClient: Bytes Written=9254 12/01/29 23:28:37 INFO mapred.JobClient: FileSystemCounters 12/01/29 23:28:37 INFO mapred.JobClient: HDFS_BYTES_READ=703 12/01/29 23:28:37 INFO mapred.JobClient: FILE_BYTES_WRITTEN=129232 12/01/29 23:28:37 INFO mapred.JobClient: S3_BYTES_WRITTEN=9254 12/01/29 23:28:37 INFO mapred.JobClient: File Input Format Counters 12/01/29 23:28:37 INFO mapred.JobClient: Bytes Read=0 12/01/29 23:28:37 INFO mapred.JobClient: com.atlantbh.bigdata.pocs.fd.hadoop.S3FaceDetectionJob$S3FaceDetectionMapper$FaceDetectionCounters 12/01/29 23:28:37 INFO mapred.JobClient: SUCCESS=738 12/01/29 23:28:37 INFO mapred.JobClient: FACES_DETECTED=53 12/01/29 23:28:37 INFO mapred.JobClient: FAILED=151 12/01/29 23:28:37 INFO mapred.JobClient: Map-Reduce Framework 12/01/29 23:28:37 INFO mapred.JobClient: Map input records=889 12/01/29 23:28:37 INFO mapred.JobClient: Physical memory (bytes) snapshot=250867712 12/01/29 23:28:37 INFO mapred.JobClient: Spilled Records=0 12/01/29 23:28:37 INFO mapred.JobClient: CPU time spent (ms)=25860 12/01/29 23:28:37 INFO mapred.JobClient: Total committed heap usage (bytes)=65011712 12/01/29 23:28:37 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2384003072 12/01/29 23:28:37 INFO mapred.JobClient: Map output records=53 12/01/29 23:28:37 INFO mapred.JobClient: SPLIT_RAW_BYTES=703
The result of our job is set of text files (one file per each job mapper) containing information about images (S3 bucket, S3 key, size…) where the face has been detected including bounding box of the detected face:
[bucketName=xxxxxx, key=users/10/avatar_10e5213a-756a-416c-bc0d-267807fe97ad.jpg, size=6339, ETag=dcca74b9ce159bde603b5f75f8e49542, storageClass=STANDARD] (36, 25; 56, 56) [bucketName=xxxxxx, key=users/10/avatar_25cddcbf-dc27-4217-851b-a309e9332342.jpg, size=4020, ETag=2bc1a423c5cfa5002f0e0c11eafb4da8, storageClass=STANDARD] (14, 10; 22, 22) [bucketName=xxxxxx, key=users/10/avatar_266d6a7b-4670-4cbd-bf77-ffeb1e17b44b.jpg, size=4020
Once when we’re done playing with our EMR cluster, we have to terminate it:
$ ruby elastic-mapreduce -j j-2JDYPP5IFRQ0L --terminate Terminated job flow j-2JDYPP5IFRQ0L
Summary
This article demonstrated some of the basic steps we have to follow to get a custom map-reduce job run on Amazon Elastic Map Reduce.