首页 >> 实验教学 >> 实验项目 >> 详细内容
实验教学
 
实验项目 >> 正文
云计算实验报告
日期:2021-12-09 18:30:40  发布人:nclgjsj  浏览量:174

 

实验一  远程读取HDFS中的一个文件

  • 实验目的
  • 熟练运用Eclipse平台进行hadoop编程
  • 深入理解MapReduce
  • 熟悉Linuxhadoop命令相关操作
  • 实验内容及要求

编写程序使用URL方式从HDFS 中读取一个文件。

  • 实验步骤
  • 启动hadoop,向hdfs中上传一个文本文件。
  • 检查物理机和主机能否ping通。
  • 编写程序,远程读取文件数据
  • 调试运行。

 

  • 算法思想

url读取hdfs中的数据流。

 

 

  • 实验结果

 

 

 实验代码

package hdfs;

import java.net.URL;

import java.io.InputStream;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;

import org.apache.hadoop.io.IOUtils;

public class App1 {

public static final String HDFS_PATH="hdfs://192.168.80.100:9000/wlh/wlh.txt";

public static void main(String[] args)throws Exception{

URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());

final URL url=new URL(HDFS_PATH);

final InputStream in=url.openStream();

IOUtils.copyBytes(in, System.out, 1024, true);

}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

实验二  用程序实现HDFS文件相关操作

  • 实验目的
  • 熟练运用Eclipse平台进行hadoop编程
  • 深入理解HDFS系统
  • 熟悉LinuxHDFS命令相关操作
  • 实验内容及要求

编写程序,要求在HDFS文件系统中建立一个目录test,在test目录先创建文本文件file,并把文件从HDFS下载到本地

  • 实验步骤

  1.赋予写权限

cd /

cd /usr/local/hadoop

hadoop fs -chmod 777 /

service iptables stop

2.E盘新建文本文档wlh.txt,上传到HDFS,然后从HDFS下载到D盘。

  • 算法思想

HDFS文件的上传与下载。

 

  • 实验结果

 

  代码

package hdfs;

import java.io.FileInputStream;

import java.io.ByteArrayInputStream;

import java.io.FileNotFoundException;

import java.io.IOException;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.fs.FileStatus;

public class App2 {

public static final String HDFS_PATH = "hdfs://192.168.80.100:9000";

public static final String DIR_PATH = "/test";

public static final String FILE_PATH = "/test/wlh.txt";

 

public static void main(String[] args) throws Exception {

final FileSystem fs = FileSystem.get(new URI(HDFS_PATH),

new Configuration());

 

makeDir(fs); /* 创建目录 */

// listDir(fs); /*显示文件列表*/

uploadFile(fs); /* 上传文件 */

downloadFile(fs); /* 下载文件 */

// deleteFile(fs); /*删除文件或目录*/

// writetoFile(fs); /*写入文件*/

// readFile(fs); /*读出文件*/

 

}

 

/* 创建目录 */

private static void makeDir(final FileSystem fs) throws IOException {

fs.mkdirs(new Path(DIR_PATH));

}

 

/* 上传文件 */

private static void uploadFile(final FileSystem fs) throws IOException,

FileNotFoundException {

final FSDataOutputStream out = fs.create(new Path(FILE_PATH));

final FileInputStream in = new FileInputStream("E:/wlh.txt");

IOUtils.copyBytes(in, out, 1024, true);

}

 

/* 下载文件 */

private static void downloadFile(final FileSystem fs) throws IOException {

String src_HDFS_PATH = HDFS_PATH + FILE_PATH;

String dst = "D:/";

Path src_Path = new Path(src_HDFS_PATH);

Path dst_Path = new Path(dst);

FileSystem fs1 = src_Path.getFileSystem(new Configuration());

fs1.copyToLocalFile(false, src_Path, dst_Path);

}

 

 

}

实验三  手机流量统计

  • 实验目的
  • 熟练运用Eclipse平台进行hadoop编程
  • 深入理解MapReduce
  • 熟悉Linuxhadoop命令相关操作
  • 实验内容及要求

设通信话单中包含手机号码和每次网络访问的数据流量,编写MapReduce程序计算每个手机号对应的数据流量总和。

  • 实验步骤
  1. 在虚拟机中编写一定格式的手机号码 流量记录文档。
  2. 上传到HDFS中。
  3. MapReduce处理数据。
  4. 查看实验结果。
  • 算法思想

Map过程将数据分行,分别获得手机号和流量,将手机号作为Key,将流量作为Value,Reduce过程将每个相同Key的Value值加起来得到统计。

  • 实验结果

 

 

  源代码

public class AverageGrade {

//自定义Mapper

private static class AverageGradeMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override

protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {

// 获取每行数据的值

String lineValue = value.toString();

// 将文件分成多行

StringTokenizer stringTokenizer = new StringTokenizer(lineValue, "\n");

while(stringTokenizer.hasMoreElements()){

//将一行数据进行分割

StringTokenizer tokenizer = new StringTokenizer(stringTokenizer.nextToken());

// 得到学生姓名

String strName = tokenizer.nextToken();

// 得到学生成绩

String strScore = tokenizer.nextToken();

// 用学生姓名作为key

Text name = new Text(strName);

// 用学生成绩作为value

IntWritable score = new IntWritable(Integer.valueOf(strScore));

//keyvalue写回到context

context.write(name, score);

}

};

}

 

//自定义Reducer

private static class AverageGradeReducer extends Reducer<Text, IntWritable, Text, FloatWritable> {

@Override

protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

float sum = 0;  //sum用来存放每个学生的总成绩

float counter = 0; //counter表示课程门数

for(IntWritable intWritable : values){

sum += intWritable.get();

//counter ++;

}

context.write(key, new FloatWritable(sum));

}

}

 

// 驱动代码

private int run(String[] args) throws Exception {

// 获取配置信息

Configuration configuration = new Configuration();

// 优化程序

String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();

if(otherArgs.length != 2) {

System.err.println("Usage:SortData <in> <out>");

System.exit(2);

}

 

// 创建Job,设置配置信息和Job名称

Job job = new Job(configuration, "AverageGrade");

job.setJarByClass(AverageGrade.class); //要把程序打成jar包运行,需要这条语句

// 1.设置输入路径

Path inputDir = new Path(args[0]);

FileInputFormat.addInputPath(job, inputDir);

 

// 2. 设置mapper类和map()方法输出的 keyvalue类型

job.setMapperClass(AverageGradeMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

 

// 3. 设置Reducer类以及输出的keyvalue的类型

job.setReducerClass(AverageGradeReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FloatWritable.class);

 

// 4. 设置输出路径

Path OutputDir = new Path(args[1]);

FileOutputFormat.setOutputPath(job, OutputDir);

 

// 5.提交作业

boolean isSuccess = job.waitForCompletion(true);

return isSuccess ? 0 : 1;

}

 

// Client 区域

public static void main(String[] args) throws Exception{

args = new String[] { "hdfs://192.168.80.100:9000/wlh3/input",

"hdfs://192.168.80.100:9000/wlh3/output" };

// 6.结束程序

System.exit(new AverageGrade().run(args));

}

}

 

实验四  计算年度最高温

  • 实验目的
  • 熟练运用Eclipse平台进行hadoop编程
  • 深入理解MapReduce
  • 熟悉Linuxhadoop命令相关操作
  • 实验内容及要求

设气象单中记录了历年来每天的温度,数据格式包含日期和当天的温度,如2012030415表示在20120304日的气温为15度,编写MapReduce程序计算每一年出现过的最高气温。。

  • 实验步骤

同实验三。文档中按格式编写年月日最高温度。

  • 算法思想

同实验三,保留相同Key的最高Value值。

五、 实验结果

2012030415

2015070325

2013050616

2005040608

2015080930

 

 源代码

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class MyWordCount

{

static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>

{

final Text k2 = new Text(); // k2 存放一行中的单词

// v2 表示单词在该行中的出现次数

final IntWritable v2 = new IntWritable(1);

 

/* 定义map方法,主要功能是分割文本行中的单词,将单词及其在该行中的出现次数1写入context。形参value表示一行文本 */

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException

{

// 以空格分割文本

final String[] splited = value.toString().split("\n");

String s1=null;//s1放温度/////////

for (String word : splited)

{

k2.set(word.substring(0,4));//取出年份/////////

s1=word.substring(8,10);//放温度///////////////////

v2.set(Integer.parseInt(s1));///////////////////////

context.write(k2, v2); // 把k2、v2写入到context中

}

}

}

 

static class MyReducer extends

Reducer<Text, IntWritable, Text, IntWritable>

{

// v3表示单词出现的总次数

final IntWritable v3 = new IntWritable(0);

 

/* 定义reduce方法,主要功能是遍历map()方法输出的“值”的集合,将所有的“值”相加,得到单词的总出现次数。 */

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException

{

int max = 0; // ////

for (IntWritable count : values)

{

//sum += count.get();

if(max<count.get())

{

max = count.get();

}///////////////////////////////

}

final Text k3 = key; // k3表示单词,是最后输出的“键”

v3.set(max); // v3表示最大的温度,是最后输出的“值”//////////

context.write(k3, v3); // 将单词及其总次数作为<key,value>写入context

}

}

 

public static void main(String[] args) throws IOException,

InterruptedException, ClassNotFoundException

{

// 创建一个job对象

final Job job = new Job(new Configuration(), "MyWordCount");

job.setJarByClass(MyWordCount.class);// 把程序打成jar包运行

// 设置把输入文件处理成键值对的类

job.setInputFormatClass(TextInputFormat.class);

// 设置自定义的Mapper类和Reducer类

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReducer.class);

// 设置map()方法输出的k2、v2的类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

// 设置输出的key和value的类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

// 设置job作业执行时输入文件的路径和输出路径

FileInputFormat.addInputPath(job, new Path(

"hdfs://192.168.80.100:9000/wlh/WordCount/input"));

FileOutputFormat.setOutputPath(job, new Path(

"hdfs://192.168.80.100:9000/wlh/WordCount/output"));

// 让作业运行,直到运行结束,程序退出

job.waitForCompletion(true);

}

}

 

 

实验五  大数据应用场景

大数据MapReduce用于图计算及其不足之处。

    很多图算法可以转换为Mapreduce框架下的计算任务。下面以PageRank计算为例讲述如何在该框架下进行图计算。

Mapreduce框架下的输入往往是key-value数据对,其中,value可以是简单类型,比如数值或字符串,也可以是复杂的数据结构,比如数组 或者记录等。对于图数据来说,其内部表示方式以邻接表为宜,这样,输入数据的key为图节点ID,对应的value为复杂记录,其中记载了邻接表数据、 key节点的PageRank值等。

对很多图算法来说,Mapreduce内部计算过程中的ShuffleSort操作起到类似于通过图中节点出边进行消息传播的效果。从图14-7PageRank伪码中可见此技巧的运用。

       在该例的Map操作中,输入数据的key是图节点IDvalue是图节点数据结构N,其中包括邻接表AdjacencyList信息以及节点对应的当前 PageRank值。第3行代码计算当前节点传播到邻接节点的PageRank分值,第56行代码将这些分值转换为新的key1-value1,以邻接 节点ID作为新的key,而从当前节点传播给邻接节点的分值作为新的value1。除此之外,还需要将当前节点的节点信息继续保留,以便进行后续的迭代过 程,所以第4行代码将输入记录本身再次原封不动地传播出去。

       通过MapReduce内部的Shuffle和Sort操作,可以将相同key1对应的系列value1集中到一起,即将ID为key1的图节点从其他节点传入的PageRank部分分值聚合到一起,这起到了类似于消息传播的作用。图14-7示例里的 Reduce操作中,其对应的输入数据包括图节点ID以及对应的PageRank部分分值列表,伪码第4行到第8行累积这部分分值形成新的 PageRank值,同时判断某个value1是否是节点信息(第5行代码)。第9行代码则更新节点信息内的PageRank值,而第10行代码输出更新 后的节点信息。这样就完成了一轮PageRank迭代过程,而本次Reduce阶段的输出结果可以作为下一轮迭代Map阶段的输入数据。如此循环往复,直 到满足终止条件,即可输出最终的计算结果。

Mapreduce计算框架在Map操作后会通过网络通信将具有相同key值的中间结果记录映射到同一台机器上,以满足后续Reduce阶段操作的要求。 一般情况下,这种网络传输数据量非常大,往往会严重影响计算效率,而Combine操作即为减少网络传输以优化效率而提出。Combine操作在本地机器 Map操作后,首先将具有相同key值的Map结果数据value部分进行本地聚合,这样本来应该分别传输的项目即被合并,大大减少了网络传输量,加快了 计算速度。对于图计算,同样可以采用这种优化手段改善效率

