Hadoop中DBInputFormat和DBOutputFormat使用

2019-03-28 12:56|来源: 网络

一、背景

为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

推荐阅读:

Hadoop 中利用 MapReduce 读写 MySQL 数据 http://www.linuxidc.com/Linux/2013-07/88117.htm

二、技术细节

1、DBInputFormat(Mysql为例),先创建表:

CREATE TABLE studentinfo (

  id INTEGER NOT NULL PRIMARY KEY,

  name VARCHAR(32) NOT NULL);2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。3、DBInputFormat用法如下:

public class DBInput {
  // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
  // CREATE TABLE studentinfo (
  // id INTEGER NOT NULL PRIMARY KEY,
  // name VARCHAR(32) NOT NULL);

  public static class StudentinfoRecord implements Writable, DBWritable {
    int id;
    String name;
    public StudentinfoRecord() {

    }
    public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
    }
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
    }
    public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
    }
    public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
    }
    public String toString() {
        return new String(this.id + " " + this.name);
    }
  }
  public class DBInputMapper extends MapReduceBase implements
        Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
    public void map(LongWritable key, StudentinfoRecord value,
          OutputCollector<LongWritable, Text> collector, Reporter reporter)
          throws IOException {
        collector.collect(new LongWritable(value.id), new Text(value
            .toString()));
    }
  }
  public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(DBInput.class);
    DistributedCache.addFileToClassPath(new Path(
          "/lib/mysql-connector-java-5.1.0-bin.jar"), conf);
   
    conf.setMapperClass(DBInputMapper.class);
    conf.setReducerClass(IdentityReducer.class);

    conf.setMapOutputKeyClass(LongWritable.class);
    conf.setMapOutputValueClass(Text.class);
    conf.setOutputKeyClass(LongWritable.class);
    conf.setOutputValueClass(Text.class);
   
    conf.setInputFormat(DBInputFormat.class);
    FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
    DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
    String[] fields = { "id", "name" };
    DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
 null, "id", fields);

    JobClient.runJob(conf);
  }
}

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。

实现Writable的方法:

 public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
    }
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
    }

实现DBWritable的方法:

public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
    }
    public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
    }

b)读入Mapper的value类型是StudnetinfoRecord。

c)配置如何连入数据库,读出表studentinfo数据。

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
    String[] fields = { "id", "name" };
    DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",  null, "id", fields);

接下来请看: http://www.linuxidc.com/Linux/2013-07/88119p2.htm

相关问答

更多
  • order by nulls last 不是标准sql,oracle设计的 可以这么写,再指定一个虚拟列来辅助排序 order by case when col is null then 0 else 1 end , col desc
  • bg7.png 相信下面概念你已经知道了:jms即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。它可以在两个应用程序之间异步通信,也就是说,如果你做一个监控系统,那么你想通过监控系统获取hadoop的运行信息。前提是hadoop需要有借口提供这些信息才可以。也就是你说的Hadoop Metrics收集机制。你可以把二者结合起来,只要hadoop的metrics提供 ...
  • 感谢您试用HTrace! 对不起,版本问题现在是如此的痛苦。 使用cloudera的Hadoop CDH5.5发行版及更高版本配置HTrace更容易。 这里有一个很好的描述: http : //blog.cloudera.com/blog/2015/12/new-in-cloudera-labs-apache-htrace-incubating/如果你想坚持一个Apache发布源代码而不是供应商发布,请尝试使用Hadoop 3.0.0-alpha1。 http://hadoop.apache.org/rel ...
  • 我相信你不应该在classname中包含“ .class ”。 代替 job.setJarByClass(hadoopshingles.Saishingles.class); 它应该是 job.setJarByClass(hadoopshingles.Saishingles); I believe you should not be including ".class" in the classname. Instead of job.setJarByClass(hadoopshingles.Saishin ...
  • 您需要在Biginsight lib文件夹中包含jar以将其作为Biginsights Application运行。 还要添加biginsights-env文件的路径。 默认位置: / opt / ibm / biginsights / IHC / lib 要作为java应用程序运行,应该添加到classpath。 You need to include jar in Biginsight lib folder to run it as Biginsights Application. Also add p ...
  • 这个问题引起了我的兴趣,因为我喜欢数学。 我怀疑它主要是科学或数学数据集和问题,将受益于矩阵乘法和操作。 请查看此处以了解可能需要此类型的应用程序: http : //grids.ucs.indiana.edu/ptliupages/publications/DryadReport.pdf This question intrigued me as I like maths. I suspect that it would mostly be scientific or mathematical datas ...
  • 下载osquery https://code.facebook.com/projects/658950180885092 并安装 发出这个命令osqueryi 当出现提示时,使用此sql命令查看所有正在运行的java进程并找到pids SELECT名称,路径,pid FROM进程,其中name =“java”; 你会在Mac上看到这样的东西 +------+-------------------------------------------------------------------------- ...
  • 您可以使用Sqoop将数据从RDBMS导入Hadoop。 Hadoop会处理非结构化数据,因为您将约束(创建结构化数据)推到了最后。 这也允许创建什么样的结构,这将定义您可以提取的信息类型。 永远不会说您无法处理结构化数据,但获得的里程数很低。 RDBMS可以高效地处理结构化数据。 You can use Sqoop to import data from RDBMS to Hadoop. Hadoop shines at processing unstructured data because you a ...
  • 问题是您将凭据传递到错误的位置。 请尝试使用此代码: public int run(String[] args) throws Exception { String driverClassName = DRIVER_CLASS; String url = DB_URL; if(args.length > 1) { driverClassName = args[0]; url = args[1]; } initialize(driverCl ...
  • 它是关于每个数据块的元数据 。 hadoop中的文件被分成块,然后这些块存储在不同的数据节点上。 但是要再次访问这些数据,我们需要知道这些块的存储位置 。 namenode在块池的帮助下完成了这个任务。 因此,块池是关于hadoop集群上每个文件的每个块的元数据。 它们存储在namenode的内存中,而不是存储在磁盘上。 因此,如果namenode关闭,则需要重建此信息。 现在在Hadoop Federation中,我们有多个名称空间的概念。 不同的名称节点负责不同的名称空间。 假设我们有两台机器充当nam ...