From db0e170d32413d91edd8a36d2bfe7251e9debd4b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Apr 2017 11:37:38 +0200 Subject: [PATCH] support for safe timers, periodic scheduling, #16742 * implemented an Intercept behavior that can handle the scheduled TimerMsg, even though it's of a different type * Intercept is similar to Tap but more powerful, implemented Tap with Intercept. Intercept is internal API. * When wrapping a behavior, e.g. Tap, the outer behavior must also be Deferred if the wrapped behavior is Deferred * PostStop not signaled when stopped voluntarily, intercept messages to cancel timers when stopped is returned --- .../java/akka/typed/javadsl/ActorCompile.java | 10 + .../test/scala/akka/typed/DeferredSpec.scala | 54 +++- .../test/scala/akka/typed/RestarterSpec.scala | 8 +- .../src/test/scala/akka/typed/TimerSpec.scala | 231 ++++++++++++++++++ .../src/test/scala/akka/typed/TypedSpec.scala | 24 +- .../src/main/scala/akka/typed/Behavior.scala | 9 + .../akka/typed/internal/BehaviorImpl.scala | 143 +++++++++-- .../scala/akka/typed/internal/Restarter.scala | 5 +- .../typed/internal/TimerSchedulerImpl.scala | 152 ++++++++++++ .../main/scala/akka/typed/javadsl/Actor.scala | 16 +- .../akka/typed/javadsl/TimerScheduler.scala | 62 +++++ .../scala/akka/typed/scaladsl/Actor.scala | 16 +- .../akka/typed/scaladsl/TimerScheduler.scala | 62 +++++ 13 files changed, 744 insertions(+), 48 deletions(-) create mode 100644 akka-typed-tests/src/test/scala/akka/typed/TimerSpec.scala create mode 100644 akka-typed/src/main/scala/akka/typed/internal/TimerSchedulerImpl.scala create mode 100644 akka-typed/src/main/scala/akka/typed/javadsl/TimerScheduler.scala create mode 100644 akka-typed/src/main/scala/akka/typed/scaladsl/TimerScheduler.scala diff --git a/akka-typed-tests/src/test/java/akka/typed/javadsl/ActorCompile.java b/akka-typed-tests/src/test/java/akka/typed/javadsl/ActorCompile.java index 7bae597a9e..9830562f2e 100644 --- a/akka-typed-tests/src/test/java/akka/typed/javadsl/ActorCompile.java +++ b/akka-typed-tests/src/test/java/akka/typed/javadsl/ActorCompile.java @@ -8,6 +8,9 @@ import akka.typed.ActorContext; import static akka.typed.javadsl.Actor.*; +import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.Duration; + public class ActorCompile { interface MyMsg {} @@ -56,6 +59,13 @@ public class ActorCompile { }); } + { + Behavior b = Actor.withTimers(timers -> { + timers.startPeriodicTimer("key", new MyMsgB("tick"), Duration.create(1, TimeUnit.SECONDS)); + return Actor.ignore(); + }); + } + static class MyBehavior extends ExtensibleBehavior { diff --git a/akka-typed-tests/src/test/scala/akka/typed/DeferredSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/DeferredSpec.scala index 9be2432718..b61b975302 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/DeferredSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/DeferredSpec.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.Actor.BehaviorDecorators import akka.typed.scaladsl.AskPattern._ import akka.typed.testkit.EffectfulActorContext import akka.typed.testkit.TestKitSettings @@ -62,16 +63,10 @@ class DeferredSpec extends TypedSpec { } - trait RealTests { + trait RealTests extends StartSupport { implicit def system: ActorSystem[TypedSpec.Command] implicit val testSettings = TestKitSettings(system) - val nameCounter = Iterator.from(0) - def nextName(): String = s"a-${nameCounter.next()}" - - def start(behv: Behavior[Command]): ActorRef[Command] = - Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated) - def `must create underlying`(): Unit = { val probe = TestProbe[Event]("evt") val behv = Actor.deferred[Command] { _ ⇒ @@ -96,6 +91,51 @@ class DeferredSpec extends TypedSpec { probe.expectNoMsg(100.millis) } + def `must create underlying when nested`(): Unit = { + val probe = TestProbe[Event]("evt") + val behv = Actor.deferred[Command] { _ ⇒ + Actor.deferred[Command] { _ ⇒ + probe.ref ! Started + target(probe.ref) + } + } + start(behv) + probe.expectMsg(Started) + } + + def `must undefer underlying when wrapped by widen`(): Unit = { + val probe = TestProbe[Event]("evt") + val behv = Actor.deferred[Command] { _ ⇒ + probe.ref ! Started + target(probe.ref) + }.widen[Command] { + case m ⇒ m + } + probe.expectNoMsg(100.millis) // not yet + val ref = start(behv) + // it's supposed to be created immediately (not waiting for first message) + probe.expectMsg(Started) + ref ! Ping + probe.expectMsg(Pong) + } + + def `must undefer underlying when wrapped by monitor`(): Unit = { + // monitor is implemented with tap, so this is testing both + val probe = TestProbe[Event]("evt") + val monitorProbe = TestProbe[Command]("monitor") + val behv = Actor.monitor(monitorProbe.ref, Actor.deferred[Command] { _ ⇒ + probe.ref ! Started + target(probe.ref) + }) + probe.expectNoMsg(100.millis) // not yet + val ref = start(behv) + // it's supposed to be created immediately (not waiting for first message) + probe.expectMsg(Started) + ref ! Ping + monitorProbe.expectMsg(Ping) + probe.expectMsg(Pong) + } + } object `A Restarter (stubbed, native)` extends StubbedTests with NativeSystem diff --git a/akka-typed-tests/src/test/scala/akka/typed/RestarterSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/RestarterSpec.scala index d2e48efa78..fdcb4412f5 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/RestarterSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/RestarterSpec.scala @@ -223,17 +223,11 @@ class RestarterSpec extends TypedSpec { } } - trait RealTests { + trait RealTests extends StartSupport { import akka.typed.scaladsl.adapter._ implicit def system: ActorSystem[TypedSpec.Command] implicit val testSettings = TestKitSettings(system) - val nameCounter = Iterator.from(0) - def nextName(): String = s"a-${nameCounter.next()}" - - def start(behv: Behavior[Command]): ActorRef[Command] = - Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated) - def `must receive message`(): Unit = { val probe = TestProbe[Event]("evt") val behv = restarter[Throwable]().wrap(target(probe.ref)) diff --git a/akka-typed-tests/src/test/scala/akka/typed/TimerSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/TimerSpec.scala new file mode 100644 index 0000000000..09707eebae --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/TimerSpec.scala @@ -0,0 +1,231 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.AskPattern._ +import akka.typed.scaladsl.TimerScheduler +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl._ +import org.scalatest.concurrent.Eventually.eventually + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TimerSpec extends TypedSpec(""" + #akka.loglevel = DEBUG + """) { + + sealed trait Command + case class Tick(n: Int) extends Command + case object Bump extends Command + case class SlowThenBump(latch: CountDownLatch) extends Command + case object End extends Command + case class Throw(e: Throwable) extends Command + case object Cancel extends Command + case class SlowThenThrow(latch: CountDownLatch, e: Throwable) extends Command + + sealed trait Event + case class Tock(n: Int) extends Event + case class GotPostStop(timerActive: Boolean) extends Event + case class GotPreRestart(timerActive: Boolean) extends Event + + class Exc extends RuntimeException("simulated exc") with NoStackTrace + + trait RealTests extends StartSupport { + implicit def system: ActorSystem[TypedSpec.Command] + implicit val testSettings = TestKitSettings(system) + + val interval = 1.second + val dilatedInterval = interval.dilated + + def target(monitor: ActorRef[Event], timer: TimerScheduler[Command], bumpCount: Int): Behavior[Command] = { + def bump(): Behavior[Command] = { + val nextCount = bumpCount + 1 + timer.startPeriodicTimer("T", Tick(nextCount), interval) + target(monitor, timer, nextCount) + } + + Actor.immutable[Command] { (ctx, cmd) ⇒ + cmd match { + case Tick(n) ⇒ + monitor ! Tock(n) + Actor.same + case Bump ⇒ + bump() + case SlowThenBump(latch) ⇒ + latch.await(10, TimeUnit.SECONDS) + bump() + case End ⇒ + Actor.stopped + case Cancel ⇒ + timer.cancel("T") + Actor.same + case Throw(e) ⇒ + throw e + case SlowThenThrow(latch, e) ⇒ + latch.await(10, TimeUnit.SECONDS) + throw e + } + } onSignal { + case (ctx, PreRestart) ⇒ + monitor ! GotPreRestart(timer.isTimerActive("T")) + Actor.same + case (ctx, PostStop) ⇒ + monitor ! GotPostStop(timer.isTimerActive("T")) + Actor.same + } + } + + def `01 must schedule non-repeated ticks`(): Unit = { + val probe = TestProbe[Event]("evt") + val behv = Actor.withTimers[Command] { timer ⇒ + timer.startSingleTimer("T", Tick(1), 10.millis) + target(probe.ref, timer, 1) + } + + val ref = start(behv) + probe.expectMsg(Tock(1)) + probe.expectNoMsg(100.millis) + + ref ! End + } + + def `02 must schedule repeated ticks`(): Unit = { + val probe = TestProbe[Event]("evt") + val behv = Actor.withTimers[Command] { timer ⇒ + timer.startPeriodicTimer("T", Tick(1), interval) + target(probe.ref, timer, 1) + } + + val ref = start(behv) + probe.within((interval * 4) - 100.millis) { + probe.expectMsg(Tock(1)) + probe.expectMsg(Tock(1)) + probe.expectMsg(Tock(1)) + } + + ref ! End + } + + def `03 must replace timer`(): Unit = { + val probe = TestProbe[Event]("evt") + val behv = Actor.withTimers[Command] { timer ⇒ + timer.startPeriodicTimer("T", Tick(1), interval) + target(probe.ref, timer, 1) + } + + val ref = start(behv) + probe.expectMsg(Tock(1)) + val latch = new CountDownLatch(1) + // next Tock(1) enqueued in mailboxed, but should be discarded because of new timer + ref ! SlowThenBump(latch) + probe.expectNoMsg(interval + 100.millis) + latch.countDown() + probe.expectMsg(Tock(2)) + + ref ! End + } + + def `04 must cancel timer`(): Unit = { + val probe = TestProbe[Event]("evt") + val behv = Actor.withTimers[Command] { timer ⇒ + timer.startPeriodicTimer("T", Tick(1), interval) + target(probe.ref, timer, 1) + } + + val ref = start(behv) + probe.expectMsg(Tock(1)) + ref ! Cancel + probe.expectNoMsg(dilatedInterval + 100.millis) + + ref ! End + } + + def `05 must discard timers from old incarnation after restart, alt 1`(): Unit = { + val probe = TestProbe[Event]("evt") + val startCounter = new AtomicInteger(0) + val behv = Actor.restarter[Exception]().wrap(Actor.withTimers[Command] { timer ⇒ + timer.startPeriodicTimer("T", Tick(startCounter.incrementAndGet()), interval) + target(probe.ref, timer, 1) + }) + + val ref = start(behv) + probe.expectMsg(Tock(1)) + + val latch = new CountDownLatch(1) + // next Tock(1) is enqueued in mailbox, but should be discarded by new incarnation + ref ! SlowThenThrow(latch, new Exc) + probe.expectNoMsg(interval + 100.millis) + latch.countDown() + probe.expectMsg(GotPreRestart(false)) + probe.expectNoMsg(interval / 2) + probe.expectMsg(Tock(2)) + + ref ! End + } + + def `06 must discard timers from old incarnation after restart, alt 2`(): Unit = { + val probe = TestProbe[Event]("evt") + val behv = Actor.restarter[Exception]().wrap(Actor.withTimers[Command] { timer ⇒ + timer.startPeriodicTimer("T", Tick(1), interval) + target(probe.ref, timer, 1) + }) + + val ref = start(behv) + probe.expectMsg(Tock(1)) + // change state so that we see that the restart starts over again + ref ! Bump + + probe.expectMsg(Tock(2)) + + val latch = new CountDownLatch(1) + // next Tock(2) is enqueued in mailbox, but should be discarded by new incarnation + ref ! SlowThenThrow(latch, new Exc) + probe.expectNoMsg(interval + 100.millis) + latch.countDown() + probe.expectMsg(GotPreRestart(false)) + probe.expectMsg(Tock(1)) + + ref ! End + } + + def `07 must cancel timers when stopped from exception`(): Unit = { + val probe = TestProbe[Event]("evt") + val behv = Actor.withTimers[Command] { timer ⇒ + timer.startPeriodicTimer("T", Tick(1), interval) + target(probe.ref, timer, 1) + } + val ref = start(behv) + ref ! Throw(new Exc) + probe.expectMsg(GotPostStop(false)) + } + + def `08 must cancel timers when stopped voluntarily`(): Unit = { + val probe = TestProbe[Event]("evt") + val timerRef = new AtomicReference[TimerScheduler[Command]] + val behv = Actor.withTimers[Command] { timer ⇒ + timerRef.set(timer) + timer.startPeriodicTimer("T", Tick(1), interval) + target(probe.ref, timer, 1) + } + val ref = start(behv) + ref ! End + // PostStop is not signalled when stopped voluntarily + eventually { + timerRef.get().isTimerActive("T") should ===(false) + } + } + } + + object `A Restarter (real, native)` extends RealTests with NativeSystem + object `A Restarter (real, adapted)` extends RealTests with AdaptedSystem + +} diff --git a/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala index 5bd3860a25..7757711392 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import org.scalatest.exceptions.TestFailedException import akka.typed.scaladsl.AskPattern import scala.util.control.NoStackTrace +import akka.typed.testkit.TestKitSettings /** * Helper class for writing tests for typed Actors with ScalaTest. @@ -46,6 +47,8 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { def this() = this(ConfigFactory.empty) + def this(config: String) = this(ConfigFactory.parseString(config)) + // extension point def setTimeout: Timeout = Timeout(1.minute) @@ -62,11 +65,26 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { sys } - trait NativeSystem { - def system = nativeSystem + trait StartSupport { + def system: ActorSystem[TypedSpec.Command] + + private val nameCounter = Iterator.from(0) + def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}" + + def start[T](behv: Behavior[T]): ActorRef[T] = { + import akka.typed.scaladsl.AskPattern._ + import akka.typed.testkit.scaladsl._ + implicit val testSettings = TestKitSettings(system) + Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated) + } } + + trait NativeSystem { + def system: ActorSystem[TypedSpec.Command] = nativeSystem + } + trait AdaptedSystem { - def system = adaptedSystem + def system: ActorSystem[TypedSpec.Command] = adaptedSystem } implicit val timeout = setTimeout diff --git a/akka-typed/src/main/scala/akka/typed/Behavior.scala b/akka-typed/src/main/scala/akka/typed/Behavior.scala index 10d4bdd6e1..bd1576964c 100644 --- a/akka-typed/src/main/scala/akka/typed/Behavior.scala +++ b/akka-typed/src/main/scala/akka/typed/Behavior.scala @@ -7,6 +7,7 @@ import scala.annotation.tailrec import akka.util.LineNumbers import akka.annotation.{ DoNotInherit, InternalApi } import akka.typed.scaladsl.{ ActorContext ⇒ SAC } +import akka.util.OptionVal /** * The behavior of an actor defines how it reacts to the messages that it @@ -217,6 +218,14 @@ object Behavior { */ def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq UnhandledBehavior + /** + * Returns true if the given behavior is the special `Unhandled` marker. + */ + def isDeferred[T](behavior: Behavior[T]): Boolean = behavior match { + case _: DeferredBehavior[T] ⇒ true + case _ ⇒ false + } + /** * Execute the behavior with the given message */ diff --git a/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala index 2a435e31d9..eceff42b7a 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala @@ -9,6 +9,8 @@ import akka.annotation.InternalApi import akka.typed.{ ActorContext ⇒ AC } import akka.typed.scaladsl.{ ActorContext ⇒ SAC } import akka.typed.scaladsl.Actor +import scala.reflect.ClassTag +import scala.annotation.tailrec /** * INTERNAL API @@ -23,21 +25,40 @@ import akka.typed.scaladsl.Actor def as[U] = ctx.asInstanceOf[AC[U]] } - final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] { - private def postProcess(behv: Behavior[T], ctx: AC[T]): Behavior[U] = - if (isUnhandled(behv)) unhandled - else if (isAlive(behv)) { - val next = canonicalize(behv, behavior, ctx) - if (next eq behavior) same else Widened(next, matcher) - } else stopped + def widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]): Behavior[U] = { + behavior match { + case d: DeferredBehavior[T] ⇒ + DeferredBehavior[U] { ctx ⇒ + val c = ctx.asInstanceOf[akka.typed.ActorContext[T]] + val b = Behavior.validateAsInitial(Behavior.undefer(d, c)) + Widened(b, matcher) + } + case _ ⇒ + Widened(behavior, matcher) + } + } + + private final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] { + @tailrec + private def canonical(b: Behavior[T], ctx: AC[T]): Behavior[U] = { + if (isUnhandled(b)) unhandled + else if ((b eq SameBehavior) || (b eq this)) same + else if (!Behavior.isAlive(b)) Behavior.stopped + else { + b match { + case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx) + case _ ⇒ Widened(b, matcher) + } + } + } override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] = - postProcess(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T]) + canonical(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T]) override def receiveMessage(ctx: AC[U], msg: U): Behavior[U] = matcher.applyOrElse(msg, nullFun) match { case null ⇒ unhandled - case transformed ⇒ postProcess(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T]) + case transformed ⇒ canonical(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T]) } override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})" @@ -54,25 +75,105 @@ import akka.typed.scaladsl.Actor override def toString = s"Immutable(${LineNumbers(onMessage)})" } - final case class Tap[T]( + def tap[T]( onMessage: Function2[SAC[T], T, _], onSignal: Function2[SAC[T], Signal, _], - behavior: Behavior[T]) extends ExtensibleBehavior[T] { + behavior: Behavior[T]): Behavior[T] = { + intercept[T, T]( + beforeMessage = (ctx, msg) ⇒ { + onMessage(ctx, msg) + msg + }, + beforeSignal = (ctx, sig) ⇒ { + onSignal(ctx, sig) + true + }, + afterMessage = (ctx, msg, b) ⇒ b, // TODO optimize by using more ConstantFun + afterSignal = (ctx, sig, b) ⇒ b, + behavior)(ClassTag(classOf[Any])) + } + + /** + * Intercept another `behavior` by invoking `beforeMessage` for + * messages of type `U`. That can be another type than the type of + * the behavior. `beforeMessage` may transform the incoming message, + * or discard it by returning `null`. Note that `beforeMessage` is + * only invoked for messages of type `U`. + * + * Signals can also be intercepted but not transformed. They can + * be discarded by returning `false` from the `beforeOnSignal` function. + * + * The returned behavior from processing messages and signals can also be + * intercepted, e.g. to return another `Behavior`. The passed message to + * `afterMessage` is the message returned from `beforeMessage` (possibly + * different than the incoming message). + */ + def intercept[T, U <: Any: ClassTag]( + beforeMessage: Function2[SAC[U], U, T], + beforeSignal: Function2[SAC[T], Signal, Boolean], + afterMessage: Function3[SAC[T], T, Behavior[T], Behavior[T]], + afterSignal: Function3[SAC[T], Signal, Behavior[T], Behavior[T]], + behavior: Behavior[T], + toStringPrefix: String = "Intercept"): Behavior[T] = { + behavior match { + case d: DeferredBehavior[T] ⇒ + DeferredBehavior[T] { ctx ⇒ + val c = ctx.asInstanceOf[akka.typed.ActorContext[T]] + val b = Behavior.validateAsInitial(Behavior.undefer(d, c)) + Intercept(beforeMessage, beforeSignal, afterMessage, afterSignal, b, toStringPrefix) + } + case _ ⇒ + Intercept(beforeMessage, beforeSignal, afterMessage, afterSignal, behavior, toStringPrefix) + } + } + + private final case class Intercept[T, U <: Any: ClassTag]( + beforeOnMessage: Function2[SAC[U], U, T], + beforeOnSignal: Function2[SAC[T], Signal, Boolean], + afterMessage: Function3[SAC[T], T, Behavior[T], Behavior[T]], + afterSignal: Function3[SAC[T], Signal, Behavior[T], Behavior[T]], + behavior: Behavior[T], + toStringPrefix: String = "Intercept") extends ExtensibleBehavior[T] { + + @tailrec + private def canonical(b: Behavior[T], ctx: ActorContext[T]): Behavior[T] = { + if (isUnhandled(b)) unhandled + else if ((b eq SameBehavior) || (b eq this)) same + else if (!Behavior.isAlive(b)) Behavior.stopped + else { + b match { + case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx) + case _ ⇒ Intercept(beforeOnMessage, beforeOnSignal, afterMessage, afterSignal, b) + } + } + } - private def canonical(behv: Behavior[T]): Behavior[T] = - if (isUnhandled(behv)) unhandled - else if ((behv eq SameBehavior) || (behv eq this)) same - else if (isAlive(behv)) Tap(onMessage, onSignal, behv) - else stopped override def receiveSignal(ctx: AC[T], signal: Signal): Behavior[T] = { - onSignal(ctx.asScala, signal) - canonical(Behavior.interpretSignal(behavior, ctx, signal)) + val next: Behavior[T] = + if (beforeOnSignal(ctx.asScala, signal)) + Behavior.interpretSignal(behavior, ctx, signal) + else + same + canonical(afterSignal(ctx.asScala, signal, next), ctx) } + override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = { - onMessage(ctx.asScala, msg) - canonical(Behavior.interpretMessage(behavior, ctx, msg)) + msg match { + case m: U ⇒ + val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[SAC[U]], m) + val next: Behavior[T] = + if (msg2 == null) + same + else + Behavior.interpretMessage(behavior, ctx, msg2) + canonical(afterMessage(ctx.asScala, msg2, next), ctx) + case _ ⇒ + val next: Behavior[T] = Behavior.interpretMessage(behavior, ctx, msg) + canonical(afterMessage(ctx.asScala, msg, next), ctx) + } } - override def toString = s"Tap(${LineNumbers(onSignal)},${LineNumbers(onMessage)},$behavior)" + + override def toString = s"$toStringPrefix(${LineNumbers(beforeOnMessage)},${LineNumbers(beforeOnSignal)},$behavior)" } } diff --git a/akka-typed/src/main/scala/akka/typed/internal/Restarter.scala b/akka-typed/src/main/scala/akka/typed/internal/Restarter.scala index 14ead33c7f..b2634e8e0e 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/Restarter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/Restarter.scala @@ -34,7 +34,7 @@ import akka.typed.scaladsl.Actor def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] = Actor.deferred[T] { ctx ⇒ val c = ctx.asInstanceOf[akka.typed.ActorContext[T]] - val startedBehavior = initialUndefer(ctx.asInstanceOf[akka.typed.ActorContext[T]], initialBehavior) + val startedBehavior = initialUndefer(c, initialBehavior) strategy match { case Restart(-1, _, loggingEnabled) ⇒ new Restarter(initialBehavior, startedBehavior, loggingEnabled) @@ -220,9 +220,6 @@ import akka.typed.scaladsl.Actor strategy: Backoff, restartCount: Int, blackhole: Boolean) extends Supervisor[Any, Thr] { // TODO using Any here because the scheduled messages can't be of type T. - // Something to consider is that timer messages should typically not be part of the - // ordinary public message protocol and therefore those should perhaps be signals. - // https://github.com/akka/akka/issues/16742 import BackoffRestarter._ diff --git a/akka-typed/src/main/scala/akka/typed/internal/TimerSchedulerImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/TimerSchedulerImpl.scala new file mode 100644 index 0000000000..8b3bf4cf7c --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/internal/TimerSchedulerImpl.scala @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed +package internal + +import scala.concurrent.duration.FiniteDuration + +import akka.actor.Cancellable +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.typed.ActorRef +import akka.typed.ActorRef.ActorRefOps +import akka.typed.javadsl +import akka.typed.scaladsl +import akka.typed.scaladsl.ActorContext +import scala.reflect.ClassTag + +/** + * INTERNAL API + */ +@InternalApi private[akka] object TimerSchedulerImpl { + final case class Timer[T](key: Any, msg: T, repeat: Boolean, generation: Int, task: Cancellable) + final case class TimerMsg(key: Any, generation: Int, owner: AnyRef) + + def withTimers[T](factory: TimerSchedulerImpl[T] ⇒ Behavior[T]): Behavior[T] = { + scaladsl.Actor.deferred[T] { ctx ⇒ + val timerScheduler = new TimerSchedulerImpl[T](ctx) + val behavior = factory(timerScheduler) + timerScheduler.intercept(behavior) + } + } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class TimerSchedulerImpl[T](ctx: ActorContext[T]) + extends scaladsl.TimerScheduler[T] with javadsl.TimerScheduler[T] { + import TimerSchedulerImpl._ + + // FIXME change to a class specific logger, see issue #21219 + private val log = ctx.system.log + private var timers: Map[Any, Timer[T]] = Map.empty + private val timerGen = Iterator from 1 + + override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit = + startTimer(key, msg, interval, repeat = true) + + override def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit = + startTimer(key, msg, timeout, repeat = false) + + private def startTimer(key: Any, msg: T, timeout: FiniteDuration, repeat: Boolean): Unit = { + timers.get(key) match { + case Some(t) ⇒ cancelTimer(t) + case None ⇒ + } + val nextGen = timerGen.next() + + val timerMsg = TimerMsg(key, nextGen, this) + val task = + if (repeat) + ctx.system.scheduler.schedule(timeout, timeout) { + ctx.self.upcast ! timerMsg + }(ExecutionContexts.sameThreadExecutionContext) + else + ctx.system.scheduler.scheduleOnce(timeout) { + ctx.self.upcast ! timerMsg + }(ExecutionContexts.sameThreadExecutionContext) + + val nextTimer = Timer(key, msg, repeat, nextGen, task) + log.debug("Start timer [{}] with generation [{}]", key, nextGen) + timers = timers.updated(key, nextTimer) + } + + override def isTimerActive(key: Any): Boolean = + timers.contains(key) + + override def cancel(key: Any): Unit = { + timers.get(key) match { + case None ⇒ // already removed/canceled + case Some(t) ⇒ cancelTimer(t) + } + } + + private def cancelTimer(timer: Timer[T]): Unit = { + log.debug("Cancel timer [{}] with generation [{}]", timer.key, timer.generation) + timer.task.cancel() + timers -= timer.key + } + + override def cancelAll(): Unit = { + log.debug("Cancel all timers") + timers.valuesIterator.foreach { timer ⇒ + timer.task.cancel() + } + timers = Map.empty + } + + private def interceptTimerMsg(ctx: ActorContext[TimerMsg], timerMsg: TimerMsg): T = { + timers.get(timerMsg.key) match { + case None ⇒ + // it was from canceled timer that was already enqueued in mailbox + log.debug("Received timer [{}] that has been removed, discarding", timerMsg.key) + null.asInstanceOf[T] // message should be ignored + case Some(t) ⇒ + if (timerMsg.owner ne this) { + // after restart, it was from an old instance that was enqueued in mailbox before canceled + log.debug("Received timer [{}] from old restarted instance, discarding", timerMsg.key) + null.asInstanceOf[T] // message should be ignored + } else if (timerMsg.generation == t.generation) { + // valid timer + log.debug("Received timer [{}]", timerMsg.key) + if (!t.repeat) + timers -= t.key + t.msg + } else { + // it was from an old timer that was enqueued in mailbox before canceled + log.debug( + "Received timer [{}] from from old generation [{}], expected generation [{}], discarding", + timerMsg.key, timerMsg.generation, t.generation) + null.asInstanceOf[T] // message should be ignored + } + } + } + + def intercept(behavior: Behavior[T]): Behavior[T] = { + // The scheduled TimerMsg is intercepted to guard against old messages enqueued + // in mailbox before timer was canceled. + // Intercept some signals to cancel timers when when restarting and stopping. + BehaviorImpl.intercept[T, TimerMsg]( + beforeMessage = interceptTimerMsg, + beforeSignal = (ctx, sig) ⇒ { + sig match { + case PreRestart | PostStop ⇒ cancelAll() + case _ ⇒ // unhandled + } + true + }, + afterMessage = (ctx, msg, b) ⇒ { + // PostStop is not signaled when voluntarily stopped + if (!Behavior.isAlive(b)) + cancelAll() + b + }, + afterSignal = (ctx, sig, b) ⇒ b, // TODO optimize by using more ConstantFun + behavior)(ClassTag(classOf[TimerSchedulerImpl.TimerMsg])) + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala b/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala index 4394795d5a..baf095b22f 100644 --- a/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala +++ b/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala @@ -16,6 +16,7 @@ import akka.typed.SupervisorStrategy import scala.reflect.ClassTag import akka.typed.internal.Restarter import akka.japi.pf.PFBuilder +import akka.typed.internal.TimerSchedulerImpl object Actor { @@ -177,7 +178,7 @@ object Actor { onMessage: Procedure2[ActorContext[T], T], onSignal: Procedure2[ActorContext[T], Signal], behavior: Behavior[T]): Behavior[T] = { - BehaviorImpl.Tap( + BehaviorImpl.tap( (ctx, msg) ⇒ onMessage.apply(ctx.asJava, msg), (ctx, sig) ⇒ onSignal.apply(ctx.asJava, sig), behavior) @@ -190,7 +191,7 @@ object Actor { * wrapped in a `monitor` call again. */ def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = { - BehaviorImpl.Tap( + BehaviorImpl.tap( (ctx, msg) ⇒ monitor ! msg, unitFunction, behavior) @@ -239,6 +240,15 @@ object Actor { * @return a behavior of the widened type */ def widened[T, U](behavior: Behavior[T], selector: JFunction[PFBuilder[U, T], PFBuilder[U, T]]): Behavior[U] = - BehaviorImpl.Widened(behavior, selector.apply(new PFBuilder).build()) + BehaviorImpl.widened(behavior, selector.apply(new PFBuilder).build()) + + /** + * Support for scheduled `self` messages in an actor. + * It takes care of the lifecycle of the timers such as cancelling them when the actor + * is restarted or stopped. + * @see [[TimerScheduler]] + */ + def withTimers[T](factory: akka.japi.function.Function[TimerScheduler[T], Behavior[T]]): Behavior[T] = + TimerSchedulerImpl.withTimers(timers ⇒ factory.apply(timers)) } diff --git a/akka-typed/src/main/scala/akka/typed/javadsl/TimerScheduler.scala b/akka-typed/src/main/scala/akka/typed/javadsl/TimerScheduler.scala new file mode 100644 index 0000000000..1412043a29 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/javadsl/TimerScheduler.scala @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.javadsl + +import scala.concurrent.duration.FiniteDuration + +/** + * Support for scheduled `self` messages in an actor. + * It is used with `Actor.withTimers`, which also takes care of the + * lifecycle of the timers such as cancelling them when the actor + * is restarted or stopped. + * + * `TimerScheduler` is not thread-safe, i.e. it must only be used within + * the actor that owns it. + */ +trait TimerScheduler[T] { + + /** + * Start a periodic timer that will send `msg` to the `self` actor at + * a fixed `interval`. + * + * Each timer has a key and if a new timer with same key is started + * the previous is cancelled and it's guaranteed that a message from the + * previous timer is not received, even though it might already be enqueued + * in the mailbox when the new timer is started. + */ + def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit + + /** + * * Start a timer that will send `msg` once to the `self` actor after + * the given `timeout`. + * + * Each timer has a key and if a new timer with same key is started + * the previous is cancelled and it's guaranteed that a message from the + * previous timer is not received, even though it might already be enqueued + * in the mailbox when the new timer is started. + */ + def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit + + /** + * Check if a timer with a given `key` is active. + */ + def isTimerActive(key: Any): Boolean + + /** + * Cancel a timer with a given `key`. + * If canceling a timer that was already canceled, or key never was used to start a timer + * this operation will do nothing. + * + * It is guaranteed that a message from a canceled timer, including its previous incarnation + * for the same key, will not be received by the actor, even though the message might already + * be enqueued in the mailbox when cancel is called. + */ + def cancel(key: Any): Unit + + /** + * Cancel all timers. + */ + def cancelAll(): Unit + +} diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala index e467ff779d..50efc0c41c 100644 --- a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala @@ -9,6 +9,7 @@ import scala.reflect.ClassTag import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit import akka.typed.internal.BehaviorImpl +import akka.typed.internal.TimerSchedulerImpl @ApiMayChange object Actor { @@ -34,7 +35,7 @@ object Actor { * }}} */ def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = - BehaviorImpl.Widened(behavior, matcher) + BehaviorImpl.widened(behavior, matcher) } /** @@ -175,9 +176,9 @@ object Actor { */ def tap[T]( onMessage: Function2[ActorContext[T], T, _], - onSignal: Function2[ActorContext[T], Signal, _], + onSignal: Function2[ActorContext[T], Signal, _], // FIXME use partial function here also? behavior: Behavior[T]): Behavior[T] = - BehaviorImpl.Tap(onMessage, onSignal, behavior) + BehaviorImpl.tap(onMessage, onSignal, behavior) /** * Behavior decorator that copies all received message to the designated @@ -210,6 +211,15 @@ object Actor { def wrap[T](b: Behavior[T]): Behavior[T] = akka.typed.internal.Restarter(Behavior.validateAsInitial(b), strategy)(c) } + /** + * Support for scheduled `self` messages in an actor. + * It takes care of the lifecycle of the timers such as cancelling them when the actor + * is restarted or stopped. + * @see [[TimerScheduler]] + */ + def withTimers[T](factory: TimerScheduler[T] ⇒ Behavior[T]): Behavior[T] = + TimerSchedulerImpl.withTimers(factory) + // TODO // final case class Selective[T](timeout: FiniteDuration, selector: PartialFunction[T, Behavior[T]], onTimeout: () ⇒ Behavior[T]) diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/TimerScheduler.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/TimerScheduler.scala new file mode 100644 index 0000000000..9189d66b59 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/TimerScheduler.scala @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.scaladsl + +import scala.concurrent.duration.FiniteDuration + +/** + * Support for scheduled `self` messages in an actor. + * It is used with `Actor.withTimers`. + * Timers are bound to the lifecycle of the actor that owns it, + * and thus are cancelled automatically when it is restarted or stopped. + * + * `TimerScheduler` is not thread-safe, i.e. it must only be used within + * the actor that owns it. + */ +trait TimerScheduler[T] { + + /** + * Start a periodic timer that will send `msg` to the `self` actor at + * a fixed `interval`. + * + * Each timer has a key and if a new timer with same key is started + * the previous is cancelled and it's guaranteed that a message from the + * previous timer is not received, even though it might already be enqueued + * in the mailbox when the new timer is started. + */ + def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit + + /** + * Start a timer that will send `msg` once to the `self` actor after + * the given `timeout`. + * + * Each timer has a key and if a new timer with same key is started + * the previous is cancelled and it's guaranteed that a message from the + * previous timer is not received, even though it might already be enqueued + * in the mailbox when the new timer is started. + */ + def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit + + /** + * Check if a timer with a given `key` is active. + */ + def isTimerActive(key: Any): Boolean + + /** + * Cancel a timer with a given `key`. + * If canceling a timer that was already canceled, or key never was used to start a timer + * this operation will do nothing. + * + * It is guaranteed that a message from a canceled timer, including its previous incarnation + * for the same key, will not be received by the actor, even though the message might already + * be enqueued in the mailbox when cancel is called. + */ + def cancel(key: Any): Unit + + /** + * Cancel all timers. + */ + def cancelAll(): Unit + +}