support for safe timers, periodic scheduling, #16742

* implemented an Intercept behavior that can handle the
  scheduled TimerMsg, even though it's of a different type
* Intercept is similar to Tap but more powerful, implemented
  Tap with Intercept. Intercept is internal API.
* When wrapping a behavior, e.g. Tap, the outer behavior must also be
  Deferred if the wrapped behavior is Deferred
* PostStop not signaled when stopped voluntarily, intercept messages
  to cancel timers when stopped is returned
This commit is contained in:
Patrik Nordwall 2017-04-28 11:37:38 +02:00
parent c55fec6a43
commit db0e170d32
13 changed files with 744 additions and 48 deletions

View file

@ -8,6 +8,9 @@ import akka.typed.ActorContext;
import static akka.typed.javadsl.Actor.*;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
public class ActorCompile {
interface MyMsg {}
@ -56,6 +59,13 @@ public class ActorCompile {
});
}
{
Behavior<MyMsg> b = Actor.withTimers(timers -> {
timers.startPeriodicTimer("key", new MyMsgB("tick"), Duration.create(1, TimeUnit.SECONDS));
return Actor.ignore();
});
}
static class MyBehavior extends ExtensibleBehavior<MyMsg> {

View file

@ -8,6 +8,7 @@ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.Actor.BehaviorDecorators
import akka.typed.scaladsl.AskPattern._
import akka.typed.testkit.EffectfulActorContext
import akka.typed.testkit.TestKitSettings
@ -62,16 +63,10 @@ class DeferredSpec extends TypedSpec {
}
trait RealTests {
trait RealTests extends StartSupport {
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
val nameCounter = Iterator.from(0)
def nextName(): String = s"a-${nameCounter.next()}"
def start(behv: Behavior[Command]): ActorRef[Command] =
Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated)
def `must create underlying`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.deferred[Command] { _
@ -96,6 +91,51 @@ class DeferredSpec extends TypedSpec {
probe.expectNoMsg(100.millis)
}
def `must create underlying when nested`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.deferred[Command] { _
Actor.deferred[Command] { _
probe.ref ! Started
target(probe.ref)
}
}
start(behv)
probe.expectMsg(Started)
}
def `must undefer underlying when wrapped by widen`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.deferred[Command] { _
probe.ref ! Started
target(probe.ref)
}.widen[Command] {
case m m
}
probe.expectNoMsg(100.millis) // not yet
val ref = start(behv)
// it's supposed to be created immediately (not waiting for first message)
probe.expectMsg(Started)
ref ! Ping
probe.expectMsg(Pong)
}
def `must undefer underlying when wrapped by monitor`(): Unit = {
// monitor is implemented with tap, so this is testing both
val probe = TestProbe[Event]("evt")
val monitorProbe = TestProbe[Command]("monitor")
val behv = Actor.monitor(monitorProbe.ref, Actor.deferred[Command] { _
probe.ref ! Started
target(probe.ref)
})
probe.expectNoMsg(100.millis) // not yet
val ref = start(behv)
// it's supposed to be created immediately (not waiting for first message)
probe.expectMsg(Started)
ref ! Ping
monitorProbe.expectMsg(Ping)
probe.expectMsg(Pong)
}
}
object `A Restarter (stubbed, native)` extends StubbedTests with NativeSystem

View file

