diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeMessageClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeMessageClusterSpec.scala index 14d9fe24ec..761c4f313d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeMessageClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeMessageClusterSpec.scala @@ -9,13 +9,13 @@ import akka.actor.ActorIdentity import akka.actor.ActorRef import akka.actor.ExtendedActorSystem import akka.actor.Identify -import akka.actor.PoisonPill import akka.cluster.ClusterEvent.UnreachableMember import akka.remote.RARP import akka.remote.artery.ArterySettings import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec +import akka.serialization.SerializerWithStringManifest import akka.testkit._ import com.typesafe.config.ConfigFactory @@ -27,12 +27,21 @@ object LargeMessageClusterMultiJvmSpec extends MultiNodeConfig { // Note that this test uses default configuration, // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString( - """ + s""" akka { - loglevel = DEBUG cluster.debug.verbose-heartbeat-logging = on loggers = ["akka.testkit.TestEventListener"] - actor.provider = cluster + + actor { + provider = cluster + + serializers { + test = "akka.cluster.LargeMessageClusterMultiJvmSpec$$SlowSerializer" + } + serialization-bindings { + "akka.cluster.LargeMessageClusterMultiJvmSpec$$Slow" = test + } + } testconductor.barrier-timeout = 3 minutes @@ -44,20 +53,29 @@ object LargeMessageClusterMultiJvmSpec extends MultiNodeConfig { large-message-destinations = [ "/user/largeEcho", "/system/largeEchoProbe-3" ] advanced { - #maximum-frame-size = 2 MiB - #buffer-pool-size = 32 maximum-large-frame-size = 2 MiB large-buffer-pool-size = 32 - - compression { - #actor-refs.advertisement-interval = 2 second - #manifests.advertisement-interval = 2 second - } } } } """)) + final case class Slow(payload: Array[Byte]) + + class SlowSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + override def identifier = 999 + override def manifest(o: AnyRef) = "a" + override def toBinary(o: AnyRef) = o match { + case Slow(payload) ⇒ + // simulate slow serialization to not completely overload the machine/network, see issue #24576 + Thread.sleep(100) + payload + } + override def fromBinary(bytes: Array[Byte], manifest: String) = { + Slow(bytes) + } + } + } class LargeMessageClusterMultiJvmNode1 extends LargeMessageClusterSpec @@ -131,17 +149,18 @@ abstract class LargeMessageClusterSpec extends MultiNodeSpec(LargeMessageCluster "not disturb cluster heartbeat messages when saturated" taggedAs LongRunningTest in { - // FIXME only enabled for Aeron transport until #24576 is fixed + // for non Aeron transport we use the Slow message and SlowSerializer to slow down + // to not completely overload the machine/network, see issue #24576 val arterySettings = ArterySettings(system.settings.config.getConfig("akka.remote.artery")) - if (!arterySettings.Enabled || arterySettings.Transport != ArterySettings.AeronUpd) - pending + val aeronUdpEnabled = (arterySettings.Enabled && arterySettings.Transport == ArterySettings.AeronUpd) runOn(second) { val largeEcho2 = identify(second, "largeEcho") val largeEcho3 = identify(third, "largeEcho") val largeMsgSize = 1 * 1000 * 1000 - val largeMsg = ("0" * largeMsgSize).getBytes("utf-8") + val payload = ("0" * largeMsgSize).getBytes("utf-8") + val largeMsg = if (aeronUdpEnabled) payload else Slow(payload) (1 to 3).foreach { _ ⇒ // this will ping-pong between second and third largeEcho2.tell(largeMsg, largeEcho3)