Hadoop Map/Reduce内存限制

2019-03-28 13:40|来源: 网络

如何设置Hadoop  Map/Reduce任务的内存限制?

Parameter

Type

Meaning

mapred.cluster.map.memory.mb set by admin, cluster-wide Cluster definition of memory per map slot. The maximum amount of memory, in MB, each map task on a tasktracker can consume.
mapred.cluster.reduce.memory.mb set by admin, cluster-wide Cluster definition of memory per reduce slot. The maximum amount of memory, in MB, each reduce task on a tasktracker can consume.
mapred.job.map.memory.mb set by user, per-job Job requirement for map tasks. The maximum amount of memory each map task of a job can consume, in MB.
mapred.job.reduce.memory.mb set by user, per-job job requirement for reduce tasks. The maximum amount of memory each reduce task of a job can consume, in MB.
mapred.cluster.max.map.memory.mb set by admin, cluster-wide Max limit on jobs. The maximum value that can be specified by a user via mapred.job.map.memory.mb, in MB. A job that asks for more than this number will be failed at submission itself.
mapred.cluster.max.reduce.memory.mb set by admin, cluster-wide Max limit on jobs. The maximum value that can be specified by a user via mapred.job.reduce.memory.mb, in MB. A job that asks for more than this number will be failed at submission itself.

不设置时默认都是-1,无限制

相关介绍请参考Hadoop-0.20.2 作业内存控制策略分析 http://www.linuxidc.com/Linux/2012-06/63310.htm

设置时请注意其大小关系。比如你设置了mapred.cluster.map.memory.mb为1024 ,然后你提交任务时没有设置mapred.job.map.memory.mb(默认为-1,无限制),此时便会报如下错误:

  1. 2012-06-13 16:18:10,951 ERROR exec.Task (SessionState.java:printError(380)) - Job Submission failed with exception 'org.apache.hadoop.ipc.RemoteException(java.io.IOException: job_201206131602_0003(-1 memForMapTasks -1 memForReduceTasks): Invalid job requirements.  
  2.         at org.apache.hadoop.mapred.JobTracker.checkMemoryRequirements(JobTracker.java:5160)  
  3.         at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3949)  
  4.         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
  5.         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)  
  6.         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)  
  7.         at java.lang.reflect.Method.invoke(Method.java:597)  
  8.         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:523)  
  9.         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1383)  
  10.         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1379)  
  11.         at java.security.AccessController.doPrivileged(Native Method)  
  12.         at javax.security.auth.Subject.doAs(Subject.java:396)  
  13.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)  
  14.         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1377)  
  15. )'  
  16. org.apache.hadoop.ipc.RemoteException: java.io.IOException: job_201206131602_0003(-1 memForMapTasks -1 memForReduceTasks): Invalid job requirements.  
  17.         at org.apache.hadoop.mapred.JobTracker.checkMemoryRequirements(JobTracker.java:5160)  
  18.         at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3949)  
  19.         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
  20.         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)  
  21.         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)  
  22.         at java.lang.reflect.Method.invoke(Method.java:597)  
  23.         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:523)  
  24.         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1383)  
  25.         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1379)  
  26.         at java.security.AccessController.doPrivileged(Native Method)  
  27.         at javax.security.auth.Subject.doAs(Subject.java:396)  
  28.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)  
  29.         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1377)  
  30.   
  31.         at org.apache.hadoop.ipc.Client.call(Client.java:1030)  
  32.         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)  
  33.         at org.apache.hadoop.mapred.$Proxy7.submitJob(Unknown Source)  
  34.         at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:862)  
  35.         at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:791)  
  36.         at java.security.AccessController.doPrivileged(Native Method)  
  37.         at javax.security.auth.Subject.doAs(Subject.java:396)  
  38.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)  
  39.         at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:791)  
  40.         at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:765)  
  41.         at org.apache.hadoop.hive.ql.exec.ExecDriver.execute(ExecDriver.java:452)  
  42.         at org.apache.hadoop.hive.ql.exec.MapRedTask.execute(MapRedTask.java:136)  
  43.         at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:133)  
  44.         at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57)  
  45.         at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1332)  
  46.         at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1123)  
  47.         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:931)  
  48.         at org.apache.hadoop.hive.service.HiveServer$HiveServerHandler.execute(HiveServer.java:191)  
  49.         at org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:629)  
  50.         at org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:617)  
  51.         at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:32)  
  52.         at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:34)  
  53.         at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:176)  

相关问答

