知识点
相关文章
更多最近更新
更多Hadoop中DBInputFormat和DBOutputFormat使用
2019-03-28 12:56|来源: 网络
一、背景
为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
推荐阅读:
Hadoop 中利用 MapReduce 读写 MySQL 数据 http://www.linuxidc.com/Linux/2013-07/88117.htm
二、技术细节
1、DBInputFormat(Mysql为例),先创建表:
CREATE TABLE studentinfo (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(32) NOT NULL);2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。3、DBInputFormat用法如下:
public class DBInput {
// DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
// CREATE TABLE studentinfo (
// id INTEGER NOT NULL PRIMARY KEY,
// name VARCHAR(32) NOT NULL);
public static class StudentinfoRecord implements Writable, DBWritable {
int id;
String name;
public StudentinfoRecord() {
}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
}
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
}
public String toString() {
return new String(this.id + " " + this.name);
}
}
public class DBInputMapper extends MapReduceBase implements
Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
public void map(LongWritable key, StudentinfoRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.id), new Text(value
.toString()));
}
}
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBInput.class);
DistributedCache.addFileToClassPath(new Path(
"/lib/mysql-connector-java-5.1.0-bin.jar"), conf);
conf.setMapperClass(DBInputMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
String[] fields = { "id", "name" };
DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
null, "id", fields);
JobClient.runJob(conf);
}
}
a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。
实现Writable的方法:
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}
实现DBWritable的方法:
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
}
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
}
b)读入Mapper的value类型是StudnetinfoRecord。
c)配置如何连入数据库,读出表studentinfo数据。
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
String[] fields = { "id", "name" };
DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo", null, "id", fields);
相关问答
更多-
oracle 中的 nulls last 在 hadoop 的 hive 上 怎么写。。[2022-05-08]
order by nulls last 不是标准sql,oracle设计的 可以这么写,再指定一个虚拟列来辅助排序 order by case when col is null then 0 else 1 end , col desc -
请问hadoop中metrics的作用?[2022-05-05]
bg7.png 相信下面概念你已经知道了:jms即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。它可以在两个应用程序之间异步通信,也就是说,如果你做一个监控系统,那么你想通过监控系统获取hadoop的运行信息。前提是hadoop需要有借口提供这些信息才可以。也就是你说的Hadoop Metrics收集机制。你可以把二者结合起来,只要hadoop的metrics提供 ... -
HTrace在Hadoop 2.7.3中(HTrace in Hadoop 2.7.3)[2022-03-02]
感谢您试用HTrace! 对不起,版本问题现在是如此的痛苦。 使用cloudera的Hadoop CDH5.5发行版及更高版本配置HTrace更容易。 这里有一个很好的描述: http : //blog.cloudera.com/blog/2015/12/new-in-cloudera-labs-apache-htrace-incubating/如果你想坚持一个Apache发布源代码而不是供应商发布,请尝试使用Hadoop 3.0.0-alpha1。 http://hadoop.apache.org/rel ... -
我相信你不应该在classname中包含“ .class ”。 代替 job.setJarByClass(hadoopshingles.Saishingles.class); 它应该是 job.setJarByClass(hadoopshingles.Saishingles); I believe you should not be including ".class" in the classname. Instead of job.setJarByClass(hadoopshingles.Saishin ...
-
Eclipse类未找到异常,即使jar包含在类路径中也是如此(Eclipse Class not found Exception even when jars are included in classpath)[2023-06-05]
您需要在Biginsight lib文件夹中包含jar以将其作为Biginsights Application运行。 还要添加biginsights-env文件的路径。 默认位置: / opt / ibm / biginsights / IHC / lib 要作为java应用程序运行,应该添加到classpath。 You need to include jar in Biginsight lib folder to run it as Biginsights Application. Also add p ... -
这个问题引起了我的兴趣,因为我喜欢数学。 我怀疑它主要是科学或数学数据集和问题,将受益于矩阵乘法和操作。 请查看此处以了解可能需要此类型的应用程序: http : //grids.ucs.indiana.edu/ptliupages/publications/DryadReport.pdf This question intrigued me as I like maths. I suspect that it would mostly be scientific or mathematical datas ...
-
下载osquery https://code.facebook.com/projects/658950180885092 并安装 发出这个命令osqueryi 当出现提示时,使用此sql命令查看所有正在运行的java进程并找到pids SELECT名称,路径,pid FROM进程,其中name =“java”; 你会在Mac上看到这样的东西 +------+-------------------------------------------------------------------------- ...
-
Hadoop和RDBMS(Hadoop and RDBMS)[2023-09-10]
您可以使用Sqoop将数据从RDBMS导入Hadoop。 Hadoop会处理非结构化数据,因为您将约束(创建结构化数据)推到了最后。 这也允许创建什么样的结构,这将定义您可以提取的信息类型。 永远不会说您无法处理结构化数据,但获得的里程数很低。 RDBMS可以高效地处理结构化数据。 You can use Sqoop to import data from RDBMS to Hadoop. Hadoop shines at processing unstructured data because you a ... -
问题是您将凭据传递到错误的位置。 请尝试使用此代码: public int run(String[] args) throws Exception { String driverClassName = DRIVER_CLASS; String url = DB_URL; if(args.length > 1) { driverClassName = args[0]; url = args[1]; } initialize(driverCl ...
-
在hadoop中阻塞池(Block pool in hadoop)[2022-11-16]
它是关于每个数据块的元数据 。 hadoop中的文件被分成块,然后这些块存储在不同的数据节点上。 但是要再次访问这些数据,我们需要知道这些块的存储位置 。 namenode在块池的帮助下完成了这个任务。 因此,块池是关于hadoop集群上每个文件的每个块的元数据。 它们存储在namenode的内存中,而不是存储在磁盘上。 因此,如果namenode关闭,则需要重建此信息。 现在在Hadoop Federation中,我们有多个名称空间的概念。 不同的名称节点负责不同的名称空间。 假设我们有两台机器充当nam ...