@ -223,17 +223,11 @@ class RestarterSpec extends TypedSpec {
}
}
trait RealTests {
trait RealTests extends StartSupport {
import akka.typed.scaladsl.adapter._
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
val nameCounter = Iterator.from(0)
def nextName(): String = s"a-${nameCounter.next()}"
def start(behv: Behavior[Command]): ActorRef[Command] =
Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated)
def `must receive message`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = restarter[Throwable]().wrap(target(probe.ref))

View file

@ -0,0 +1,231 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.AskPattern._
import akka.typed.scaladsl.TimerScheduler
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl._
import org.scalatest.concurrent.Eventually.eventually
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TimerSpec extends TypedSpec("""
#akka.loglevel = DEBUG
""") {
sealed trait Command
case class Tick(n: Int) extends Command
case object Bump extends Command
case class SlowThenBump(latch: CountDownLatch) extends Command
case object End extends Command
case class Throw(e: Throwable) extends Command
case object Cancel extends Command
case class SlowThenThrow(latch: CountDownLatch, e: Throwable) extends Command
sealed trait Event
case class Tock(n: Int) extends Event
case class GotPostStop(timerActive: Boolean) extends Event
case class GotPreRestart(timerActive: Boolean) extends Event
class Exc extends RuntimeException("simulated exc") with NoStackTrace
trait RealTests extends StartSupport {
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
val interval = 1.second
val dilatedInterval = interval.dilated
def target(monitor: ActorRef[Event], timer: TimerScheduler[Command], bumpCount: Int): Behavior[Command] = {
def bump(): Behavior[Command] = {
val nextCount = bumpCount + 1
timer.startPeriodicTimer("T", Tick(nextCount), interval)
target(monitor, timer, nextCount)
}
Actor.immutable[Command] { (ctx, cmd)
cmd match {
case Tick(n)
monitor ! Tock(n)
Actor.same
case Bump
bump()
case SlowThenBump(latch)
latch.await(10, TimeUnit.SECONDS)
bump()
case End
Actor.stopped
case Cancel
timer.cancel("T")
Actor.same
case Throw(e)
throw e
case SlowThenThrow(latch, e)
latch.await(10, TimeUnit.SECONDS)
throw e
}
} onSignal {
case (ctx, PreRestart)
monitor ! GotPreRestart(timer.isTimerActive("T"))
Actor.same
case (ctx, PostStop)
monitor ! GotPostStop(timer.isTimerActive("T"))
Actor.same
}
}
def `01 must schedule non-repeated ticks`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer
timer.startSingleTimer("T", Tick(1), 10.millis)
target(probe.ref, timer, 1)
}
val ref = start(behv)
probe.expectMsg(Tock(1))
probe.expectNoMsg(100.millis)
ref ! End
}
def `02 must schedule repeated ticks`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
target(probe.ref, timer, 1)
}
val ref = start(behv)
probe.within((interval * 4) - 100.millis) {
probe.expectMsg(Tock(1))
probe.expectMsg(Tock(1))
probe.expectMsg(Tock(1))
}
ref ! End
}
def `03 must replace timer`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
target(probe.ref, timer, 1)
}
val ref = start(behv)
probe.expectMsg(Tock(1))
val latch = new CountDownLatch(1)
// next Tock(1) enqueued in mailboxed, but should be discarded because of new timer
ref ! SlowThenBump(latch)
probe.expectNoMsg(interval + 100.millis)
latch.countDown()
probe.expectMsg(Tock(2))
ref ! End
}
def `04 must cancel timer`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
target(probe.ref, timer, 1)
}
val ref = start(behv)
probe.expectMsg(Tock(1))
ref ! Cancel
probe.expectNoMsg(dilatedInterval + 100.millis)
ref ! End
}
def `05 must discard timers from old incarnation after restart, alt 1`(): Unit = {
val probe = TestProbe[Event]("evt")
val startCounter = new AtomicInteger(0)
val behv = Actor.restarter[Exception]().wrap(Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(startCounter.incrementAndGet()), interval)
target(probe.ref, timer, 1)
})
val ref = start(behv)
probe.expectMsg(Tock(1))
val latch = new CountDownLatch(1)
// next Tock(1) is enqueued in mailbox, but should be discarded by new incarnation
ref ! SlowThenThrow(latch, new Exc)
probe.expectNoMsg(interval + 100.millis)
latch.countDown()
probe.expectMsg(GotPreRestart(false))
probe.expectNoMsg(interval / 2)
probe.expectMsg(Tock(2))
ref ! End
}
def `06 must discard timers from old incarnation after restart, alt 2`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.restarter[Exception]().wrap(Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
target(probe.ref, timer, 1)
})
val ref = start(behv)
probe.expectMsg(Tock(1))
// change state so that we see that the restart starts over again
ref ! Bump
probe.expectMsg(Tock(2))
val latch = new CountDownLatch(1)
// next Tock(2) is enqueued in mailbox, but should be discarded by new incarnation
ref ! SlowThenThrow(latch, new Exc)
probe.expectNoMsg(interval + 100.millis)
latch.countDown()
probe.expectMsg(GotPreRestart(false))
probe.expectMsg(Tock(1))
ref ! End
}
def `07 must cancel timers when stopped from exception`(): Unit = {
val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
target(probe.ref, timer, 1)
}
val ref = start(behv)
ref ! Throw(new Exc)
probe.expectMsg(GotPostStop(false))
}
def `08 must cancel timers when stopped voluntarily`(): Unit = {
val probe = TestProbe[Event]("evt")
val timerRef = new AtomicReference[TimerScheduler[Command]]
val behv = Actor.withTimers[Command] { timer
timerRef.set(timer)
timer.startPeriodicTimer("T", Tick(1), interval)
target(probe.ref, timer, 1)
}
val ref = start(behv)
ref ! End
// PostStop is not signalled when stopped voluntarily
eventually {
timerRef.get().isTimerActive("T") should ===(false)
}
}
}
object `A Restarter (real, native)` extends RealTests with NativeSystem
object `A Restarter (real, adapted)` extends RealTests with AdaptedSystem
}

