Specific minimal scheduler API for typed #26971

* migration guide entry
* ManualTime was making assumptions about scheduler
* Add API compile tests for good measure
* named scheduleAtFixedRate instead of schedule
This commit is contained in:
Johan Andrén 2019-05-24 07:38:38 +02:00 committed by Patrik Nordwall
parent dd6924465b
commit 604523ba18
24 changed files with 262 additions and 52 deletions

View file

@ -7,18 +7,17 @@ package akka.actor.testkit.typed.internal
import java.util.concurrent.{ CompletionStage, ThreadFactory }
import akka.actor.typed.internal.ActorRefImpl
import akka.actor.typed.{
ActorRef,
ActorSystem,
Behavior,
DispatcherSelector,
Dispatchers,
Extension,
ExtensionId,
Logger,
Props,
Settings
}
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.DispatcherSelector
import akka.actor.typed.Dispatchers
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.Logger
import akka.actor.typed.Props
import akka.actor.typed.Scheduler
import akka.actor.typed.Settings
import akka.annotation.InternalApi
import akka.util.Timeout
import akka.{ actor => untyped }
@ -72,7 +71,7 @@ import com.github.ghik.silencer.silent
override def logConfiguration(): Unit = log.info(settings.toString)
override def scheduler: untyped.Scheduler = throw new UnsupportedOperationException("no scheduler")
override def scheduler: Scheduler = throw new UnsupportedOperationException("no scheduler")
private val terminationPromise = Promise[Done]
override def terminate(): Unit = terminationPromise.trySuccess(Done)

View file

@ -6,8 +6,11 @@ package akka.actor.testkit.typed.javadsl
import java.time.Duration
import akka.actor.Scheduler
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.Scheduler
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.internal.TestKitUtils
import akka.actor.testkit.typed.scaladsl

View file

@ -5,9 +5,12 @@
package akka.actor.testkit.typed.javadsl
import java.time.Duration
import akka.actor.typed.ActorSystem
import akka.actor.typed.internal.adapter.SchedulerAdapter
import com.typesafe.config.Config
import akka.util.JavaDurationConverters._
import scala.annotation.varargs
/**
@ -29,11 +32,17 @@ object ManualTime {
*/
def get[A](system: ActorSystem[A]): ManualTime =
system.scheduler match {
case sc: akka.testkit.ExplicitlyTriggeredScheduler => new ManualTime(sc)
case _ =>
case adapter: SchedulerAdapter =>
adapter.untypedScheduler match {
case sc: akka.testkit.ExplicitlyTriggeredScheduler => new ManualTime(sc)
case _ =>
throw new IllegalArgumentException(
"ActorSystem not configured with explicitly triggered scheduler, " +
"make sure to include akka.actor.testkit.typed.scaladsl.ManualTime.config() when setting up the test")
}
case s =>
throw new IllegalArgumentException(
"ActorSystem not configured with explicitly triggered scheduler, " +
"make sure to include akka.actor.testkit.typed.javadsl.ManualTime.config() when setting up the test")
s"ActorSystem.scheduler is not an untyped SchedulerAdapter but a ${s.getClass.getName}, this is not supported")
}
}

View file

@ -4,10 +4,13 @@
package akka.actor.testkit.typed.javadsl
import akka.actor.Scheduler
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.internal.TestKitUtils
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.Scheduler
import akka.util.Timeout
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

View file

