编写程序使用URL方式从HDFS 中读取一个文件。
用url读取hdfs中的数据流。
六 实验代码
package hdfs;
import java.net.URL;
import java.io.InputStream;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
public class App1 {
public static final String HDFS_PATH="hdfs://192.168.80.100:9000/wlh/wlh.txt";
public static void main(String[] args)throws Exception{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
final URL url=new URL(HDFS_PATH);
final InputStream in=url.openStream();
IOUtils.copyBytes(in, System.out, 1024, true);
}
}
编写程序,要求在HDFS文件系统中建立一个目录test,在test目录先创建文本文件file,并把文件从HDFS下载到本地
1.赋予写权限
cd /
cd /usr/local/hadoop
hadoop fs -chmod 777 /
service iptables stop
2.在E盘新建文本文档wlh.txt,上传到HDFS,然后从HDFS下载到D盘。
HDFS文件的上传与下载。
六 源代码
package hdfs;
import java.io.FileInputStream;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FileStatus;
public class App2 {
public static final String HDFS_PATH = "hdfs://192.168.80.100:9000";
public static final String DIR_PATH = "/test";
public static final String FILE_PATH = "/test/wlh.txt";
public static void main(String[] args) throws Exception {
final FileSystem fs = FileSystem.get(new URI(HDFS_PATH),
new Configuration());
makeDir(fs); /* 创建目录 */
// listDir(fs); /*显示文件列表*/
uploadFile(fs); /* 上传文件 */
downloadFile(fs); /* 下载文件 */
// deleteFile(fs); /*删除文件或目录*/
// writetoFile(fs); /*写入文件*/
// readFile(fs); /*读出文件*/
}
/* 创建目录 */
private static void makeDir(final FileSystem fs) throws IOException {
fs.mkdirs(new Path(DIR_PATH));
}
/* 上传文件 */
private static void uploadFile(final FileSystem fs) throws IOException,
FileNotFoundException {
final FSDataOutputStream out = fs.create(new Path(FILE_PATH));
final FileInputStream in = new FileInputStream("E:/wlh.txt");
IOUtils.copyBytes(in, out, 1024, true);
}
/* 下载文件 */
private static void downloadFile(final FileSystem fs) throws IOException {
String src_HDFS_PATH = HDFS_PATH + FILE_PATH;
String dst = "D:/";
Path src_Path = new Path(src_HDFS_PATH);
Path dst_Path = new Path(dst);
FileSystem fs1 = src_Path.getFileSystem(new Configuration());
fs1.copyToLocalFile(false, src_Path, dst_Path);
}
}
设通信话单中包含手机号码和每次网络访问的数据流量,编写MapReduce程序计算每个手机号对应的数据流量总和。
Map过程将数据分行,分别获得手机号和流量,将手机号作为Key,将流量作为Value,Reduce过程将每个相同Key的Value值加起来得到统计。
六 源代码
public class AverageGrade {
//自定义Mapper
private static class AverageGradeMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
// 获取每行数据的值
String lineValue = value.toString();
// 将文件分成多行
StringTokenizer stringTokenizer = new StringTokenizer(lineValue, "\n");
while(stringTokenizer.hasMoreElements()){
//将一行数据进行分割
StringTokenizer tokenizer = new StringTokenizer(stringTokenizer.nextToken());
// 得到学生姓名
String strName = tokenizer.nextToken();
// 得到学生成绩
String strScore = tokenizer.nextToken();
// 用学生姓名作为key
Text name = new Text(strName);
// 用学生成绩作为value
IntWritable score = new IntWritable(Integer.valueOf(strScore));
//将key、value写回到context
context.write(name, score);
}
};
}
//自定义Reducer
private static class AverageGradeReducer extends Reducer<Text, IntWritable, Text, FloatWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
float sum = 0; //sum用来存放每个学生的总成绩
float counter = 0; //counter表示课程门数
for(IntWritable intWritable : values){
sum += intWritable.get();
//counter ++;
}
context.write(key, new FloatWritable(sum));
}
}
// 驱动代码
private int run(String[] args) throws Exception {
// 获取配置信息
Configuration configuration = new Configuration();
// 优化程序
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage:SortData <in> <out>");
System.exit(2);
}
// 创建Job,设置配置信息和Job名称
Job job = new Job(configuration, "AverageGrade");
job.setJarByClass(AverageGrade.class); //要把程序打成jar包运行,需要这条语句
// 1.设置输入路径
Path inputDir = new Path(args[0]);
FileInputFormat.addInputPath(job, inputDir);
// 2. 设置mapper类和map()方法输出的 key、value类型
job.setMapperClass(AverageGradeMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 3. 设置Reducer类以及输出的key和value的类型
job.setReducerClass(AverageGradeReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
// 4. 设置输出路径
Path OutputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, OutputDir);
// 5.提交作业
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
// Client 区域
public static void main(String[] args) throws Exception{
args = new String[] { "hdfs://192.168.80.100:9000/wlh3/input",
"hdfs://192.168.80.100:9000/wlh3/output" };
// 6.结束程序
System.exit(new AverageGrade().run(args));
}
}
设气象单中记录了历年来每天的温度,数据格式包含日期和当天的温度,如2012030415表示在2012年03月04日的气温为15度,编写MapReduce程序计算每一年出现过的最高气温。。
同实验三。文档中按格式编写年月日最高温度。
同实验三,保留相同Key的最高Value值。
五、 实验结果
2012030415
2015070325
2013050616
2005040608
2015080930
六 源代码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyWordCount
{
static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
final Text k2 = new Text(); // k2 存放一行中的单词
// v2 表示单词在该行中的出现次数
final IntWritable v2 = new IntWritable(1);
/* 定义map方法,主要功能是分割文本行中的单词,将单词及其在该行中的出现次数1写入context。形参value表示一行文本 */
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
// 以空格分割文本
final String[] splited = value.toString().split("\n");
String s1=null;//s1放温度/////////
for (String word : splited)
{
k2.set(word.substring(0,4));//取出年份/////////
s1=word.substring(8,10);//放温度///////////////////
v2.set(Integer.parseInt(s1));///////////////////////
context.write(k2, v2); // 把k2、v2写入到context中
}
}
}
static class MyReducer extends
Reducer<Text, IntWritable, Text, IntWritable>
{
// v3表示单词出现的总次数
final IntWritable v3 = new IntWritable(0);
/* 定义reduce方法,主要功能是遍历map()方法输出的“值”的集合,将所有的“值”相加,得到单词的总出现次数。 */
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException
{
int max = 0; // ////
for (IntWritable count : values)
{
//sum += count.get();
if(max<count.get())
{
max = count.get();
}///////////////////////////////
}
final Text k3 = key; // k3表示单词,是最后输出的“键”
v3.set(max); // v3表示最大的温度,是最后输出的“值”//////////
context.write(k3, v3); // 将单词及其总次数作为<key,value>写入context
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException
{
// 创建一个job对象
final Job job = new Job(new Configuration(), "MyWordCount");
job.setJarByClass(MyWordCount.class);// 把程序打成jar包运行
// 设置把输入文件处理成键值对的类
job.setInputFormatClass(TextInputFormat.class);
// 设置自定义的Mapper类和Reducer类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 设置map()方法输出的k2、v2的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置输出的key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置job作业执行时输入文件的路径和输出路径
FileInputFormat.addInputPath(job, new Path(
"hdfs://192.168.80.100:9000/wlh/WordCount/input"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://192.168.80.100:9000/wlh/WordCount/output"));
// 让作业运行,直到运行结束,程序退出
job.waitForCompletion(true);
}
}
大数据MapReduce用于图计算及其不足之处。
很多图算法可以转换为Mapreduce框架下的计算任务。下面以PageRank计算为例讲述如何在该框架下进行图计算。
Mapreduce框架下的输入往往是key-value数据对,其中,value可以是简单类型,比如数值或字符串,也可以是复杂的数据结构,比如数组 或者记录等。对于图数据来说,其内部表示方式以邻接表为宜,这样,输入数据的key为图节点ID,对应的value为复杂记录,其中记载了邻接表数据、 key节点的PageRank值等。
对很多图算法来说,Mapreduce内部计算过程中的Shuffle和Sort操作起到类似于通过图中节点出边进行消息传播的效果。从图14-7的PageRank伪码中可见此技巧的运用。
在该例的Map操作中,输入数据的key是图节点ID,value是图节点数据结构N,其中包括邻接表AdjacencyList信息以及节点对应的当前 PageRank值。第3行代码计算当前节点传播到邻接节点的PageRank分值,第5、6行代码将这些分值转换为新的key1-value1,以邻接 节点ID作为新的key,而从当前节点传播给邻接节点的分值作为新的value1。除此之外,还需要将当前节点的节点信息继续保留,以便进行后续的迭代过 程,所以第4行代码将输入记录本身再次原封不动地传播出去。
通过MapReduce内部的Shuffle和Sort操作,可以将相同key1对应的系列value1集中到一起,即将ID为key1的图节点从其他节点传入的PageRank部分分值聚合到一起,这起到了类似于消息传播的作用。图14-7示例里的 Reduce操作中,其对应的输入数据包括图节点ID以及对应的PageRank部分分值列表,伪码第4行到第8行累积这部分分值形成新的 PageRank值,同时判断某个value1是否是节点信息(第5行代码)。第9行代码则更新节点信息内的PageRank值,而第10行代码输出更新 后的节点信息。这样就完成了一轮PageRank迭代过程,而本次Reduce阶段的输出结果可以作为下一轮迭代Map阶段的输入数据。如此循环往复,直 到满足终止条件,即可输出最终的计算结果。
Mapreduce计算框架在Map操作后会通过网络通信将具有相同key值的中间结果记录映射到同一台机器上,以满足后续Reduce阶段操作的要求。 一般情况下,这种网络传输数据量非常大,往往会严重影响计算效率,而Combine操作即为减少网络传输以优化效率而提出。Combine操作在本地机器 Map操作后,首先将具有相同key值的Map结果数据value部分进行本地聚合,这样本来应该分别传输的项目即被合并,大大减少了网络传输量,加快了 计算速度。对于图计算,同样可以采用这种优化手段改善效率。
上面介绍了如何在Mapreduce框架下进行PageRank计算,很多其他图算法也可 用近似的思路处理,其关键点仍然是通过上述的Shuffle和Sort操作,将同一节点的入边聚合到一起,而Reduce操作可以类似例中的部分数值求 和,也可能是取边中的Max/Min等其他类型的操作,这依据应用各异,但基本思想无较大的区别。
MapReduce尽管已经成为主流的分布式计算模型,但有其适用范围,对于大量的数 据挖掘类科学计算和图挖掘算法来说,使用Mapreduce模型尽管经过变换也可以得到解决,但往往并非解决此类问题的最佳技术方案。根本原因在于:很多 科学计算或者图算法内在机制上需要进行多轮反复迭代,而如果采用Mapreduce模型,每一次迭代过程中产生的中间结果都需要反复在Map阶段写入本地 磁盘,在Reduce阶段写入GFS/HDFS文件系统中,下一轮迭代一般是在上一轮迭代的计算结果的基础上继续进行,这样需要再次将其加载入内存,计算 得出新的中间结果后仍然写入本地文件系统以及GFS/HDFS文件系统中。如此反复,且不必要的磁盘输入/输出严重影响计算效率。除此之外,每次迭代都需 要对任务重新进行初始化等任务管理开销也非常影响效率。
下面以Mapreduce模型计算图的单源最短路径的具体应用实例来说明此问题的严重性。所谓“单源最短路径”,就是对于图结构G<N,E>(N为图节点集合,E为图中边集合且边具有权值,这个权值代表两个节点间的距离),如果给定初始节点V,需要计算图中该节点到其他任意节点的最短距离是多少。这个例子的图结构如图14-9所示,图的内部表示采用邻接表方案。假设从源节点A出发,求其他节点到节点A的最短距离,在初始化阶段,设置源节点A的最短距离为0,其他节点的最短距离为INF(足够大的数值)。
对Mapreduce模型来说,计算分为两个阶段,即Map阶段和Reduce阶段。针对上述问题,Map阶段的最初输入即为稍加改造的图G的邻接表,除 了节点的邻接表信息外,还需要额外记载节点当前获得的最小距离数值。以常见的key-value方式表示为:key=节点ID,value=<节点 到源节点A的当前最小距离Dist,邻接表>。以源节点A为例,其Map阶段的输入为:<A, <0, <(B, 10),(D, 5)>>>,其他节点输入数据形式与此类似。
Map阶段对输入数据的转换逻辑为:计算key节点的邻接表中节点到源节点A的当前最短距离。即将key-value转换为key1-value1序列, 这里key1是key节点的邻接表中节点ID,value1为key1节点到源节点A的当前最短距离。以源节点A为例,其输入为<A, <0, <(B, 10), (D, 5)>>>,经过Map转换后,得到输出<B,10>和<D,5>,<B,10>的含义是:B节 点到A节点的当前最短距离是10(由A节点到源节点A距离0加上B节点到A节点距离10获得),<D,5>的含义与之类似。通过此步可以完成 Map阶段计算,图14-10展示了原始输入转换为Map阶段输出结果对应的KV数值。
在Map阶段产生结果后,系统会将临时结果写入本地磁盘文件中,以作为Reduce阶段的输入数据。Reduce阶段的逻辑为:对某个节点来说,从众多本 节点到源节点A的距离中选择最短的距离作为当前值。以节点B为例,从图14-10中Map阶段的输出可以看出,以B为key有两 项:<B,10>和<B,inf>,取其最小值得到新的最短距离为10,则可输出结果<B,<10,< (C,1),(D,2)>>>。图14-11展示了Reduce阶段的输出。
在Reduce阶段结束后,系统将结果写入GFS/HDFS文件系统中,这样完成了单源最短路径的一轮计算。使得图节点B和图节点D的当前最短路径获得了 更新。而为了能够获得最终的结果,还需要按照上述方式反复迭代,以本轮Reduce输出作为下一轮Map阶段的输入。由此可见,如果完成计算,则需要多次 将中间结果往文件系统输出,这会严重影响系统效率。这是为何Mapreduce框架不适宜做图应用的主要原因。