View file

@ -30,6 +30,7 @@ import scala.util.control.NonFatal
import org.scalatest.exceptions.TestFailedException
import akka.typed.scaladsl.AskPattern
import scala.util.control.NoStackTrace
import akka.typed.testkit.TestKitSettings
/**
* Helper class for writing tests for typed Actors with ScalaTest.
@ -46,6 +47,8 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
def this() = this(ConfigFactory.empty)
def this(config: String) = this(ConfigFactory.parseString(config))
// extension point
def setTimeout: Timeout = Timeout(1.minute)
@ -62,11 +65,26 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
sys
}
trait NativeSystem {
def system = nativeSystem
trait StartSupport {
def system: ActorSystem[TypedSpec.Command]
private val nameCounter = Iterator.from(0)
def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}"
def start[T](behv: Behavior[T]): ActorRef[T] = {
import akka.typed.scaladsl.AskPattern._
import akka.typed.testkit.scaladsl._
implicit val testSettings = TestKitSettings(system)
Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated)
}
}
trait NativeSystem {
def system: ActorSystem[TypedSpec.Command] = nativeSystem
}
trait AdaptedSystem {
def system = adaptedSystem
def system: ActorSystem[TypedSpec.Command] = adaptedSystem
}
implicit val timeout = setTimeout

View file

@ -7,6 +7,7 @@ import scala.annotation.tailrec
import akka.util.LineNumbers
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.typed.scaladsl.{ ActorContext SAC }
import akka.util.OptionVal
/**
* The behavior of an actor defines how it reacts to the messages that it
@ -217,6 +218,14 @@ object Behavior {
*/
def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq UnhandledBehavior
/**
* Returns true if the given behavior is the special `Unhandled` marker.
*/
def isDeferred[T](behavior: Behavior[T]): Boolean = behavior match {
case _: DeferredBehavior[T] true
case _ false
}
/**
* Execute the behavior with the given message
*/

View file