更多
  • 找到离存数据最近的一台机器运行和这个数据相关的map任务,reduce是按照你整理出的key有多少个来决定的。一个机器很难说,处理的快的处理多一点,保持所有机器使用平衡。 上面你都自己写了20个map,和文件大小个数有关,和数据条数无关。 要看你选择的输入格式是什么,默认是行偏移量,然后由你编写map函数,指定key和value是什么。相同的key整合起来传给reduce,由reduce进行下一步处理,最后输出到指定的地方。
  • map的数量 map的数量通常是由hadoop集群的DFS块大小确定的,也就是输入文件的总块数,正常的map数量的并行规模大致是每一个Node是10~100个,对于CPU消耗较小的作业可以设置Map数量为300个左右,但是由于hadoop的每一个任务在初始化时需要一定的时间,因此比较合理的情况是每个map执行的时间至少超过1分钟。具体的数据分片是这样的,InputFormat在默认情况下会根据hadoop集群的DFS块大小进行分片,每一个分片会由一个map任务来进行处理,当然用户还是可以通过参数mapred ...
  • 你是否期望每个reducer能够在完全相同的映射数据上工作? 但至少“钥匙”应该是不同的,因为它决定了哪个减速器要去。 您可以在mapper中多次输出输出,并将其输出为密钥(其中$ i代表第i个缩减器,$ key是您的原始密钥)。 您需要添加一个“分区程序”以确保这些记录是基于$ i分布在还原器中的。 然后使用“GroupingComparator”按原始$ key对记录进行分组。 有可能做到这一点,但不是在一个MR中以微不足道的方式。 Are you expecting every reducer to ...
  • 您也可以在单个Map Reduce作业中执行此操作。 您的Mapper将读取数据。 假设这是序列化格式,其结构类似于您的类(自定义可写)。 从映射器中,您可以以复杂键的形式收集输出,包括2个部分 - 您收集的内容:值,例如年龄:18。 这可以是文本或自定义可写。 根据您的使用案例,您可能需要使用分区程序来确保所有具有Age的键都转到单个reducer,而具有名称的键转到另一个reducer。 没有分区器所有具有Age:18的键将转到相同的reducer。 You can do this in a singl ...
  • 这个想法的问题是Hadoop没有“分布式内存”的概念。 如果你想让结果“在内存中”,下一个问题必须是“哪台机器的内存?” 如果你真的想要这样访问它,你将不得不编写自己的自定义输出格式,然后使用一些现有的框架在机器之间共享内存,或者再次编写自己的。 我的建议是简单地写入HDFS,然后对于非MapReduce业务逻辑,首先通过FileSystem API从HDFS读取数据,即: FileSystem fs = new JobClient(conf).getFs(); Path outputPath = new ...
  • 我认为你没有使用reducer,mapper输出是最终输出 如果你引入减速器,则会发生改组和排序,你会获得所需的输出。 请参考以下问题:减速机的改组和分拣阶段的目的是什么 示例Reducer实现: public class Reduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOExceptio ...
  • 这是除了最佳之外的一切,因为地图输出必须始终复制到另一台服务器。 但您可以简单地修改服务器上的mapred-site.xml。 mapred.tasktracker.map.tasks.maximum 4 The maximum number of map tasks that will be run simultaneously by a task tracker.
  • 您可能可以完成这项工作。 一种可能的方法是让每个hadoop节点运行一个嵌入式路由唯一弹性搜索节点。 这应该使查询更高效一些,因为节点会找出每个查询需要联系哪些节点,并利用有效的内部协议来执行此操作。 您可以通过添加更多es数据节点来进行水平缩放。 唯一的缺点是你的hadoop节点不会接近不同节点上的数据; 所以你有一点延迟通过网络。 但即使如此,你应该能够运行大量的看起来应该很便宜的查询这种方式。 由于它只有20GB,所以你的es节点很少需要去磁盘,并且可以在内存中执行所有操作,利用过滤器缓存等。 实际上 ...
  • JobConf conf = new JobConf(getConf(), ...); ... FileInputFormat.setInputPaths(conf, new Path("stored.xls")) ... JobClient.runJob(conf); ... setInputPaths将做到这一点。 JobConf conf = new JobConf(getConf(), ...); ... FileInputFormat.setInputPaths(conf, new Path(" ...
  • 我通过以下方式在Eclipse中开发Cassandra / Hadoop应用程序: 使用maven(m2e)为我的Eclipse项目收集和配置依赖项(Hadoop,Cassandra,Pig等) 创建测试用例(src / test / java中的类)来测试我的映射器和缩减器。 诀窍是使用扩展RecordWriter和StatusReporter的内部类动态构建上下文对象。 如果执行此操作,则在调用setup / map / cleanup或setup / reduce / cleanup之后,您可以断言正 ...