Akka 29900 stubbed timer (#29903)
* master: abstract over TimerScheduler * master: introduce effects for scheduled timer and cancelled timer. * master: introduce a failing test * master: introduce an effectfull timer scheduler into the effectfull actor ctx. * master: scalafmtall * akka-29900__stubbed_timer: compilation fix. * akka-29900__stubbed_timer: modify stubbed timer scheduler effects and behaviour to closely mimic the actual timer's contract. * akka-29900__stubbed_timer: more tests * akka-29900__stubbed_timer: scalafmtAll * akka-29900__stubbed_timer: fix a deprecation issue and a failed test. * akka-29900__stubbed_timer: scalafmtall * akka-29900__stubbed_timer: remove unused val * akka-29900__stubbed_timer: remove unused import * akka-29900__stubbed_timer: fmt * akka-29900__stubbed_timer: add java API for the new Effect. * akka-29900__stubbed_timer: unused import * akka-29900__stubbed_timer: fmt * akka-29900__stubbed_timer: add explicit return type * akka-29900__stubbed_timer: scalafmtAll * akka-29900__stubbed_timer: resolve mima issues * akka-29900__stubbed_timer: better asJava/asScala support for TimerScheduler * akka-29900__stubbed_timer: avoid invoking a deprecated method. * akka-29900__stubbed_timer: remove unuse import * akka-29900__stubbed_timer: couple more unused imports. * akka-29900__stubbed_timer: remove TimerScheduler.asJava/Scala se these are not needed. sort out mima related failures. * akka-29900__stubbed_timer: unused import + DoNotInherit annotation. * akka-29900__stubbed_timer: modify docs, add the timer related effects. * akka-29900__stubbed_timer: fmt * akka-29900__stubbed_timer: unused import * akka-29900__stubbed_timer: fmt * akka-29900__stubbed_timer: scala 2.13 compilation quircks * akka-29900__stubbed_timer: move the mima exclude file * akka-29900__stubbed_timer: small fixup * akka-29900__stubbed_timer: fmt
This commit is contained in:
parent
59a818153a
commit
f68f0cd805
13 changed files with 343 additions and 122 deletions
|
|
@ -0,0 +1,3 @@
|
||||||
|
# changes to package private and internal implementation classes (#29903)
|
||||||
|
|
||||||
|
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.testkit.typed.internal.StubbedActorContext.timer")
|
||||||
|
|
@ -203,6 +203,33 @@ object Effect {
|
||||||
def duration(): java.time.Duration = delay.asJava
|
def duration(): java.time.Duration = delay.asJava
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final case class TimerScheduled[U](
|
||||||
|
key: Any,
|
||||||
|
msg: U,
|
||||||
|
delay: FiniteDuration,
|
||||||
|
mode: TimerScheduled.TimerMode,
|
||||||
|
overriding: Boolean)(val send: () => Unit)
|
||||||
|
extends Effect {
|
||||||
|
def duration(): java.time.Duration = delay.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
object TimerScheduled {
|
||||||
|
sealed trait TimerMode
|
||||||
|
case object FixedRateMode extends TimerMode
|
||||||
|
case object FixedDelayMode extends TimerMode
|
||||||
|
case object SingleMode extends TimerMode
|
||||||
|
|
||||||
|
/*Java API*/
|
||||||
|
def fixedRateMode = FixedRateMode
|
||||||
|
def fixedDelayMode = FixedDelayMode
|
||||||
|
def singleMode = SingleMode
|
||||||
|
}
|
||||||
|
|
||||||
|
/*Java API*/
|
||||||
|
def timerScheduled = TimerScheduled
|
||||||
|
|
||||||
|
final case class TimerCancelled(key: Any) extends Effect
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to represent an empty list of effects - in other words, the behavior didn't do anything observable
|
* Used to represent an empty list of effects - in other words, the behavior didn't do anything observable
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -6,15 +6,16 @@ package akka.actor.testkit.typed.internal
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
|
|
||||||
import scala.concurrent.duration.FiniteDuration
|
|
||||||
import scala.reflect.ClassTag
|
|
||||||
|
|
||||||
import akka.actor.{ ActorPath, Cancellable }
|
|
||||||
import akka.actor.testkit.typed.Effect
|
import akka.actor.testkit.typed.Effect
|
||||||
import akka.actor.testkit.typed.Effect._
|
import akka.actor.testkit.typed.Effect._
|
||||||
|
import akka.actor.typed.internal.TimerSchedulerCrossDslSupport
|
||||||
import akka.actor.typed.{ ActorRef, Behavior, Props }
|
import akka.actor.typed.{ ActorRef, Behavior, Props }
|
||||||
|
import akka.actor.{ ActorPath, Cancellable }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
|
|
@ -83,4 +84,49 @@ import akka.annotation.InternalApi
|
||||||
effectQueue.offer(Scheduled(delay, target, message))
|
effectQueue.offer(Scheduled(delay, target, message))
|
||||||
super.scheduleOnce(delay, target, message)
|
super.scheduleOnce(delay, target, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerCrossDslSupport[T] {
|
||||||
|
var activeTimers: Map[Any, Effect.TimerScheduled[T]] = Map.empty
|
||||||
|
|
||||||
|
override def startTimerWithFixedDelay(key: Any, msg: T, delay: FiniteDuration): Unit =
|
||||||
|
startTimer(key, msg, delay, Effect.TimerScheduled.FixedDelayMode)
|
||||||
|
|
||||||
|
override def startTimerAtFixedRate(key: Any, msg: T, interval: FiniteDuration): Unit =
|
||||||
|
startTimer(key, msg, interval, Effect.TimerScheduled.FixedRateMode)
|
||||||
|
|
||||||
|
override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit =
|
||||||
|
startTimer(key, msg, interval, Effect.TimerScheduled.FixedRateMode)
|
||||||
|
|
||||||
|
override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit =
|
||||||
|
startTimer(key, msg, delay, Effect.TimerScheduled.SingleMode)
|
||||||
|
|
||||||
|
override def isTimerActive(key: Any): Boolean = ???
|
||||||
|
|
||||||
|
override def cancel(key: Any): Unit = if (activeTimers.keySet(key)) {
|
||||||
|
val effect = Effect.TimerCancelled(key)
|
||||||
|
effectQueue.offer(effect)
|
||||||
|
activeTimers -= key
|
||||||
|
}
|
||||||
|
|
||||||
|
override def cancelAll(): Unit = activeTimers.foreach(cancel)
|
||||||
|
|
||||||
|
private def sendAction(key: Any): () => Unit = () => {
|
||||||
|
activeTimers.get(key).foreach {
|
||||||
|
case Effect.TimerScheduled(_, msg, _, mode, _) =>
|
||||||
|
mode match {
|
||||||
|
case Effect.TimerScheduled.SingleMode =>
|
||||||
|
activeTimers -= key
|
||||||
|
case _ =>
|
||||||
|
}
|
||||||
|
self ! msg
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def startTimer(key: Any, msg: T, delay: FiniteDuration, mode: Effect.TimerScheduled.TimerMode) = {
|
||||||
|
val effect = Effect.TimerScheduled(key, msg, delay, mode, activeTimers.keySet(key))(sendAction(key))
|
||||||
|
activeTimers += (key -> effect)
|
||||||
|
effectQueue.offer(effect)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -93,6 +93,15 @@ object Effects {
|
||||||
def scheduled[U](delay: Duration, target: ActorRef[U], message: U): Scheduled[U] =
|
def scheduled[U](delay: Duration, target: ActorRef[U], message: U): Scheduled[U] =
|
||||||
Scheduled(delay.asScala, target, message)
|
Scheduled(delay.asScala, target, message)
|
||||||
|
|
||||||
|
def timerScheduled[U](
|
||||||
|
key: Any,
|
||||||
|
msg: U,
|
||||||
|
delay: Duration,
|
||||||
|
mode: TimerScheduled.TimerMode,
|
||||||
|
overriding: Boolean,
|
||||||
|
send: akka.japi.function.Effect): TimerScheduled[U] =
|
||||||
|
TimerScheduled(key, msg, delay.asScala, mode, overriding)(send.apply _)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to represent an empty list of effects - in other words, the behavior didn't do anything observable
|
* Used to represent an empty list of effects - in other words, the behavior didn't do anything observable
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -16,14 +16,13 @@ import org.junit.Test;
|
||||||
import org.scalatestplus.junit.JUnitSuite;
|
import org.scalatestplus.junit.JUnitSuite;
|
||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
public class BehaviorTestKitTest extends JUnitSuite {
|
public class BehaviorTestKitTest extends JUnitSuite {
|
||||||
|
|
||||||
|
|
@ -367,4 +366,18 @@ public class BehaviorTestKitTest extends JUnitSuite {
|
||||||
assertEquals(Collections.singletonList(Done.getInstance()), d.getAllReceived());
|
assertEquals(Collections.singletonList(Done.getInstance()), d.getAllReceived());
|
||||||
test.expectEffectClass(Effect.Stopped.class);
|
test.expectEffectClass(Effect.Stopped.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void canUseTimerScheduledInJavaApi() {
|
||||||
|
// this is a compilation test
|
||||||
|
Effect.TimerScheduled<String> timerScheduled =
|
||||||
|
Effects.timerScheduled(
|
||||||
|
"my key",
|
||||||
|
"my msg",
|
||||||
|
Duration.ofSeconds(42),
|
||||||
|
Effect.timerScheduled().fixedDelayMode(),
|
||||||
|
false,
|
||||||
|
() -> {});
|
||||||
|
assertNotNull(timerScheduled);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,21 +4,21 @@
|
||||||
|
|
||||||
package akka.actor.testkit.typed.scaladsl
|
package akka.actor.testkit.typed.scaladsl
|
||||||
|
|
||||||
import scala.reflect.ClassTag
|
import akka.Done
|
||||||
|
import akka.actor.Address
|
||||||
|
import akka.actor.testkit.typed.Effect._
|
||||||
|
import akka.actor.testkit.typed.scaladsl.BehaviorTestKitSpec.Parent._
|
||||||
|
import akka.actor.testkit.typed.scaladsl.BehaviorTestKitSpec.{ Child, Parent }
|
||||||
|
import akka.actor.testkit.typed.{ CapturedLogEvent, Effect }
|
||||||
|
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
|
||||||
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
|
import akka.actor.typed.{ ActorRef, Behavior, Props, Terminated }
|
||||||
import org.scalatest.matchers.should.Matchers
|
import org.scalatest.matchers.should.Matchers
|
||||||
import org.scalatest.wordspec.AnyWordSpec
|
import org.scalatest.wordspec.AnyWordSpec
|
||||||
import org.slf4j.event.Level
|
import org.slf4j.event.Level
|
||||||
|
|
||||||
import akka.Done
|
import scala.concurrent.duration.{ FiniteDuration, _ }
|
||||||
import akka.actor.Address
|
import scala.reflect.ClassTag
|
||||||
import akka.actor.testkit.typed.{ CapturedLogEvent, Effect }
|
|
||||||
import akka.actor.testkit.typed.Effect._
|
|
||||||
import akka.actor.testkit.typed.scaladsl.BehaviorTestKitSpec.{ Child, Parent }
|
|
||||||
import akka.actor.testkit.typed.scaladsl.BehaviorTestKitSpec.Parent._
|
|
||||||
import akka.actor.typed.{ ActorRef, Behavior, Props, Terminated }
|
|
||||||
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
|
|
||||||
import akka.actor.typed.scaladsl.Behaviors
|
|
||||||
|
|
||||||
object BehaviorTestKitSpec {
|
object BehaviorTestKitSpec {
|
||||||
object Parent {
|
object Parent {
|
||||||
|
|
@ -46,8 +46,12 @@ object BehaviorTestKitSpec {
|
||||||
case class KillSession(session: ActorRef[String], replyTo: ActorRef[Done]) extends Command
|
case class KillSession(session: ActorRef[String], replyTo: ActorRef[Done]) extends Command
|
||||||
case class Log(what: String) extends Command
|
case class Log(what: String) extends Command
|
||||||
case class RegisterWithReceptionist(name: String) extends Command
|
case class RegisterWithReceptionist(name: String) extends Command
|
||||||
|
case class ScheduleCommand(key: Any, delay: FiniteDuration, mode: Effect.TimerScheduled.TimerMode, cmd: Command)
|
||||||
|
extends Command
|
||||||
|
case class CancelScheduleCommand(key: Any) extends Command
|
||||||
|
|
||||||
val init: Behavior[Command] = Behaviors
|
val init: Behavior[Command] = Behaviors.withTimers { timers =>
|
||||||
|
Behaviors
|
||||||
.receive[Command] { (context, message) =>
|
.receive[Command] { (context, message) =>
|
||||||
message match {
|
message match {
|
||||||
case SpawnChild =>
|
case SpawnChild =>
|
||||||
|
|
@ -116,6 +120,16 @@ object BehaviorTestKitSpec {
|
||||||
case RegisterWithReceptionist(name: String) =>
|
case RegisterWithReceptionist(name: String) =>
|
||||||
context.system.receptionist ! Receptionist.Register(ServiceKey[Command](name), context.self)
|
context.system.receptionist ! Receptionist.Register(ServiceKey[Command](name), context.self)
|
||||||
Behaviors.same
|
Behaviors.same
|
||||||
|
case ScheduleCommand(key, delay, mode, cmd) =>
|
||||||
|
mode match {
|
||||||
|
case Effect.TimerScheduled.SingleMode => timers.startSingleTimer(key, cmd, delay)
|
||||||
|
case Effect.TimerScheduled.FixedDelayMode => timers.startTimerWithFixedDelay(key, cmd, delay)
|
||||||
|
case Effect.TimerScheduled.FixedRateMode => timers.startTimerAtFixedRate(key, cmd, delay)
|
||||||
|
}
|
||||||
|
Behaviors.same
|
||||||
|
case CancelScheduleCommand(key) =>
|
||||||
|
timers.cancel(key)
|
||||||
|
Behaviors.same
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.receiveSignal {
|
.receiveSignal {
|
||||||
|
|
@ -124,6 +138,7 @@ object BehaviorTestKitSpec {
|
||||||
Behaviors.same
|
Behaviors.same
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
object Child {
|
object Child {
|
||||||
|
|
||||||
|
|
@ -385,4 +400,82 @@ class BehaviorTestKitSpec extends AnyWordSpec with Matchers with LogCapturing {
|
||||||
testkit.receptionistInbox().hasMessages should equal(false)
|
testkit.receptionistInbox().hasMessages should equal(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"timer support" must {
|
||||||
|
"schedule and cancel timers" in {
|
||||||
|
val testkit = BehaviorTestKit[Parent.Command](Parent.init)
|
||||||
|
testkit.run(ScheduleCommand("abc", 42.seconds, Effect.TimerScheduled.SingleMode, SpawnChild))
|
||||||
|
testkit.expectEffectPF {
|
||||||
|
case Effect.TimerScheduled(
|
||||||
|
"abc",
|
||||||
|
SpawnChild,
|
||||||
|
finiteDuration,
|
||||||
|
Effect.TimerScheduled.SingleMode,
|
||||||
|
false /*not overriding*/ ) =>
|
||||||
|
finiteDuration should equal(42.seconds)
|
||||||
|
}
|
||||||
|
testkit.run(CancelScheduleCommand("abc"))
|
||||||
|
testkit.expectEffectPF {
|
||||||
|
case Effect.TimerCancelled(key) =>
|
||||||
|
key should equal("abc")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"schedule and fire timers" in {
|
||||||
|
val testkit = BehaviorTestKit[Parent.Command](Parent.init)
|
||||||
|
testkit.run(ScheduleCommand("abc", 42.seconds, Effect.TimerScheduled.SingleMode, SpawnChild))
|
||||||
|
val send = testkit.expectEffectPF {
|
||||||
|
case e @ Effect.TimerScheduled(
|
||||||
|
"abc",
|
||||||
|
SpawnChild,
|
||||||
|
finiteDuration,
|
||||||
|
Effect.TimerScheduled.SingleMode,
|
||||||
|
false /*not overriding*/ ) =>
|
||||||
|
finiteDuration should equal(42.seconds)
|
||||||
|
e.send
|
||||||
|
}
|
||||||
|
send()
|
||||||
|
testkit.runOne()
|
||||||
|
testkit.expectEffectPF {
|
||||||
|
case Effect.Spawned(_, "child", _) =>
|
||||||
|
}
|
||||||
|
//no effect since the timer's mode was single, hence removed after fired
|
||||||
|
send()
|
||||||
|
testkit.selfInbox().hasMessages should be(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
"schedule and fire timers multiple times" in {
|
||||||
|
val testkit = BehaviorTestKit[Parent.Command](Parent.init)
|
||||||
|
testkit.run(ScheduleCommand("abc", 42.seconds, Effect.TimerScheduled.FixedRateMode, SpawnChild))
|
||||||
|
val send = testkit.expectEffectPF {
|
||||||
|
case e @ Effect.TimerScheduled(
|
||||||
|
"abc",
|
||||||
|
SpawnChild,
|
||||||
|
finiteDuration,
|
||||||
|
Effect.TimerScheduled.FixedRateMode,
|
||||||
|
false /*not overriding*/ ) =>
|
||||||
|
finiteDuration should equal(42.seconds)
|
||||||
|
e.send
|
||||||
|
}
|
||||||
|
send()
|
||||||
|
testkit.runOne()
|
||||||
|
val child: ActorRef[String] = testkit.expectEffectPF {
|
||||||
|
case spawned @ Effect.Spawned(_, "child", _) => spawned.asInstanceOf[Effect.Spawned[String]].ref
|
||||||
|
}
|
||||||
|
|
||||||
|
testkit.run(StopChild(child))
|
||||||
|
testkit.expectEffect {
|
||||||
|
Effect.Stopped("child")
|
||||||
|
}
|
||||||
|
//when scheduling with fixed rate the timer remains scheduled
|
||||||
|
send()
|
||||||
|
testkit.runOne()
|
||||||
|
testkit.expectEffectPF {
|
||||||
|
case Effect.Spawned(_, "child", _) =>
|
||||||
|
}
|
||||||
|
|
||||||
|
testkit.run(CancelScheduleCommand("abc"))
|
||||||
|
testkit.expectEffect(Effect.TimerCancelled("abc"))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
# changes to package private and internal implementation classes (#29903)
|
||||||
|
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.TimerSchedulerImpl.wrapWithTimers")
|
||||||
|
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.TimerSchedulerImpl.withTimers")
|
||||||
|
ProblemFilters.exclude[FinalMethodProblem]("akka.actor.typed.internal.TimerSchedulerImpl.startTimerAtFixedRate")
|
||||||
|
ProblemFilters.exclude[FinalMethodProblem]("akka.actor.typed.internal.TimerSchedulerImpl.startTimerWithFixedDelay")
|
||||||
|
ProblemFilters.exclude[FinalMethodProblem]("akka.actor.typed.internal.TimerSchedulerImpl.startPeriodicTimer")
|
||||||
|
ProblemFilters.exclude[FinalMethodProblem]("akka.actor.typed.internal.TimerSchedulerImpl.startSingleTimer")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.ActorContextImpl.timer")
|
||||||
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.internal.ActorContextImpl.akka$actor$typed$internal$ActorContextImpl$$_timer")
|
||||||
|
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.internal.ActorContextImpl.akka$actor$typed$internal$ActorContextImpl$$_timer_=")
|
||||||
|
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.TimerSchedulerImpl.withTimers")
|
||||||
|
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.TimerSchedulerImpl.wrapWithTimers")
|
||||||
|
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.adapter.ActorContextAdapter.timer")
|
||||||
|
|
@ -95,7 +95,7 @@ import scala.util.Success
|
||||||
|
|
||||||
private var messageAdapterRef: OptionVal[ActorRef[Any]] = OptionVal.None
|
private var messageAdapterRef: OptionVal[ActorRef[Any]] = OptionVal.None
|
||||||
private var _messageAdapters: List[(Class[_], Any => T)] = Nil
|
private var _messageAdapters: List[(Class[_], Any => T)] = Nil
|
||||||
private var _timer: OptionVal[TimerSchedulerImpl[T]] = OptionVal.None
|
private var _timer: OptionVal[TimerSchedulerCrossDslSupport[T]] = OptionVal.None
|
||||||
|
|
||||||
// _currentActorThread is on purpose not volatile. Used from `checkCurrentActorThread`.
|
// _currentActorThread is on purpose not volatile. Used from `checkCurrentActorThread`.
|
||||||
// It will always see the right value when accessed from the right thread.
|
// It will always see the right value when accessed from the right thread.
|
||||||
|
|
@ -103,15 +103,17 @@ import scala.util.Success
|
||||||
private var _currentActorThread: OptionVal[Thread] = OptionVal.None
|
private var _currentActorThread: OptionVal[Thread] = OptionVal.None
|
||||||
|
|
||||||
// context-shared timer needed to allow for nested timer usage
|
// context-shared timer needed to allow for nested timer usage
|
||||||
def timer: TimerSchedulerImpl[T] = _timer match {
|
def timer: TimerSchedulerCrossDslSupport[T] = _timer match {
|
||||||
case OptionVal.Some(timer) => timer
|
case OptionVal.Some(timer) => timer
|
||||||
case OptionVal.None =>
|
case OptionVal.None =>
|
||||||
checkCurrentActorThread()
|
checkCurrentActorThread()
|
||||||
val timer = new TimerSchedulerImpl[T](this)
|
val timer = mkTimer()
|
||||||
_timer = OptionVal.Some(timer)
|
_timer = OptionVal.Some(timer)
|
||||||
timer
|
timer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected[this] def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerImpl[T](this)
|
||||||
|
|
||||||
override private[akka] def hasTimer: Boolean = _timer.isDefined
|
override private[akka] def hasTimer: Boolean = _timer.isDefined
|
||||||
|
|
||||||
override private[akka] def cancelAllTimers(): Unit = {
|
override private[akka] def cancelAllTimers(): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -7,18 +7,14 @@ package internal
|
||||||
|
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import akka.actor.{ Cancellable, NotInfluenceReceiveTimeout }
|
||||||
|
import akka.actor.typed.scaladsl.{ ActorContext, LoggerOps }
|
||||||
import org.slf4j.Logger
|
|
||||||
|
|
||||||
import akka.actor.Cancellable
|
|
||||||
import akka.actor.NotInfluenceReceiveTimeout
|
|
||||||
import akka.actor.typed.scaladsl.ActorContext
|
|
||||||
import akka.actor.typed.scaladsl.LoggerOps
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.dispatch.ExecutionContexts
|
import akka.dispatch.ExecutionContexts
|
||||||
import akka.util.JavaDurationConverters._
|
|
||||||
import akka.util.OptionVal
|
import akka.util.OptionVal
|
||||||
|
import org.slf4j.Logger
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -29,11 +25,11 @@ import akka.util.OptionVal
|
||||||
override def toString = s"TimerMsg(key=$key, generation=$generation, owner=$owner)"
|
override def toString = s"TimerMsg(key=$key, generation=$generation, owner=$owner)"
|
||||||
}
|
}
|
||||||
|
|
||||||
def withTimers[T](factory: TimerSchedulerImpl[T] => Behavior[T]): Behavior[T] = {
|
def withTimers[T](factory: TimerSchedulerCrossDslSupport[T] => Behavior[T]): Behavior[T] = {
|
||||||
scaladsl.Behaviors.setup[T](wrapWithTimers(factory))
|
scaladsl.Behaviors.setup[T](wrapWithTimers(factory))
|
||||||
}
|
}
|
||||||
|
|
||||||
def wrapWithTimers[T](factory: TimerSchedulerImpl[T] => Behavior[T])(ctx: ActorContext[T]): Behavior[T] =
|
def wrapWithTimers[T](factory: TimerSchedulerCrossDslSupport[T] => Behavior[T])(ctx: ActorContext[T]): Behavior[T] =
|
||||||
ctx match {
|
ctx match {
|
||||||
case ctxImpl: ActorContextImpl[T] =>
|
case ctxImpl: ActorContextImpl[T] =>
|
||||||
val timerScheduler = ctxImpl.timer
|
val timerScheduler = ctxImpl.timer
|
||||||
|
|
@ -55,12 +51,32 @@ import akka.util.OptionVal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@InternalApi private[akka] trait TimerSchedulerCrossDslSupport[T]
|
||||||
|
extends scaladsl.TimerScheduler[T]
|
||||||
|
with javadsl.TimerScheduler[T] {
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
|
||||||
|
override final def startTimerWithFixedDelay(key: Any, msg: T, delay: Duration): Unit =
|
||||||
|
startTimerWithFixedDelay(key, msg, delay.asScala)
|
||||||
|
|
||||||
|
override final def startTimerAtFixedRate(key: Any, msg: T, interval: Duration): Unit =
|
||||||
|
startTimerAtFixedRate(key, msg, interval.asScala)
|
||||||
|
|
||||||
|
override final def startPeriodicTimer(key: Any, msg: T, interval: Duration): Unit = {
|
||||||
|
//this follows the deprecation note in the super class
|
||||||
|
startTimerWithFixedDelay(key, msg, interval.asScala)
|
||||||
|
}
|
||||||
|
|
||||||
|
override final def startSingleTimer(key: Any, msg: T, delay: Duration): Unit =
|
||||||
|
startSingleTimer(key, msg, delay.asScala)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] class TimerSchedulerImpl[T](ctx: ActorContext[T])
|
@InternalApi private[akka] class TimerSchedulerImpl[T](ctx: ActorContext[T])
|
||||||
extends scaladsl.TimerScheduler[T]
|
extends scaladsl.TimerScheduler[T]
|
||||||
with javadsl.TimerScheduler[T] {
|
with TimerSchedulerCrossDslSupport[T] {
|
||||||
import TimerSchedulerImpl._
|
import TimerSchedulerImpl._
|
||||||
|
|
||||||
private var timers: Map[Any, Timer[T]] = Map.empty
|
private var timers: Map[Any, Timer[T]] = Map.empty
|
||||||
|
|
@ -69,27 +85,15 @@ import akka.util.OptionVal
|
||||||
override def startTimerAtFixedRate(key: Any, msg: T, interval: FiniteDuration): Unit =
|
override def startTimerAtFixedRate(key: Any, msg: T, interval: FiniteDuration): Unit =
|
||||||
startTimer(key, msg, interval, FixedRateMode)
|
startTimer(key, msg, interval, FixedRateMode)
|
||||||
|
|
||||||
override def startTimerAtFixedRate(key: Any, msg: T, interval: Duration): Unit =
|
|
||||||
startTimerAtFixedRate(key, msg, interval.asScala)
|
|
||||||
|
|
||||||
override def startTimerWithFixedDelay(key: Any, msg: T, delay: FiniteDuration): Unit =
|
override def startTimerWithFixedDelay(key: Any, msg: T, delay: FiniteDuration): Unit =
|
||||||
startTimer(key, msg, delay, FixedDelayMode)
|
startTimer(key, msg, delay, FixedDelayMode)
|
||||||
|
|
||||||
override def startTimerWithFixedDelay(key: Any, msg: T, delay: Duration): Unit =
|
|
||||||
startTimerWithFixedDelay(key, msg, delay.asScala)
|
|
||||||
|
|
||||||
override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit =
|
override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit =
|
||||||
startTimer(key, msg, interval, FixedRateMode)
|
startTimer(key, msg, interval, FixedRateMode)
|
||||||
|
|
||||||
override def startPeriodicTimer(key: Any, msg: T, interval: java.time.Duration): Unit =
|
|
||||||
startPeriodicTimer(key, msg, interval.asScala)
|
|
||||||
|
|
||||||
override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit =
|
override def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit =
|
||||||
startTimer(key, msg, delay, SingleMode)
|
startTimer(key, msg, delay, SingleMode)
|
||||||
|
|
||||||
def startSingleTimer(key: Any, msg: T, delay: java.time.Duration): Unit =
|
|
||||||
startSingleTimer(key, msg, delay.asScala)
|
|
||||||
|
|
||||||
private def startTimer(key: Any, msg: T, delay: FiniteDuration, mode: TimerMode): Unit = {
|
private def startTimer(key: Any, msg: T, delay: FiniteDuration, mode: TimerMode): Unit = {
|
||||||
timers.get(key) match {
|
timers.get(key) match {
|
||||||
case Some(t) => cancelTimer(t)
|
case Some(t) => cancelTimer(t)
|
||||||
|
|
@ -170,5 +174,4 @@ import akka.util.OptionVal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -117,7 +117,8 @@ import akka.util.OptionVal
|
||||||
if (c.hasTimer) {
|
if (c.hasTimer) {
|
||||||
msg match {
|
msg match {
|
||||||
case timerMsg: TimerMsg =>
|
case timerMsg: TimerMsg =>
|
||||||
c.timer.interceptTimerMsg(ctx.log, timerMsg) match {
|
//we can only get this kind of message if the timer is of this concrete class
|
||||||
|
c.timer.asInstanceOf[TimerSchedulerImpl[T]].interceptTimerMsg(ctx.log, timerMsg) match {
|
||||||
case OptionVal.None => // means TimerMsg not applicable, discard
|
case OptionVal.None => // means TimerMsg not applicable, discard
|
||||||
case OptionVal.Some(m) =>
|
case OptionVal.Some(m) =>
|
||||||
next(Behavior.interpretMessage(behavior, c, m), m)
|
next(Behavior.interpretMessage(behavior, c, m), m)
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,8 @@ package akka.actor.typed.javadsl
|
||||||
|
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
|
||||||
|
import akka.annotation.DoNotInherit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Support for scheduled `self` messages in an actor.
|
* Support for scheduled `self` messages in an actor.
|
||||||
* It is used with `Behaviors.withTimers`, which also takes care of the
|
* It is used with `Behaviors.withTimers`, which also takes care of the
|
||||||
|
|
@ -14,7 +16,10 @@ import java.time.Duration
|
||||||
*
|
*
|
||||||
* `TimerScheduler` is not thread-safe, i.e. it must only be used within
|
* `TimerScheduler` is not thread-safe, i.e. it must only be used within
|
||||||
* the actor that owns it.
|
* the actor that owns it.
|
||||||
|
*
|
||||||
|
* Not for user extension.
|
||||||
*/
|
*/
|
||||||
|
@DoNotInherit
|
||||||
trait TimerScheduler[T] {
|
trait TimerScheduler[T] {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -164,5 +169,4 @@ trait TimerScheduler[T] {
|
||||||
* Cancel all timers.
|
* Cancel all timers.
|
||||||
*/
|
*/
|
||||||
def cancelAll(): Unit
|
def cancelAll(): Unit
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
package akka.actor.typed.scaladsl
|
package akka.actor.typed.scaladsl
|
||||||
|
|
||||||
|
import akka.annotation.DoNotInherit
|
||||||
|
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -14,7 +16,10 @@ import scala.concurrent.duration.FiniteDuration
|
||||||
*
|
*
|
||||||
* `TimerScheduler` is not thread-safe, i.e. it must only be used within
|
* `TimerScheduler` is not thread-safe, i.e. it must only be used within
|
||||||
* the actor that owns it.
|
* the actor that owns it.
|
||||||
|
*
|
||||||
|
* Not for user extension.
|
||||||
*/
|
*/
|
||||||
|
@DoNotInherit
|
||||||
trait TimerScheduler[T] {
|
trait TimerScheduler[T] {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ limitations:
|
||||||
|
|
||||||
* Spawning of @scala[`Future`]@java[`CompletionStage`] or other asynchronous task and you rely on a callback to
|
* Spawning of @scala[`Future`]@java[`CompletionStage`] or other asynchronous task and you rely on a callback to
|
||||||
complete before observing the effect you want to test.
|
complete before observing the effect you want to test.
|
||||||
* Usage of scheduler or timers not supported.
|
* Usage of scheduler is not supported.
|
||||||
* `EventSourcedBehavior` can't be tested.
|
* `EventSourcedBehavior` can't be tested.
|
||||||
* Interactions with other actors must be stubbed.
|
* Interactions with other actors must be stubbed.
|
||||||
* Blackbox testing style.
|
* Blackbox testing style.
|
||||||
|
|
@ -112,6 +112,8 @@ The @apidoc[BehaviorTestKit] keeps track other effects you can verify, look at t
|
||||||
* WatchedWith
|
* WatchedWith
|
||||||
* Unwatched
|
* Unwatched
|
||||||
* Scheduled
|
* Scheduled
|
||||||
|
* TimerScheduled
|
||||||
|
* TimerCancelled
|
||||||
|
|
||||||
### Checking for Log Messages
|
### Checking for Log Messages
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue