package
com.sdnware.start01
.hadoop
import java
.io.IOException
import java
.net.URI
import java
.util.StringTokenizer
import org
.apache.hadoop.conf.Configuration
import org
.apache.hadoop.fs.FileSystem
import org
.apache.hadoop.fs.Path
import org
.apache.hadoop.io.IntWritable
import org
.apache.hadoop.io.Text
import org
.apache.hadoop.mapreduce.Job
import org
.apache.hadoop.mapreduce.Mapper
import org
.apache.hadoop.mapreduce.Reducer
import org
.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org
.apache.hadoop.mapreduce.lib.output.FileOutputFormat
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(
1)
private Text word = new Text()
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value
.toString())
while (itr
.hasMoreTokens()) {
word
.set(itr
.nextToken())
context
.write(word, one)
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable()
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum =
0
for (IntWritable val : values) {
sum += val
.get()
}
result
.set(sum)
context
.write(key, result)
}
}
public static void main(String[] args) throws Exception {
System
.setProperty(
"hadoop.home.dir",
"D:/soft/hadoop-2.6.0")
System
.setProperty(
"HADOOP_USER_NAME",
"root")
Configuration conf = new Configuration()
FileSystem hdfs = FileSystem
.get(new URI(
"hdfs://f1:9000"), conf)
Path outpath = new Path(
"/user/result")
if(hdfs
.exists(outpath)){
hdfs
.delete(outpath,true)
}
Job job = new Job(conf,
"word count")
job
.setJarByClass(WordCount
.class)
job
.setMapperClass(TokenizerMapper
.class)
job
.setCombinerClass(IntSumReducer
.class)
job
.setReducerClass(IntSumReducer
.class)
job
.setOutputKeyClass(Text
.class)
job
.setOutputValueClass(IntWritable
.class)
//使用hdfs协议读取远程待处理文件
FileInputFormat
.addInputPath(job, new Path(new URI(
"hdfs://f1:9000/user/wordcount")))
FileOutputFormat
.setOutputPath(job,new Path(new URI(
"hdfs://f1:9000/user/result")))
System
.exit(job
.waitForCompletion(true) ?
0 :
1)
}
}
附上pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<groupId>com.sdnware
</groupId>
<artifactId>start01
</artifactId>
<version>0.0.1-SNAPSHOT
</version>
<packaging>jar
</packaging>
<name>start01
</name>
<url>http://maven.apache.org
</url>
<repositories>
<repository>
<id>cloudera
</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/
</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8
</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop
</groupId>
<artifactId>hadoop-core
</artifactId>
<version>2.6.0-mr1-cdh5.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop
</groupId>
<artifactId>hadoop-common
</artifactId>
<version>2.6.0-cdh5.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop
</groupId>
<artifactId>hadoop-hdfs
</artifactId>
<version>2.6.0-cdh5.10.0
</version>
</dependency>
<dependency>
<groupId>junit
</groupId>
<artifactId>junit
</artifactId>
<version>3.8.1
</version>
<scope>test
</scope>
</dependency>
</dependencies>
</project>