此时Driver堆栈信息如下:
"Driver" #26 prio=5 os_prio=0 tid=0x00007f163a116000 nid=0x5192 waiting on condition [0x00007f15bb9a0000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000001a8c4f9e0> (a scala.concurrent.impl.Promise$CompletionLatch)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1988)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)
at org.apache.spark.rdd.RDD$$anonfun$treeReduce$1.apply(RDD.scala:1059)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.treeReduce(RDD.scala:1037)
at breeze.optimize.CachedDiffFunction.calculate(CachedDiffFunction.scala:23)
at breeze.optimize.LineSearch$$anon$1.calculate(LineSearch.scala:41)
at breeze.optimize.LineSearch$$anon$1.calculate(LineSearch.scala:30)
at breeze.optimize.StrongWolfeLineSearch.breeze$optimize$StrongWolfeLineSearch$$phi$1(StrongWolfe.scala:69)
at breeze.optimize.StrongWolfeLineSearch$$anonfun$minimize$1.apply$mcVI$sp(StrongWolfe.scala:142)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at breeze.optimize.StrongWolfeLineSearch.minimize(StrongWolfe.scala:141)
at breeze.optimize.LBFGS.determineStepSize(LBFGS.scala:78)
at breeze.optimize.LBFGS.determineStepSize(LBFGS.scala:40)
at breeze.optimize.FirstOrderMinimizer$$anonfun$infiniteIterations$1.apply(FirstOrderMinimizer.scala:64)
at breeze.optimize.FirstOrderMinimizer$$anonfun$infiniteIterations$1.apply(FirstOrderMinimizer.scala:62)
at scala.collection.Iterator$$anon$7.next(Iterator.scala:129)
at breeze.util.IteratorImplicits$RichIterator$$anon$2.next(Implicits.scala:71)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at app.package.AppClass.main(AppClass.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
可见正在runJob,并且等待executor执行结果;
有问题的executor上堆栈信息有一个可疑的thread长时间一直在running:
"shuffle-client-5-4" #94 daemon prio=5 os_prio=0 tid=0x00007fbae0e42800 nid=0x2a3a runnable [0x00007fbae4760000]
java.lang.Thread.State: RUNNABLE
at io.netty.util.Recycler$Stack.scavengeSome(Recycler.java:476)
at io.netty.util.Recycler$Stack.scavenge(Recycler.java:454)
at io.netty.util.Recycler$Stack.pop(Recycler.java:435)
at io.netty.util.Recycler.get(Recycler.java:144)
at io.netty.buffer.PooledUnsafeDirectByteBuf.newInstance(PooledUnsafeDirectByteBuf.java:39)
at io.netty.buffer.PoolArena$DirectArena.newByteBuf(PoolArena.java:727)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:140)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
ps:出问题的executor上当时的内存资源很空闲,进程状态也正常:
-bash-4.2$ free -m
total used free shared buff/cache available
Mem: 257676 29251 5274 517 223150 226669
Swap: 0 0 0
怀疑此处可能有死循环,spark2.1.1使用的netty版本是4.0.42,查看netty代码:
io.netty.util.Recycler
boolean scavengeSome() {
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
cursor = head;
if (cursor == null) {
return false;
boolean success = false;
WeakOrderQueue prev = this.prev;
if (cursor.transfer(this)) {
success = true;
break;
WeakOrderQueue next = cursor.next;
if (cursor.owner.get() == null) {
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
if (prev != null) {
prev.next = next;
} else {
prev = cursor;
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
---------------------------------------------------------------- 结束啦,我是大魔王先生的分割线 :) ----------------------------------------------------------------
由于大魔王先生能力有限,文中可能存在错误,欢迎指正、补充!
感谢您的阅读,如果文章对您有用,那么请为大魔王先生轻轻点个赞,ありがとう