[HadoopinAction]第1章Hadoop简介-创新互联

  • 编写可扩展、分布式的数据密集型程序和基础知识

    成都创新互联服务电话:13518219792,为您提供成都网站建设网页设计及定制高端网站建设服务,成都创新互联网页制作领域十余年,包括花箱等多个行业拥有多年的网站维护经验,选择成都创新互联,为企业锦上添花。
  • 理解Hadoop和MapReduce

  • 编写和运行一个基本的MapReduce程序

1、什么是Hadoop

Hadoop是一个开源的框架,可编写和运行分布式应用处理大规模数据。

Hadoop与众不同之处在于以下几点:

  1. 方便——Hadoop运行在由一般商用机器构成的大型集群上,或者云计算服务之上;

  2. 健壮——Hadoop致力于在一般商用硬件上运行,其架构假设硬件会频繁地出现失效;

  3. 可扩展——Hadoop通过增加集群节点,可以线性地扩展以处理更大的数据集;

  4. 简单——Hadoop运行用户快速编写出高效的并行代码。

2、了解分布式系统和Hadoop

理解分布式系统(向外扩展)和大型单机服务器(向上扩展)之间的对比,考虑现有I/O技术的性价比。

理解Hadoop和其他分布式架构(SETI@home)的区别:

Hadoop设计理念是代码向数据迁移,而SETI@home设计理念是数据迁移。

要运行的程序在规模上比数据小几个数量级,更容易移动;此外,在网络上移动数据要比在其上加载代码更花时间,不如让数据不动而将可执行代码移动到数据所在机器上去。

3、比较SQL数据库和Hadoop

SQL(结构化查询语言)是针对结构化数据设计的,而Hadoop最初的许多应用针对的是文本这种非结构化数据。让我们从特定的视角将Hadoop与典型SQL数据库做更详细的比较:

  1. 用向外扩展代替向上扩展——扩展商用关系型数据库的代价会更加昂贵的

  2. 用键/值对代替关系表——Hadoop使用键/值对作为基本数据单元,可足够灵活地处理较少结构化的数据类型

  3. 用函数式编程(MapReduce)代替声明式查询(SQL)——在MapReduce中,实际的数据处理步骤是由你指定的,很类似于SQL引擎的一个执行计划

  4. 用离线处理代替在线处理——Hadoop是专为离线处理和大规模数据分析而设计的,并不适合那种对几个记录随机读写的在线事务处理模式

4、理解MapReduce

MapReduce是一种数据处理模型,大的优点是容易扩展到多个计算节点上处理数据;

在MapReduce模型中,数据处理原语被称为mapper和reducer;

分解一个数据处理应用为mapper和reducer有时是繁琐的,但是一旦一MapReduce的形式写好了一个应用程序,仅需修改配置就可以将它扩展到集群中几百、几千,甚至几万台机器上运行。

[动手扩展一个简单程序]

少量文档处理方式:对于每个文档,使用分词过程逐个提取单词;对于每个单词,在多重集合wordcount中的相应项上加1;最后display()函数打印出wordcount中的所有条目。

大量文档处理方式:将工作分布到多台机器上,每台机器处理这些文档的不同部分,当所有机器都完成时,第二个处理阶段将合并这些结果。

一些细节可能会妨碍程序按预期工作,如文档读取过量导致中央存储服务器的带宽性能跟不上、多重集合wordcount条目过多超过计算机的内存容量。此外,第二阶段只有一个计算机处理wordcount任务,容易出现瓶颈,所以可以采用分布的方式运转,以某种方式将其分割到多台计算机上,使之能够独立运行,即需要在第一阶段后将wordcount分区,使得第二阶段的每台计算机仅需处理一个分区。

为了使它工作在一个分布式计算机集群上,需要添加以下功能:

  • 存储文件到许多计算机上(第一阶段)

  • 编写一个基于磁盘的散列表,使得处理不受内存容量限制

  • 划分来自第一阶段的中间数据(即wordcount)

  • 洗牌这些分区到第二阶段中合适的计算机上

