Download the input data file from here

Extract and keep the data file at “/usr/demodata/”

Compile and Run Following program.

Note : Assuming that hadoop 1.1.2 is installed and running properly.

 

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool {

	public static class MapClass extends MapReduceBase implements Mapper<Object,Object,Object,Object>{

		@Override
		public void map(Object key, Object value,
				OutputCollector<Object, Object> output, Reporter reporter)
				throws IOException {
			// TODO Auto-generated method stub
			output.collect(key, value);

		} 

	}

	public static class Reduce extends MapReduceBase  implements Reducer<Object,Object,Object,Object>{

		@Override
		public void reduce(Object key, Iterator<Object> values,
				OutputCollector<Object, Object> output, Reporter reporter)
				throws IOException {
			// TODO Auto-generated method stub
			String csv = "";
			while(values.hasNext()){
				if(csv.length() > 0) csv+=",";
				csv += values.next().toString();
			}
			output.collect(key,new Text(csv));
		}

	}

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
              int res = ToolRunner.run(new Configuration(), new MyJob(),args);
              System.exit(res);
	}

	public int run(String[] args) throws Exception{
		Configuration conf = new Configuration();
		JobConf job = new JobConf(conf,MyJob.class);

		Path in = new Path("/usr/demodata/cite75_99.txt");
		Path out = new Path("/usr/demodata/output");

		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job,out);

		job.setJobName("Test Job");
		job.setMapperClass(MapClass.class);
		job.setReducerClass(Reduce.class);

		job.setInputFormat(KeyValueTextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.set("key.value.separator.in.input.line",",");

		JobClient.runJob(job);

		return 0;
	}

}