diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala index f71bb0116b..97b886dc79 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala @@ -41,62 +41,71 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod } } - runOn(remote) { - system.actorOf(Props(new Actor { - def receive = { - case x ⇒ testActor ! x - } - }), "echo") - } + @volatile var target: ActorRef = system.deadLetters + @volatile var proxy: ActorRef = system.deadLetters - val target = system.actorFor(node(remote) / "user" / "echo") - - var proxy: ActorRef = _ def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s)) def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2)) - - runOn(local) { - //#demo - import akka.contrib.pattern.ReliableProxy - - proxy = system.actorOf(Props(new ReliableProxy(target, 100.millis)), "proxy") - //#demo - proxy ! FSM.SubscribeTransitionCallBack(testActor) - expectState(Idle) - //#demo - proxy ! "hello" - //#demo - expectTransition(Idle, Active) - expectTransition(Active, Idle) - } - runOn(remote) { - expectMsg("hello") - } + + def sendN(n: Int) = (1 to n) foreach (proxy ! _) + def expectN(n: Int) = (1 to n) foreach { n ⇒ expectMsg(n); lastSender must be === target } "A ReliableProxy" must { + "initialize properly" in { + runOn(remote) { + target = system.actorOf(Props(new Actor { + def receive = { + case x ⇒ testActor ! x + } + }), "echo") + } + + enterBarrier("initialize") + + runOn(local) { + //#demo + import akka.contrib.pattern.ReliableProxy + + target = system.actorFor(node(remote) / "user" / "echo") + proxy = system.actorOf(Props(new ReliableProxy(target, 100.millis)), "proxy") + //#demo + proxy ! FSM.SubscribeTransitionCallBack(testActor) + expectState(Idle) + //#demo + proxy ! "hello" + //#demo + expectTransition(Idle, Active) + expectTransition(Active, Idle) + } + + runOn(remote) { + expectMsg("hello") + } + } + "forward messages in sequence" in { runOn(local) { - (1 to 100) foreach (proxy ! _) + sendN(100) expectTransition(Idle, Active) expectTransition(Active, Idle) } runOn(remote) { within(1 second) { - (1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target } + expectN(100) } } enterBarrier("test1a") runOn(local) { - (1 to 100) foreach (proxy ! _) + sendN(100) expectTransition(Idle, Active) expectTransition(Active, Idle) } runOn(remote) { within(1 second) { - (1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target } + expectN(100) } } @@ -106,7 +115,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod "retry when sending fails" in { runOn(local) { testConductor.blackhole(local, remote, Direction.Send).await - (1 to 100) foreach (proxy ! _) + sendN(100) within(1 second) { expectTransition(Idle, Active) expectNoMsg @@ -127,7 +136,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod } runOn(remote) { within(1 second) { - (1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target } + expectN(100) } } @@ -137,7 +146,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod "retry when receiving fails" in { runOn(local) { testConductor.blackhole(local, remote, Direction.Receive).await - (1 to 100) foreach (proxy ! _) + sendN(100) within(1 second) { expectTransition(Idle, Active) expectNoMsg @@ -145,7 +154,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod } runOn(remote) { within(1 second) { - (1 to 100) foreach { n ⇒ expectMsg(n); lastSender must be === target } + expectN(100) } } @@ -162,7 +171,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod "resend across a slow link" in { runOn(local) { testConductor.throttle(local, remote, Direction.Send, rateMBit = 0.1).await - (1 to 50) foreach (proxy ! _) + sendN(50) within(5 seconds) { expectTransition(Idle, Active) expectTransition(Active, Idle) @@ -170,7 +179,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod } runOn(remote) { within(5 seconds) { - (1 to 50) foreach { n ⇒ expectMsg(n); lastSender must be === target } + expectN(50) } } @@ -179,7 +188,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod runOn(local) { testConductor.throttle(local, remote, Direction.Send, rateMBit = -1).await testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.1).await - (1 to 50) foreach (proxy ! _) + sendN(50) within(5 seconds) { expectTransition(Idle, Active) expectTransition(Active, Idle) @@ -187,7 +196,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod } runOn(remote) { within(1 second) { - (1 to 50) foreach { n ⇒ expectMsg(n); lastSender must be === target } + expectN(50) } }