MapReduce程序执行分为两个主要阶段,为mapping和reducing,每个阶段均定义为一个数据处理函数,分别称为mapper和reducer。在mapping阶段,MapReduce获取输入数据并将数据单元装入mapper;在reduce阶段,reducer处理来自mapper的所有输出,并给出最终结果。简而言之,mapper意味着将输入进行过滤与转换,使reducer可以完成聚合。

另外,为了扩展分布式的单词统计程序,不得不编写了partitioning和shuffling函数。

在MapReduce框架中编写应用程序就是定制化mapper和reducer的过程,以下是完整的数据流:

  1. 应用的输入必须组织为一个键/值对的列表list();

  2. 含有键/值对的列表被拆分,进而通过调用mapper的map函数对每个单独的键/值对进行处理;

  3. 所有mapper的输出被聚合到一个包含对的巨大列表中;

  4. 每个reducer分别处理每个被聚合起来的,并输出

5、用Hadoop统计单词——运行第一个程序

  • Linux操作系统

  • JDK1.6以上运行环境

  • Hadoop操作环境

Usage:hadoop [—config configdir] COMMAND

这里COMMAND为下列其中一个:

namenode -format                       格式化DFS文件系统

secondarynamenode                     运行DFS的第二个namenode

namenode                              运行DFS的namenode

datanode                               运行一个DFS的datanode

dfsadmin                               运行一个DFS的admin客户端

fsck                                   运行一个DFS文件系统的检查工具

fs                                     运行一个普通的文件系统用户客户端

balancer                               运行一个集群负载均衡工具

jobtracker                              运行MapReduce的jobtracker节点

pipes                                  运行一个pipes作业

tasktracker                             运行一个MapReduce的tasktracker节点

job                                    处理MapReduce作业

version                                 打印版本

jar                                运行一个jar文件

distcp                  递归地复制文件或者目录

archive  -archiveName NAME *   生成一个Hadoop档案

daemonlog                             获取或设置每个daemon的log级别

CLASSNAME                           运行名为CLASSNAME的类大多数命令会在使用w/o参数

                                      时打出帮助信息。

运行单词统计示例程序的命令形式如下:

hadoop jar hadoop-*-examples.jar wordcount [-m ] [-r reduces] input output

编译修改后的单词统计程序的命令形式如下:

javac -classpath hadoop-*-core.jar -d playground/classes playground/src/WordCount.java

jar -cvf playground/src/wordcount.jar -C playground/classes/

运行修改后的单词统计程序的命令形式如下:

hadoop jar playground/wordcount.jar org.apache.hadoop.examples.WordCount input output


代码清单 WordCount.java

package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
  public static class TokenizerMapper 
       extends Mapper{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());   //(1)使用空格进行分词
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());   //(2)把Token放入Text对象中
        context.write(word, one);
      }
    }
  }
  public static class IntSumReducer 
       extends Reducer {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);   //(3)输出每个Token的统计结果
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount  [...] ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

在(1)的位置上wordcount以默认配置使用了Java的StringTokenizer,这里仅基于空格来分词。为了在分词过程中忽略标准的标点符号,将它们加入到stringTokenizer的定界符列表中:

        StringTokenizer itr = new StringTokenizer(value.toString(),” \t\n\r\f,.:;?![]’");

因为希望单词统计忽略大小写,把它们转换为Text对象前先将所有的单词都变成小写:

     word.set(itr.nextToken().toLowerCase());

希望仅仅显示出现次数大于4次的单词:

     if (sum > 4) context.write(key, result);


6、hadoop历史

创始人:Doug Cutting

2004年左右——Google发表了两篇论文来论述Google文件系统(GFS)和MapReduce框架。

2006年1月——雅虎聘用Doug,让他和一个专项团队一起改进Hadoop,并将其作为一个开源项目。

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


分享题目:[HadoopinAction]第1章Hadoop简介-创新互联
分享URL:http://pcwzsj.com/article/epcjd.html