Merge pull request #24592 from akka/wip-24576-LargeMessageClusterSpec-patriknw

slowdown LargeMessageClusterSpec for tcp transport, #24576
This commit is contained in:
Patrik Nordwall 2018-03-05 16:20:46 +01:00 committed by GitHub
commit 0ea8c0d872
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -9,13 +9,13 @@ import akka.actor.ActorIdentity
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.Identify import akka.actor.Identify
import akka.actor.PoisonPill
import akka.cluster.ClusterEvent.UnreachableMember import akka.cluster.ClusterEvent.UnreachableMember
import akka.remote.RARP import akka.remote.RARP
import akka.remote.artery.ArterySettings import akka.remote.artery.ArterySettings
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.serialization.SerializerWithStringManifest
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
@ -27,12 +27,21 @@ object LargeMessageClusterMultiJvmSpec extends MultiNodeConfig {
// Note that this test uses default configuration, // Note that this test uses default configuration,
// not MultiNodeClusterSpec.clusterConfig // not MultiNodeClusterSpec.clusterConfig
commonConfig(ConfigFactory.parseString( commonConfig(ConfigFactory.parseString(
""" s"""
akka { akka {
loglevel = DEBUG
cluster.debug.verbose-heartbeat-logging = on cluster.debug.verbose-heartbeat-logging = on
loggers = ["akka.testkit.TestEventListener"] loggers = ["akka.testkit.TestEventListener"]
actor.provider = cluster
actor {
provider = cluster
serializers {
test = "akka.cluster.LargeMessageClusterMultiJvmSpec$$SlowSerializer"
}
serialization-bindings {
"akka.cluster.LargeMessageClusterMultiJvmSpec$$Slow" = test
}
}
testconductor.barrier-timeout = 3 minutes testconductor.barrier-timeout = 3 minutes
@ -44,20 +53,29 @@ object LargeMessageClusterMultiJvmSpec extends MultiNodeConfig {
large-message-destinations = [ "/user/largeEcho", "/system/largeEchoProbe-3" ] large-message-destinations = [ "/user/largeEcho", "/system/largeEchoProbe-3" ]
advanced { advanced {
#maximum-frame-size = 2 MiB
#buffer-pool-size = 32
maximum-large-frame-size = 2 MiB maximum-large-frame-size = 2 MiB
large-buffer-pool-size = 32 large-buffer-pool-size = 32
compression {
#actor-refs.advertisement-interval = 2 second
#manifests.advertisement-interval = 2 second
}
} }
} }
} }
""")) """))
final case class Slow(payload: Array[Byte])
class SlowSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
override def identifier = 999
override def manifest(o: AnyRef) = "a"
override def toBinary(o: AnyRef) = o match {
case Slow(payload)
// simulate slow serialization to not completely overload the machine/network, see issue #24576
Thread.sleep(100)
payload
}
override def fromBinary(bytes: Array[Byte], manifest: String) = {
Slow(bytes)
}
}
} }
class LargeMessageClusterMultiJvmNode1 extends LargeMessageClusterSpec class LargeMessageClusterMultiJvmNode1 extends LargeMessageClusterSpec
@ -131,17 +149,18 @@ abstract class LargeMessageClusterSpec extends MultiNodeSpec(LargeMessageCluster
"not disturb cluster heartbeat messages when saturated" taggedAs LongRunningTest in { "not disturb cluster heartbeat messages when saturated" taggedAs LongRunningTest in {
// FIXME only enabled for Aeron transport until #24576 is fixed // for non Aeron transport we use the Slow message and SlowSerializer to slow down
// to not completely overload the machine/network, see issue #24576
val arterySettings = ArterySettings(system.settings.config.getConfig("akka.remote.artery")) val arterySettings = ArterySettings(system.settings.config.getConfig("akka.remote.artery"))
if (!arterySettings.Enabled || arterySettings.Transport != ArterySettings.AeronUpd) val aeronUdpEnabled = (arterySettings.Enabled && arterySettings.Transport == ArterySettings.AeronUpd)
pending
runOn(second) { runOn(second) {
val largeEcho2 = identify(second, "largeEcho") val largeEcho2 = identify(second, "largeEcho")
val largeEcho3 = identify(third, "largeEcho") val largeEcho3 = identify(third, "largeEcho")
val largeMsgSize = 1 * 1000 * 1000 val largeMsgSize = 1 * 1000 * 1000
val largeMsg = ("0" * largeMsgSize).getBytes("utf-8") val payload = ("0" * largeMsgSize).getBytes("utf-8")
val largeMsg = if (aeronUdpEnabled) payload else Slow(payload)
(1 to 3).foreach { _ (1 to 3).foreach { _
// this will ping-pong between second and third // this will ping-pong between second and third
largeEcho2.tell(largeMsg, largeEcho3) largeEcho2.tell(largeMsg, largeEcho3)