From cba535c9b735169100bf4304ad1f00750f07f64c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 23 Nov 2012 11:22:01 +0100 Subject: [PATCH] Hardening AdaptiveLoadBalancingRouterSpec, see #2547 * Problems with OOME for large heaps * Create many small arrays instead of one large array * Reduce half-life to get faster updates --- .../AdaptiveLoadBalancingRouterSpec.scala | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala index f936c5d0a7..723ef6b8ec 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala @@ -8,7 +8,6 @@ import language.postfixOps import java.lang.management.ManagementFactory import scala.concurrent.Await import scala.concurrent.duration._ -import scala.concurrent.forkjoin.ThreadLocalRandom import com.typesafe.config.Config import com.typesafe.config.ConfigFactory @@ -26,22 +25,26 @@ import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender } object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig { class Routee extends Actor { - var usedMemory: Array[Byte] = _ def receive = { case _ ⇒ sender ! Reply(Cluster(context.system).selfAddress) } } class Memory extends Actor with ActorLogging { - var usedMemory: Array[Byte] = _ + var usedMemory: Array[Array[Int]] = _ def receive = { case AllocateMemory ⇒ val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage // getMax can be undefined (-1) val max = math.max(heap.getMax, heap.getCommitted) val used = heap.getUsed - val allocate = (0.8 * (max - used)).toInt - usedMemory = Array.fill(allocate)(ThreadLocalRandom.current.nextInt(127).toByte) + log.debug("used heap before: [{}] bytes, of max [{}]", used, heap.getMax) + // allocate 70% of free space + val allocateBytes = (0.7 * (max - used)).toInt + val numberOfArrays = allocateBytes / 1024 + usedMemory = Array.ofDim(numberOfArrays, 248) // each 248 element Int array will use ~ 1 kB + log.debug("used heap after: [{}] bytes", ManagementFactory.getMemoryMXBean.getHeapMemoryUsage.getUsed) + sender ! "done" } } @@ -55,6 +58,7 @@ object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" akka.cluster.metrics.collect-interval = 1s akka.cluster.metrics.gossip-interval = 1s + akka.cluster.metrics.moving-average-half-life = 2s akka.actor.deployment { /router3 = { router = adaptive @@ -157,7 +161,10 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa enterBarrier("gc") runOn(second) { - system.actorOf(Props[Memory], "memory") ! AllocateMemory + within(20.seconds) { + system.actorOf(Props[Memory], "memory") ! AllocateMemory + expectMsg("done") + } } enterBarrier("heap-allocated") @@ -166,13 +173,11 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa router2 // collect some metrics before we start - Thread.sleep(10000) + Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10) - val iterationCount = 100 - for (i ← 0 until iterationCount) { + val iterationCount = 3000 + 1 to iterationCount foreach { _ ⇒ router2 ! "hit" - // wait a while between each message, since metrics is collected periodically - Thread.sleep(10) } val replies = receiveReplies(iterationCount)