@ -7,15 +7,21 @@ package akka.actor.testkit.typed.scaladsl
import java.util.concurrent.TimeoutException
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.annotation.{ ApiMayChange, InternalApi }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.Scheduler
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils }
import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.testkit.typed.internal.ActorTestKitGuardian
import akka.actor.testkit.typed.internal.TestKitUtils
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.Scheduler
import akka.util.Timeout
object ActorTestKit {

View file

@ -4,12 +4,12 @@
package akka.actor.testkit.typed.scaladsl
import akka.actor.Scheduler
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.internal.TestKitUtils
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Scheduler
import akka.actor.typed.Props
import akka.util.Timeout
import com.typesafe.config.Config

View file

@ -5,6 +5,7 @@
package akka.actor.testkit.typed.scaladsl
import akka.actor.typed.ActorSystem
import akka.actor.typed.internal.adapter.SchedulerAdapter
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.varargs
@ -30,11 +31,17 @@ object ManualTime {
*/
def apply()(implicit system: ActorSystem[_]): ManualTime =
system.scheduler match {
case sc: akka.testkit.ExplicitlyTriggeredScheduler => new ManualTime(sc)
case _ =>
case adapter: SchedulerAdapter =>
adapter.untypedScheduler match {
case sc: akka.testkit.ExplicitlyTriggeredScheduler => new ManualTime(sc)
case _ =>
throw new IllegalArgumentException(
"ActorSystem not configured with explicitly triggered scheduler, " +
"make sure to include akka.actor.testkit.typed.scaladsl.ManualTime.config() when setting up the test")
}
case s =>
throw new IllegalArgumentException(
"ActorSystem not configured with explicitly triggered scheduler, " +
"make sure to include akka.actor.testkit.typed.scaladsl.ManualTime.config() when setting up the test")
s"ActorSystem.scheduler is not an untyped SchedulerAdapter but a ${s.getClass.getName}, this is not supported")
}
}

View file

@ -4,9 +4,9 @@
package jdocs.akka.actor.testkit.typed.javadsl;
import akka.actor.Scheduler;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.Scheduler;
import akka.actor.typed.javadsl.AskPattern;
import akka.actor.typed.javadsl.Behaviors;
// #test-header

View file

@ -4,7 +4,7 @@
package docs.akka.actor.testkit.typed.scaladsl
import akka.actor.Scheduler
import akka.actor.typed.Scheduler
//#test-header
import akka.actor.testkit.typed.scaladsl.ActorTestKit

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed;
import java.time.Duration;
public class SchedulerTest {
public void compileOnly() {
// accepts a lambda
ActorSystem<Void> system = null;
system
.scheduler()
.scheduleAtFixedRate(
Duration.ofMillis(10),
Duration.ofMillis(10),
() -> system.log().info("Woo!"),
system.executionContext());
system
.scheduler()
.scheduleOnce(
Duration.ofMillis(10), () -> system.log().info("Woo!"), system.executionContext());
}
}

View file

@ -0,0 +1,20 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
import scala.concurrent.duration._
class SchedulerSpec {
def compileOnly(): Unit = {
val system: ActorSystem[Nothing] = ???
import system.executionContext
// verify a lambda works
system.scheduler.scheduleAtFixedRate(10.milliseconds, 10.milliseconds, () => system.log.info("Woho!"))
system.scheduler.scheduleOnce(10.milliseconds, () => system.log.info("Woho!"))
}
}

View file

@ -24,7 +24,7 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Props
import akka.util.Timeout
import akka.actor.Scheduler
import akka.actor.typed.Scheduler
//#imports2

View file