@ -9,6 +9,8 @@ import akka.annotation.InternalApi
import akka.typed.{ ActorContext AC }
import akka.typed.scaladsl.{ ActorContext SAC }
import akka.typed.scaladsl.Actor
import scala.reflect.ClassTag
import scala.annotation.tailrec
/**
* INTERNAL API
@ -23,21 +25,40 @@ import akka.typed.scaladsl.Actor
def as[U] = ctx.asInstanceOf[AC[U]]
}
final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] {
private def postProcess(behv: Behavior[T], ctx: AC[T]): Behavior[U] =
if (isUnhandled(behv)) unhandled
else if (isAlive(behv)) {
val next = canonicalize(behv, behavior, ctx)
if (next eq behavior) same else Widened(next, matcher)
} else stopped
def widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]): Behavior[U] = {
behavior match {
case d: DeferredBehavior[T]
DeferredBehavior[U] { ctx
val c = ctx.asInstanceOf[akka.typed.ActorContext[T]]
val b = Behavior.validateAsInitial(Behavior.undefer(d, c))
Widened(b, matcher)
}
case _
Widened(behavior, matcher)
}
}
private final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] {
@tailrec
private def canonical(b: Behavior[T], ctx: AC[T]): Behavior[U] = {
if (isUnhandled(b)) unhandled
else if ((b eq SameBehavior) || (b eq this)) same
else if (!Behavior.isAlive(b)) Behavior.stopped
else {
b match {
case d: DeferredBehavior[T] canonical(Behavior.undefer(d, ctx), ctx)
case _ Widened(b, matcher)
}
}
}
override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] =
postProcess(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T])
canonical(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T])
override def receiveMessage(ctx: AC[U], msg: U): Behavior[U] =
matcher.applyOrElse(msg, nullFun) match {
case null unhandled
case transformed postProcess(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T])
case transformed canonical(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T])
}
override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})"
@ -54,25 +75,105 @@ import akka.typed.scaladsl.Actor
override def toString = s"Immutable(${LineNumbers(onMessage)})"
}
final case class Tap[T](
def tap[T](
onMessage: Function2[SAC[T], T, _],
onSignal: Function2[SAC[T], Signal, _],
behavior: Behavior[T]) extends ExtensibleBehavior[T] {
behavior: Behavior[T]): Behavior[T] = {
intercept[T, T](
beforeMessage = (ctx, msg) {
onMessage(ctx, msg)
msg
},
beforeSignal = (ctx, sig) {
onSignal(ctx, sig)
true
},
afterMessage = (ctx, msg, b) b, // TODO optimize by using more ConstantFun
afterSignal = (ctx, sig, b) b,
behavior)(ClassTag(classOf[Any]))
}
/**
* Intercept another `behavior` by invoking `beforeMessage` for
* messages of type `U`. That can be another type than the type of
* the behavior. `beforeMessage` may transform the incoming message,
* or discard it by returning `null`. Note that `beforeMessage` is
* only invoked for messages of type `U`.
*
* Signals can also be intercepted but not transformed. They can
* be discarded by returning `false` from the `beforeOnSignal` function.
*
* The returned behavior from processing messages and signals can also be
* intercepted, e.g. to return another `Behavior`. The passed message to
* `afterMessage` is the message returned from `beforeMessage` (possibly
* different than the incoming message).
*/
def intercept[T, U <: Any: ClassTag](
beforeMessage: Function2[SAC[U], U, T],
beforeSignal: Function2[SAC[T], Signal, Boolean],
afterMessage: Function3[SAC[T], T, Behavior[T], Behavior[T]],
afterSignal: Function3[SAC[T], Signal, Behavior[T], Behavior[T]],
behavior: Behavior[T],
toStringPrefix: String = "Intercept"): Behavior[T] = {
behavior match {
case d: DeferredBehavior[T]
DeferredBehavior[T] { ctx
val c = ctx.asInstanceOf[akka.typed.ActorContext[T]]
val b = Behavior.validateAsInitial(Behavior.undefer(d, c))
Intercept(beforeMessage, beforeSignal, afterMessage, afterSignal, b, toStringPrefix)
}
case _
Intercept(beforeMessage, beforeSignal, afterMessage, afterSignal, behavior, toStringPrefix)
}
}
private final case class Intercept[T, U <: Any: ClassTag](
beforeOnMessage: Function2[SAC[U], U, T],
beforeOnSignal: Function2[SAC[T], Signal, Boolean],
afterMessage: Function3[SAC[T], T, Behavior[T], Behavior[T]],
afterSignal: Function3[SAC[T], Signal, Behavior[T], Behavior[T]],
behavior: Behavior[T],
toStringPrefix: String = "Intercept") extends ExtensibleBehavior[T] {
@tailrec
private def canonical(b: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
if (isUnhandled(b)) unhandled
else if ((b eq SameBehavior) || (b eq this)) same
else if (!Behavior.isAlive(b)) Behavior.stopped
else {
b match {
case d: DeferredBehavior[T] canonical(Behavior.undefer(d, ctx), ctx)
case _ Intercept(beforeOnMessage, beforeOnSignal, afterMessage, afterSignal, b)
}
}
}
private def canonical(behv: Behavior[T]): Behavior[T] =
if (isUnhandled(behv)) unhandled
else if ((behv eq SameBehavior) || (behv eq this)) same
else if (isAlive(behv)) Tap(onMessage, onSignal, behv)
else stopped
override def receiveSignal(ctx: AC[T], signal: Signal): Behavior[T] = {
onSignal(ctx.asScala, signal)
canonical(Behavior.interpretSignal(behavior, ctx, signal))
val next: Behavior[T] =
if (beforeOnSignal(ctx.asScala, signal))
Behavior.interpretSignal(behavior, ctx, signal)
else
same
canonical(afterSignal(ctx.asScala, signal, next), ctx)
}
override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = {
onMessage(ctx.asScala, msg)
canonical(Behavior.interpretMessage(behavior, ctx, msg))
msg match {
case m: U
val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[SAC[U]], m)
val next: Behavior[T] =
if (msg2 == null)
same
else
Behavior.interpretMessage(behavior, ctx, msg2)
canonical(afterMessage(ctx.asScala, msg2, next), ctx)
case _
val next: Behavior[T] = Behavior.interpretMessage(behavior, ctx, msg)
canonical(afterMessage(ctx.asScala, msg, next), ctx)
}
}
override def toString = s"Tap(${LineNumbers(onSignal)},${LineNumbers(onMessage)},$behavior)"
override def toString = s"$toStringPrefix(${LineNumbers(beforeOnMessage)},${LineNumbers(beforeOnSignal)},$behavior)"
}
}

