From f68f0cd80580253d3ba0f739cd533579a3d90c49 Mon Sep 17 00:00:00 2001 From: eyal farago Date: Mon, 21 Dec 2020 15:05:56 +0200 Subject: [PATCH] Akka 29900 stubbed timer (#29903) * master: abstract over TimerScheduler * master: introduce effects for scheduled timer and cancelled timer. * master: introduce a failing test * master: introduce an effectfull timer scheduler into the effectfull actor ctx. * master: scalafmtall * akka-29900__stubbed_timer: compilation fix. * akka-29900__stubbed_timer: modify stubbed timer scheduler effects and behaviour to closely mimic the actual timer's contract. * akka-29900__stubbed_timer: more tests * akka-29900__stubbed_timer: scalafmtAll * akka-29900__stubbed_timer: fix a deprecation issue and a failed test. * akka-29900__stubbed_timer: scalafmtall * akka-29900__stubbed_timer: remove unused val * akka-29900__stubbed_timer: remove unused import * akka-29900__stubbed_timer: fmt * akka-29900__stubbed_timer: add java API for the new Effect. * akka-29900__stubbed_timer: unused import * akka-29900__stubbed_timer: fmt * akka-29900__stubbed_timer: add explicit return type * akka-29900__stubbed_timer: scalafmtAll * akka-29900__stubbed_timer: resolve mima issues * akka-29900__stubbed_timer: better asJava/asScala support for TimerScheduler * akka-29900__stubbed_timer: avoid invoking a deprecated method. * akka-29900__stubbed_timer: remove unuse import * akka-29900__stubbed_timer: couple more unused imports. * akka-29900__stubbed_timer: remove TimerScheduler.asJava/Scala se these are not needed. sort out mima related failures. * akka-29900__stubbed_timer: unused import + DoNotInherit annotation. * akka-29900__stubbed_timer: modify docs, add the timer related effects. * akka-29900__stubbed_timer: fmt * akka-29900__stubbed_timer: unused import * akka-29900__stubbed_timer: fmt * akka-29900__stubbed_timer: scala 2.13 compilation quircks * akka-29900__stubbed_timer: move the mima exclude file * akka-29900__stubbed_timer: small fixup * akka-29900__stubbed_timer: fmt --- .../29903-actor-testkit-typed-timer-support | 3 + .../akka/actor/testkit/typed/Effect.scala | 27 ++ .../internal/EffectfulActorContext.scala | 54 +++- .../actor/testkit/typed/javadsl/Effects.scala | 9 + .../typed/javadsl/BehaviorTestKitTest.java | 19 +- .../typed/scaladsl/BehaviorTestKitSpec.scala | 261 ++++++++++++------ ...r-testkit-timer-support.backwards.excludes | 13 + .../typed/internal/ActorContextImpl.scala | 8 +- .../typed/internal/TimerSchedulerImpl.scala | 53 ++-- .../typed/internal/adapter/ActorAdapter.scala | 3 +- .../actor/typed/javadsl/TimerScheduler.scala | 6 +- .../actor/typed/scaladsl/TimerScheduler.scala | 5 + .../src/main/paradox/typed/testing-sync.md | 4 +- 13 files changed, 343 insertions(+), 122 deletions(-) create mode 100644 akka-actor-testkit-typed/src/main/mima-filters/2.6.10.backwards.excludes/29903-actor-testkit-typed-timer-support create mode 100644 akka-actor-typed/src/main/mima-filters/2.6.10.backwards.excludes/behavior-testkit-timer-support.backwards.excludes diff --git a/akka-actor-testkit-typed/src/main/mima-filters/2.6.10.backwards.excludes/29903-actor-testkit-typed-timer-support b/akka-actor-testkit-typed/src/main/mima-filters/2.6.10.backwards.excludes/29903-actor-testkit-typed-timer-support new file mode 100644 index 0000000000..1b5b0b8177 --- /dev/null +++ b/akka-actor-testkit-typed/src/main/mima-filters/2.6.10.backwards.excludes/29903-actor-testkit-typed-timer-support @@ -0,0 +1,3 @@ +# changes to package private and internal implementation classes (#29903) + +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.testkit.typed.internal.StubbedActorContext.timer") \ No newline at end of file diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/Effect.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/Effect.scala index 99ccbe49ef..1984306713 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/Effect.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/Effect.scala @@ -203,6 +203,33 @@ object Effect { def duration(): java.time.Duration = delay.asJava } + final case class TimerScheduled[U]( + key: Any, + msg: U, + delay: FiniteDuration, + mode: TimerScheduled.TimerMode, + overriding: Boolean)(val send: () => Unit) + extends Effect { + def duration(): java.time.Duration = delay.asJava + } + + object TimerScheduled { + sealed trait TimerMode + case object FixedRateMode extends TimerMode + case object FixedDelayMode extends TimerMode + case object SingleMode extends TimerMode + + /*Java API*/ + def fixedRateMode = FixedRateMode + def fixedDelayMode = FixedDelayMode + def singleMode = SingleMode + } + + /*Java API*/ + def timerScheduled = TimerScheduled + + final case class TimerCancelled(key: Any) extends Effect + /** * Used to represent an empty list of effects - in other words, the behavior didn't do anything observable */ diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala index dd9d867428..eab585560d 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala @@ -6,15 +6,16 @@ package akka.actor.testkit.typed.internal import java.util.concurrent.ConcurrentLinkedQueue -import scala.concurrent.duration.FiniteDuration -import scala.reflect.ClassTag - -import akka.actor.{ ActorPath, Cancellable } import akka.actor.testkit.typed.Effect import akka.actor.testkit.typed.Effect._ +import akka.actor.typed.internal.TimerSchedulerCrossDslSupport import akka.actor.typed.{ ActorRef, Behavior, Props } +import akka.actor.{ ActorPath, Cancellable } import akka.annotation.InternalApi +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + /** * INTERNAL API */ @@ -83,4 +84,49 @@ import akka.annotation.InternalApi effectQueue.offer(Scheduled(delay, target, message)) super.scheduleOnce(delay, target, message) } + + override def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerCrossDslSupport[T] { + var activeTimers: Map[Any, Effect.TimerScheduled[T]] = Map.empty + + override def startTimerWithFixedDelay(key: Any, msg: T, delay: FiniteDuration): Unit = + startTimer(key, msg, delay, Effect.TimerScheduled.FixedDelayMode) + + override def startTimerAtFixedRate(key: Any, msg: T, interval: FiniteDuration): Unit = + startTimer(key, msg, interval, Effect.TimerScheduled.FixedRateMode) + + override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit = + startTimer(key, msg, interval, Effect.TimerScheduled.FixedRateMode) + + override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit = + startTimer(key, msg, delay, Effect.TimerScheduled.SingleMode) + + override def isTimerActive(key: Any): Boolean = ??? + + override def cancel(key: Any): Unit = if (activeTimers.keySet(key)) { + val effect = Effect.TimerCancelled(key) + effectQueue.offer(effect) + activeTimers -= key + } + + override def cancelAll(): Unit = activeTimers.foreach(cancel) + + private def sendAction(key: Any): () => Unit = () => { + activeTimers.get(key).foreach { + case Effect.TimerScheduled(_, msg, _, mode, _) => + mode match { + case Effect.TimerScheduled.SingleMode => + activeTimers -= key + case _ => + } + self ! msg + } + + } + + def startTimer(key: Any, msg: T, delay: FiniteDuration, mode: Effect.TimerScheduled.TimerMode) = { + val effect = Effect.TimerScheduled(key, msg, delay, mode, activeTimers.keySet(key))(sendAction(key)) + activeTimers += (key -> effect) + effectQueue.offer(effect) + } + } } diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/Effects.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/Effects.scala index acdd799b90..53dd176bf0 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/Effects.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/Effects.scala @@ -93,6 +93,15 @@ object Effects { def scheduled[U](delay: Duration, target: ActorRef[U], message: U): Scheduled[U] = Scheduled(delay.asScala, target, message) + def timerScheduled[U]( + key: Any, + msg: U, + delay: Duration, + mode: TimerScheduled.TimerMode, + overriding: Boolean, + send: akka.japi.function.Effect): TimerScheduled[U] = + TimerScheduled(key, msg, delay.asScala, mode, overriding)(send.apply _) + /** * Used to represent an empty list of effects - in other words, the behavior didn't do anything observable */ diff --git a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/BehaviorTestKitTest.java b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/BehaviorTestKitTest.java index 22f0b16d13..3a828dcfd1 100644 --- a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/BehaviorTestKitTest.java +++ b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/BehaviorTestKitTest.java @@ -16,14 +16,13 @@ import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; import org.slf4j.event.Level; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.IntStream; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class BehaviorTestKitTest extends JUnitSuite { @@ -367,4 +366,18 @@ public class BehaviorTestKitTest extends JUnitSuite { assertEquals(Collections.singletonList(Done.getInstance()), d.getAllReceived()); test.expectEffectClass(Effect.Stopped.class); } + + @Test + public void canUseTimerScheduledInJavaApi() { + // this is a compilation test + Effect.TimerScheduled timerScheduled = + Effects.timerScheduled( + "my key", + "my msg", + Duration.ofSeconds(42), + Effect.timerScheduled().fixedDelayMode(), + false, + () -> {}); + assertNotNull(timerScheduled); + } } diff --git a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/BehaviorTestKitSpec.scala b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/BehaviorTestKitSpec.scala index bc1c4decbf..dc12a0bcad 100644 --- a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/BehaviorTestKitSpec.scala +++ b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/BehaviorTestKitSpec.scala @@ -4,21 +4,21 @@ package akka.actor.testkit.typed.scaladsl -import scala.reflect.ClassTag - +import akka.Done +import akka.actor.Address +import akka.actor.testkit.typed.Effect._ +import akka.actor.testkit.typed.scaladsl.BehaviorTestKitSpec.Parent._ +import akka.actor.testkit.typed.scaladsl.BehaviorTestKitSpec.{ Child, Parent } +import akka.actor.testkit.typed.{ CapturedLogEvent, Effect } +import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorRef, Behavior, Props, Terminated } import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import org.slf4j.event.Level -import akka.Done -import akka.actor.Address -import akka.actor.testkit.typed.{ CapturedLogEvent, Effect } -import akka.actor.testkit.typed.Effect._ -import akka.actor.testkit.typed.scaladsl.BehaviorTestKitSpec.{ Child, Parent } -import akka.actor.testkit.typed.scaladsl.BehaviorTestKitSpec.Parent._ -import akka.actor.typed.{ ActorRef, Behavior, Props, Terminated } -import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } -import akka.actor.typed.scaladsl.Behaviors +import scala.concurrent.duration.{ FiniteDuration, _ } +import scala.reflect.ClassTag object BehaviorTestKitSpec { object Parent { @@ -46,83 +46,98 @@ object BehaviorTestKitSpec { case class KillSession(session: ActorRef[String], replyTo: ActorRef[Done]) extends Command case class Log(what: String) extends Command case class RegisterWithReceptionist(name: String) extends Command + case class ScheduleCommand(key: Any, delay: FiniteDuration, mode: Effect.TimerScheduled.TimerMode, cmd: Command) + extends Command + case class CancelScheduleCommand(key: Any) extends Command - val init: Behavior[Command] = Behaviors - .receive[Command] { (context, message) => - message match { - case SpawnChild => - context.spawn(Child.initial, "child") - Behaviors.same - case SpawnChildren(numberOfChildren) if numberOfChildren > 0 => - 0.until(numberOfChildren).foreach { i => - context.spawn(Child.initial, s"child$i") - } - Behaviors.same - case SpawnChildrenWithProps(numberOfChildren, props) if numberOfChildren > 0 => - 0.until(numberOfChildren).foreach { i => - context.spawn(Child.initial, s"child$i", props) - } - Behaviors.same - case SpawnAnonymous(numberOfChildren) if numberOfChildren > 0 => - 0.until(numberOfChildren).foreach { _ => - context.spawnAnonymous(Child.initial) - } - Behaviors.same - case SpawnAnonymousWithProps(numberOfChildren, props) if numberOfChildren > 0 => - 0.until(numberOfChildren).foreach { _ => - context.spawnAnonymous(Child.initial, props) - } - Behaviors.same - case StopChild(child) => - context.stop(child) - Behaviors.same - case SpawnAdapter => - context.spawnMessageAdapter { (r: Reproduce) => - SpawnAnonymous(r.times) - } - Behaviors.same - case SpawnAdapterWithName(name) => - context.spawnMessageAdapter({ (r: Reproduce) => - SpawnAnonymous(r.times) - }, name) - Behaviors.same - case SpawnAndWatchUnwatch(name) => - val c = context.spawn(Child.initial, name) - context.watch(c) - context.unwatch(c) - Behaviors.same - case m @ SpawnAndWatchWith(name) => - val c = context.spawn(Child.initial, name) - context.watchWith(c, m) - Behaviors.same - case SpawnSession(replyTo, sessionHandler) => - val session = context.spawnAnonymous[String](Behaviors.receiveMessage { message => - sessionHandler ! message + val init: Behavior[Command] = Behaviors.withTimers { timers => + Behaviors + .receive[Command] { (context, message) => + message match { + case SpawnChild => + context.spawn(Child.initial, "child") Behaviors.same - }) - replyTo ! session - Behaviors.same - case KillSession(session, replyTo) => - context.stop(session) - replyTo ! Done - Behaviors.same - case CreateMessageAdapter(messageClass, f, replyTo) => - val adaptor = context.messageAdapter(f)(ClassTag(messageClass)) - replyTo.foreach(_ ! adaptor.unsafeUpcast) - Behaviors.same - case Log(what) => - context.log.info(what) - Behaviors.same - case RegisterWithReceptionist(name: String) => - context.system.receptionist ! Receptionist.Register(ServiceKey[Command](name), context.self) + case SpawnChildren(numberOfChildren) if numberOfChildren > 0 => + 0.until(numberOfChildren).foreach { i => + context.spawn(Child.initial, s"child$i") + } + Behaviors.same + case SpawnChildrenWithProps(numberOfChildren, props) if numberOfChildren > 0 => + 0.until(numberOfChildren).foreach { i => + context.spawn(Child.initial, s"child$i", props) + } + Behaviors.same + case SpawnAnonymous(numberOfChildren) if numberOfChildren > 0 => + 0.until(numberOfChildren).foreach { _ => + context.spawnAnonymous(Child.initial) + } + Behaviors.same + case SpawnAnonymousWithProps(numberOfChildren, props) if numberOfChildren > 0 => + 0.until(numberOfChildren).foreach { _ => + context.spawnAnonymous(Child.initial, props) + } + Behaviors.same + case StopChild(child) => + context.stop(child) + Behaviors.same + case SpawnAdapter => + context.spawnMessageAdapter { (r: Reproduce) => + SpawnAnonymous(r.times) + } + Behaviors.same + case SpawnAdapterWithName(name) => + context.spawnMessageAdapter({ (r: Reproduce) => + SpawnAnonymous(r.times) + }, name) + Behaviors.same + case SpawnAndWatchUnwatch(name) => + val c = context.spawn(Child.initial, name) + context.watch(c) + context.unwatch(c) + Behaviors.same + case m @ SpawnAndWatchWith(name) => + val c = context.spawn(Child.initial, name) + context.watchWith(c, m) + Behaviors.same + case SpawnSession(replyTo, sessionHandler) => + val session = context.spawnAnonymous[String](Behaviors.receiveMessage { message => + sessionHandler ! message + Behaviors.same + }) + replyTo ! session + Behaviors.same + case KillSession(session, replyTo) => + context.stop(session) + replyTo ! Done + Behaviors.same + case CreateMessageAdapter(messageClass, f, replyTo) => + val adaptor = context.messageAdapter(f)(ClassTag(messageClass)) + replyTo.foreach(_ ! adaptor.unsafeUpcast) + Behaviors.same + case Log(what) => + context.log.info(what) + Behaviors.same + case RegisterWithReceptionist(name: String) => + context.system.receptionist ! Receptionist.Register(ServiceKey[Command](name), context.self) + Behaviors.same + case ScheduleCommand(key, delay, mode, cmd) => + mode match { + case Effect.TimerScheduled.SingleMode => timers.startSingleTimer(key, cmd, delay) + case Effect.TimerScheduled.FixedDelayMode => timers.startTimerWithFixedDelay(key, cmd, delay) + case Effect.TimerScheduled.FixedRateMode => timers.startTimerAtFixedRate(key, cmd, delay) + } + Behaviors.same + case CancelScheduleCommand(key) => + timers.cancel(key) + Behaviors.same + } + } + .receiveSignal { + case (context, Terminated(_)) => + context.log.debug("Terminated") Behaviors.same } - } - .receiveSignal { - case (context, Terminated(_)) => - context.log.debug("Terminated") - Behaviors.same - } + } } object Child { @@ -385,4 +400,82 @@ class BehaviorTestKitSpec extends AnyWordSpec with Matchers with LogCapturing { testkit.receptionistInbox().hasMessages should equal(false) } } + + "timer support" must { + "schedule and cancel timers" in { + val testkit = BehaviorTestKit[Parent.Command](Parent.init) + testkit.run(ScheduleCommand("abc", 42.seconds, Effect.TimerScheduled.SingleMode, SpawnChild)) + testkit.expectEffectPF { + case Effect.TimerScheduled( + "abc", + SpawnChild, + finiteDuration, + Effect.TimerScheduled.SingleMode, + false /*not overriding*/ ) => + finiteDuration should equal(42.seconds) + } + testkit.run(CancelScheduleCommand("abc")) + testkit.expectEffectPF { + case Effect.TimerCancelled(key) => + key should equal("abc") + } + } + + "schedule and fire timers" in { + val testkit = BehaviorTestKit[Parent.Command](Parent.init) + testkit.run(ScheduleCommand("abc", 42.seconds, Effect.TimerScheduled.SingleMode, SpawnChild)) + val send = testkit.expectEffectPF { + case e @ Effect.TimerScheduled( + "abc", + SpawnChild, + finiteDuration, + Effect.TimerScheduled.SingleMode, + false /*not overriding*/ ) => + finiteDuration should equal(42.seconds) + e.send + } + send() + testkit.runOne() + testkit.expectEffectPF { + case Effect.Spawned(_, "child", _) => + } + //no effect since the timer's mode was single, hence removed after fired + send() + testkit.selfInbox().hasMessages should be(false) + } + + "schedule and fire timers multiple times" in { + val testkit = BehaviorTestKit[Parent.Command](Parent.init) + testkit.run(ScheduleCommand("abc", 42.seconds, Effect.TimerScheduled.FixedRateMode, SpawnChild)) + val send = testkit.expectEffectPF { + case e @ Effect.TimerScheduled( + "abc", + SpawnChild, + finiteDuration, + Effect.TimerScheduled.FixedRateMode, + false /*not overriding*/ ) => + finiteDuration should equal(42.seconds) + e.send + } + send() + testkit.runOne() + val child: ActorRef[String] = testkit.expectEffectPF { + case spawned @ Effect.Spawned(_, "child", _) => spawned.asInstanceOf[Effect.Spawned[String]].ref + } + + testkit.run(StopChild(child)) + testkit.expectEffect { + Effect.Stopped("child") + } + //when scheduling with fixed rate the timer remains scheduled + send() + testkit.runOne() + testkit.expectEffectPF { + case Effect.Spawned(_, "child", _) => + } + + testkit.run(CancelScheduleCommand("abc")) + testkit.expectEffect(Effect.TimerCancelled("abc")) + } + } } diff --git a/akka-actor-typed/src/main/mima-filters/2.6.10.backwards.excludes/behavior-testkit-timer-support.backwards.excludes b/akka-actor-typed/src/main/mima-filters/2.6.10.backwards.excludes/behavior-testkit-timer-support.backwards.excludes new file mode 100644 index 0000000000..d48bcc53c1 --- /dev/null +++ b/akka-actor-typed/src/main/mima-filters/2.6.10.backwards.excludes/behavior-testkit-timer-support.backwards.excludes @@ -0,0 +1,13 @@ +# changes to package private and internal implementation classes (#29903) +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.TimerSchedulerImpl.wrapWithTimers") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.TimerSchedulerImpl.withTimers") +ProblemFilters.exclude[FinalMethodProblem]("akka.actor.typed.internal.TimerSchedulerImpl.startTimerAtFixedRate") +ProblemFilters.exclude[FinalMethodProblem]("akka.actor.typed.internal.TimerSchedulerImpl.startTimerWithFixedDelay") +ProblemFilters.exclude[FinalMethodProblem]("akka.actor.typed.internal.TimerSchedulerImpl.startPeriodicTimer") +ProblemFilters.exclude[FinalMethodProblem]("akka.actor.typed.internal.TimerSchedulerImpl.startSingleTimer") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.ActorContextImpl.timer") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.internal.ActorContextImpl.akka$actor$typed$internal$ActorContextImpl$$_timer") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.internal.ActorContextImpl.akka$actor$typed$internal$ActorContextImpl$$_timer_=") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.TimerSchedulerImpl.withTimers") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.TimerSchedulerImpl.wrapWithTimers") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.adapter.ActorContextAdapter.timer") diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 8950a299f8..25c2c1685b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -95,7 +95,7 @@ import scala.util.Success private var messageAdapterRef: OptionVal[ActorRef[Any]] = OptionVal.None private var _messageAdapters: List[(Class[_], Any => T)] = Nil - private var _timer: OptionVal[TimerSchedulerImpl[T]] = OptionVal.None + private var _timer: OptionVal[TimerSchedulerCrossDslSupport[T]] = OptionVal.None // _currentActorThread is on purpose not volatile. Used from `checkCurrentActorThread`. // It will always see the right value when accessed from the right thread. @@ -103,15 +103,17 @@ import scala.util.Success private var _currentActorThread: OptionVal[Thread] = OptionVal.None // context-shared timer needed to allow for nested timer usage - def timer: TimerSchedulerImpl[T] = _timer match { + def timer: TimerSchedulerCrossDslSupport[T] = _timer match { case OptionVal.Some(timer) => timer case OptionVal.None => checkCurrentActorThread() - val timer = new TimerSchedulerImpl[T](this) + val timer = mkTimer() _timer = OptionVal.Some(timer) timer } + protected[this] def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerImpl[T](this) + override private[akka] def hasTimer: Boolean = _timer.isDefined override private[akka] def cancelAllTimers(): Unit = { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala index affbf8fa1d..b21f6e57e9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala @@ -7,18 +7,14 @@ package internal import java.time.Duration -import scala.concurrent.duration.FiniteDuration - -import org.slf4j.Logger - -import akka.actor.Cancellable -import akka.actor.NotInfluenceReceiveTimeout -import akka.actor.typed.scaladsl.ActorContext -import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.{ Cancellable, NotInfluenceReceiveTimeout } +import akka.actor.typed.scaladsl.{ ActorContext, LoggerOps } import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts -import akka.util.JavaDurationConverters._ import akka.util.OptionVal +import org.slf4j.Logger + +import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -29,11 +25,11 @@ import akka.util.OptionVal override def toString = s"TimerMsg(key=$key, generation=$generation, owner=$owner)" } - def withTimers[T](factory: TimerSchedulerImpl[T] => Behavior[T]): Behavior[T] = { + def withTimers[T](factory: TimerSchedulerCrossDslSupport[T] => Behavior[T]): Behavior[T] = { scaladsl.Behaviors.setup[T](wrapWithTimers(factory)) } - def wrapWithTimers[T](factory: TimerSchedulerImpl[T] => Behavior[T])(ctx: ActorContext[T]): Behavior[T] = + def wrapWithTimers[T](factory: TimerSchedulerCrossDslSupport[T] => Behavior[T])(ctx: ActorContext[T]): Behavior[T] = ctx match { case ctxImpl: ActorContextImpl[T] => val timerScheduler = ctxImpl.timer @@ -55,12 +51,32 @@ import akka.util.OptionVal } } +@InternalApi private[akka] trait TimerSchedulerCrossDslSupport[T] + extends scaladsl.TimerScheduler[T] + with javadsl.TimerScheduler[T] { + import akka.util.JavaDurationConverters._ + + override final def startTimerWithFixedDelay(key: Any, msg: T, delay: Duration): Unit = + startTimerWithFixedDelay(key, msg, delay.asScala) + + override final def startTimerAtFixedRate(key: Any, msg: T, interval: Duration): Unit = + startTimerAtFixedRate(key, msg, interval.asScala) + + override final def startPeriodicTimer(key: Any, msg: T, interval: Duration): Unit = { + //this follows the deprecation note in the super class + startTimerWithFixedDelay(key, msg, interval.asScala) + } + + override final def startSingleTimer(key: Any, msg: T, delay: Duration): Unit = + startSingleTimer(key, msg, delay.asScala) +} + /** * INTERNAL API */ @InternalApi private[akka] class TimerSchedulerImpl[T](ctx: ActorContext[T]) extends scaladsl.TimerScheduler[T] - with javadsl.TimerScheduler[T] { + with TimerSchedulerCrossDslSupport[T] { import TimerSchedulerImpl._ private var timers: Map[Any, Timer[T]] = Map.empty @@ -69,27 +85,15 @@ import akka.util.OptionVal override def startTimerAtFixedRate(key: Any, msg: T, interval: FiniteDuration): Unit = startTimer(key, msg, interval, FixedRateMode) - override def startTimerAtFixedRate(key: Any, msg: T, interval: Duration): Unit = - startTimerAtFixedRate(key, msg, interval.asScala) - override def startTimerWithFixedDelay(key: Any, msg: T, delay: FiniteDuration): Unit = startTimer(key, msg, delay, FixedDelayMode) - override def startTimerWithFixedDelay(key: Any, msg: T, delay: Duration): Unit = - startTimerWithFixedDelay(key, msg, delay.asScala) - override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit = startTimer(key, msg, interval, FixedRateMode) - override def startPeriodicTimer(key: Any, msg: T, interval: java.time.Duration): Unit = - startPeriodicTimer(key, msg, interval.asScala) - override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit = startTimer(key, msg, delay, SingleMode) - def startSingleTimer(key: Any, msg: T, delay: java.time.Duration): Unit = - startSingleTimer(key, msg, delay.asScala) - private def startTimer(key: Any, msg: T, delay: FiniteDuration, mode: TimerMode): Unit = { timers.get(key) match { case Some(t) => cancelTimer(t) @@ -170,5 +174,4 @@ import akka.util.OptionVal } } } - } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index c57d90b3c2..71a496cec2 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -117,7 +117,8 @@ import akka.util.OptionVal if (c.hasTimer) { msg match { case timerMsg: TimerMsg => - c.timer.interceptTimerMsg(ctx.log, timerMsg) match { + //we can only get this kind of message if the timer is of this concrete class + c.timer.asInstanceOf[TimerSchedulerImpl[T]].interceptTimerMsg(ctx.log, timerMsg) match { case OptionVal.None => // means TimerMsg not applicable, discard case OptionVal.Some(m) => next(Behavior.interpretMessage(behavior, c, m), m) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/TimerScheduler.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/TimerScheduler.scala index 0f823abd3c..8e30a8eab3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/TimerScheduler.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/TimerScheduler.scala @@ -6,6 +6,8 @@ package akka.actor.typed.javadsl import java.time.Duration +import akka.annotation.DoNotInherit + /** * Support for scheduled `self` messages in an actor. * It is used with `Behaviors.withTimers`, which also takes care of the @@ -14,7 +16,10 @@ import java.time.Duration * * `TimerScheduler` is not thread-safe, i.e. it must only be used within * the actor that owns it. + * + * Not for user extension. */ +@DoNotInherit trait TimerScheduler[T] { /** @@ -164,5 +169,4 @@ trait TimerScheduler[T] { * Cancel all timers. */ def cancelAll(): Unit - } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/TimerScheduler.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/TimerScheduler.scala index a0842f2a95..95c42bb188 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/TimerScheduler.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/TimerScheduler.scala @@ -4,6 +4,8 @@ package akka.actor.typed.scaladsl +import akka.annotation.DoNotInherit + import scala.concurrent.duration.FiniteDuration /** @@ -14,7 +16,10 @@ import scala.concurrent.duration.FiniteDuration * * `TimerScheduler` is not thread-safe, i.e. it must only be used within * the actor that owns it. + * + * Not for user extension. */ +@DoNotInherit trait TimerScheduler[T] { /** diff --git a/akka-docs/src/main/paradox/typed/testing-sync.md b/akka-docs/src/main/paradox/typed/testing-sync.md index f2808548d9..16ba52f767 100644 --- a/akka-docs/src/main/paradox/typed/testing-sync.md +++ b/akka-docs/src/main/paradox/typed/testing-sync.md @@ -11,7 +11,7 @@ limitations: * Spawning of @scala[`Future`]@java[`CompletionStage`] or other asynchronous task and you rely on a callback to complete before observing the effect you want to test. -* Usage of scheduler or timers not supported. +* Usage of scheduler is not supported. * `EventSourcedBehavior` can't be tested. * Interactions with other actors must be stubbed. * Blackbox testing style. @@ -112,6 +112,8 @@ The @apidoc[BehaviorTestKit] keeps track other effects you can verify, look at t * WatchedWith * Unwatched * Scheduled + * TimerScheduled + * TimerCancelled ### Checking for Log Messages