上面介绍了如何在Mapreduce框架下进行PageRank计算,很多其他图算法也可 用近似的思路处理,其关键点仍然是通过上述的Shuffle和Sort操作,将同一节点的入边聚合到一起,而Reduce操作可以类似例中的部分数值求 和,也可能是取边中的Max/Min等其他类型的操作,这依据应用各异,但基本思想无较大的区别。

MapReduce尽管已经成为主流的分布式计算模型,但有其适用范围,对于大量的数 据挖掘类科学计算和图挖掘算法来说,使用Mapreduce模型尽管经过变换也可以得到解决,但往往并非解决此类问题的最佳技术方案。根本原因在于:很多 科学计算或者图算法内在机制上需要进行多轮反复迭代,而如果采用Mapreduce模型,每一次迭代过程中产生的中间结果都需要反复在Map阶段写入本地 磁盘,在Reduce阶段写入GFS/HDFS文件系统中,下一轮迭代一般是在上一轮迭代的计算结果的基础上继续进行,这样需要再次将其加载入内存,计算 得出新的中间结果后仍然写入本地文件系统以及GFS/HDFS文件系统中。如此反复,且不必要的磁盘输入/输出严重影响计算效率。除此之外,每次迭代都需 要对任务重新进行初始化等任务管理开销也非常影响效率。

 

 

 

 

 

 

 

 

       下面以Mapreduce模型计算图的单源最短路径的具体应用实例来说明此问题的严重性。所谓“单源最短路径”,就是对于图结构G<N,E>(N为图节点集合,E为图中边集合且边具有权值,这个权值代表两个节点间的距离),如果给定初始节点V,需要计算图中该节点到其他任意节点的最短距离是多少。这个例子的图结构如图14-9所示,图的内部表示采用邻接表方案。假设从源节点A出发,求其他节点到节点A的最短距离,在初始化阶段,设置源节点A的最短距离为0,其他节点的最短距离为INF(足够大的数值)。

      Mapreduce模型来说,计算分为两个阶段,即Map阶段和Reduce阶段。针对上述问题,Map阶段的最初输入即为稍加改造的图G的邻接表,除 了节点的邻接表信息外,还需要额外记载节点当前获得的最小距离数值。以常见的key-value方式表示为:key=节点IDvalue=<节点 到源节点A的当前最小距离Dist,邻接表>。以源节点A为例,其Map阶段的输入为:<A, <0, <(B, 10),(D, 5)>>>,其他节点输入数据形式与此类似。

      Map阶段对输入数据的转换逻辑为:计算key节点的邻接表中节点到源节点A的当前最短距离。即将key-value转换为key1-value1序列, 这里key1key节点的邻接表中节点IDvalue1key1节点到源节点A的当前最短距离。以源节点A为例,其输入为<A, <0, <(B, 10), (D, 5)>>>,经过Map转换后,得到输出<B,10><D,5><B,10>的含义是:B节 点到A节点的当前最短距离是10(由A节点到源节点A距离0加上B节点到A节点距离10获得),<D,5>的含义与之类似。通过此步可以完成 Map阶段计算,图14-10展示了原始输入转换为Map阶段输出结果对应的KV数值。

       Map阶段产生结果后,系统会将临时结果写入本地磁盘文件中,以作为Reduce阶段的输入数据。Reduce阶段的逻辑为:对某个节点来说,从众多本 节点到源节点A的距离中选择最短的距离作为当前值。以节点B为例,从图14-10Map阶段的输出可以看出,以Bkey有两 项:<B,10><B,inf>,取其最小值得到新的最短距离为10,则可输出结果<B,<10,< (C,1),(D,2)>>>。图14-11展示了Reduce阶段的输出。

      Reduce阶段结束后,系统将结果写入GFS/HDFS文件系统中,这样完成了单源最短路径的一轮计算。使得图节点B和图节点D的当前最短路径获得了 更新。而为了能够获得最终的结果,还需要按照上述方式反复迭代,以本轮Reduce输出作为下一轮Map阶段的输入。由此可见,如果完成计算,则需要多次 将中间结果往文件系统输出,这会严重影响系统效率。这是为何Mapreduce框架不适宜做图应用的主要原因。

 

核发:nclgjsj 点击数:174收藏本页