diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala index 5659044bc9..c12d7a4482 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala @@ -26,7 +26,7 @@ import akka.remote.artery.MaxThroughputSpec._ object FanInThroughputSpec extends MultiNodeConfig { val totalNumberOfNodes = - System.getProperty("MultiJvm.akka.test.FanInThroughputSpec.nrOfNodes") match { + System.getProperty("akka.test.FanInThroughputSpec.nrOfNodes") match { case null ⇒ 4 case value ⇒ value.toInt } @@ -41,6 +41,10 @@ object FanInThroughputSpec extends MultiNodeConfig { # for serious measurements you should increase the totalMessagesFactor (20) akka.test.FanInThroughputSpec.totalMessagesFactor = 10.0 akka.test.FanInThroughputSpec.real-message = off + akka.test.FanInThroughputSpec.actor-selection = off + akka.remote.artery.advanced { + inbound-lanes = 4 + } """)) .withFallback(MaxThroughputSpec.cfg) .withFallback(RemotingMultiNodeSpec.commonConfig)) @@ -61,6 +65,7 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput val totalMessagesFactor = system.settings.config.getDouble("akka.test.FanInThroughputSpec.totalMessagesFactor") val realMessage = system.settings.config.getBoolean("akka.test.FanInThroughputSpec.real-message") + val actorSelection = system.settings.config.getBoolean("akka.test.FanInThroughputSpec.actor-selection") var plot = PlotResult() @@ -85,9 +90,12 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput super.afterAll() } - def identifyReceiver(name: String, r: RoleName): ActorRef = { - system.actorSelection(node(r) / "user" / name) ! Identify(None) - expectMsgType[ActorIdentity](10.seconds).ref.get + def identifyReceiver(name: String, r: RoleName): Target = { + val sel = system.actorSelection(node(r) / "user" / name) + sel ! Identify(None) + val ref = expectMsgType[ActorIdentity](10.seconds).ref.get + if (actorSelection) ActorSelectionTarget(sel, ref) + else ActorRefTarget(ref) } val scenarios = List( @@ -146,7 +154,7 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput val ignore = TestProbe() val receivers = (1 to sendingNodes.size).map { n ⇒ identifyReceiver(receiverName + "-" + n, roles.head) - }.toArray[ActorRef] + }.toArray[Target] val idx = roles.indexOf(myself) - 1 val receiver = receivers(idx) @@ -171,7 +179,6 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput } "Max throughput of fan-in" must { - pending val reporter = BenchmarkFileReporter("FanInThroughputSpec", system) for (s ← scenarios) { s"be great for ${s.testName}, burstSize = ${s.burstSize}, payloadSize = ${s.payloadSize}" in test(s, reporter) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala index 74b5440ca0..3fe83e6e3b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala @@ -26,7 +26,7 @@ import akka.remote.artery.MaxThroughputSpec._ object FanOutThroughputSpec extends MultiNodeConfig { val totalNumberOfNodes = - System.getProperty("MultiJvm.akka.test.FanOutThroughputSpec.nrOfNodes") match { + System.getProperty("akka.test.FanOutThroughputSpec.nrOfNodes") match { case null ⇒ 4 case value ⇒ value.toInt } @@ -41,6 +41,7 @@ object FanOutThroughputSpec extends MultiNodeConfig { # for serious measurements you should increase the totalMessagesFactor (20) akka.test.FanOutThroughputSpec.totalMessagesFactor = 10.0 akka.test.FanOutThroughputSpec.real-message = off + akka.test.FanOutThroughputSpec.actor-selection = off """)) .withFallback(MaxThroughputSpec.cfg) .withFallback(RemotingMultiNodeSpec.commonConfig)) @@ -61,6 +62,7 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp val totalMessagesFactor = system.settings.config.getDouble("akka.test.FanOutThroughputSpec.totalMessagesFactor") val realMessage = system.settings.config.getBoolean("akka.test.FanOutThroughputSpec.real-message") + val actorSelection = system.settings.config.getBoolean("akka.test.FanOutThroughputSpec.actor-selection") var plot = PlotResult() @@ -85,9 +87,12 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp super.afterAll() } - def identifyReceiver(name: String, r: RoleName): ActorRef = { - system.actorSelection(node(r) / "user" / name) ! Identify(None) - expectMsgType[ActorIdentity](10.seconds).ref.get + def identifyReceiver(name: String, r: RoleName): Target = { + val sel = system.actorSelection(node(r) / "user" / name) + sel ! Identify(None) + val ref = expectMsgType[ActorIdentity](10.seconds).ref.get + if (actorSelection) ActorSelectionTarget(sel, ref) + else ActorRefTarget(ref) } val burstSize = 2000 / senderReceiverPairs @@ -143,7 +148,7 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp runOn(roles.head) { enterBarrier(receiverName + "-started") val ignore = TestProbe() - val receivers = targetNodes.map(target ⇒ identifyReceiver(receiverName, target)).toArray[ActorRef] + val receivers = targetNodes.map(target ⇒ identifyReceiver(receiverName, target)).toArray[Target] val senders = for ((target, i) ← targetNodes.zipWithIndex) yield { val receiver = receivers(i) val plotProbe = TestProbe() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 20718b1195..2475a15805 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -33,6 +33,7 @@ object MaxThroughputSpec extends MultiNodeConfig { # for serious measurements you should increase the totalMessagesFactor (20) akka.test.MaxThroughputSpec.totalMessagesFactor = 10.0 akka.test.MaxThroughputSpec.real-message = off + akka.test.MaxThroughputSpec.actor-selection = off akka { loglevel = INFO log-dead-letters = 10000 @@ -78,8 +79,8 @@ object MaxThroughputSpec extends MultiNodeConfig { akka.remote.default-remote-dispatcher { fork-join-executor { # parallelism-factor = 0.5 - parallelism-min = 2 - parallelism-max = 2 + parallelism-min = 4 + parallelism-max = 4 } # Set to 10 by default. Might be worthwhile to experiment with. # throughput = 100 @@ -97,6 +98,19 @@ object MaxThroughputSpec extends MultiNodeConfig { final case class EndResult(totalReceived: Long) extends JavaSerializable final case class FlowControl(burstStartTime: Long) extends Echo + sealed trait Target { + def tell(msg: Any, sender: ActorRef): Unit + def ref: ActorRef + } + + final case class ActorRefTarget(override val ref: ActorRef) extends Target { + override def tell(msg: Any, sender: ActorRef) = ref.tell(msg, sender) + } + + final case class ActorSelectionTarget(sel: ActorSelection, override val ref: ActorRef) extends Target { + override def tell(msg: Any, sender: ActorRef) = sel.tell(msg, sender) + } + def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int): Props = Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics, numSenders)).withDispatcher("akka.remote.default-remote-dispatcher") @@ -137,11 +151,11 @@ object MaxThroughputSpec extends MultiNodeConfig { } } - def senderProps(mainTarget: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, + def senderProps(mainTarget: Target, targets: Array[Target], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter): Props = Props(new Sender(mainTarget, targets, testSettings, plotRef, printTaskRunnerMetrics, reporter)) - class Sender(target: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) + class Sender(target: Target, targets: Array[Target], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) extends Actor { val numTargets = targets.size @@ -161,7 +175,7 @@ object MaxThroughputSpec extends MultiNodeConfig { def receive = { case Run ⇒ if (compressionEnabled) { - target ! Warmup(payload) + target.tell(Warmup(payload), self) context.setReceiveTimeout(1.second) context.become(waitingForCompression) } else runWarmup() @@ -169,18 +183,22 @@ object MaxThroughputSpec extends MultiNodeConfig { def waitingForCompression: Receive = { case ReceivedActorRefCompressionTable(_, table) ⇒ - if (table.dictionary.contains(target)) { + val ref = target match { + case ActorRefTarget(ref) ⇒ ref + case ActorSelectionTarget(sel, _) ⇒ sel.anchor + } + if (table.dictionary.contains(ref)) { context.setReceiveTimeout(Duration.Undefined) runWarmup() } else - target ! Warmup(payload) + target.tell(Warmup(payload), self) case ReceiveTimeout ⇒ - target ! Warmup(payload) + target.tell(Warmup(payload), self) } def runWarmup(): Unit = { sendBatch(warmup = true) // first some warmup - targets.foreach(_ ! Start(target)) // then Start, which will echo back here + targets.foreach(_.tell(Start(target.ref), self)) // then Start, which will echo back here context.become(warmup) } @@ -268,9 +286,9 @@ object MaxThroughputSpec extends MultiNodeConfig { def sendFlowControl(t0: Long): Unit = { if (remaining <= 0) { context.become(waitingForEndResult) - targets.foreach(_ ! End) + targets.foreach(_.tell(End, self)) } else - target ! FlowControl(t0) + target.tell(FlowControl(t0), self) } } @@ -331,6 +349,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor") val realMessage = system.settings.config.getBoolean("akka.test.MaxThroughputSpec.real-message") + val actorSelection = system.settings.config.getBoolean("akka.test.MaxThroughputSpec.actor-selection") var plot = PlotResult() @@ -355,9 +374,12 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec super.afterAll() } - def identifyReceiver(name: String, r: RoleName = second): ActorRef = { - system.actorSelection(node(r) / "user" / name) ! Identify(None) - expectMsgType[ActorIdentity](10.seconds).ref.get + def identifyReceiver(name: String, r: RoleName = second): Target = { + val sel = system.actorSelection(node(r) / "user" / name) + sel ! Identify(None) + val ref = expectMsgType[ActorIdentity](10.seconds).ref.get + if (actorSelection) ActorSelectionTarget(sel, ref) + else ActorRefTarget(ref) } val scenarios = List( diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 51e2d3af27..54a531e388 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -734,11 +734,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .via(hubKillSwitch.flow) .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) - // select lane based on destination, to preserve message order + // Select lane based on destination to preserve message order, + // Also include the uid of the sending system in the hash to spread + // "hot" destinations, e.g. ActorSelection anchor. val partitioner: InboundEnvelope ⇒ Int = env ⇒ { env.recipient match { - case OptionVal.Some(r) ⇒ math.abs(r.path.uid) % inboundLanes - case OptionVal.None ⇒ 0 + case OptionVal.Some(r) ⇒ + val a = r.path.uid + val b = env.originUid + val hashA = 23 + a + val hash: Int = 23 * hashA + java.lang.Long.hashCode(b) + math.abs(hash) % inboundLanes + case OptionVal.None ⇒ 0 } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index e00bfc456d..909930e664 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -377,12 +377,11 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { // Write fixed length parts byteBuffer.put(VersionOffset, header.version) byteBuffer.put(FlagsOffset, header.flags) - byteBuffer.putLong(UidOffset, header.uid) - byteBuffer.putInt(SerializerOffset, header.serializer) - // compression table version numbers byteBuffer.put(ActorRefCompressionTableVersionOffset, header.outboundActorRefCompression.version) byteBuffer.put(ClassManifestCompressionTableVersionOffset, header.outboundClassManifestCompression.version) + byteBuffer.putLong(UidOffset, header.uid) + byteBuffer.putInt(SerializerOffset, header.serializer) // maybe write some metadata // after metadata is written (or not), buffer is at correct position to continue writing literals @@ -421,12 +420,11 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { // Read fixed length parts header.setVersion(byteBuffer.get(VersionOffset)) header.setFlags(byteBuffer.get(FlagsOffset)) - header.setUid(byteBuffer.getLong(UidOffset)) - header.setSerializer(byteBuffer.getInt(SerializerOffset)) - // compression table versions (stored in the Tag) header._inboundActorRefCompressionTableVersion = byteBuffer.get(ActorRefCompressionTableVersionOffset) header._inboundClassManifestCompressionTableVersion = byteBuffer.get(ClassManifestCompressionTableVersionOffset) + header.setUid(byteBuffer.getLong(UidOffset)) + header.setSerializer(byteBuffer.getInt(SerializerOffset)) byteBuffer.position(MetadataContainerAndLiteralSectionOffset) if (header.flag(MetadataPresentFlag)) { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 9eac28541e..1b3fdc5b43 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -9,6 +9,7 @@ import akka.testkit.{ AkkaSpec, ImplicitSender, TestActors, TestProbe } import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.duration._ +import akka.actor.ActorSelection class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(ArterySpecSupport.defaultConfig) @@ -33,18 +34,26 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryM } }), "echo") - val remoteRef = { + val echoSel = system.actorSelection(rootB / "user" / "echo") + val echoRef = { system.actorSelection(rootB / "user" / "echo") ! Identify(None) expectMsgType[ActorIdentity](5.seconds).ref.get } - remoteRef ! "ping" + echoRef ! "ping" expectMsg("pong") - remoteRef ! "ping" + echoRef ! "ping" expectMsg("pong") - remoteRef ! "ping" + echoRef ! "ping" + expectMsg("pong") + + // and actorSelection + echoSel ! "ping" + expectMsg("pong") + + echoSel ! "ping" expectMsg("pong") } @@ -116,6 +125,45 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryM } } + "be able to send messages with actorSelection concurrently preserving order" in { + systemB.actorOf(TestActors.echoActorProps, "echoA2") + systemB.actorOf(TestActors.echoActorProps, "echoB2") + systemB.actorOf(TestActors.echoActorProps, "echoC2") + + val selA = system.actorSelection(rootB / "user" / "echoA2") + val selB = system.actorSelection(rootB / "user" / "echoB2") + val selC = system.actorSelection(rootB / "user" / "echoC2") + + def senderProps(sel: ActorSelection) = Props(new Actor { + var counter = 1000 + sel ! counter + + override def receive: Receive = { + case i: Int ⇒ + if (i != counter) testActor ! s"Failed, expected $counter got $i" + else if (counter == 0) { + testActor ! "success2" + context.stop(self) + } else { + counter -= 1 + sel ! counter + } + } + }).withDeploy(Deploy.local) + + system.actorOf(senderProps(selA)) + system.actorOf(senderProps(selB)) + system.actorOf(senderProps(selC)) + system.actorOf(senderProps(selA)) + + within(10.seconds) { + expectMsg("success2") + expectMsg("success2") + expectMsg("success2") + expectMsg("success2") + } + } + } }