diff --git a/akka-actor-tests/src/test/scala/akka/actor/TimerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TimerSpec.scala new file mode 100644 index 0000000000..42e95a7233 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/TimerSpec.scala @@ -0,0 +1,266 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.actor + +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +import akka.testkit._ +import scala.concurrent.Await + +object TimerSpec { + sealed trait Command + case class Tick(n: Int) extends Command + case object Bump extends Command + case class SlowThenBump(latch: TestLatch) extends Command + with NoSerializationVerificationNeeded + case object End extends Command + case class Throw(e: Throwable) extends Command + case object Cancel extends Command + case class SlowThenThrow(latch: TestLatch, e: Throwable) extends Command + with NoSerializationVerificationNeeded + + sealed trait Event + case class Tock(n: Int) extends Event + case class GotPostStop(timerActive: Boolean) extends Event + case class GotPreRestart(timerActive: Boolean) extends Event + + class Exc extends RuntimeException("simulated exc") with NoStackTrace + + def target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () ⇒ Int): Props = + Props(new Target(monitor, interval, repeat, initial)) + + class Target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () ⇒ Int) extends Actor with Timers { + private var bumpCount = initial() + + if (repeat) + timers.startPeriodicTimer("T", Tick(bumpCount), interval) + else + timers.startSingleTimer("T", Tick(bumpCount), interval) + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + monitor ! GotPreRestart(timers.isTimerActive("T")) + // don't call super.preRestart to avoid postStop + } + + override def postStop(): Unit = { + monitor ! GotPostStop(timers.isTimerActive("T")) + } + + def bump(): Unit = { + bumpCount += 1 + timers.startPeriodicTimer("T", Tick(bumpCount), interval) + } + + override def receive = { + case Tick(n) ⇒ + monitor ! Tock(n) + case Bump ⇒ + bump() + case SlowThenBump(latch) ⇒ + Await.ready(latch, 10.seconds) + bump() + case End ⇒ + context.stop(self) + case Cancel ⇒ + timers.cancel("T") + case Throw(e) ⇒ + throw e + case SlowThenThrow(latch, e) ⇒ + Await.ready(latch, 10.seconds) + throw e + } + } + + def fsmTarget(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () ⇒ Int): Props = + Props(new FsmTarget(monitor, interval, repeat, initial)) + + object TheState + + class FsmTarget(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () ⇒ Int) extends FSM[TheState.type, Int] { + + private var restarting = false + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + restarting = true + super.preRestart(reason, message) + monitor ! GotPreRestart(isTimerActive("T")) + } + + override def postStop(): Unit = { + super.postStop() + if (!restarting) + monitor ! GotPostStop(isTimerActive("T")) + } + + def bump(bumpCount: Int): State = { + setTimer("T", Tick(bumpCount + 1), interval, repeat) + stay using (bumpCount + 1) + } + + { + val i = initial() + startWith(TheState, i) + setTimer("T", Tick(i), interval, repeat) + } + + when(TheState) { + case Event(Tick(n), _) ⇒ + monitor ! Tock(n) + stay + case Event(Bump, bumpCount) ⇒ + bump(bumpCount) + case Event(SlowThenBump(latch), bumpCount) ⇒ + Await.ready(latch, 10.seconds) + bump(bumpCount) + case Event(End, _) ⇒ + stop() + case Event(Cancel, _) ⇒ + cancelTimer("T") + stay + case Event(Throw(e), _) ⇒ + throw e + case Event(SlowThenThrow(latch, e), _) ⇒ + Await.ready(latch, 10.seconds) + throw e + } + + initialize() + } + +} + +class TimerSpec extends AbstractTimerSpec { + override def testName: String = "Timers" + override def target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () ⇒ Int = () ⇒ 1): Props = + TimerSpec.target(monitor, interval, repeat, initial) +} + +class FsmTimerSpec extends AbstractTimerSpec { + override def testName: String = "FSM Timers" + override def target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () ⇒ Int = () ⇒ 1): Props = + TimerSpec.fsmTarget(monitor, interval, repeat, initial) +} + +abstract class AbstractTimerSpec extends AkkaSpec { + import TimerSpec._ + + val interval = 1.second + val dilatedInterval = interval.dilated + + def target(monitor: ActorRef, interval: FiniteDuration, repeat: Boolean, initial: () ⇒ Int = () ⇒ 1): Props + + def testName: String + + testName must { + "schedule non-repeated ticks" taggedAs TimingTest in { + val probe = TestProbe() + val ref = system.actorOf(target(probe.ref, 10.millis, repeat = false)) + + probe.expectMsg(Tock(1)) + probe.expectNoMsg(100.millis) + + ref ! End + probe.expectMsg(GotPostStop(false)) + } + + "schedule repeated ticks" taggedAs TimingTest in { + val probe = TestProbe() + val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true)) + probe.within((interval * 4) - 100.millis) { + probe.expectMsg(Tock(1)) + probe.expectMsg(Tock(1)) + probe.expectMsg(Tock(1)) + } + + ref ! End + probe.expectMsg(GotPostStop(false)) + } + + "replace timer" taggedAs TimingTest in { + val probe = TestProbe() + val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true)) + probe.expectMsg(Tock(1)) + val latch = new TestLatch(1) + // next Tock(1) enqueued in mailboxed, but should be discarded because of new timer + ref ! SlowThenBump(latch) + probe.expectNoMsg(interval + 100.millis) + latch.countDown() + probe.expectMsg(Tock(2)) + + ref ! End + probe.expectMsg(GotPostStop(false)) + } + + "cancel timer" taggedAs TimingTest in { + val probe = TestProbe() + val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true)) + probe.expectMsg(Tock(1)) + ref ! Cancel + probe.expectNoMsg(dilatedInterval + 100.millis) + + ref ! End + probe.expectMsg(GotPostStop(false)) + } + + "cancel timers when restarted" taggedAs TimingTest in { + val probe = TestProbe() + val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true)) + ref ! Throw(new Exc) + probe.expectMsg(GotPreRestart(false)) + + ref ! End + probe.expectMsg(GotPostStop(false)) + } + + "discard timers from old incarnation after restart, alt 1" taggedAs TimingTest in { + val probe = TestProbe() + val startCounter = new AtomicInteger(0) + val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true, + initial = () ⇒ startCounter.incrementAndGet())) + probe.expectMsg(Tock(1)) + + val latch = new TestLatch(1) + // next Tock(1) is enqueued in mailbox, but should be discarded by new incarnation + ref ! SlowThenThrow(latch, new Exc) + probe.expectNoMsg(interval + 100.millis) + latch.countDown() + probe.expectMsg(GotPreRestart(false)) + probe.expectNoMsg(interval / 2) + probe.expectMsg(Tock(2)) // this is from the startCounter increment + + ref ! End + probe.expectMsg(GotPostStop(false)) + } + + "discard timers from old incarnation after restart, alt 2" taggedAs TimingTest in { + val probe = TestProbe() + val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true)) + probe.expectMsg(Tock(1)) + // change state so that we see that the restart starts over again + ref ! Bump + + probe.expectMsg(Tock(2)) + + val latch = new TestLatch(1) + // next Tock(2) is enqueued in mailbox, but should be discarded by new incarnation + ref ! SlowThenThrow(latch, new Exc) + probe.expectNoMsg(interval + 100.millis) + latch.countDown() + probe.expectMsg(GotPreRestart(false)) + probe.expectMsg(Tock(1)) + + ref ! End + probe.expectMsg(GotPostStop(false)) + } + + "cancel timers when stopped" in { + val probe = TestProbe() + val ref = system.actorOf(target(probe.ref, dilatedInterval, repeat = true)) + ref ! End + probe.expectMsg(GotPostStop(false)) + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 85c7f4132b..fab26a2217 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -272,6 +272,7 @@ class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaExc */ @SerialVersionUID(1L) final case class UnhandledMessage(@BeanProperty message: Any, @BeanProperty sender: ActorRef, @BeanProperty recipient: ActorRef) + extends NoSerializationVerificationNeeded /** * Classes for passing status back to the sender. diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 3f909b031d..9390b2d5c3 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -9,6 +9,7 @@ import scala.collection.mutable import akka.routing.{ Deafen, Listen, Listeners } import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ +import akka.annotation.InternalApi object FSM { @@ -87,8 +88,9 @@ object FSM { /** * INTERNAL API */ - // FIXME: what about the cancellable? - private[akka] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) + @InternalApi + private[akka] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int, + owner: AnyRef)(context: ActorContext) extends NoSerializationVerificationNeeded { private var ref: Option[Cancellable] = _ private val scheduler = context.system.scheduler @@ -419,7 +421,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging { if (timers contains name) { timers(name).cancel } - val timer = Timer(name, msg, repeat, timerGen.next)(context) + val timer = Timer(name, msg, repeat, timerGen.next, this)(context) timer.schedule(self, timeout) timers(name) = timer } @@ -616,8 +618,8 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging { if (generation == gen) { processMsg(StateTimeout, "state timeout") } - case t @ Timer(name, msg, repeat, gen) ⇒ - if ((timers contains name) && (timers(name).generation == gen)) { + case t @ Timer(name, msg, repeat, gen, owner) ⇒ + if ((owner eq this) && (timers contains name) && (timers(name).generation == gen)) { if (timeoutFuture.isDefined) { timeoutFuture.get.cancel() timeoutFuture = None @@ -781,10 +783,10 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ private[akka] abstract override def processEvent(event: Event, source: AnyRef): Unit = { if (debugEvent) { val srcstr = source match { - case s: String ⇒ s - case Timer(name, _, _, _) ⇒ "timer " + name - case a: ActorRef ⇒ a.toString - case _ ⇒ "unknown" + case s: String ⇒ s + case Timer(name, _, _, _, _) ⇒ "timer " + name + case a: ActorRef ⇒ a.toString + case _ ⇒ "unknown" } log.debug("processing {} from {} in state {}", event, srcstr, stateName) } diff --git a/akka-actor/src/main/scala/akka/actor/Timers.scala b/akka-actor/src/main/scala/akka/actor/Timers.scala new file mode 100644 index 0000000000..387212cbe6 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Timers.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.actor + +import scala.concurrent.duration.FiniteDuration +import akka.annotation.DoNotInherit +import akka.util.OptionVal + +/** + * Scala API: Mix in Timers into your Actor to get support for scheduled + * `self` messages via [[TimerScheduler]]. + * + * Timers are bound to the lifecycle of the actor that owns it, + * and thus are cancelled automatically when it is restarted or stopped. + */ +trait Timers extends Actor { + + private val _timers = new TimerSchedulerImpl(context) + + /** + * Start and cancel timers via the enclosed `TimerScheduler`. + */ + final def timers: TimerScheduler = _timers + + override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { + timers.cancelAll() + super.aroundPreRestart(reason, message) + } + + override protected[akka] def aroundPostStop(): Unit = { + timers.cancelAll() + super.aroundPostStop() + } + + override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { + msg match { + case timerMsg: TimerSchedulerImpl.TimerMsg ⇒ + _timers.interceptTimerMsg(timerMsg) match { + case OptionVal.Some(m) ⇒ super.aroundReceive(receive, m) + case OptionVal.None ⇒ // discard + } + case _ ⇒ + super.aroundReceive(receive, msg) + } + } + +} + +/** + * Java API: Support for scheduled `self` messages via [[TimerScheduler]]. + * + * Timers are bound to the lifecycle of the actor that owns it, + * and thus are cancelled automatically when it is restarted or stopped. + */ +abstract class AbstractActorWithTimers extends AbstractActor with Timers { + /** + * Start and cancel timers via the enclosed `TimerScheduler`. + */ + final def getTimers: TimerScheduler = timers +} + +/** + * Support for scheduled `self` messages in an actor. + * It is used by mixing in trait `Timers` in Scala or extending `AbstractActorWithTimers` + * in Java. + * + * Timers are bound to the lifecycle of the actor that owns it, + * and thus are cancelled automatically when it is restarted or stopped. + * + * `TimerScheduler` is not thread-safe, i.e. it must only be used within + * the actor that owns it. + */ +@DoNotInherit abstract class TimerScheduler { + + /** + * Start a periodic timer that will send `msg` to the `self` actor at + * a fixed `interval`. + * + * Each timer has a key and if a new timer with same key is started + * the previous is cancelled and it's guaranteed that a message from the + * previous timer is not received, even though it might already be enqueued + * in the mailbox when the new timer is started. + */ + def startPeriodicTimer(key: Any, msg: Any, interval: FiniteDuration): Unit + + /** + * Start a timer that will send `msg` once to the `self` actor after + * the given `timeout`. + * + * Each timer has a key and if a new timer with same key is started + * the previous is cancelled and it's guaranteed that a message from the + * previous timer is not received, even though it might already be enqueued + * in the mailbox when the new timer is started. + */ + def startSingleTimer(key: Any, msg: Any, timeout: FiniteDuration): Unit + + /** + * Check if a timer with a given `key` is active. + */ + def isTimerActive(key: Any): Boolean + + /** + * Cancel a timer with a given `key`. + * If canceling a timer that was already canceled, or key never was used to start a timer + * this operation will do nothing. + * + * It is guaranteed that a message from a canceled timer, including its previous incarnation + * for the same key, will not be received by the actor, even though the message might already + * be enqueued in the mailbox when cancel is called. + */ + def cancel(key: Any): Unit + + /** + * Cancel all timers. + */ + def cancelAll(): Unit + +} diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/TimerSchedulerImpl.scala b/akka-actor/src/main/scala/akka/actor/dungeon/TimerSchedulerImpl.scala new file mode 100644 index 0000000000..a2ba58c670 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/dungeon/TimerSchedulerImpl.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.actor + +import scala.concurrent.duration.FiniteDuration + +import akka.annotation.InternalApi +import akka.event.Logging +import akka.util.OptionVal + +/** + * INTERNAL API + */ +@InternalApi private[akka] object TimerSchedulerImpl { + final case class Timer(key: Any, msg: Any, repeat: Boolean, generation: Int, task: Cancellable) + final case class TimerMsg(key: Any, generation: Int, owner: TimerSchedulerImpl) + extends NoSerializationVerificationNeeded +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class TimerSchedulerImpl(ctx: ActorContext) extends TimerScheduler { + import TimerSchedulerImpl._ + + private val log = Logging(ctx.system, classOf[TimerScheduler]) + private var timers: Map[Any, Timer] = Map.empty + private var timerGen = 0 + private def nextTimerGen(): Int = { + timerGen += 1 + timerGen + } + + override def startPeriodicTimer(key: Any, msg: Any, interval: FiniteDuration): Unit = + startTimer(key, msg, interval, repeat = true) + + override def startSingleTimer(key: Any, msg: Any, timeout: FiniteDuration): Unit = + startTimer(key, msg, timeout, repeat = false) + + private def startTimer(key: Any, msg: Any, timeout: FiniteDuration, repeat: Boolean): Unit = { + timers.get(key) match { + case Some(t) ⇒ cancelTimer(t) + case None ⇒ + } + val nextGen = nextTimerGen() + + val timerMsg = TimerMsg(key, nextGen, this) + val task = + if (repeat) + ctx.system.scheduler.schedule(timeout, timeout, ctx.self, timerMsg)(ctx.dispatcher) + else + ctx.system.scheduler.scheduleOnce(timeout, ctx.self, timerMsg)(ctx.dispatcher) + + val nextTimer = Timer(key, msg, repeat, nextGen, task) + log.debug("Start timer [{}] with generation [{}]", key, nextGen) + timers = timers.updated(key, nextTimer) + } + + override def isTimerActive(key: Any): Boolean = + timers.contains(key) + + override def cancel(key: Any): Unit = { + timers.get(key) match { + case None ⇒ // already removed/canceled + case Some(t) ⇒ cancelTimer(t) + } + } + + private def cancelTimer(timer: Timer): Unit = { + log.debug("Cancel timer [{}] with generation [{}]", timer.key, timer.generation) + timer.task.cancel() + timers -= timer.key + } + + override def cancelAll(): Unit = { + log.debug("Cancel all timers") + timers.valuesIterator.foreach { timer ⇒ + timer.task.cancel() + } + timers = Map.empty + } + + def interceptTimerMsg(timerMsg: TimerMsg): OptionVal[AnyRef] = { + timers.get(timerMsg.key) match { + case None ⇒ + // it was from canceled timer that was already enqueued in mailbox + log.debug("Received timer [{}] that has been removed, discarding", timerMsg.key) + OptionVal.None // message should be ignored + case Some(t) ⇒ + if (timerMsg.owner ne this) { + // after restart, it was from an old instance that was enqueued in mailbox before canceled + log.debug("Received timer [{}] from old restarted instance, discarding", timerMsg.key) + OptionVal.None // message should be ignored + } else if (timerMsg.generation == t.generation) { + // valid timer + log.debug("Received timer [{}]", timerMsg.key) + if (!t.repeat) + timers -= t.key + OptionVal.Some(t.msg.asInstanceOf[AnyRef]) + } else { + // it was from an old timer that was enqueued in mailbox before canceled + log.debug( + "Received timer [{}] from from old generation [{}], expected generation [{}], discarding", + timerMsg.key, timerMsg.generation, t.generation) + OptionVal.None // message should be ignored + } + } + } + +} diff --git a/akka-docs/rst/java/code/jdocs/actor/TimerDocTest.java b/akka-docs/rst/java/code/jdocs/actor/TimerDocTest.java new file mode 100644 index 0000000000..a6725dc6b8 --- /dev/null +++ b/akka-docs/rst/java/code/jdocs/actor/TimerDocTest.java @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ + +package jdocs.actor; + +//#timers +import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.Duration; +import akka.actor.AbstractActorWithTimers; + +//#timers + +public class TimerDocTest { + + static + //#timers + public class MyActor extends AbstractActorWithTimers { + + private static Object TICK_KEY = "TickKey"; + private static final class FirstTick { + } + private static final class Tick { + } + + public MyActor() { + getTimers().startSingleTimer(TICK_KEY, new FirstTick(), + Duration.create(500, TimeUnit.MILLISECONDS)); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(FirstTick.class, message -> { + // do something useful here + getTimers().startPeriodicTimer(TICK_KEY, new Tick(), + Duration.create(1, TimeUnit.SECONDS)); + }) + .match(Tick.class, message -> { + // do something useful here + }) + .build(); + } + } + //#timers +} \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/actor/TimerDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/TimerDocSpec.scala new file mode 100644 index 0000000000..dfae34d9cf --- /dev/null +++ b/akka-docs/rst/scala/code/docs/actor/TimerDocSpec.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package docs.actor + +import akka.actor.Actor +import scala.concurrent.duration._ + +object TimerDocSpec { + //#timers + import akka.actor.Timers + + object MyActor { + private case object TickKey + private case object FirstTick + private case object Tick + private case object LaterTick + } + + class MyActor extends Actor with Timers { + import MyActor._ + timers.startSingleTimer(TickKey, FirstTick, 500.millis) + + def receive = { + case FirstTick => + // do something useful here + timers.startPeriodicTimer(TickKey, Tick, 1.second) + case Tick => + // do something useful here + } + } + //#timers +} diff --git a/akka-docs/src/main/paradox/scala/actors.md b/akka-docs/src/main/paradox/scala/actors.md index bfac206982..7022f5927a 100644 --- a/akka-docs/src/main/paradox/scala/actors.md +++ b/akka-docs/src/main/paradox/scala/actors.md @@ -905,6 +905,29 @@ Messages marked with `NotInfluenceReceiveTimeout` will not reset the timer. This `ReceiveTimeout` should be fired by external inactivity but not influenced by internal activity, e.g. scheduled tick messages. + + +## Timers, scheduled messages + +Messages can be scheduled to be sent at a later point by using the @ref[Scheduler](scheduler.md) directly, +but when scheduling periodic or single messages in an actor to itself it's more convenient and safe +to use the support for named timers. The lifecycle of scheduled messages can be difficult to manage +when the actor is restarted and that is taken care of by the timers. + +Scala +: @@snip [ActorDocSpec.scala]($code$/scala/docs/actor/TimerDocSpec.scala) { #timers } + +Java +: @@snip [ActorDocTest.java]($code$/java/jdocs/actor/TimerDocTest.java) { #timers } + +Each timer has a key and can be replaced or cancelled. It's guaranteed that a message from the +previous incarnation of the timer with the same key is not received, even though it might already +be enqueued in the mailbox when it was cancelled or the new timer was started. + +The timers are bound to the lifecycle of the actor that owns it, and thus are cancelled +automatically when it is restarted or stopped. Note that the `TimerScheduler` is not thread-safe, +i.e. it must only be used within the actor that owns it. + ## Stopping actors diff --git a/akka-docs/src/main/paradox/scala/howto.md b/akka-docs/src/main/paradox/scala/howto.md index 456b626c14..c6f75fedb0 100644 --- a/akka-docs/src/main/paradox/scala/howto.md +++ b/akka-docs/src/main/paradox/scala/howto.md @@ -122,47 +122,7 @@ The pattern is described [Discovering Message Flows in Actor System with the Spi ## Scheduling Periodic Messages -This pattern describes how to schedule periodic messages to yourself in two different -ways. - -The first way is to set up periodic message scheduling in the constructor of the actor, -and cancel that scheduled sending in `postStop` or else we might have multiple registered -message sends to the same actor. - -@@@ note - -With this approach the scheduled periodic message send will be restarted with the actor on restarts. -This also means that the time period that elapses between two tick messages during a restart may drift -off based on when you restart the scheduled message sends relative to the time that the last message was -sent, and how long the initial delay is. Worst case scenario is `interval` plus `initialDelay`. - -@@@ - -Scala -: @@snip [SchedulerPatternSpec.scala]($code$/scala/docs/pattern/SchedulerPatternSpec.scala) { #schedule-constructor } - -Java -: @@snip [SchedulerPatternTest.java]($code$/java/jdocs/pattern/SchedulerPatternTest.java) { #schedule-constructor } - -The second variant sets up an initial one shot message send in the `preStart` method -of the actor, and the then the actor when it receives this message sets up a new one shot -message send. You also have to override `postRestart` so we don't call `preStart` -and schedule the initial message send again. - -@@@ note - -With this approach we won't fill up the mailbox with tick messages if the actor is -under pressure, but only schedule a new tick message when we have seen the previous one. - -@@@ - -Scala -: @@snip [SchedulerPatternSpec.scala]($code$/scala/docs/pattern/SchedulerPatternSpec.scala) { #schedule-receive } - -Java -: @@snip [SchedulerPatternTest.java]($code$/java/jdocs/pattern/SchedulerPatternTest.java) { #schedule-receive } - -@@@ div { .group-java } +See @ref:[Actor Timers](actors.md#actors-timers) ## Single-Use Actor Trees with High-Level Error Reporting diff --git a/akka-docs/src/main/paradox/scala/scheduler.md b/akka-docs/src/main/paradox/scala/scheduler.md index ba0edabab5..24d6946565 100644 --- a/akka-docs/src/main/paradox/scala/scheduler.md +++ b/akka-docs/src/main/paradox/scala/scheduler.md @@ -10,6 +10,10 @@ You can schedule sending of messages to actors and execution of tasks (functions or Runnable). You will get a `Cancellable` back that you can call `cancel` on to cancel the execution of the scheduled operation. +When scheduling periodic or single messages in an actor to itself it is recommended to +use the @ref:[Actor Timers](actors.md#actors-timers) instead of using the `Scheduler` +directly. + The scheduler in Akka is designed for high-throughput of thousands up to millions of triggers. The prime use-case being triggering Actor receive timeouts, Future timeouts, circuit breakers and other time dependent events which happen all-the-time and in many diff --git a/akka-docs/src/test/java/jdocs/actor/TimerDocTest.java b/akka-docs/src/test/java/jdocs/actor/TimerDocTest.java new file mode 100644 index 0000000000..1d384f94ef --- /dev/null +++ b/akka-docs/src/test/java/jdocs/actor/TimerDocTest.java @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ + +package jdocs.actor; + +//#timers +import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.Duration; +import akka.actor.AbstractActorWithTimers; + +//#timers + +public class TimerDocTest { + + static + //#timers + public class MyActor extends AbstractActorWithTimers { + + private static Object TICK_KEY = "TickKey"; + private static final class FirstTick { + } + private static final class Tick { + } + + public MyActor() { + getTimers().startSingleTimer(TICK_KEY, new FirstTick(), + Duration.create(500, TimeUnit.MILLISECONDS)); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(FirstTick.class, message -> { + // do something useful here + getTimers().startPeriodicTimer(TICK_KEY, new Tick(), + Duration.create(1, TimeUnit.SECONDS)); + }) + .match(Tick.class, message -> { + // do something useful here + }) + .build(); + } + } + //#timers +} diff --git a/akka-docs/src/test/java/jdocs/pattern/SchedulerPatternTest.java b/akka-docs/src/test/java/jdocs/pattern/SchedulerPatternTest.java deleted file mode 100644 index 237abd2799..0000000000 --- a/akka-docs/src/test/java/jdocs/pattern/SchedulerPatternTest.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Copyright (C) 2009-2017 Lightbend Inc. - */ - -package jdocs.pattern; - -import akka.actor.*; -import akka.testkit.*; -import akka.testkit.TestEvent.Mute; -import akka.testkit.TestEvent.UnMute; -import jdocs.AbstractJavaTest; -import akka.testkit.javadsl.TestKit; -import org.junit.*; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -public class SchedulerPatternTest extends AbstractJavaTest { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("SchedulerPatternTest", AkkaSpec.testConf()); - - private final ActorSystem system = actorSystemResource.getSystem(); - - static - //#schedule-constructor - public class ScheduleInConstructor extends AbstractActor { - - private final Cancellable tick = getContext().getSystem().scheduler().schedule( - Duration.create(500, TimeUnit.MILLISECONDS), - Duration.create(1, TimeUnit.SECONDS), - getSelf(), "tick", getContext().dispatcher(), null); - //#schedule-constructor - // this variable and constructor is declared here to not show up in the docs - final ActorRef target; - public ScheduleInConstructor(ActorRef target) { - this.target = target; - } - //#schedule-constructor - - @Override - public void postStop() { - tick.cancel(); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .matchEquals("tick", message -> { - // do something useful here - //#schedule-constructor - target.tell(message, getSelf()); - //#schedule-constructor - }) - .matchEquals("restart", message -> { - throw new ArithmeticException(); - }) - .build(); - } - } - //#schedule-constructor - - static - //#schedule-receive - public class ScheduleInReceive extends AbstractActor { - //#schedule-receive - // this variable and constructor is declared here to not show up in the docs - final ActorRef target; - public ScheduleInReceive(ActorRef target) { - this.target = target; - } - //#schedule-receive - - @Override - public void preStart() { - getContext().getSystem().scheduler().scheduleOnce( - Duration.create(500, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().dispatcher(), null); - } - - // override postRestart so we don't call preStart and schedule a new message - @Override - public void postRestart(Throwable reason) { - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .matchEquals("tick", message -> { - // send another periodic tick after the specified delay - getContext().getSystem().scheduler().scheduleOnce( - Duration.create(1, TimeUnit.SECONDS), - getSelf(), "tick", getContext().dispatcher(), null); - // do something useful here - //#schedule-receive - target.tell(message, getSelf()); - //#schedule-receive - }) - .matchEquals("restart", message -> { - throw new ArithmeticException(); - }) - .build(); - } - } - //#schedule-receive - - @Test - @Ignore // no way to tag this as timing sensitive - public void scheduleInConstructor() { - new TestSchedule(system) {{ - final TestKit probe = new TestKit(system); - final Props props = Props.create(ScheduleInConstructor.class, probe.getRef()); - testSchedule(probe, props, duration("3000 millis"), duration("2000 millis")); - }}; - } - - @Test - @Ignore // no way to tag this as timing sensitive - public void scheduleInReceive() { - new TestSchedule(system) {{ - final TestKit probe = new TestKit(system); - final Props props = Props.create(ScheduleInReceive.class, probe.getRef()); - testSchedule(probe, props, duration("3000 millis"), duration("2500 millis")); - }}; - } - - @Test - public void doNothing() { - // actorSystemResource.after is not called when all tests are ignored - } - - public static class TestSchedule extends TestKit { - private ActorSystem system; - - public TestSchedule(ActorSystem system) { - super(system); - this.system = system; - } - - public void testSchedule(final TestKit probe, Props props, - FiniteDuration startDuration, - FiniteDuration afterRestartDuration) { - Iterable filter = - Arrays.asList(new akka.testkit.EventFilter[]{ - (akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)}); - try { - system.eventStream().publish(new Mute(filter)); - - final ActorRef actor = system.actorOf(props); - within(startDuration, () -> { - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - return null; - }); - - actor.tell("restart", getRef()); - within(afterRestartDuration, () -> { - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - return null; - }); - - system.stop(actor); - } - finally { - system.eventStream().publish(new UnMute(filter)); - } - } - } -} diff --git a/akka-docs/src/test/scala/docs/actor/TimerDocSpec.scala b/akka-docs/src/test/scala/docs/actor/TimerDocSpec.scala new file mode 100644 index 0000000000..dfae34d9cf --- /dev/null +++ b/akka-docs/src/test/scala/docs/actor/TimerDocSpec.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package docs.actor + +import akka.actor.Actor +import scala.concurrent.duration._ + +object TimerDocSpec { + //#timers + import akka.actor.Timers + + object MyActor { + private case object TickKey + private case object FirstTick + private case object Tick + private case object LaterTick + } + + class MyActor extends Actor with Timers { + import MyActor._ + timers.startSingleTimer(TickKey, FirstTick, 500.millis) + + def receive = { + case FirstTick => + // do something useful here + timers.startPeriodicTimer(TickKey, Tick, 1.second) + case Tick => + // do something useful here + } + } + //#timers +} diff --git a/akka-docs/src/test/scala/docs/pattern/SchedulerPatternSpec.scala b/akka-docs/src/test/scala/docs/pattern/SchedulerPatternSpec.scala deleted file mode 100644 index 5dde1d4cdb..0000000000 --- a/akka-docs/src/test/scala/docs/pattern/SchedulerPatternSpec.scala +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Copyright (C) 2009-2017 Lightbend Inc. - */ - -package docs.pattern - -import language.postfixOps - -import akka.actor.{ Props, ActorRef, Actor } -import scala.concurrent.duration._ -import akka.testkit.{ TimingTest, AkkaSpec, filterException } -import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor - -object SchedulerPatternSpec { - //#schedule-constructor - class ScheduleInConstructor extends Actor { - import context.dispatcher - val tick = - context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick") - //#schedule-constructor - // this var and constructor is declared here to not show up in the docs - var target: ActorRef = null - def this(target: ActorRef) = { this(); this.target = target } - //#schedule-constructor - - override def postStop() = tick.cancel() - - def receive = { - case "tick" => - // do something useful here - //#schedule-constructor - target ! "tick" - case "restart" => - throw new ArithmeticException - //#schedule-constructor - } - } - //#schedule-constructor - - //#schedule-receive - class ScheduleInReceive extends Actor { - import context._ - //#schedule-receive - // this var and constructor is declared here to not show up in the docs - var target: ActorRef = null - def this(target: ActorRef) = { this(); this.target = target } - //#schedule-receive - - override def preStart() = - system.scheduler.scheduleOnce(500 millis, self, "tick") - - // override postRestart so we don't call preStart and schedule a new message - override def postRestart(reason: Throwable) = {} - - def receive = { - case "tick" => - // send another periodic tick after the specified delay - system.scheduler.scheduleOnce(1000 millis, self, "tick") - // do something useful here - //#schedule-receive - target ! "tick" - case "restart" => - throw new ArithmeticException - //#schedule-receive - } - } - //#schedule-receive -} - -class SchedulerPatternSpec extends AkkaSpec { - - def testSchedule(actor: ActorRef, startDuration: FiniteDuration, - afterRestartDuration: FiniteDuration) = { - - filterException[ArithmeticException] { - within(startDuration) { - expectMsg("tick") - expectMsg("tick") - expectMsg("tick") - } - actor ! "restart" - within(afterRestartDuration) { - expectMsg("tick") - expectMsg("tick") - } - system.stop(actor) - } - } - - "send periodic ticks from the constructor" taggedAs TimingTest in { - testSchedule( - system.actorOf(Props(classOf[ScheduleInConstructor], testActor)), - 3000 millis, 2000 millis) - } - - "send ticks from the preStart and receive" taggedAs TimingTest in { - testSchedule( - system.actorOf(Props(classOf[ScheduleInConstructor], testActor)), - 3000 millis, 2500 millis) - } -} diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala index 17fcdd48bb..c0beb256ee 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala @@ -288,9 +288,9 @@ object PersistentFSM { /** * INTERNAL API */ - // FIXME: what about the cancellable? @InternalApi - private[persistence] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) + private[persistence] final case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int, + owner: AnyRef)(context: ActorContext) extends NoSerializationVerificationNeeded { private var ref: Option[Cancellable] = _ private val scheduler = context.system.scheduler diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala index 0608dd4410..93a6bed4e8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala @@ -211,7 +211,7 @@ trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging if (timers contains name) { timers(name).cancel } - val timer = Timer(name, msg, repeat, timerGen.next)(context) + val timer = Timer(name, msg, repeat, timerGen.next, this)(context) timer.schedule(self, timeout) timers(name) = timer } @@ -412,8 +412,8 @@ trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging if (generation == gen) { processMsg(StateTimeout, "state timeout") } - case t @ Timer(name, msg, repeat, gen) ⇒ - if ((timers contains name) && (timers(name).generation == gen)) { + case t @ Timer(name, msg, repeat, gen, owner) ⇒ + if ((owner eq this) && (timers contains name) && (timers(name).generation == gen)) { if (timeoutFuture.isDefined) { timeoutFuture.get.cancel() timeoutFuture = None @@ -575,10 +575,10 @@ trait LoggingPersistentFSM[S, D, E] extends PersistentFSMBase[S, D, E] { this: A private[akka] abstract override def processEvent(event: Event, source: AnyRef): Unit = { if (debugEvent) { val srcstr = source match { - case s: String ⇒ s - case Timer(name, _, _, _) ⇒ "timer " + name - case a: ActorRef ⇒ a.toString - case _ ⇒ "unknown" + case s: String ⇒ s + case Timer(name, _, _, _, _) ⇒ "timer " + name + case a: ActorRef ⇒ a.toString + case _ ⇒ "unknown" } log.debug("processing {} from {} in state {}", event, srcstr, stateName) } diff --git a/project/MiMa.scala b/project/MiMa.scala index 7110fdcab8..064a307eef 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1230,6 +1230,15 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate") ), "2.5.3" -> Seq( + + // #15733 Timers + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.fsm.PersistentFSM#Timer.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.fsm.PersistentFSM#Timer.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.fsm.PersistentFSM#Timer.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.FSM#Timer.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.FSM#Timer.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.FSM#Timer.apply"), + // #22789 Source.maybe rewritten as a graph stage ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$MaybeSubscription"),