diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala index 6ca95cb5c1..97796a90ef 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/FSM.scala @@ -33,13 +33,13 @@ object FSM { * [[akka.actor.FSM.SubscribeTransitionCallBack]] before sending any * [[akka.actor.FSM.Transition]] messages. */ - final case class CurrentState[S](fsmRef: ActorRef, state: S) + final case class CurrentState[S](fsmRef: ActorRef, state: S, timeout: Option[FiniteDuration] = None) /** * Message type which is used to communicate transitions between states to * all subscribed listeners (use [[akka.actor.FSM.SubscribeTransitionCallBack]]). */ - final case class Transition[S](fsmRef: ActorRef, from: S, to: S) + final case class Transition[S](fsmRef: ActorRef, from: S, to: S, timeout: Option[FiniteDuration] = None) /** * Send this to an [[akka.actor.FSM]] to request first the [[FSM.CurrentState]] @@ -622,24 +622,23 @@ trait FSM[S, D, E] extends Actor with Listeners with ActorLogging { // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list listeners.add(actorRef) // send current state back as reference point - actorRef ! CurrentState(self, currentState.stateName) + actorRef ! CurrentState(self, currentState.stateName, currentState.timeout) case Listen(actorRef) ⇒ // TODO Use context.watch(actor) and receive Terminated(actor) to clean up list listeners.add(actorRef) // send current state back as reference point - actorRef ! CurrentState(self, currentState.stateName) + actorRef ! CurrentState(self, currentState.stateName, currentState.timeout) case UnsubscribeTransitionCallBack(actorRef) ⇒ listeners.remove(actorRef) case Deafen(actorRef) ⇒ listeners.remove(actorRef) - case value ⇒ { + case value ⇒ if (timeoutFuture.isDefined) { timeoutFuture.get.cancel() timeoutFuture = None } generation += 1 processMsg(value, sender()) - } } private def processMsg(value: Any, source: AnyRef): Unit = { diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala index 6541898209..419819fadb 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMActorSpec.scala @@ -195,7 +195,7 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co expectMsg(CurrentState(fsmRef, LookingAround)) expectMsg(Transition(fsmRef, LookingAround, Shopping)) - expectNoMsg(0.6 seconds) //randomly chosen delay, less than the timeout, before stopping the FSM + expectNoMsg(0.6 seconds) // arbitrarily chosen delay, less than the timeout, before stopping the FSM fsmRef ! PoisonPill expectTerminated(fsmRef) @@ -203,13 +203,13 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co watch(recoveredFsmRef) recoveredFsmRef ! SubscribeTransitionCallBack(testActor) - expectMsg(CurrentState(recoveredFsmRef, Shopping)) + expectMsg(CurrentState(recoveredFsmRef, Shopping, Some(1 second))) within(0.9 seconds, 1.9 seconds) { expectMsg(Transition(recoveredFsmRef, Shopping, Inactive)) } - expectNoMsg(0.9 seconds) //randomly chosen delay, less than the timeout, before stopping the FSM + expectNoMsg(0.6 seconds) // arbitrarily chosen delay, less than the timeout, before stopping the FSM recoveredFsmRef ! PoisonPill expectTerminated(recoveredFsmRef) @@ -217,11 +217,8 @@ abstract class PersistentFSMActorSpec(config: Config) extends PersistenceSpec(co watch(recoveredFsmRef) recoveredFsmRef ! SubscribeTransitionCallBack(testActor) - expectMsg(CurrentState(recoveredFsmRef, Inactive)) - - within(1.9 seconds, 2.9 seconds) { - expectTerminated(recoveredFsmRef) - } + expectMsg(CurrentState(recoveredFsmRef, Inactive, Some(2 seconds))) + expectTerminated(recoveredFsmRef) } "not trigger onTransition for stay()" taggedAs TimingTest in {