From 3fdbf553a11e0120ffd44162f767bd28e99647b2 Mon Sep 17 00:00:00 2001 From: Perforce Administrator Date: Wed, 21 Aug 2019 20:29:27 +0000 Subject: [PATCH 1/5] commit by Octane --- Config | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 Config diff --git a/Config b/Config new file mode 100644 index 0000000..8ab9ce1 --- /dev/null +++ b/Config @@ -0,0 +1,37 @@ +# -*-perl-*- + +package.Spark-memory = { + interfaces = (1.0); + + deploy = { + generic = true; + }; + + build-environment = { + chroot = basic; + network-access = blocked; + }; + + # Use NoOpBuild. See https://w.amazon.com/index.php/BrazilBuildSystem/NoOpBuild + build-system = no-op; + build-tools = { + 1.0 = { + NoOpBuild = 1.0; + }; + }; + + # Use runtime-dependencies for when you want to bring in additional + # packages when deploying. + # Use dependencies instead if you intend for these dependencies to + # be exported to other packages that build against you. + dependencies = { + 1.0 = { + }; + }; + + runtime-dependencies = { + 1.0 = { + }; + }; + +}; From be37d37b933b321a490d0d5ddbd77747264ea8fb Mon Sep 17 00:00:00 2001 From: Andrew Long Date: Wed, 21 Aug 2019 13:54:35 -0700 Subject: [PATCH 2/5] [BONFIRE] Added support for EMR and fixed a name space issue that prevented spark from stating the ExecutorPlugin cr https://code.amazon.com/reviews/CR-12240438 --- core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala | 2 +- .../org/apache/spark/memory/SparkMemoryManagerHandle.scala | 1 - pom.xml | 4 +--- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala index 9a85c11..33b4190 100644 --- a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala @@ -327,7 +327,7 @@ object MemoryMonitor { } } -class MemoryMonitorExecutorExtension extends ExecutorPlugin { +class MemoryMonitorExecutorExtension extends ExecutorPlugin with org.apache.spark.ExecutorPlugin { // the "extension class" api just lets you invoke a constructor. We really just want to // call this static method, so that's good enough. MemoryMonitor.installIfSysProps() diff --git a/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala b/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala index 387d7a8..63ef183 100644 --- a/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala +++ b/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala @@ -1,7 +1,6 @@ package org.apache.spark.memory import com.cloudera.spark.{Reflector, IncrementBytes, MemoryGetter} -import org.apache.spark.util.{Utils, ThreadStackTrace} import org.apache.spark.{SparkContext, SparkEnv} class SparkMemoryManagerHandle( diff --git a/pom.xml b/pom.xml index 019475d..6c1f041 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,6 @@ sumac_2.11 0.3.0 - org.scala-lang scala-library @@ -46,10 +45,9 @@ org.apache.spark spark-core_2.11 - 2.3.0 + 2.4.3 provided - org.scalatest scalatest_2.11 From 68529eece73e8aa9d812e8dcbbf93b67be26a50a Mon Sep 17 00:00:00 2001 From: Andrew Long Date: Wed, 21 Aug 2019 14:32:12 -0700 Subject: [PATCH 3/5] [BONFIRE] Updated the README.md to include an introduction and an example error message generated by Yarn --- README.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 776cefe..969a3fd 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,23 @@ Spark Memory Monitor ======================== +Intro +---------- +Memory management is a complicated system in spark that often times leads to confusing and difficult to diagnose problems. Typically the user gets anerror message that shows that YARN has killed an executor for exceeding physical memory limits but gives no insight into why that is. Further more the typical tools used to investigate java memory use are ineffective due to the extensive use of off-heap memory use in spark. This results in a portition of the process memory being “unaccounted for”. spark-memory is a tool to provide insight into the parts of the code that are not typically visible giving insight into the max memory use. -Usage +> ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714. + +Example yarn error message + + +Building spark-memory ----------- Build with `mvn package`, `sbt`, etc. + +Modifying spark-submit to use spark-memory +----------- + Include that jar in your spark application. You could bundle it directly, or just include it with `--jars`. The monitoring is configured via java system properties: From 0ebbf8d7ce46e02127a7173fe21420e2ba57bbc4 Mon Sep 17 00:00:00 2001 From: Andrew Long Date: Wed, 21 Aug 2019 14:50:29 -0700 Subject: [PATCH 4/5] [BONFIRE] refactored the MemoryMonitorExecutorExtension into its own class --- .../com/cloudera/spark/MemoryMonitor.scala | 53 ---------------- .../MemoryMonitorExecutorExtension.scala | 62 +++++++++++++++++++ 2 files changed, 62 insertions(+), 53 deletions(-) create mode 100644 core/src/main/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala index 33b4190..6ee6d53 100644 --- a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala @@ -327,60 +327,7 @@ object MemoryMonitor { } } -class MemoryMonitorExecutorExtension extends ExecutorPlugin with org.apache.spark.ExecutorPlugin { - // the "extension class" api just lets you invoke a constructor. We really just want to - // call this static method, so that's good enough. - MemoryMonitor.installIfSysProps() - val args = MemoryMonitorArgs.sysPropsArgs - - val monitoredTaskCount = new AtomicInteger(0) - - val scheduler = if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) { - // TODO share polling executors? - new ScheduledThreadPoolExecutor(1, new ThreadFactory { - override def newThread(r: Runnable): Thread = { - val t = new Thread(r, "thread-dump poll thread") - t.setDaemon(true) - t - } - }) - } else { - null - } - val pollingTask = new AtomicReference[ScheduledFuture[_]]() - - override def taskStart(taskContext: TaskContext): Unit = { - if (args.stagesToPoll.contains(taskContext.stageId())) { - if (monitoredTaskCount.getAndIncrement() == 0) { - // TODO schedule thread polling - val task = scheduler.scheduleWithFixedDelay(new Runnable { - override def run(): Unit = { - val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis()) - println(s"Polled thread dump @ $d") - MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo) - } - }, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS) - pollingTask.set(task) - } - } - } - - override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { - removeActiveTask(context) - } - - override def onTaskCompletion(context: TaskContext): Unit = { - removeActiveTask(context) - } - private def removeActiveTask(context: TaskContext): Unit = { - if (args.stagesToPoll.contains(context.stageId())) { - if (monitoredTaskCount.decrementAndGet() == 0) { - pollingTask.get().cancel(false) - } - } - } -} class MemoryMonitorArgs extends FieldArgs { var enabled = false diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala new file mode 100644 index 0000000..69b52ae --- /dev/null +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala @@ -0,0 +1,62 @@ +package com.cloudera.spark + +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} +import java.util.concurrent.{ScheduledFuture, ScheduledThreadPoolExecutor, ThreadFactory, TimeUnit} + +import org.apache.spark.TaskContext +import org.apache.spark.executor.ExecutorPlugin + +class MemoryMonitorExecutorExtension extends ExecutorPlugin with org.apache.spark.ExecutorPlugin { + // the "extension class" api just lets you invoke a constructor. We really just want to + // call this static method, so that's good enough. + MemoryMonitor.installIfSysProps() + val args = MemoryMonitorArgs.sysPropsArgs + + val monitoredTaskCount = new AtomicInteger(0) + + val scheduler = if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) { + // TODO share polling executors? + new ScheduledThreadPoolExecutor(1, new ThreadFactory { + override def newThread(r: Runnable): Thread = { + val t = new Thread(r, "thread-dump poll thread") + t.setDaemon(true) + t + } + }) + } else { + null + } + val pollingTask = new AtomicReference[ScheduledFuture[_]]() + + override def taskStart(taskContext: TaskContext): Unit = { + if (args.stagesToPoll.contains(taskContext.stageId())) { + if (monitoredTaskCount.getAndIncrement() == 0) { + // TODO schedule thread polling + val task = scheduler.scheduleWithFixedDelay(new Runnable { + override def run(): Unit = { + val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis()) + println(s"Polled thread dump @ $d") + MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo) + } + }, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS) + pollingTask.set(task) + } + } + } + + override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { + removeActiveTask(context) + } + + override def onTaskCompletion(context: TaskContext): Unit = { + removeActiveTask(context) + } + + private def removeActiveTask(context: TaskContext): Unit = { + if (args.stagesToPoll.contains(context.stageId())) { + if (monitoredTaskCount.decrementAndGet() == 0) { + pollingTask.get().cancel(false) + } + } + } +} From 565220661bfdd5725333f9a75532d90b363806aa Mon Sep 17 00:00:00 2001 From: Andrew Long Date: Wed, 21 Aug 2019 16:40:00 -0700 Subject: [PATCH 5/5] [BONFIRE] Added test to verify that the correct ExecutorPlugin classes were implemented --- .../spark/MemoryMonitorExecutorExtension.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 core/src/test/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala diff --git a/core/src/test/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala b/core/src/test/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala new file mode 100644 index 0000000..8580b87 --- /dev/null +++ b/core/src/test/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala @@ -0,0 +1,10 @@ +package com.cloudera.spark + +import org.scalatest.FunSuite + +class MemoryMonitorExecutorExtensionSuite extends FunSuite { + test("MemoryMonitorExecutorExtension should extend the correct class of ExecutorPlugin") { + assert(classOf[MemoryMonitorExecutorExtension].getInterfaces.contains(classOf[org.apache.spark.executor.ExecutorPlugin])) + assert(classOf[MemoryMonitorExecutorExtension].getInterfaces.contains(classOf[org.apache.spark.ExecutorPlugin])) + } +}