diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index b36185ae22..974e5901f5 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -19,6 +19,33 @@ import java.net.{ InetSocketAddress, ConnectException } import akka.remote.transport.ThrottlerTransportAdapter.{ SetThrottle, TokenBucket, Blackhole, Unthrottled } import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +object Player { + + final class Waiter extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { + + import FSM._ + import ClientFSM._ + + var waiting: ActorRef = _ + + def receive = { + case fsm: ActorRef ⇒ + waiting = sender(); fsm ! SubscribeTransitionCallBack(self) + case Transition(_, f: ClientFSM.State, t: ClientFSM.State) if f == Connecting && t == AwaitDone ⇒ // step 1, not there yet // // SI-5900 workaround + case Transition(_, f: ClientFSM.State, t: ClientFSM.State) if f == AwaitDone && t == Connected ⇒ // SI-5900 workaround + waiting ! Done; context stop self + case t: Transition[_] ⇒ + waiting ! Status.Failure(new RuntimeException("unexpected transition: " + t)); context stop self + case CurrentState(_, s: ClientFSM.State) if s == Connected ⇒ // SI-5900 workaround + waiting ! Done; context stop self + case _: CurrentState[_] ⇒ + } + + } + + def waiterProps = Props[Waiter] +} + /** * The Player is the client component of the * [[akka.remote.testconductor.TestConductorExt]] extension. It registers with @@ -51,22 +78,7 @@ trait Player { this: TestConductorExt ⇒ if (_client ne null) throw new IllegalStateException("TestConductorClient already started") _client = system.actorOf(Props(classOf[ClientFSM], name, controllerAddr), "TestConductorClient") - val a = system.actorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { - var waiting: ActorRef = _ - def receive = { - case fsm: ActorRef ⇒ - waiting = sender(); fsm ! SubscribeTransitionCallBack(self) - case Transition(_, f: ClientFSM.State, t: ClientFSM.State) if (f == Connecting && t == AwaitDone) ⇒ // step 1, not there yet // // SI-5900 workaround - case Transition(_, f: ClientFSM.State, t: ClientFSM.State) if (f == AwaitDone && t == Connected) ⇒ // SI-5900 workaround - waiting ! Done; context stop self - case t: Transition[_] ⇒ - waiting ! Status.Failure(new RuntimeException("unexpected transition: " + t)); context stop self - case CurrentState(_, s: ClientFSM.State) if (s == Connected) ⇒ // SI-5900 workaround - waiting ! Done; context stop self - case _: CurrentState[_] ⇒ - } - })) - + val a = system.actorOf(Player.waiterProps) a ? client mapTo classTag[Done] }