diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index 0098da695b..8a6ac1935f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -110,6 +110,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou case a ⇒ a } + def currentRoutees(router: ActorRef) = + Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees + "A cluster router with a RoundRobin router" must { "start cluster with 2 nodes" taggedAs LongRunningTest in { awaitClusterUp(first, second) @@ -121,6 +124,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou runOn(first) { router1.isInstanceOf[RoutedActorRef] must be(true) + // max-nr-of-instances-per-node=2 times 2 nodes + awaitCond(currentRoutees(router1).size == 4) + val iterationCount = 10 for (i ← 0 until iterationCount) { router1 ! "hit" @@ -146,6 +152,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou enterBarrier("myservice-started") runOn(first) { + // 2 nodes, 1 routee on each node + awaitCond(currentRoutees(router4).size == 2) + val iterationCount = 10 for (i ← 0 until iterationCount) { router4 ! "hit" @@ -169,6 +178,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou awaitClusterUp(first, second, third, fourth) runOn(first) { + // max-nr-of-instances-per-node=2 times 4 nodes + awaitCond(currentRoutees(router1).size == 8) + val iterationCount = 10 for (i ← 0 until iterationCount) { router1 ! "hit" @@ -188,6 +200,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou // cluster consists of first, second, third and fourth runOn(first) { + // 4 nodes, 1 routee on each node + awaitCond(currentRoutees(router4).size == 4) + val iterationCount = 10 for (i ← 0 until iterationCount) { router4 ! "hit" @@ -205,6 +220,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou "deploy routees to only remote nodes when allow-local-routees = off" taggedAs LongRunningTest in { runOn(first) { + // max-nr-of-instances-per-node=1 times 3 nodes + awaitCond(currentRoutees(router3).size == 3) + val iterationCount = 10 for (i ← 0 until iterationCount) { router3 ! "hit" @@ -227,6 +245,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou runOn(first) { router2.isInstanceOf[RoutedActorRef] must be(true) + // totalInstances = 3, maxInstancesPerNode = 1 + awaitCond(currentRoutees(router2).size == 3) + val iterationCount = 10 for (i ← 0 until iterationCount) { router2 ! "hit" @@ -235,8 +256,8 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou val replies = receiveReplies(DeployRoutee, iterationCount) // note that router2 has totalInstances = 3, maxInstancesPerNode = 1 - val currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees - val routeeAddresses = currentRoutees map fullAddress + val routees = currentRoutees(router2) + val routeeAddresses = routees map fullAddress routeeAddresses.size must be(3) replies.values.sum must be(iterationCount) @@ -249,8 +270,8 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou muteMarkingAsUnreachable() runOn(first) { - def currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees - def routeeAddresses = (currentRoutees map fullAddress).toSet + def routees = currentRoutees(router2) + def routeeAddresses = (routees map fullAddress).toSet val notUsedAddress = ((roles map address).toSet -- routeeAddresses).head diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index c007b3d7d2..d76bbbf071 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -154,7 +154,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A wrappedTransport.associate(remoteAddress) onComplete { // Slight modification of pipe, only success is sent, failure is propagated to a separate future case Success(handle) ⇒ self ! (handle, statusPromise) - case Failure(e) ⇒ statusPromise.failure(e) + case Failure(e) ⇒ statusPromise.failure(e) } // Finished outbound association and got back the handle case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) ⇒