Boy scouting based on feedback, see #1935
This commit is contained in:
parent
b9a6ccaf41
commit
06f86e1091
4 changed files with 35 additions and 34 deletions
|
|
@ -1,7 +1,7 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.remote.router
|
||||
package akka.remote
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
|
|
@ -9,7 +9,6 @@ import akka.actor.Actor
|
|||
import akka.actor.ActorRef
|
||||
import akka.actor.Props
|
||||
import akka.pattern.ask
|
||||
import akka.remote.RemoteActorRef
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ package akka.remote.router
|
|||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Props
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Address
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
|
|
@ -14,13 +16,13 @@ import akka.routing.Broadcast
|
|||
import akka.routing.RandomRouter
|
||||
import akka.routing.RoutedActorRef
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object RandomRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
|
||||
|
||||
class SomeActor extends Actor with Serializable {
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! self
|
||||
case "end" ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -64,23 +66,22 @@ class RandomRoutedRemoteActorSpec extends MultiNodeSpec(RandomRoutedRemoteActorM
|
|||
val connectionCount = 3
|
||||
val iterationCount = 10
|
||||
|
||||
var replies = Map(
|
||||
node(first).address -> 0,
|
||||
node(second).address -> 0,
|
||||
node(third).address -> 0)
|
||||
for (i ← 0 until iterationCount; k ← 0 until connectionCount) {
|
||||
actor ! "hit"
|
||||
}
|
||||
|
||||
for (i ← 0 until iterationCount) {
|
||||
for (k ← 0 until connectionCount) {
|
||||
val nodeAddress = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address
|
||||
replies = replies + (nodeAddress -> (replies(nodeAddress) + 1))
|
||||
}
|
||||
val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
|
||||
case ref: ActorRef ⇒ ref.path.address
|
||||
}).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
|
||||
case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1))
|
||||
}
|
||||
|
||||
testConductor.enter("broadcast-end")
|
||||
actor ! Broadcast("end")
|
||||
actor ! Broadcast(PoisonPill)
|
||||
|
||||
testConductor.enter("end")
|
||||
replies.values foreach { _ must be > (0) }
|
||||
replies.get(node(fourth).address) must be(None)
|
||||
|
||||
// shut down the actor before we let the other node(s) shut down so we don't try to send
|
||||
// "Terminate" to a shut down node
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ package akka.remote.router
|
|||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Props
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Address
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
|
|
@ -14,13 +16,13 @@ import akka.routing.Broadcast
|
|||
import akka.routing.RoundRobinRouter
|
||||
import akka.routing.RoutedActorRef
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object RoundRobinRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
|
||||
|
||||
class SomeActor extends Actor with Serializable {
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! self
|
||||
case "end" ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -64,23 +66,22 @@ class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemo
|
|||
val connectionCount = 3
|
||||
val iterationCount = 10
|
||||
|
||||
var replies = Map(
|
||||
node(first).address -> 0,
|
||||
node(second).address -> 0,
|
||||
node(third).address -> 0)
|
||||
for (i ← 0 until iterationCount; k ← 0 until connectionCount) {
|
||||
actor ! "hit"
|
||||
}
|
||||
|
||||
for (i ← 0 until iterationCount) {
|
||||
for (k ← 0 until connectionCount) {
|
||||
val nodeAddress = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address
|
||||
replies = replies + (nodeAddress -> (replies(nodeAddress) + 1))
|
||||
}
|
||||
val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
|
||||
case ref: ActorRef ⇒ ref.path.address
|
||||
}).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
|
||||
case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1))
|
||||
}
|
||||
|
||||
testConductor.enter("broadcast-end")
|
||||
actor ! Broadcast("end")
|
||||
actor ! Broadcast(PoisonPill)
|
||||
|
||||
testConductor.enter("end")
|
||||
replies.values foreach { _ must be(10) }
|
||||
replies.values foreach { _ must be(iterationCount) }
|
||||
replies.get(node(fourth).address) must be(None)
|
||||
|
||||
// shut down the actor before we let the other node(s) shut down so we don't try to send
|
||||
// "Terminate" to a shut down node
|
||||
|
|
|
|||
|
|
@ -15,13 +15,14 @@ import akka.routing.ScatterGatherFirstCompletedRouter
|
|||
import akka.routing.RoutedActorRef
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Address
|
||||
|
||||
object ScatterGatherRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
|
||||
|
||||
class SomeActor extends Actor with Serializable {
|
||||
def receive = {
|
||||
case "hit" ⇒ sender ! self
|
||||
case "end" ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -65,23 +66,22 @@ class ScatterGatherRoutedRemoteActorSpec extends MultiNodeSpec(ScatterGatherRout
|
|||
val connectionCount = 3
|
||||
val iterationCount = 10
|
||||
|
||||
for (i ← 0 until iterationCount) {
|
||||
for (k ← 0 until connectionCount) {
|
||||
actor ! "hit"
|
||||
}
|
||||
for (i ← 0 until iterationCount; k ← 0 until connectionCount) {
|
||||
actor ! "hit"
|
||||
}
|
||||
|
||||
val replies = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
|
||||
case ref: ActorRef ⇒ (ref.path.address, 1)
|
||||
val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
|
||||
case ref: ActorRef ⇒ ref.path.address
|
||||
}).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
|
||||
case (m, (n, c)) ⇒ m + (n -> (m(n) + c))
|
||||
case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1))
|
||||
}
|
||||
|
||||
testConductor.enter("broadcast-end")
|
||||
actor ! Broadcast("end")
|
||||
actor ! Broadcast(PoisonPill)
|
||||
|
||||
testConductor.enter("end")
|
||||
replies.values.sum must be === connectionCount * iterationCount
|
||||
replies.get(node(fourth).address) must be(None)
|
||||
|
||||
// shut down the actor before we let the other node(s) shut down so we don't try to send
|
||||
// "Terminate" to a shut down node
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue