Support for Actor timers, and fix bug in FSM, #15733

* backport of the timers from Akka Typed, #16742
* also fixed a small bug in FSM timers, which could result in that
  a timer from a previous incarnation was let through to new
  incarnation after restart
* no more need for the complicated "how to" section in docs of
  how to schedule periodic messages
This commit is contained in:
Patrik Nordwall 2017-06-16 11:31:00 +02:00
parent 71175eaf54
commit f8a1d635fa
17 changed files with 712 additions and 333 deletions

View file

@ -0,0 +1,266 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.testkit._
import scala.concurrent.Await
object TimerSpec {
sealed trait Command
case class Tick(n: Int) extends Command
case object Bump extends Command
case class SlowThenBump(latch: TestLatch) extends Command
with NoSerializationVerificationNeeded
case object End extends Command
case class Throw(e: Throwable) extends Command
case object Cancel extends Command
case class SlowThenThrow(latch: TestLatch, e: Throwable) extends Command
with NoSerializationVerificationNeeded
sealed trait Event
case class Tock(n: Int) extends Event
case class GotPostStop(timerActive: Boolean) extends Event
case class GotPreRestart(timerActive: Boolean) extends Event
class Exc extends RuntimeException("simulated exc") with NoStackTrace
def target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () Int): Props =
Props(new Target(monitor, interval, repeat, initial))
class Target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () Int) extends Actor with Timers {
private var bumpCount = initial()
if (repeat)
timers.startPeriodicTimer("T", Tick(bumpCount), interval)
else
timers.startSingleTimer("T", Tick(bumpCount), interval)
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
monitor ! GotPreRestart(timers.isTimerActive("T"))
// don't call super.preRestart to avoid postStop
}
override def postStop(): Unit = {
monitor ! GotPostStop(timers.isTimerActive("T"))
}
def bump(): Unit = {
bumpCount += 1
timers.startPeriodicTimer("T", Tick(bumpCount), interval)
}
override def receive = {
case Tick(n)
monitor ! Tock(n)
case Bump
bump()
case SlowThenBump(latch)
Await.ready(latch, 10.seconds)
bump()
case End
context.stop(self)
case Cancel
timers.cancel("T")
case Throw(e)
throw e
case SlowThenThrow(latch, e)
Await.ready(latch, 10.seconds)
throw e
}
}
def fsmTarget(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () Int): Props =
Props(new FsmTarget(monitor, interval, repeat, initial))
object TheState
class FsmTarget(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () Int) extends FSM[TheState.type, Int] {
private var restarting = false
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
restarting = true
super.preRestart(reason, message)
monitor ! GotPreRestart(isTimerActive("T"))
}
override def postStop(): Unit = {
super.postStop()
if (!restarting)
monitor ! GotPostStop(isTimerActive("T"))
}
def bump(bumpCount: Int): State = {
setTimer("T", Tick(bumpCount + 1), interval, repeat)
stay using (bumpCount + 1)
}
{
val i = initial()
startWith(TheState, i)
setTimer("T", Tick(i), interval, repeat)
}
when(TheState) {
case Event(Tick(n), _)
monitor ! Tock(n)
stay
case Event(Bump, bumpCount)
bump(bumpCount)
case Event(SlowThenBump(latch), bumpCount)
Await.ready(latch, 10.seconds)
bump(bumpCount)
case Event(End, _)
stop()
case Event(Cancel, _)
cancelTimer("T")
stay
case Event(Throw(e), _)
throw e
case Event(SlowThenThrow(latch, e), _)
Await.ready(latch, 10.seconds)
throw e
}
initialize()
}
}
class TimerSpec extends AbstractTimerSpec {
override def testName: String = "Timers"
override def target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () Int = () 1): Props =
TimerSpec.target(monitor, interval, repeat, initial)
}
class FsmTimerSpec extends AbstractTimerSpec {
override def testName: String = "FSM Timers"
override def target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () Int = () 1): Props =
TimerSpec.fsmTarget(monitor, interval, repeat, initial)
}
abstract class AbstractTimerSpec extends AkkaSpec {
import TimerSpec._
val interval = 1.second
val dilatedInterval = interval.dilated
def target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () Int = () 1): Props
def testName: String
testName must {
"schedule non-repeated ticks" taggedAs TimingTest in {
val probe = TestProbe()
val ref = system.actorOf(target(probe.ref, 10.millis, repeat = false))
probe.expectMsg(Tock(1))
probe.expectNoMsg(100.millis)
ref ! End
probe.expectMsg(GotPostStop(false))
}
"schedule repeated ticks" taggedAs TimingTest in {
val probe = TestProbe()
val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true))
probe.within((interval * 4) - 100.millis) {
probe.expectMsg(Tock(1))
probe.expectMsg(Tock(1))
probe.expectMsg(Tock(1))
}
ref ! End
probe.expectMsg(GotPostStop(false))
}
"replace timer" taggedAs TimingTest in {
val probe = TestProbe()
val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true))
probe.expectMsg(Tock(1))
val latch = new TestLatch(1)
// next Tock(1) enqueued in mailboxed, but should be discarded because of new timer
ref ! SlowThenBump(latch)
probe.expectNoMsg(interval + 100.millis)
latch.countDown()
probe.expectMsg(Tock(2))
ref ! End
probe.expectMsg(GotPostStop(false))
}
"cancel timer" taggedAs TimingTest in {
val probe = TestProbe()
val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true))
probe.expectMsg(Tock(1))
ref ! Cancel
probe.expectNoMsg(dilatedInterval + 100.millis)
ref ! End
probe.expectMsg(GotPostStop(false))
}
"cancel timers when restarted" taggedAs TimingTest in {
val probe = TestProbe()
val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true))
ref ! Throw(new Exc)
probe.expectMsg(GotPreRestart(false))
ref ! End
probe.expectMsg(GotPostStop(false))
}
"discard timers from old incarnation after restart, alt 1" taggedAs TimingTest in {
val probe = TestProbe()
val startCounter = new AtomicInteger(0)
val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true,
initial = () startCounter.incrementAndGet()))
probe.expectMsg(Tock(1))
val latch = new TestLatch(1)
// next Tock(1) is enqueued in mailbox, but should be discarded by new incarnation
ref ! SlowThenThrow(latch, new Exc)
probe.expectNoMsg(interval + 100.millis)
latch.countDown()
probe.expectMsg(GotPreRestart(false))
probe.expectNoMsg(interval / 2)
probe.expectMsg(Tock(2)) // this is from the startCounter increment
ref ! End
probe.expectMsg(GotPostStop(false))
}
"discard timers from old incarnation after restart, alt 2" taggedAs TimingTest in {
val probe = TestProbe()
val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true))
probe.expectMsg(Tock(1))
// change state so that we see that the restart starts over again
ref ! Bump
probe.expectMsg(Tock(2))
val latch = new TestLatch(1)
// next Tock(2) is enqueued in mailbox, but should be discarded by new incarnation
ref ! SlowThenThrow(latch, new Exc)
probe.expectNoMsg(interval + 100.millis)
latch.countDown()
probe.expectMsg(GotPreRestart(false))
probe.expectMsg(Tock(1))
ref ! End
probe.expectMsg(GotPostStop(false))
}
"cancel timers when stopped" in {
val probe = TestProbe()
val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true))
ref ! End
probe.expectMsg(GotPostStop(false))
}
}
}

View file

@ -272,6 +272,7 @@ class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaExc
*/
@SerialVersionUID(1L)
final case class UnhandledMessage(@BeanProperty message: Any, @BeanProperty sender: ActorRef, @BeanProperty recipient: ActorRef)
extends NoSerializationVerificationNeeded
/**
* Classes for passing status back to the sender.

View file

@ -9,6 +9,7 @@ import scala.collection.mutable
import akka.routing.{ Deafen, Listen, Listeners }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import akka.annotation.InternalApi
object FSM {
@ -87,8 +88,9 @@ object FSM {
/**
* INTERNAL API
*/
// FIXME: what about the cancellable?
private[akka] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext)
@InternalApi
private[akka] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int,
owner: AnyRef)(context: ActorContext)
extends NoSerializationVerificationNeeded {
private var ref: Option[Cancellable] = _
private val scheduler = context.system.scheduler
@ -419,7 +421,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
if (timers contains name) {
timers(name).cancel
}
val timer = Timer(name, msg, repeat, timerGen.next)(context)
val timer = Timer(name, msg, repeat, timerGen.next, this)(context)
timer.schedule(self, timeout)
timers(name) = timer
}
@ -616,8 +618,8 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
if (generation == gen) {
processMsg(StateTimeout, "state timeout")
}
case t @ Timer(name, msg, repeat, gen)
if ((timers contains name) && (timers(name).generation == gen)) {
case t @ Timer(name, msg, repeat, gen, owner)
if ((owner eq this) && (timers contains name) && (timers(name).generation == gen)) {
if (timeoutFuture.isDefined) {
timeoutFuture.get.cancel()
timeoutFuture = None
@ -782,7 +784,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
if (debugEvent) {
val srcstr = source match {
case s: String s
case Timer(name, _, _, _) "timer " + name
case Timer(name, _, _, _, _) "timer " + name
case a: ActorRef a.toString
case _ "unknown"
}

View file

@ -0,0 +1,119 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import scala.concurrent.duration.FiniteDuration
import akka.annotation.DoNotInherit
import akka.util.OptionVal
/**
* Scala API: Mix in Timers into your Actor to get support for scheduled
* `self` messages via [[TimerScheduler]].
*
* Timers are bound to the lifecycle of the actor that owns it,
* and thus are cancelled automatically when it is restarted or stopped.
*/
trait Timers extends Actor {
private val _timers = new TimerSchedulerImpl(context)
/**
* Start and cancel timers via the enclosed `TimerScheduler`.
*/
final def timers: TimerScheduler = _timers
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
timers.cancelAll()
super.aroundPreRestart(reason, message)
}
override protected[akka] def aroundPostStop(): Unit = {
timers.cancelAll()
super.aroundPostStop()
}
override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
msg match {
case timerMsg: TimerSchedulerImpl.TimerMsg
_timers.interceptTimerMsg(timerMsg) match {
case OptionVal.Some(m) super.aroundReceive(receive, m)
case OptionVal.None // discard
}
case _
super.aroundReceive(receive, msg)
}
}
}
/**
* Java API: Support for scheduled `self` messages via [[TimerScheduler]].
*
* Timers are bound to the lifecycle of the actor that owns it,
* and thus are cancelled automatically when it is restarted or stopped.
*/
abstract class AbstractActorWithTimers extends AbstractActor with Timers {
/**
* Start and cancel timers via the enclosed `TimerScheduler`.
*/
final def getTimers: TimerScheduler = timers
}
/**
* Support for scheduled `self` messages in an actor.
* It is used by mixing in trait `Timers` in Scala or extending `AbstractActorWithTimers`
* in Java.
*
* Timers are bound to the lifecycle of the actor that owns it,
* and thus are cancelled automatically when it is restarted or stopped.
*
* `TimerScheduler` is not thread-safe, i.e. it must only be used within
* the actor that owns it.
*/
@DoNotInherit abstract class TimerScheduler {
/**
* Start a periodic timer that will send `msg` to the `self` actor at
* a fixed `interval`.
*
* Each timer has a key and if a new timer with same key is started
* the previous is cancelled and it's guaranteed that a message from the
* previous timer is not received, even though it might already be enqueued
* in the mailbox when the new timer is started.
*/
def startPeriodicTimer(key: Any, msg: Any, interval: FiniteDuration): Unit
/**
* Start a timer that will send `msg` once to the `self` actor after
* the given `timeout`.
*
* Each timer has a key and if a new timer with same key is started
* the previous is cancelled and it's guaranteed that a message from the
* previous timer is not received, even though it might already be enqueued
* in the mailbox when the new timer is started.
*/
def startSingleTimer(key: Any, msg: Any, timeout: FiniteDuration): Unit
/**
* Check if a timer with a given `key` is active.
*/
def isTimerActive(key: Any): Boolean
/**
* Cancel a timer with a given `key`.
* If canceling a timer that was already canceled, or key never was used to start a timer
* this operation will do nothing.
*
* It is guaranteed that a message from a canceled timer, including its previous incarnation
* for the same key, will not be received by the actor, even though the message might already
* be enqueued in the mailbox when cancel is called.
*/
def cancel(key: Any): Unit
/**
* Cancel all timers.
*/
def cancelAll(): Unit
}

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import scala.concurrent.duration.FiniteDuration
import akka.annotation.InternalApi
import akka.event.Logging
import akka.util.OptionVal
/**
* INTERNAL API
*/
@InternalApi private[akka] object TimerSchedulerImpl {
final case class Timer(key: Any, msg: Any, repeat: Boolean, generation: Int, task: Cancellable)
final case class TimerMsg(key: Any, generation: Int, owner: TimerSchedulerImpl)
extends NoSerializationVerificationNeeded
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class TimerSchedulerImpl(ctx: ActorContext) extends TimerScheduler {
import TimerSchedulerImpl._
private val log = Logging(ctx.system, classOf[TimerScheduler])
private var timers: Map[Any, Timer] = Map.empty
private var timerGen = 0
private def nextTimerGen(): Int = {
timerGen += 1
timerGen
}
override def startPeriodicTimer(key: Any, msg: Any, interval: FiniteDuration): Unit =
startTimer(key, msg, interval, repeat = true)
override def startSingleTimer(key: Any, msg: Any, timeout: FiniteDuration): Unit =
startTimer(key, msg, timeout, repeat = false)
private def startTimer(key: Any, msg: Any, timeout: FiniteDuration, repeat: Boolean): Unit = {
timers.get(key) match {
case Some(t) cancelTimer(t)
case None
}
val nextGen = nextTimerGen()
val timerMsg = TimerMsg(key, nextGen, this)
val task =
if (repeat)
ctx.system.scheduler.schedule(timeout, timeout, ctx.self, timerMsg)(ctx.dispatcher)
else
ctx.system.scheduler.scheduleOnce(timeout, ctx.self, timerMsg)(ctx.dispatcher)
val nextTimer = Timer(key, msg, repeat, nextGen, task)
log.debug("Start timer [{}] with generation [{}]", key, nextGen)
timers = timers.updated(key, nextTimer)
}
override def isTimerActive(key: Any): Boolean =
timers.contains(key)
override def cancel(key: Any): Unit = {
timers.get(key) match {
case None // already removed/canceled
case Some(t) cancelTimer(t)
}
}
private def cancelTimer(timer: Timer): Unit = {
log.debug("Cancel timer [{}] with generation [{}]", timer.key, timer.generation)
timer.task.cancel()
timers -= timer.key
}
override def cancelAll(): Unit = {
log.debug("Cancel all timers")
timers.valuesIterator.foreach { timer
timer.task.cancel()
}
timers = Map.empty
}
def interceptTimerMsg(timerMsg: TimerMsg): OptionVal[AnyRef] = {
timers.get(timerMsg.key) match {
case None
// it was from canceled timer that was already enqueued in mailbox
log.debug("Received timer [{}] that has been removed, discarding", timerMsg.key)
OptionVal.None // message should be ignored
case Some(t)
if (timerMsg.owner ne this) {
// after restart, it was from an old instance that was enqueued in mailbox before canceled
log.debug("Received timer [{}] from old restarted instance, discarding", timerMsg.key)
OptionVal.None // message should be ignored
} else if (timerMsg.generation == t.generation) {
// valid timer
log.debug("Received timer [{}]", timerMsg.key)
if (!t.repeat)
timers -= t.key
OptionVal.Some(t.msg.asInstanceOf[AnyRef])
} else {
// it was from an old timer that was enqueued in mailbox before canceled
log.debug(
"Received timer [{}] from from old generation [{}], expected generation [{}], discarding",
timerMsg.key, timerMsg.generation, t.generation)
OptionVal.None // message should be ignored
}
}
}
}

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package jdocs.actor;
//#timers
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.AbstractActorWithTimers;
//#timers
public class TimerDocTest {
static
//#timers
public class MyActor extends AbstractActorWithTimers {
private static Object TICK_KEY = "TickKey";
private static final class FirstTick {
}
private static final class Tick {
}
public MyActor() {
getTimers().startSingleTimer(TICK_KEY, new FirstTick(),
Duration.create(500, TimeUnit.MILLISECONDS));
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(FirstTick.class, message -> {
// do something useful here
getTimers().startPeriodicTimer(TICK_KEY, new Tick(),
Duration.create(1, TimeUnit.SECONDS));
})
.match(Tick.class, message -> {
// do something useful here
})
.build();
}
}
//#timers
}

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.actor
import akka.actor.Actor
import scala.concurrent.duration._
object TimerDocSpec {
//#timers
import akka.actor.Timers
object MyActor {
private case object TickKey
private case object FirstTick
private case object Tick
private case object LaterTick
}
class MyActor extends Actor with Timers {
import MyActor._
timers.startSingleTimer(TickKey, FirstTick, 500.millis)
def receive = {
case FirstTick =>
// do something useful here
timers.startPeriodicTimer(TickKey, Tick, 1.second)
case Tick =>
// do something useful here
}
}
//#timers
}

View file

@ -905,6 +905,29 @@ Messages marked with `NotInfluenceReceiveTimeout` will not reset the timer. This
`ReceiveTimeout` should be fired by external inactivity but not influenced by internal activity,
e.g. scheduled tick messages.
<a id="actors-timers"></a>
## Timers, scheduled messages
Messages can be scheduled to be sent at a later point by using the @ref[Scheduler](scheduler.md) directly,
but when scheduling periodic or single messages in an actor to itself it's more convenient and safe
to use the support for named timers. The lifecycle of scheduled messages can be difficult to manage
when the actor is restarted and that is taken care of by the timers.
Scala
: @@snip [ActorDocSpec.scala]($code$/scala/docs/actor/TimerDocSpec.scala) { #timers }
Java
: @@snip [ActorDocTest.java]($code$/java/jdocs/actor/TimerDocTest.java) { #timers }
Each timer has a key and can be replaced or cancelled. It's guaranteed that a message from the
previous incarnation of the timer with the same key is not received, even though it might already
be enqueued in the mailbox when it was cancelled or the new timer was started.
The timers are bound to the lifecycle of the actor that owns it, and thus are cancelled
automatically when it is restarted or stopped. Note that the `TimerScheduler` is not thread-safe,
i.e. it must only be used within the actor that owns it.
<a id="stopping-actors"></a>
## Stopping actors

View file

@ -122,47 +122,7 @@ The pattern is described [Discovering Message Flows in Actor System with the Spi
## Scheduling Periodic Messages
This pattern describes how to schedule periodic messages to yourself in two different
ways.
The first way is to set up periodic message scheduling in the constructor of the actor,
and cancel that scheduled sending in `postStop` or else we might have multiple registered
message sends to the same actor.
@@@ note
With this approach the scheduled periodic message send will be restarted with the actor on restarts.
This also means that the time period that elapses between two tick messages during a restart may drift
off based on when you restart the scheduled message sends relative to the time that the last message was
sent, and how long the initial delay is. Worst case scenario is `interval` plus `initialDelay`.
@@@
Scala
: @@snip [SchedulerPatternSpec.scala]($code$/scala/docs/pattern/SchedulerPatternSpec.scala) { #schedule-constructor }
Java
: @@snip [SchedulerPatternTest.java]($code$/java/jdocs/pattern/SchedulerPatternTest.java) { #schedule-constructor }
The second variant sets up an initial one shot message send in the `preStart` method
of the actor, and the then the actor when it receives this message sets up a new one shot
message send. You also have to override `postRestart` so we don't call `preStart`
and schedule the initial message send again.
@@@ note
With this approach we won't fill up the mailbox with tick messages if the actor is
under pressure, but only schedule a new tick message when we have seen the previous one.
@@@
Scala
: @@snip [SchedulerPatternSpec.scala]($code$/scala/docs/pattern/SchedulerPatternSpec.scala) { #schedule-receive }
Java
: @@snip [SchedulerPatternTest.java]($code$/java/jdocs/pattern/SchedulerPatternTest.java) { #schedule-receive }
@@@ div { .group-java }
See @ref:[Actor Timers](actors.md#actors-timers)
## Single-Use Actor Trees with High-Level Error Reporting

View file

@ -10,6 +10,10 @@ You can schedule sending of messages to actors and execution of tasks
(functions or Runnable). You will get a `Cancellable` back that you can call
`cancel` on to cancel the execution of the scheduled operation.
When scheduling periodic or single messages in an actor to itself it is recommended to
use the @ref:[Actor Timers](actors.md#actors-timers) instead of using the `Scheduler`
directly.
The scheduler in Akka is designed for high-throughput of thousands up to millions
of triggers. The prime use-case being triggering Actor receive timeouts, Future timeouts,
circuit breakers and other time dependent events which happen all-the-time and in many

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package jdocs.actor;
//#timers
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.AbstractActorWithTimers;
//#timers
public class TimerDocTest {
static
//#timers
public class MyActor extends AbstractActorWithTimers {
private static Object TICK_KEY = "TickKey";
private static final class FirstTick {
}
private static final class Tick {
}
public MyActor() {
getTimers().startSingleTimer(TICK_KEY, new FirstTick(),
Duration.create(500, TimeUnit.MILLISECONDS));
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(FirstTick.class, message -> {
// do something useful here
getTimers().startPeriodicTimer(TICK_KEY, new Tick(),
Duration.create(1, TimeUnit.SECONDS));
})
.match(Tick.class, message -> {
// do something useful here
})
.build();
}
}
//#timers
}

View file

@ -1,173 +0,0 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package jdocs.pattern;
import akka.actor.*;
import akka.testkit.*;
import akka.testkit.TestEvent.Mute;
import akka.testkit.TestEvent.UnMute;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.*;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class SchedulerPatternTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("SchedulerPatternTest", AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
static
//#schedule-constructor
public class ScheduleInConstructor extends AbstractActor {
private final Cancellable tick = getContext().getSystem().scheduler().schedule(
Duration.create(500, TimeUnit.MILLISECONDS),
Duration.create(1, TimeUnit.SECONDS),
getSelf(), "tick", getContext().dispatcher(), null);
//#schedule-constructor
// this variable and constructor is declared here to not show up in the docs
final ActorRef target;
public ScheduleInConstructor(ActorRef target) {
this.target = target;
}
//#schedule-constructor
@Override
public void postStop() {
tick.cancel();
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("tick", message -> {
// do something useful here
//#schedule-constructor
target.tell(message, getSelf());
//#schedule-constructor
})
.matchEquals("restart", message -> {
throw new ArithmeticException();
})
.build();
}
}
//#schedule-constructor
static
//#schedule-receive
public class ScheduleInReceive extends AbstractActor {
//#schedule-receive
// this variable and constructor is declared here to not show up in the docs
final ActorRef target;
public ScheduleInReceive(ActorRef target) {
this.target = target;
}
//#schedule-receive
@Override
public void preStart() {
getContext().getSystem().scheduler().scheduleOnce(
Duration.create(500, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher(), null);
}
// override postRestart so we don't call preStart and schedule a new message
@Override
public void postRestart(Throwable reason) {
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("tick", message -> {
// send another periodic tick after the specified delay
getContext().getSystem().scheduler().scheduleOnce(
Duration.create(1, TimeUnit.SECONDS),
getSelf(), "tick", getContext().dispatcher(), null);
// do something useful here
//#schedule-receive
target.tell(message, getSelf());
//#schedule-receive
})
.matchEquals("restart", message -> {
throw new ArithmeticException();
})
.build();
}
}
//#schedule-receive
@Test
@Ignore // no way to tag this as timing sensitive
public void scheduleInConstructor() {
new TestSchedule(system) {{
final TestKit probe = new TestKit(system);
final Props props = Props.create(ScheduleInConstructor.class, probe.getRef());
testSchedule(probe, props, duration("3000 millis"), duration("2000 millis"));
}};
}
@Test
@Ignore // no way to tag this as timing sensitive
public void scheduleInReceive() {
new TestSchedule(system) {{
final TestKit probe = new TestKit(system);
final Props props = Props.create(ScheduleInReceive.class, probe.getRef());
testSchedule(probe, props, duration("3000 millis"), duration("2500 millis"));
}};
}
@Test
public void doNothing() {
// actorSystemResource.after is not called when all tests are ignored
}
public static class TestSchedule extends TestKit {
private ActorSystem system;
public TestSchedule(ActorSystem system) {
super(system);
this.system = system;
}
public void testSchedule(final TestKit probe, Props props,
FiniteDuration startDuration,
FiniteDuration afterRestartDuration) {
Iterable<akka.testkit.EventFilter> filter =
Arrays.asList(new akka.testkit.EventFilter[]{
(akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)});
try {
system.eventStream().publish(new Mute(filter));
final ActorRef actor = system.actorOf(props);
within(startDuration, () -> {
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
return null;
});
actor.tell("restart", getRef());
within(afterRestartDuration, () -> {
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
return null;
});
system.stop(actor);
}
finally {
system.eventStream().publish(new UnMute(filter));
}
}
}
}

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.actor
import akka.actor.Actor
import scala.concurrent.duration._
object TimerDocSpec {
//#timers
import akka.actor.Timers
object MyActor {
private case object TickKey
private case object FirstTick
private case object Tick
private case object LaterTick
}
class MyActor extends Actor with Timers {
import MyActor._
timers.startSingleTimer(TickKey, FirstTick, 500.millis)
def receive = {
case FirstTick =>
// do something useful here
timers.startPeriodicTimer(TickKey, Tick, 1.second)
case Tick =>
// do something useful here
}
}
//#timers
}

View file

@ -1,101 +0,0 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.pattern
import language.postfixOps
import akka.actor.{ Props, ActorRef, Actor }
import scala.concurrent.duration._
import akka.testkit.{ TimingTest, AkkaSpec, filterException }
import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor
object SchedulerPatternSpec {
//#schedule-constructor
class ScheduleInConstructor extends Actor {
import context.dispatcher
val tick =
context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")
//#schedule-constructor
// this var and constructor is declared here to not show up in the docs
var target: ActorRef = null
def this(target: ActorRef) = { this(); this.target = target }
//#schedule-constructor
override def postStop() = tick.cancel()
def receive = {
case "tick" =>
// do something useful here
//#schedule-constructor
target ! "tick"
case "restart" =>
throw new ArithmeticException
//#schedule-constructor
}
}
//#schedule-constructor
//#schedule-receive
class ScheduleInReceive extends Actor {
import context._
//#schedule-receive
// this var and constructor is declared here to not show up in the docs
var target: ActorRef = null
def this(target: ActorRef) = { this(); this.target = target }
//#schedule-receive
override def preStart() =
system.scheduler.scheduleOnce(500 millis, self, "tick")
// override postRestart so we don't call preStart and schedule a new message
override def postRestart(reason: Throwable) = {}
def receive = {
case "tick" =>
// send another periodic tick after the specified delay
system.scheduler.scheduleOnce(1000 millis, self, "tick")
// do something useful here
//#schedule-receive
target ! "tick"
case "restart" =>
throw new ArithmeticException
//#schedule-receive
}
}
//#schedule-receive
}
class SchedulerPatternSpec extends AkkaSpec {
def testSchedule(actor: ActorRef, startDuration: FiniteDuration,
afterRestartDuration: FiniteDuration) = {
filterException[ArithmeticException] {
within(startDuration) {
expectMsg("tick")
expectMsg("tick")
expectMsg("tick")
}
actor ! "restart"
within(afterRestartDuration) {
expectMsg("tick")
expectMsg("tick")
}
system.stop(actor)
}
}
"send periodic ticks from the constructor" taggedAs TimingTest in {
testSchedule(
system.actorOf(Props(classOf[ScheduleInConstructor], testActor)),
3000 millis, 2000 millis)
}
"send ticks from the preStart and receive" taggedAs TimingTest in {
testSchedule(
system.actorOf(Props(classOf[ScheduleInConstructor], testActor)),
3000 millis, 2500 millis)
}
}

View file

@ -288,9 +288,9 @@ object PersistentFSM {
/**
* INTERNAL API
*/
// FIXME: what about the cancellable?
@InternalApi
private[persistence] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext)
private[persistence] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int,
owner: AnyRef)(context: ActorContext)
extends NoSerializationVerificationNeeded {
private var ref: Option[Cancellable] = _
private val scheduler = context.system.scheduler

View file

@ -211,7 +211,7 @@ trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging
if (timers contains name) {
timers(name).cancel
}
val timer = Timer(name, msg, repeat, timerGen.next)(context)
val timer = Timer(name, msg, repeat, timerGen.next, this)(context)
timer.schedule(self, timeout)
timers(name) = timer
}
@ -412,8 +412,8 @@ trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging
if (generation == gen) {
processMsg(StateTimeout, "state timeout")
}
case t @ Timer(name, msg, repeat, gen)
if ((timers contains name) && (timers(name).generation == gen)) {
case t @ Timer(name, msg, repeat, gen, owner)
if ((owner eq this) && (timers contains name) && (timers(name).generation == gen)) {
if (timeoutFuture.isDefined) {
timeoutFuture.get.cancel()
timeoutFuture = None
@ -576,7 +576,7 @@ trait LoggingPersistentFSM[S, D, E] extends PersistentFSMBase[S, D, E] { this: A
if (debugEvent) {
val srcstr = source match {
case s: String s
case Timer(name, _, _, _) "timer " + name
case Timer(name, _, _, _, _) "timer " + name
case a: ActorRef a.toString
case _ "unknown"
}

View file

@ -1230,6 +1230,15 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate")
),
"2.5.3" -> Seq(
// #15733 Timers
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.fsm.PersistentFSM#Timer.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.fsm.PersistentFSM#Timer.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.fsm.PersistentFSM#Timer.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.FSM#Timer.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.FSM#Timer.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.FSM#Timer.apply"),
// #22789 Source.maybe rewritten as a graph stage
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$MaybeSubscription"),