@ -85,7 +85,7 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter
* It is recommended to use the ActorContexts scheduling capabilities for sending
* messages to actors instead of registering a Runnable for execution using this facility.
*/
def scheduler: untyped.Scheduler
def scheduler: Scheduler
/**
* Facilities for lookup up thread-pools from configuration.

View file

@ -0,0 +1,100 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed
import akka.actor.Cancellable
import akka.annotation.DoNotInherit
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
/**
* The ActorSystem facility for scheduling tasks.
*
* For scheduling within actors `Behaviors.withTimers` should be preferred.
*
* Not for user extension
*/
@DoNotInherit
trait Scheduler {
/**
*
* Schedules a Runnable to be run once with a delay, i.e. a time period that
* has to pass before the runnable is executed.
*
* @throws IllegalArgumentException if the given delays exceed the maximum
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
*
* Note: For scheduling within actors `Behaviors.withTimers` or `ActorContext.scheduleOnce` should be preferred.
*
* Scala API
*/
def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a Runnable to be run once with a delay, i.e. a time period that
* has to pass before the runnable is executed.
*
* @throws IllegalArgumentException if the given delays exceed the maximum
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
*
* Note: For scheduling within actors `Behaviors.withTimers` or `ActorContext.scheduleOnce` should be preferred.
*
* Java API
*/
def scheduleOnce(delay: java.time.Duration, runnable: Runnable, executor: ExecutionContext): Cancellable
/**
* Schedules a `Runnable` to be run repeatedly with an initial delay and
* a frequency. E.g. if you would like the function to be run after 2
* seconds and thereafter every 100ms you would set delay = Duration(2,
* TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS). If
* the execution of the runnable takes longer than the interval, the
* subsequent execution will start immediately after the prior one completes
* (there will be no overlap of executions of the runnable). In such cases,
* the actual execution interval will differ from the interval passed to this
* method.
*
* If the `Runnable` throws an exception the repeated scheduling is aborted,
* i.e. the function will not be invoked any more.
*
* @throws IllegalArgumentException if the given delays exceed the maximum
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
*
* Note: For user scheduling needs `Behaviors.withTimers` should be preferred.
*
* Scala API
*/
def scheduleAtFixedRate(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable
/**
* Schedules a `Runnable` to be run repeatedly with an initial delay and
* a frequency. E.g. if you would like the function to be run after 2
* seconds and thereafter every 100ms you would set delay to `Duration.ofSeconds(2)`,
* and interval to `Duration.ofMillis(100)`. If
* the execution of the runnable takes longer than the interval, the
* subsequent execution will start immediately after the prior one completes
* (there will be no overlap of executions of the runnable). In such cases,
* the actual execution interval will differ from the interval passed to this
* method.
*
* If the `Runnable` throws an exception the repeated scheduling is aborted,
* i.e. the function will not be invoked any more.
*
* @throws IllegalArgumentException if the given delays exceed the maximum
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
*
* Note: For user scheduling needs `Behaviors.withTimers` should be preferred.
*
* Java API
*/
def scheduleAtFixedRate(
initialDelay: java.time.Duration,
interval: java.time.Duration,
runnable: Runnable,
executor: ExecutionContext): Cancellable
}

View file

@ -76,13 +76,11 @@ import akka.util.OptionVal
val task =
if (repeat)
ctx.system.scheduler.schedule(delay, delay) {
ctx.self.unsafeUpcast ! timerMsg
}(ExecutionContexts.sameThreadExecutionContext)
ctx.system.scheduler.scheduleAtFixedRate(delay, delay, () => ctx.self.unsafeUpcast ! timerMsg)(
ExecutionContexts.sameThreadExecutionContext)
else
ctx.system.scheduler.scheduleOnce(delay) {
ctx.self.unsafeUpcast ! timerMsg
}(ExecutionContexts.sameThreadExecutionContext)
ctx.system.scheduler
.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContexts.sameThreadExecutionContext)
val nextTimer = Timer(key, msg, repeat, nextGen, task)
ctx.log.debug("Start timer [{}] with generation [{}]", key, nextGen)

View file