View file

@ -34,7 +34,7 @@ import akka.typed.scaladsl.Actor
def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] =
Actor.deferred[T] { ctx
val c = ctx.asInstanceOf[akka.typed.ActorContext[T]]
val startedBehavior = initialUndefer(ctx.asInstanceOf[akka.typed.ActorContext[T]], initialBehavior)
val startedBehavior = initialUndefer(c, initialBehavior)
strategy match {
case Restart(-1, _, loggingEnabled)
new Restarter(initialBehavior, startedBehavior, loggingEnabled)
@ -220,9 +220,6 @@ import akka.typed.scaladsl.Actor
strategy: Backoff, restartCount: Int, blackhole: Boolean) extends Supervisor[Any, Thr] {
// TODO using Any here because the scheduled messages can't be of type T.
// Something to consider is that timer messages should typically not be part of the
// ordinary public message protocol and therefore those should perhaps be signals.
// https://github.com/akka/akka/issues/16742
import BackoffRestarter._

View file

@ -0,0 +1,152 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed
package internal
import scala.concurrent.duration.FiniteDuration
import akka.actor.Cancellable
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.typed.ActorRef
import akka.typed.ActorRef.ActorRefOps
import akka.typed.javadsl
import akka.typed.scaladsl
import akka.typed.scaladsl.ActorContext
import scala.reflect.ClassTag
/**
* INTERNAL API
*/
@InternalApi private[akka] object TimerSchedulerImpl {
final case class Timer[T](key: Any, msg: T, repeat: Boolean, generation: Int, task: Cancellable)
final case class TimerMsg(key: Any, generation: Int, owner: AnyRef)
def withTimers[T](factory: TimerSchedulerImpl[T] Behavior[T]): Behavior[T] = {
scaladsl.Actor.deferred[T] { ctx
val timerScheduler = new TimerSchedulerImpl[T](ctx)
val behavior = factory(timerScheduler)
timerScheduler.intercept(behavior)
}
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class TimerSchedulerImpl[T](ctx: ActorContext[T])
extends scaladsl.TimerScheduler[T] with javadsl.TimerScheduler[T] {
import TimerSchedulerImpl._
// FIXME change to a class specific logger, see issue #21219
private val log = ctx.system.log
private var timers: Map[Any, Timer[T]] = Map.empty
private val timerGen = Iterator from 1
override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit =
startTimer(key, msg, interval, repeat = true)
override def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit =
startTimer(key, msg, timeout, repeat = false)
private def startTimer(key: Any, msg: T, timeout: FiniteDuration, repeat: Boolean): Unit = {
timers.get(key) match {
case Some(t) cancelTimer(t)
case None
}
val nextGen = timerGen.next()
val timerMsg = TimerMsg(key, nextGen, this)
val task =
if (repeat)
ctx.system.scheduler.schedule(timeout, timeout) {
ctx.self.upcast ! timerMsg
}(ExecutionContexts.sameThreadExecutionContext)
else
ctx.system.scheduler.scheduleOnce(timeout) {
ctx.self.upcast ! timerMsg
}(ExecutionContexts.sameThreadExecutionContext)
val nextTimer = Timer(key, msg, repeat, nextGen, task)
log.debug("Start timer [{}] with generation [{}]", key, nextGen)
timers = timers.updated(key, nextTimer)
}
override def isTimerActive(key: Any): Boolean =
timers.contains(key)
override def cancel(key: Any): Unit = {
timers.get(key) match {
case None // already removed/canceled
case Some(t) cancelTimer(t)
}
}
private def cancelTimer(timer: Timer[T]): Unit = {
log.debug("Cancel timer [{}] with generation [{}]", timer.key, timer.generation)
timer.task.cancel()
timers -= timer.key
}
override def cancelAll(): Unit = {
log.debug("Cancel all timers")
timers.valuesIterator.foreach { timer
timer.task.cancel()
}
timers = Map.empty
}
private def interceptTimerMsg(ctx: ActorContext[TimerMsg], timerMsg: TimerMsg): T = {
timers.get(timerMsg.key) match {
case None
// it was from canceled timer that was already enqueued in mailbox
log.debug("Received timer [{}] that has been removed, discarding", timerMsg.key)
null.asInstanceOf[T] // message should be ignored
case Some(t)
if (timerMsg.owner ne this) {
// after restart, it was from an old instance that was enqueued in mailbox before canceled
log.debug("Received timer [{}] from old restarted instance, discarding", timerMsg.key)
null.asInstanceOf[T] // message should be ignored
} else if (timerMsg.generation == t.generation) {
// valid timer
log.debug("Received timer [{}]", timerMsg.key)
if (!t.repeat)
timers -= t.key
t.msg
} else {
// it was from an old timer that was enqueued in mailbox before canceled
log.debug(
"Received timer [{}] from from old generation [{}], expected generation [{}], discarding",
timerMsg.key, timerMsg.generation, t.generation)
null.asInstanceOf[T] // message should be ignored
}
}
}
def intercept(behavior: Behavior[T]): Behavior[T] = {
// The scheduled TimerMsg is intercepted to guard against old messages enqueued
// in mailbox before timer was canceled.
// Intercept some signals to cancel timers when when restarting and stopping.
BehaviorImpl.intercept[T, TimerMsg](
beforeMessage = interceptTimerMsg,
beforeSignal = (ctx, sig) {
sig match {
case PreRestart | PostStop cancelAll()
case _ // unhandled
}
true
},
afterMessage = (ctx, msg, b) {
// PostStop is not signaled when voluntarily stopped
if (!Behavior.isAlive(b))
cancelAll()
b
},
afterSignal = (ctx, sig, b) b, // TODO optimize by using more ConstantFun
behavior)(ClassTag(classOf[TimerSchedulerImpl.TimerMsg]))
}
}

View file

@ -16,6 +16,7 @@ import akka.typed.SupervisorStrategy
import scala.reflect.ClassTag
import akka.typed.internal.Restarter
import akka.japi.pf.PFBuilder
import akka.typed.internal.TimerSchedulerImpl
object Actor {
@ -177,7 +178,7 @@ object Actor {
onMessage: Procedure2[ActorContext[T], T],
onSignal: Procedure2[ActorContext[T], Signal],
behavior: Behavior[T]): Behavior[T] = {
BehaviorImpl.Tap(
BehaviorImpl.tap(
(ctx, msg) onMessage.apply(ctx.asJava, msg),
(ctx, sig) onSignal.apply(ctx.asJava, sig),
behavior)
@ -190,7 +191,7 @@ object Actor {
* wrapped in a `monitor` call again.
*/
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = {
BehaviorImpl.Tap(
BehaviorImpl.tap(
(ctx, msg) monitor ! msg,
unitFunction,
behavior)
@ -239,6 +240,15 @@ object Actor {
* @return a behavior of the widened type
*/
def widened[T, U](behavior: Behavior[T], selector: JFunction[PFBuilder[U, T], PFBuilder[U, T]]): Behavior[U] =
BehaviorImpl.Widened(behavior, selector.apply(new PFBuilder).build())
BehaviorImpl.widened(behavior, selector.apply(new PFBuilder).build())
/**
* Support for scheduled `self` messages in an actor.
* It takes care of the lifecycle of the timers such as cancelling them when the actor
* is restarted or stopped.
* @see [[TimerScheduler]]
*/
def withTimers[T](factory: akka.japi.function.Function[TimerScheduler[T], Behavior[T]]): Behavior[T] =
TimerSchedulerImpl.withTimers(timers factory.apply(timers))
}

View file

@ -0,0 +1,62 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.javadsl
import scala.concurrent.duration.FiniteDuration
/**
* Support for scheduled `self` messages in an actor.
* It is used with `Actor.withTimers`, which also takes care of the
* lifecycle of the timers such as cancelling them when the actor
* is restarted or stopped.
*
* `TimerScheduler` is not thread-safe, i.e. it must only be used within
* the actor that owns it.
*/
trait TimerScheduler[T] {
/**
* Start a periodic timer that will send `msg` to the `self` actor at
* a fixed `interval`.
*
* Each timer has a key and if a new timer with same key is started
* the previous is cancelled and it's guaranteed that a message from the
* previous timer is not received, even though it might already be enqueued
* in the mailbox when the new timer is started.
*/
def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit
/**
* * Start a timer that will send `msg` once to the `self` actor after
* the given `timeout`.
*
* Each timer has a key and if a new timer with same key is started
* the previous is cancelled and it's guaranteed that a message from the
* previous timer is not received, even though it might already be enqueued
* in the mailbox when the new timer is started.
*/
def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit
/**
* Check if a timer with a given `key` is active.
*/
def isTimerActive(key: Any): Boolean
/**
* Cancel a timer with a given `key`.
* If canceling a timer that was already canceled, or key never was used to start a timer
* this operation will do nothing.
*
* It is guaranteed that a message from a canceled timer, including its previous incarnation
* for the same key, will not be received by the actor, even though the message might already
* be enqueued in the mailbox when cancel is called.
*/
def cancel(key: Any): Unit
/**
* Cancel all timers.
*/
def cancelAll(): Unit
}

View file

@ -9,6 +9,7 @@ import scala.reflect.ClassTag
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.typed.internal.BehaviorImpl
import akka.typed.internal.TimerSchedulerImpl
@ApiMayChange
object Actor {
@ -34,7 +35,7 @@ object Actor {
* }}}
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
BehaviorImpl.Widened(behavior, matcher)
BehaviorImpl.widened(behavior, matcher)
}
/**
@ -175,9 +176,9 @@ object Actor {
*/
def tap[T](
onMessage: Function2[ActorContext[T], T, _],
onSignal: Function2[ActorContext[T], Signal, _],
onSignal: Function2[ActorContext[T], Signal, _], // FIXME use partial function here also?
behavior: Behavior[T]): Behavior[T] =
BehaviorImpl.Tap(onMessage, onSignal, behavior)
BehaviorImpl.tap(onMessage, onSignal, behavior)
/**
* Behavior decorator that copies all received message to the designated
@ -210,6 +211,15 @@ object Actor {
def wrap[T](b: Behavior[T]): Behavior[T] = akka.typed.internal.Restarter(Behavior.validateAsInitial(b), strategy)(c)
}
/**
* Support for scheduled `self` messages in an actor.
* It takes care of the lifecycle of the timers such as cancelling them when the actor
* is restarted or stopped.
* @see [[TimerScheduler]]
*/
def withTimers[T](factory: TimerScheduler[T] Behavior[T]): Behavior[T] =
TimerSchedulerImpl.withTimers(factory)
// TODO
// final case class Selective[T](timeout: FiniteDuration, selector: PartialFunction[T, Behavior[T]], onTimeout: () Behavior[T])

View file

@ -0,0 +1,62 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.scaladsl
import scala.concurrent.duration.FiniteDuration
/**
* Support for scheduled `self` messages in an actor.
* It is used with `Actor.withTimers`.
* Timers are bound to the lifecycle of the actor that owns it,
* and thus are cancelled automatically when it is restarted or stopped.
*
* `TimerScheduler` is not thread-safe, i.e. it must only be used within
* the actor that owns it.
*/
trait TimerScheduler[T] {
/**
* Start a periodic timer that will send `msg` to the `self` actor at
* a fixed `interval`.
*
* Each timer has a key and if a new timer with same key is started
* the previous is cancelled and it's guaranteed that a message from the
* previous timer is not received, even though it might already be enqueued
* in the mailbox when the new timer is started.
*/
def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit
/**
* Start a timer that will send `msg` once to the `self` actor after
* the given `timeout`.
*
* Each timer has a key and if a new timer with same key is started
* the previous is cancelled and it's guaranteed that a message from the
* previous timer is not received, even though it might already be enqueued
* in the mailbox when the new timer is started.
*/
def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit
/**
* Check if a timer with a given `key` is active.
*/
def isTimerActive(key: Any): Boolean
/**
* Cancel a timer with a given `key`.
* If canceling a timer that was already canceled, or key never was used to start a timer
* this operation will do nothing.
*
* It is guaranteed that a message from a canceled timer, including its previous incarnation
* for the same key, will not be received by the actor, even though the message might already
* be enqueued in the mailbox when cancel is called.
*/
def cancel(key: Any): Unit
/**
* Cancel all timers.
*/
def cancelAll(): Unit
}