多表關聯和單表關聯類似,它也是通過對原始數據進行一定的處理,從其中挖掘出關心的信息。如下 輸入的是兩個文件,一個代表工廠表,包含工廠名列和地址編號列;另一個代表地址表,包含地址名列和地址編號列。要求從輸入數據中找出工廠名和地址名的對應關系,
多表關聯和單表關聯類似,它也是通過對原始數據進行一定的處理,從其中挖掘出關心的信息。如下
輸入的是兩個文件,一個代表工廠表,包含工廠名列和地址編號列;另一個代表地址表,包含地址名列和地址編號列。要求從輸入數據中找出工廠名和地址名的對應關系,輸出工廠名-地址名表
樣本如下:
factory:
factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1
address:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
結果:
factoryname addressname
Beijing Red Star Beijing
Beijing Rising Beijing
Bank of Beijing Beijing
Guangzhou Honda Guangzhou
Guangzhou Development Bank Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
代碼如下:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MTjoin {
public static int time = 0;
/*
* 在map中先區分輸入行屬于左表還是右表,然后對兩列值進行分割,
* 保存連接列在key值,剩余列和左右表標志在value中,最后
輸出
*/
public static class Map extends Mapper {
// 實現map函數
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();// 每行文件
String relationtype = new String();// 左右表標識
// 輸入文件首行,不處理
if (line.contains("factoryname") == true
|| line.contains("addressed") == true) {
return;
}
// 輸入的一行預處理文本
StringTokenizer itr = new StringTokenizer(line);
String mapkey = new String();
String mapvalue = new String();
int i = 0;
while (itr.hasMoreTokens()) {
// 先讀取一個單詞
String token = itr.nextToken();
// 判斷該地址ID就把存到"values[0]"
if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {
mapkey = token;
if (i > 0) {
relationtype = "1";
} else {
relationtype = "2";
}
continue;
}
// 存工廠名
mapvalue += token + " ";
i++;
}
// 輸出左右表
context.write(new Text(mapkey), new Text(relationtype + "+"+ mapvalue));
}
}
/*
* reduce解析map輸出,將value中數據按照左右表分別保存,
* 然后求出笛卡爾積,并輸出。
*/
public static class Reduce extends Reducer {
// 實現reduce函數
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
// 輸出表頭
if (0 == time) {
context.write(new Text("factoryname"), new Text("addressname"));
time++;
}
int factorynum = 0;
String[] factory = new String[10];
int addressnum = 0;
String[] address = new String[10];
Iterator ite = values.iterator();
while (ite.hasNext()) {
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (0 == len) {
continue;
}
// 取得左右表標識
char relationtype = record.charAt(0);
// 左表
if ('1' == relationtype) {
factory[factorynum] = record.substring(i);
factorynum++;
}
// 右表
if ('2' == relationtype) {
address[addressnum] = record.substring(i);
addressnum++;
}
}
// 求笛卡爾積
if (0 != factorynum && 0 != addressnum) {
for (int m = 0; m < factorynum; m++) {
for (int n = 0; n < addressnum; n++) {
// 輸出結果
context.write(new Text(factory[m]),
new Text(address[n]));
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 這句話很關鍵
// conf.set("mapred.job.tracker", "192.168.1.2:9001");
//可使用args
// String[] ioArgs = new String[] { "MTjoin_in", "MTjoin_out" };
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Multiple Table Join ");
System.exit(2);
}
Job job = new Job(conf, "Multiple Table Join");
job.setJarByClass(MTjoin.class);
// 設置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// 設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
javac -classpath hadoop-core-1.1.2.jar:/opt/hadoop-1.1.2/lib/commons-cli-1.2.jar -d firstProject firstProject/MTJoin.java
jar -cvf MTJoin.jar -C firstProject/ .
刪除已經存在的output
hadoop fs -put factory input
hadoop fs -put address input
運行
hadoop jar MTJoin.jar MTJoin input output
查看結果
hadoop fs -cat output/part-r-00000
?
作者:a331251021 發表于2013-8-4 16:20:52 原文鏈接
閱讀:72 評論:0 查看評論
原文地址:hadoop實例---多表關聯, 感謝原作者分享。
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com