diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index fa0ad212ad..602b4b5edc 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -43,6 +43,8 @@ object LatencySpec extends MultiNodeConfig { akka.test.LatencySpec.repeatCount = 1 akka { loglevel = ERROR + # avoid TestEventListener + loggers = ["akka.event.Logging$$DefaultLogger"] testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s actor { provider = "akka.remote.RemoteActorRefProvider" diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 77e0039614..f7c5dc823d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -3,18 +3,22 @@ */ package akka.remote.artery +import java.nio.ByteBuffer import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.NANOSECONDS + import scala.concurrent.duration._ + import akka.actor._ import akka.remote.RemoteActorRefProvider import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec +import akka.serialization.ByteBufferSerializer +import akka.serialization.SerializerWithStringManifest import akka.testkit._ import com.typesafe.config.ConfigFactory -import java.net.InetAddress object MaxThroughputSpec extends MultiNodeConfig { val first = role("first") @@ -28,11 +32,20 @@ object MaxThroughputSpec extends MultiNodeConfig { akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0 akka { loglevel = ERROR + # avoid TestEventListener + loggers = ["akka.event.Logging$$DefaultLogger"] testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s actor { provider = "akka.remote.RemoteActorRefProvider" serialize-creators = false serialize-messages = false + + serializers { + test = "akka.remote.artery.MaxThroughputSpec$$TestSerializer" + } + serialization-bindings { + "akka.remote.artery.MaxThroughputSpec$$FlowControl" = test + } } remote.artery { enabled = on @@ -57,9 +70,13 @@ object MaxThroughputSpec extends MultiNodeConfig { Props(new Receiver(reporter, payloadSize)).withDispatcher("akka.remote.default-remote-dispatcher") class Receiver(reporter: RateReporter, payloadSize: Int) extends Actor { - var c = 0L + private var c = 0L def receive = { + case msg: Array[Byte] ⇒ + if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message") + reporter.onMessage(1, payloadSize) + c += 1 case Start ⇒ c = 0 sender() ! Start @@ -68,10 +85,7 @@ object MaxThroughputSpec extends MultiNodeConfig { context.stop(self) case m: Echo ⇒ sender() ! m - case msg: Array[Byte] ⇒ - if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message") - reporter.onMessage(1, payloadSize) - c += 1 + } } @@ -117,14 +131,14 @@ object MaxThroughputSpec extends MultiNodeConfig { val throughput = (totalReceived * 1000.0 / took) println( s"=== MaxThroughput ${self.path.name}: " + - f"throughput ${throughput}%,.0f msg/s, " + - f"${throughput * payloadSize}%,.0f bytes/s, " + + f"throughput ${throughput * testSettings.senderReceiverPairs}%,.0f msg/s, " + + f"${throughput * payloadSize * testSettings.senderReceiverPairs}%,.0f bytes/s, " + s"dropped ${totalMessages - totalReceived}, " + s"max round-trip $maxRoundTripMillis ms, " + s"burst size $burstSize, " + s"payload size $payloadSize, " + s"$took ms to deliver $totalReceived messages") - plotRef ! PlotResult().add(testName, throughput * payloadSize / 1024 / 1024) + plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024) context.stop(self) } @@ -132,7 +146,8 @@ object MaxThroughputSpec extends MultiNodeConfig { val batchSize = math.min(remaining, burstSize) var i = 0 while (i < batchSize) { - target ! payload + // target ! payload + target.tell(payload, ActorRef.noSender) i += 1 } remaining -= batchSize @@ -153,6 +168,31 @@ object MaxThroughputSpec extends MultiNodeConfig { payloadSize: Int, senderReceiverPairs: Int) + class TestSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with ByteBufferSerializer { + + val FlowControlManifest = "A" + + override val identifier: Int = 100 + + override def manifest(o: AnyRef): String = + o match { + case _: FlowControl ⇒ FlowControlManifest + } + + override def toBinary(o: AnyRef, buf: ByteBuffer): Unit = + o match { + case FlowControl(burstStartTime) ⇒ buf.putLong(burstStartTime) + } + + override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = + manifest match { + case FlowControlManifest ⇒ FlowControl(buf.getLong) + } + + override def toBinary(o: AnyRef): Array[Byte] = ??? + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = ??? + } + } class MaxThroughputSpecMultiJvmNode1 extends MaxThroughputSpec @@ -195,6 +235,12 @@ abstract class MaxThroughputSpec } val scenarios = List( + TestSettings( + testName = "warmup", + totalMessages = adjustedTotalMessages(20000), + burstSize = 1000, + payloadSize = 100, + senderReceiverPairs = 1), TestSettings( testName = "1-to-1", totalMessages = adjustedTotalMessages(20000), @@ -216,7 +262,7 @@ abstract class MaxThroughputSpec TestSettings( testName = "5-to-5", totalMessages = adjustedTotalMessages(20000), - burstSize = 1000, + burstSize = 200, // don't exceed the send queue capacity 200*5*3=3000 payloadSize = 100, senderReceiverPairs = 5)) @@ -269,7 +315,5 @@ abstract class MaxThroughputSpec s"be great for ${s.testName}, burstSize = ${s.burstSize}, payloadSize = ${s.payloadSize}" in test(s) } - // TODO add more tests, such as 5-to-5 sender receiver pairs - } }