@ -83,7 +83,7 @@ import akka.event.LoggingFilterWithMarker
LoggingFilterWithMarker.wrap(untypedSystem.logFilter))
override def logConfiguration(): Unit = untypedSystem.logConfiguration()
override def name: String = untypedSystem.name
override def scheduler: akka.actor.Scheduler = untypedSystem.scheduler
override val scheduler: Scheduler = new SchedulerAdapter(untypedSystem.scheduler)
override def settings: Settings = new Settings(untypedSystem.settings)
override def startTime: Long = untypedSystem.startTime
override def threadFactory: java.util.concurrent.ThreadFactory = untypedSystem.threadFactory

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.adapter
import java.time.Duration
import akka.actor.Cancellable
import akka.actor.typed.Scheduler
import akka.annotation.InternalApi
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class SchedulerAdapter(private[akka] val untypedScheduler: akka.actor.Scheduler) extends Scheduler {
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable =
untypedScheduler.scheduleOnce(delay, runnable)
override def scheduleOnce(delay: Duration, runnable: Runnable, executor: ExecutionContext): Cancellable =
untypedScheduler.scheduleOnce(delay, runnable)(executor)
override def scheduleAtFixedRate(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable =
untypedScheduler.schedule(initialDelay, interval, runnable)
override def scheduleAtFixedRate(
initialDelay: Duration,
interval: Duration,
runnable: Runnable,
executor: ExecutionContext): Cancellable =
untypedScheduler.schedule(initialDelay, interval, runnable)(executor)
}

View file

@ -8,7 +8,7 @@ package javadsl
import java.time.Duration
import java.util.concurrent.CompletionStage
import akka.actor.Scheduler
import akka.actor.typed.Scheduler
import akka.actor.typed.scaladsl.AskPattern._
import akka.japi.function.{ Function => JFunction }
import akka.util.JavaDurationConverters._

View file

@ -7,8 +7,9 @@ package akka.actor.typed.scaladsl
import java.util.concurrent.TimeoutException
import scala.concurrent.Future
import akka.actor.{ Address, RootActorPath, Scheduler }
import akka.actor.{ Address, RootActorPath }
import akka.actor.typed.ActorRef
import akka.actor.typed.Scheduler
import akka.actor.typed.internal.{ adapter => adapt }
import akka.annotation.InternalApi
import akka.pattern.PromiseActorRef

View file

@ -9,7 +9,7 @@ import akka.actor.testkit.typed.TestKitSettings
import akka.cluster.ddata.SelfUniqueAddress
// #sample
import akka.actor.Scheduler
import akka.actor.typed.Scheduler
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors

View file

@ -245,8 +245,10 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and th
* Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed.
* New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies.
* New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies.
* `ActorSystem.scheduler` previously gave access to the untyped `akka.actor.Scheduler` but now returns a typed specific `akka.actor.typed.Scheduler`. Additionally `.schedule` has been renamed to `.scheduleAtFixedRate`. Actors that needs to schedule tasks should prefer `Behaviors.withTimers`.
* `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees.
### Akka Typed Stream API changes
* `ActorSoruce.actorRef` relying on `PartialFunction` has been replaced in the Java API with a variant more suitable to be called by Java.

View file

@ -95,14 +95,12 @@ private[akka] final class BehaviorSetup[C, E, S](
implicit val ec: ExecutionContext = context.executionContext
val timer =
if (snapshot)
context.system.scheduler
.scheduleOnce(settings.recoveryEventTimeout, context.self.toUntyped, RecoveryTickEvent(snapshot = true))
context.scheduleOnce(settings.recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
else
context.system.scheduler.schedule(
context.system.scheduler.scheduleAtFixedRate(
settings.recoveryEventTimeout,
settings.recoveryEventTimeout,
context.self.toUntyped,
RecoveryTickEvent(snapshot = false))
() => context.self ! RecoveryTickEvent(snapshot = false))
recoveryTimer = OptionVal.Some(timer)
}

View file

@ -4,10 +4,10 @@
package akka.persistence.typed.javadsl;
import akka.actor.Scheduler;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Scheduler;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.function.Procedure;
import akka.persistence.typed.SnapshotSelectionCriteria;

View file

@ -64,7 +64,7 @@ object PersistentActorCompileOnlyTest {
def performSideEffect(sender: ActorRef[AcknowledgeSideEffect], correlationId: Int, data: String): Unit = {
import akka.actor.typed.scaladsl.AskPattern._
implicit val timeout: akka.util.Timeout = 1.second
implicit val scheduler: akka.actor.Scheduler = ???
implicit val scheduler: akka.actor.typed.Scheduler = ???
implicit val ec: ExecutionContext = ???
val response: Future[RecoveryComplete.Response] =