Hadoop中的Speculative Task

2019-03-28 13:24|来源: 网络

Speculative Task称为推测式任务,是Map/Reduce框架中的重要优化算法,是为了解决某些运行速度较慢的task,影响整个job的执行进度的问题。在分布式集群环境下,因为程序bug、负载不均衡、资源分布不均、slave node处理能力不同,会出现同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task的情况,这些task的出现会影响整个job的整体运行速度。Speculative task技术会针对那些慢于平均进度的Task启动Speculative Task,此时如果原Task在Speculative Task前完成,则Speculative Task会被终止,同样的,如果Speculative Task先于原Task完成则原来的Task会被终止。

Hadoop平台下,我们会发现启用了speculative execution选项后,会为每一个task启用多个相同的attempt tasks(执行这些tasks的slave节点不同),当任意一个attempt task执行完成后,将kill掉所有其他相同的attempt tasks,从而提高每个task的平均执行效率。

然而,这些attempt tasks同样占用slave节点的资源,当节点资源有限的情况下,会因为这些attempt tasks的出现造成大量的资源浪费,以致频繁出现out of memory/java heap space error的问题(由于我们使用的平台资源有限,在实验过程中这种问题频繁出现)。通过关闭speculative execution选项,可以解决这个问题,具体来说有两种做法(hadoop默认是开启speculative execution选项):

1)在mapred-site.xml中设置

  1. <property> 
  2. <name>mapred.map.tasks.speculative.execution</name> 
  3. <value>false</value> 
  4. </property> 
  5. <property> 
  6. <name>mapred.reduce.tasks.speculative.execution</name> 
  7. <value>false</value> 
  8. </property> 

2)在提交job,配置job configuration时设置

  1. conf.setBoolean("mapred.map.tasks.speculative.execution"false); 
  2. conf.setBoolean("mapred.reduce.tasks.speculative.execution"false); 

前一种方法是通过修改hadoop配置文件,将对所有job均不执行speculative execution;后一种方法仅对当前设置的job不执行speculative execution,相比较来说,后一种方法更具有灵活性。

我们目前使用的是Hadoop1.0.0版本,使用了后一种配置方法,运行后,每一个task只有一个attempt task在执行,并且,由于减少了slave节点计算资源的浪费,每个task的执行速度有所提升。但缺点是,当一个task在执行过程中出现failed(无论当前执行进度如何),只能重新开启一个同样的task执行,这也在一定程度上造成了速度的下降。所以,是否开启speculative execution选项也是一个权衡的过程。

另外,在查找资料的过程中还发现,有人反映当使用前面第一种方法开启speculative execution选项后,将无法使用第二种方法动态关闭speculative excution;如果使用第一种方法关闭speculative execution选项后,则可以使用第二种方法开启speculative execution选项(https://issues.apache.org/jira/browse/MAPREDUCE-3404)。这似乎是hadoop的版本问题,因为我们在1.0.0下的实验过程中并没有发现这个问题。

相关问答

更多
  • async关键字使编译器自动处理此问题。 异步方法隐式地“包装”了任务中的返回值。 async Task GetNumber() { return 42; } VS Task GetNumber() { return Task.FromResult(42); } The async keyword causes the compiler to take care of this automatically. Async methods implicitly "wrap ...
  • 您的整个过程可以在映射器本身内完成。 使用mapper的setup函数初始化HashMap。 直接在HashMap中搜索fieldId并获取它的值并将其写入上下文。 在可迭代值的for循环内的reducer中输出相同的内容。 Your whole process can be done inside the mapper itself. Use setup function of mapper to initialize the HashMap. Directly search for the fieldI ...
  • 我会试试这个: public void BatchStart(List definition) { Task.WaitAll( definition.Select (a => Task.Factory.StartNew( () => (TaskResult)a.MethodTocall.DynamicInvoke(a.ARguments)).ContinueWith(tas ...
  • 尽管Eric Lippert提到你不应该获取Task.Result,但仍有几个问题要回答: 如何检查对象是否是泛型类型 obj.GetType().IsGenericType 是否是任务 obj.GetType().GetGenericTypeDefinition() == typeof(Task<>) 如何访问通用属性值 obj.GetType().GetProperty("Result").GetValue(obj) // This value could be null Des ...
  • 就YARN而言,在群集上运行的程序称为应用程序。 就MapReduce而言,它们被称为作业。 因此,如果您在YARN上运行MapReduce,则作业和应用程序是相同的(如果仔细观察,作业ID和应用程序ID是相同的)。 MapReduce作业由几个任务组成(它们可以是map或reduce任务)。 如果某个任务失败,它将在另一个节点上再次启动。 这些都是任务尝试。 容器是一个YARN术语。 这是资源分配的单位。 例如,MapReduce任务将在单个容器中运行。 In terms of YARN, the pro ...
  • 我不相信推测执行时间目前是可配置的。 另一方面,可能没有必要调整它。 推测执行意味着让您摆脱缓慢运行的任务(通常是由于硬件性能下降)。 如果你有可用的群集资源,那么spec exec正在进行中,让它这样做有什么害处? 请注意,对于中等或较大尺寸的作业,分钟不被视为“重要”,并且超出正常水平。 值得注意的是,虽然mapper spec exec几乎总是很好并且系统开销很低,但是reducer spec exec可能会受到伤害并且可能应该被禁用。 理由是,如果映射器进展缓慢并且存在数据为本地(正常)的可用资源, ...
  • 我不知道这是否导致执行停止,但也许是因为你在这里关闭循环变量 : DoSomeWork(someValue)); 您需要创建一个局部变量并为其分配someValue ,然后使用该局部变量,如我的链接问题中所述,如下所示: foreach(var someValue in aCollection) { var localCopy = someValue; var t = Task.Factory.StartNew(() => DoSomeWork(localCopy)); tasks.A ...
  • 如果任务失败,Application Master将重新尝试重新启动它。 该任务将重新启动。 有一个参数表示允许重新尝试的次数。 如果超过则整个应用程序被终止。 If a task fails, the Application Master will reattempt to start it afresh. The task will be restarted afresh. There is a parameter for how many times the reattempt is allowed. ...
  • 您可以指定TaskScheduler以在ContinueWith某些重载中使用。 您决定在哪里运行该代码。 这里不能指定调度程序。 在第一个等待点之后,异步方法在捕获的SynchronizationContext或当前TaskScheduler上运行。 确实,异步方法确实安排了延续。 (我遗漏了你可以有定制的等待者的事实。) 您的异步示例在主线程上同步运行以完成。 ContinueWith不仅创建任务而且还安排任务,这是否正确。 是的,在您指定的调度程序上。 是否正确等待不仅创建了文章中显示的继续任务,而且 ...
  • 一个更完整的例子: http : //play.golang.org/p/1RzDiw7F9t package main import ( "fmt" "math/rand" ) type Task interface { Do() error ID() int64 } type XTask struct { id int64 // other stuff } func NewXTask( /*task parameters...*/) *XTask ...