Hardening AdaptiveLoadBalancingRouterSpec, see #2547
* Problems with OOME for large heaps * Create many small arrays instead of one large array * Reduce half-life to get faster updates
This commit is contained in:
parent
c57f84fd28
commit
cba535c9b7
1 changed files with 16 additions and 11 deletions
|
|
@ -8,7 +8,6 @@ import language.postfixOps
|
||||||
import java.lang.management.ManagementFactory
|
import java.lang.management.ManagementFactory
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
|
@ -26,22 +25,26 @@ import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
|
||||||
object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
|
object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
class Routee extends Actor {
|
class Routee extends Actor {
|
||||||
var usedMemory: Array[Byte] = _
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case _ ⇒ sender ! Reply(Cluster(context.system).selfAddress)
|
case _ ⇒ sender ! Reply(Cluster(context.system).selfAddress)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class Memory extends Actor with ActorLogging {
|
class Memory extends Actor with ActorLogging {
|
||||||
var usedMemory: Array[Byte] = _
|
var usedMemory: Array[Array[Int]] = _
|
||||||
def receive = {
|
def receive = {
|
||||||
case AllocateMemory ⇒
|
case AllocateMemory ⇒
|
||||||
val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage
|
val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage
|
||||||
// getMax can be undefined (-1)
|
// getMax can be undefined (-1)
|
||||||
val max = math.max(heap.getMax, heap.getCommitted)
|
val max = math.max(heap.getMax, heap.getCommitted)
|
||||||
val used = heap.getUsed
|
val used = heap.getUsed
|
||||||
val allocate = (0.8 * (max - used)).toInt
|
log.debug("used heap before: [{}] bytes, of max [{}]", used, heap.getMax)
|
||||||
usedMemory = Array.fill(allocate)(ThreadLocalRandom.current.nextInt(127).toByte)
|
// allocate 70% of free space
|
||||||
|
val allocateBytes = (0.7 * (max - used)).toInt
|
||||||
|
val numberOfArrays = allocateBytes / 1024
|
||||||
|
usedMemory = Array.ofDim(numberOfArrays, 248) // each 248 element Int array will use ~ 1 kB
|
||||||
|
log.debug("used heap after: [{}] bytes", ManagementFactory.getMemoryMXBean.getHeapMemoryUsage.getUsed)
|
||||||
|
sender ! "done"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -55,6 +58,7 @@ object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
|
||||||
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
|
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
|
||||||
akka.cluster.metrics.collect-interval = 1s
|
akka.cluster.metrics.collect-interval = 1s
|
||||||
akka.cluster.metrics.gossip-interval = 1s
|
akka.cluster.metrics.gossip-interval = 1s
|
||||||
|
akka.cluster.metrics.moving-average-half-life = 2s
|
||||||
akka.actor.deployment {
|
akka.actor.deployment {
|
||||||
/router3 = {
|
/router3 = {
|
||||||
router = adaptive
|
router = adaptive
|
||||||
|
|
@ -157,7 +161,10 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
|
||||||
enterBarrier("gc")
|
enterBarrier("gc")
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
|
within(20.seconds) {
|
||||||
system.actorOf(Props[Memory], "memory") ! AllocateMemory
|
system.actorOf(Props[Memory], "memory") ! AllocateMemory
|
||||||
|
expectMsg("done")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
enterBarrier("heap-allocated")
|
enterBarrier("heap-allocated")
|
||||||
|
|
||||||
|
|
@ -166,13 +173,11 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
|
||||||
router2
|
router2
|
||||||
|
|
||||||
// collect some metrics before we start
|
// collect some metrics before we start
|
||||||
Thread.sleep(10000)
|
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
|
||||||
|
|
||||||
val iterationCount = 100
|
val iterationCount = 3000
|
||||||
for (i ← 0 until iterationCount) {
|
1 to iterationCount foreach { _ ⇒
|
||||||
router2 ! "hit"
|
router2 ! "hit"
|
||||||
// wait a while between each message, since metrics is collected periodically
|
|
||||||
Thread.sleep(10)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val replies = receiveReplies(iterationCount)
|
val replies = receiveReplies(iterationCount)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue