From 896aa7e33b0a1e9aadbacd08a7f0fe69c2d54cf7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 2 Apr 2018 09:30:49 +0200 Subject: [PATCH] use java.time.Duration in Typed javadsl, #24646 (#24804) --- .../actor/typed/javadsl/ActorCompile.java | 18 ++--- .../akka/typed/InteractionPatternsTest.java | 24 +++--- .../akka/actor/typed/SupervisorStrategy.scala | 73 ++++++++++++++++++- .../typed/internal/ActorContextImpl.scala | 7 ++ .../typed/internal/TimerSchedulerImpl.scala | 7 ++ .../actor/typed/javadsl/ActorContext.scala | 9 +-- .../actor/typed/javadsl/TimerScheduler.scala | 6 +- .../actor/typed/scaladsl/ActorContext.scala | 2 +- .../typed/ClusterShardingSettings.scala | 25 +++++-- .../typed/internal/ReplicatorBehavior.scala | 13 ++-- .../ddata/typed/javadsl/Replicator.scala | 64 ++++++++-------- .../akka/cluster/typed/ClusterSingleton.scala | 11 ++- .../typed/javadsl/PersistentActorTest.java | 9 +-- .../akka/testkit/typed/TestKitSettings.scala | 11 ++- .../typed/internal/TestProbeImpl.scala | 38 ++++++++-- .../testkit/typed/javadsl/ActorTestKit.scala | 9 ++- .../akka/testkit/typed/javadsl/Effects.scala | 11 +-- .../testkit/typed/javadsl/ManualTime.scala | 12 +-- .../testkit/typed/javadsl/TestProbe.scala | 61 +++++++++------- .../typed/javadsl/ManualTimerExampleTest.java | 11 ++- .../testkit/typed/javadsl/TestProbeTest.java | 16 ++-- 21 files changed, 287 insertions(+), 150 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index 47937f7807..197408e0ba 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -7,10 +7,10 @@ package akka.actor.typed.javadsl; import akka.actor.typed.*; import akka.actor.typed.ActorContext; +import java.time.Duration; + import static akka.actor.typed.javadsl.Behaviors.*; -import java.util.concurrent.TimeUnit; -import scala.concurrent.duration.Duration; @SuppressWarnings("unused") public class ActorCompile { @@ -65,7 +65,7 @@ public class ActorCompile { { Behavior b = Behaviors.withTimers(timers -> { - timers.startPeriodicTimer("key", new MyMsgB("tick"), Duration.create(1, TimeUnit.SECONDS)); + timers.startPeriodicTimer("key", new MyMsgB("tick"), Duration.ofSeconds(1)); return Behaviors.ignore(); }); } @@ -92,20 +92,20 @@ public class ActorCompile { SupervisorStrategy strategy2 = SupervisorStrategy.restart().withLoggingEnabled(false); SupervisorStrategy strategy3 = SupervisorStrategy.resume(); SupervisorStrategy strategy4 = - SupervisorStrategy.restartWithLimit(3, Duration.create(1, TimeUnit.SECONDS)); + SupervisorStrategy.restartWithLimit(3, Duration.ofSeconds(1)); SupervisorStrategy strategy5 = SupervisorStrategy.restartWithBackoff( - Duration.create(200, TimeUnit.MILLISECONDS), - Duration.create(10, TimeUnit.SECONDS), + Duration.ofMillis(200), + Duration.ofSeconds(10), 0.1); BackoffSupervisorStrategy strategy6 = SupervisorStrategy.restartWithBackoff( - Duration.create(200, TimeUnit.MILLISECONDS), - Duration.create(10, TimeUnit.SECONDS), + Duration.ofMillis(200), + Duration.ofSeconds(10), 0.1); - SupervisorStrategy strategy7 = strategy6.withResetBackoffAfter(Duration.create(2, TimeUnit.SECONDS)); + SupervisorStrategy strategy7 = strategy6.withResetBackoffAfter(Duration.ofSeconds(2)); Behavior behv = Behaviors.supervise( diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java index e1465e2060..93ddc2e945 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java @@ -14,10 +14,10 @@ import akka.util.Timeout; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import scala.concurrent.Await; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.net.URI; +import java.time.Duration; import java.util.*; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -239,7 +239,8 @@ public class InteractionPatternsTest extends JUnitSuite { ref.tell(new PrintMe("message 2")); // #fire-and-forget-doit - Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS)); + system.terminate(); + system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); } //#timer @@ -284,12 +285,12 @@ public class InteractionPatternsTest extends JUnitSuite { private static class TimeoutMsg implements Msg { } - public static Behavior behavior(ActorRef target, FiniteDuration after, int maxSize) { + public static Behavior behavior(ActorRef target, Duration after, int maxSize) { return Behaviors.withTimers(timers -> idle(timers, target, after, maxSize)); } private static Behavior idle(TimerScheduler timers, ActorRef target, - FiniteDuration after, int maxSize) { + Duration after, int maxSize) { return Behaviors.receive(Msg.class) .onMessage(Msg.class, (ctx, msg) -> { timers.startSingleTimer(TIMER_KEY, new TimeoutMsg(), after); @@ -301,7 +302,7 @@ public class InteractionPatternsTest extends JUnitSuite { } private static Behavior active(List buffer, TimerScheduler timers, - ActorRef target, FiniteDuration after, int maxSize) { + ActorRef target, Duration after, int maxSize) { return Behaviors.receive(Msg.class) .onMessage(TimeoutMsg.class, (ctx, msg) -> { target.tell(new Batch(buffer)); @@ -326,19 +327,20 @@ public class InteractionPatternsTest extends JUnitSuite { final ActorSystem system = ActorSystem.create(Behaviors.empty(), "timers-sample"); TestProbe probe = TestProbe.create("batcher", system); ActorRef bufferer = Await.result(system.systemActorOf( - behavior(probe.ref(), new FiniteDuration(1, TimeUnit.SECONDS), 10), - "batcher", Props.empty(), akka.util.Timeout.apply(1, TimeUnit.SECONDS)), - new FiniteDuration(1, TimeUnit.SECONDS)); + behavior(probe.ref(), Duration.ofSeconds(1), 10), + "batcher", Props.empty(), akka.util.Timeout.create(Duration.ofSeconds(1))), + FiniteDuration.create(3, TimeUnit.SECONDS)); ExcitingMessage msgOne = new ExcitingMessage("one"); ExcitingMessage msgTwo = new ExcitingMessage("two"); bufferer.tell(msgOne); bufferer.tell(msgTwo); - probe.expectNoMessage(new FiniteDuration(1, TimeUnit.MILLISECONDS)); - probe.expectMessage(new FiniteDuration(2, TimeUnit.SECONDS), + probe.expectNoMessage(Duration.ofMillis(1)); + probe.expectMessage(Duration.ofSeconds(2), new Batch(Arrays.asList(msgOne, msgTwo))); - Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS)); + system.terminate(); + system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala index c92cbafafb..2948ac4416 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala @@ -4,10 +4,14 @@ package akka.actor.typed +import java.time + import akka.annotation.InternalApi import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.Duration +import akka.util.JavaDurationConverters._ + object SupervisorStrategy { /** @@ -33,7 +37,7 @@ object SupervisorStrategy { val stop: SupervisorStrategy = Stop(loggingEnabled = true) /** - * Restart with a limit of number of restart retries. + * Scala API: Restart with a limit of number of restart retries. * The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`) * within a time range (`withinTimeRange`). When the time window has elapsed without reaching * `maxNrOfRetries` the restart count is reset. @@ -49,7 +53,23 @@ object SupervisorStrategy { Restart(maxNrOfRetries, withinTimeRange, loggingEnabled = true) /** - * It supports exponential back-off between the given `minBackoff` and + * Java API: Restart with a limit of number of restart retries. + * The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`) + * within a time range (`withinTimeRange`). When the time window has elapsed without reaching + * `maxNrOfRetries` the restart count is reset. + * + * The strategy is applied also if the actor behavior is deferred and throws an exception during + * startup. + * + * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, + * if the limit is exceeded the child actor is stopped + * @param withinTimeRange duration of the time window for maxNrOfRetries + */ + def restartWithLimit(maxNrOfRetries: Int, withinTimeRange: java.time.Duration): SupervisorStrategy = + restartWithLimit(maxNrOfRetries, withinTimeRange.asScala) + + /** + * Scala API: It supports exponential back-off between the given `minBackoff` and * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and * `maxBackoff` 30 seconds the start attempts will be delayed with * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset @@ -81,6 +101,39 @@ object SupervisorStrategy { randomFactor: Double): BackoffSupervisorStrategy = Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff, loggingEnabled = true) + /** + * Java API: It supports exponential back-off between the given `minBackoff` and + * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and + * `maxBackoff` 30 seconds the start attempts will be delayed with + * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset + * if the actor is not terminated within the `minBackoff` duration. + * + * In addition to the calculated exponential back-off an additional + * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20% + * delay. The reason for adding a random delay is to avoid that all failing + * actors hit the backend resource at the same time. + * + * During the back-off incoming messages are dropped. + * + * If no new exception occurs within the `minBackoff` duration the exponentially + * increased back-off timeout is reset. + * + * The strategy is applied also if the actor behavior is deferred and throws an exception during + * startup. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def restartWithBackoff( + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): BackoffSupervisorStrategy = + restartWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor) + /** * INTERNAL API */ @@ -124,6 +177,11 @@ object SupervisorStrategy { override def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy = copy(resetBackoffAfter = timeout) + + override def withResetBackoffAfter(timeout: java.time.Duration): BackoffSupervisorStrategy = + withResetBackoffAfter(timeout.asScala) + + override def getResetBackoffAfter: java.time.Duration = resetBackoffAfter.asJava } } @@ -136,10 +194,19 @@ sealed abstract class SupervisorStrategy { sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy { def resetBackoffAfter: FiniteDuration + def getResetBackoffAfter: java.time.Duration + /** - * The back-off algorithm is reset if the actor does not crash within the + * Scala API: The back-off algorithm is reset if the actor does not crash within the * specified `resetBackoffAfter`. By default, the `resetBackoffAfter` has * the same value as `minBackoff`. */ def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy + + /** + * Java API: The back-off algorithm is reset if the actor does not crash within the + * specified `resetBackoffAfter`. By default, the `resetBackoffAfter` has + * the same value as `minBackoff`. + */ + def withResetBackoffAfter(timeout: java.time.Duration): BackoffSupervisorStrategy } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 591583892f..06c1d45df2 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -20,6 +20,7 @@ import scala.util.Try import akka.annotation.InternalApi import akka.util.OptionVal import akka.util.Timeout +import akka.util.JavaDurationConverters._ /** * INTERNAL API @@ -68,6 +69,12 @@ import akka.util.Timeout override def getLog: Logger = log + override def setReceiveTimeout(d: java.time.Duration, msg: T): Unit = + setReceiveTimeout(d.asScala, msg) + + override def schedule[U](delay: java.time.Duration, target: ActorRef[U], msg: U): akka.actor.Cancellable = + schedule(delay.asScala, target, msg) + override def spawn[U](behavior: akka.actor.typed.Behavior[U], name: String): akka.actor.typed.ActorRef[U] = spawn(behavior, name, Props.empty) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala index 58f46f81df..0a5dd43fe5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala @@ -12,6 +12,7 @@ import akka.actor.typed.ActorRef.ActorRefOps import akka.actor.typed.scaladsl.ActorContext import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts +import akka.util.JavaDurationConverters._ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag @@ -51,9 +52,15 @@ import scala.reflect.ClassTag override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit = startTimer(key, msg, interval, repeat = true) + override def startPeriodicTimer(key: Any, msg: T, interval: java.time.Duration): Unit = + startPeriodicTimer(key, msg, interval.asScala) + override def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit = startTimer(key, msg, timeout, repeat = false) + def startSingleTimer(key: Any, msg: T, timeout: java.time.Duration): Unit = + startSingleTimer(key, msg, timeout.asScala) + private def startTimer(key: Any, msg: T, timeout: FiniteDuration, repeat: Boolean): Unit = { timers.get(key) match { case Some(t) ⇒ cancelTimer(t) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 84d63d840e..e6adb94ea2 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -4,6 +4,7 @@ package akka.actor.typed.javadsl +import java.time.Duration import java.util.function.{ BiFunction, Function ⇒ JFunction } import akka.annotation.DoNotInherit @@ -12,8 +13,6 @@ import akka.actor.typed._ import java.util.Optional import akka.util.Timeout - -import scala.concurrent.duration.FiniteDuration import scala.concurrent.ExecutionContextExecutor /** @@ -181,13 +180,13 @@ trait ActorContext[T] { /** * Schedule the sending of a notification in case no other * message is received during the given period of time. The timeout starts anew - * with each received message. Provide `Duration.Undefined` to switch off this + * with each received message. Use `cancelReceiveTimeout` to switch off this * mechanism. * * *Warning*: This method is not thread-safe and must not be accessed from threads other * than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks. */ - def setReceiveTimeout(d: FiniteDuration, msg: T): Unit + def setReceiveTimeout(d: Duration, msg: T): Unit /** * Cancel the sending of receive timeout notifications. @@ -208,7 +207,7 @@ trait ActorContext[T] { * This method is thread-safe and can be called from other threads than the ordinary * actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks. */ - def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable + def schedule[U](delay: Duration, target: ActorRef[U], msg: U): akka.actor.Cancellable /** * This Actor’s execution context. It can be used to run asynchronous tasks diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/TimerScheduler.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/TimerScheduler.scala index db533d5d1f..0f19b31884 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/TimerScheduler.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/TimerScheduler.scala @@ -4,7 +4,7 @@ package akka.actor.typed.javadsl -import scala.concurrent.duration.FiniteDuration +import java.time.Duration /** * Support for scheduled `self` messages in an actor. @@ -26,7 +26,7 @@ trait TimerScheduler[T] { * previous timer is not received, even though it might already be enqueued * in the mailbox when the new timer is started. */ - def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit + def startPeriodicTimer(key: Any, msg: T, interval: Duration): Unit /** * * Start a timer that will send `msg` once to the `self` actor after @@ -37,7 +37,7 @@ trait TimerScheduler[T] { * previous timer is not received, even though it might already be enqueued * in the mailbox when the new timer is started. */ - def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit + def startSingleTimer(key: Any, msg: T, timeout: Duration): Unit /** * Check if a timer with a given `key` is active. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index f9110000ed..008d88499e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -161,7 +161,7 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒ /** * Schedule the sending of a notification in case no other * message is received during the given period of time. The timeout starts anew - * with each received message. Provide `Duration.Undefined` to switch off this + * with each received message. Use `cancelReceiveTimeout` to switch off this * mechanism. * * *Warning*: This method is not thread-safe and must not be accessed from threads other diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index ea6e5894a3..94984eb60f 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -4,16 +4,18 @@ package akka.cluster.sharding.typed +import scala.concurrent.duration.FiniteDuration + import akka.actor.NoSerializationVerificationNeeded +import akka.actor.typed.ActorSystem import akka.annotation.InternalApi +import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.{ ClusterShardingSettings ⇒ UntypedShardingSettings } import akka.cluster.singleton.{ ClusterSingletonManagerSettings ⇒ UntypedClusterSingletonManagerSettings } -import akka.actor.typed.ActorSystem -import akka.cluster.typed.{ Cluster, ClusterSingletonManagerSettings } +import akka.cluster.typed.Cluster +import akka.cluster.typed.ClusterSingletonManagerSettings import com.typesafe.config.Config -import akka.cluster.ClusterSettings.DataCenter - -import scala.concurrent.duration.FiniteDuration +import akka.util.JavaDurationConverters._ object ClusterShardingSettings { @@ -147,21 +149,31 @@ object ClusterShardingSettings { def withBufferSize(value: Int): TuningParameters = copy(bufferSize = value) def withCoordinatorFailureBackoff(value: FiniteDuration): TuningParameters = copy(coordinatorFailureBackoff = value) + def withCoordinatorFailureBackoff(value: java.time.Duration): TuningParameters = withCoordinatorFailureBackoff(value.asScala) def withEntityRecoveryConstantRateStrategyFrequency(value: FiniteDuration): TuningParameters = copy(entityRecoveryConstantRateStrategyFrequency = value) + def withEntityRecoveryConstantRateStrategyFrequency(value: java.time.Duration): TuningParameters = withEntityRecoveryConstantRateStrategyFrequency(value.asScala) def withEntityRecoveryConstantRateStrategyNumberOfEntities(value: Int): TuningParameters = copy(entityRecoveryConstantRateStrategyNumberOfEntities = value) def withEntityRecoveryStrategy(value: java.lang.String): TuningParameters = copy(entityRecoveryStrategy = value) def withEntityRestartBackoff(value: FiniteDuration): TuningParameters = copy(entityRestartBackoff = value) + def withEntityRestartBackoff(value: java.time.Duration): TuningParameters = withEntityRestartBackoff(value.asScala) def withHandOffTimeout(value: FiniteDuration): TuningParameters = copy(handOffTimeout = value) + def withHandOffTimeout(value: java.time.Duration): TuningParameters = withHandOffTimeout(value.asScala) def withKeepNrOfBatches(value: Int): TuningParameters = copy(keepNrOfBatches = value) def withLeastShardAllocationMaxSimultaneousRebalance(value: Int): TuningParameters = copy(leastShardAllocationMaxSimultaneousRebalance = value) def withLeastShardAllocationRebalanceThreshold(value: Int): TuningParameters = copy(leastShardAllocationRebalanceThreshold = value) def withRebalanceInterval(value: FiniteDuration): TuningParameters = copy(rebalanceInterval = value) + def withRebalanceInterval(value: java.time.Duration): TuningParameters = withRebalanceInterval(value.asScala) def withRetryInterval(value: FiniteDuration): TuningParameters = copy(retryInterval = value) + def withRetryInterval(value: java.time.Duration): TuningParameters = withRetryInterval(value.asScala) def withShardFailureBackoff(value: FiniteDuration): TuningParameters = copy(shardFailureBackoff = value) + def withShardFailureBackoff(value: java.time.Duration): TuningParameters = withShardFailureBackoff(value.asScala) def withShardStartTimeout(value: FiniteDuration): TuningParameters = copy(shardStartTimeout = value) + def withShardStartTimeout(value: java.time.Duration): TuningParameters = withShardStartTimeout(value.asScala) def withSnapshotAfter(value: Int): TuningParameters = copy(snapshotAfter = value) def withUpdatingStateTimeout(value: FiniteDuration): TuningParameters = copy(updatingStateTimeout = value) + def withUpdatingStateTimeout(value: java.time.Duration): TuningParameters = withUpdatingStateTimeout(value.asScala) def withWaitingForStateTimeout(value: FiniteDuration): TuningParameters = copy(waitingForStateTimeout = value) + def withWaitingForStateTimeout(value: java.time.Duration): TuningParameters = withWaitingForStateTimeout(value.asScala) private def copy( bufferSize: Int = bufferSize, @@ -234,7 +246,8 @@ final class ClusterShardingSettings( val tuningParameters: ClusterShardingSettings.TuningParameters, val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded { - import akka.cluster.sharding.typed.ClusterShardingSettings.{ StateStoreModeDData, StateStoreModePersistence } + import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModeDData + import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModePersistence require( stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData, s"Unknown 'state-store-mode' [$stateStoreMode], " + diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala index b475bb5d2f..77f3f9675a 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/internal/ReplicatorBehavior.scala @@ -16,6 +16,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.util.Timeout +import akka.util.JavaDurationConverters._ import akka.cluster.ddata.ReplicatedData import akka.actor.typed.Terminated @@ -67,8 +68,8 @@ import akka.actor.typed.Terminated case cmd: JReplicator.Get[d] ⇒ implicit val timeout = Timeout(cmd.consistency.timeout match { - case Duration.Zero ⇒ localAskTimeout - case t ⇒ t + additionalAskTimeout + case java.time.Duration.ZERO ⇒ localAskTimeout + case t ⇒ t.asScala + additionalAskTimeout }) import ctx.executionContext val reply = @@ -91,8 +92,8 @@ import akka.actor.typed.Terminated case cmd: JReplicator.Update[d] ⇒ implicit val timeout = Timeout(cmd.writeConsistency.timeout match { - case Duration.Zero ⇒ localAskTimeout - case t ⇒ t + additionalAskTimeout + case java.time.Duration.ZERO ⇒ localAskTimeout + case t ⇒ t.asScala + additionalAskTimeout }) import ctx.executionContext val reply = @@ -146,8 +147,8 @@ import akka.actor.typed.Terminated case cmd: JReplicator.Delete[d] ⇒ implicit val timeout = Timeout(cmd.consistency.timeout match { - case Duration.Zero ⇒ localAskTimeout - case t ⇒ t + additionalAskTimeout + case java.time.Duration.ZERO ⇒ localAskTimeout + case t ⇒ t.asScala + additionalAskTimeout }) import ctx.executionContext val reply = diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala index 0388aea984..40af107501 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/Replicator.scala @@ -4,27 +4,23 @@ package akka.cluster.ddata.typed.javadsl -import java.util.function.{ Function ⇒ JFunction } -import akka.cluster.{ ddata ⇒ dd } -import akka.cluster.ddata.Key -import akka.cluster.ddata.ReplicatedData -import akka.actor.typed.ActorRef -import akka.actor.typed.Behavior -import akka.cluster.ddata.typed.internal.ReplicatorBehavior - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration.Duration +import java.time.Duration import java.util.Optional import java.util.function.{ Function ⇒ JFunction } -import akka.actor.{ DeadLetterSuppression, NoSerializationVerificationNeeded } -import akka.actor.typed.{ ActorRef, Behavior } -import akka.annotation.{ DoNotInherit, InternalApi } +import scala.util.control.NoStackTrace + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.DeadLetterSuppression +import akka.actor.NoSerializationVerificationNeeded +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi +import akka.cluster.ddata.Key +import akka.cluster.ddata.ReplicatedData import akka.cluster.ddata.typed.internal.ReplicatorBehavior import akka.cluster.{ ddata ⇒ dd } - -import scala.concurrent.duration.{ Duration, FiniteDuration } -import scala.util.control.NoStackTrace +import akka.util.JavaDurationConverters._ /** * @see [[akka.cluster.ddata.Replicator]]. @@ -48,61 +44,61 @@ object Replicator { @DoNotInherit trait Command extends akka.cluster.ddata.typed.scaladsl.Replicator.Command sealed trait ReadConsistency { - def timeout: FiniteDuration + def timeout: Duration /** INTERNAL API */ @InternalApi private[akka] def toUntyped: dd.Replicator.ReadConsistency } case object ReadLocal extends ReadConsistency { - override def timeout: FiniteDuration = Duration.Zero + override def timeout: Duration = Duration.ZERO /** INTERNAL API */ @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadLocal } - final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency { + final case class ReadFrom(n: Int, timeout: Duration) extends ReadConsistency { require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1") /** INTERNAL API */ - @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadFrom(n, timeout) + @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadFrom(n, timeout.asScala) } - final case class ReadMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency { - def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) + final case class ReadMajority(timeout: Duration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency { + def this(timeout: Duration) = this(timeout, DefaultMajorityMinCap) /** INTERNAL API */ - @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadMajority(timeout, minCap) + @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadMajority(timeout.asScala, minCap) } - final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency { + final case class ReadAll(timeout: Duration) extends ReadConsistency { /** INTERNAL API */ - @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadAll(timeout) + @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadAll(timeout.asScala) } sealed trait WriteConsistency { - def timeout: FiniteDuration + def timeout: Duration /** INTERNAL API */ @InternalApi private[akka] def toUntyped: dd.Replicator.WriteConsistency } case object WriteLocal extends WriteConsistency { - override def timeout: FiniteDuration = Duration.Zero + override def timeout: Duration = Duration.ZERO /** INTERNAL API */ @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteLocal } - final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency { + final case class WriteTo(n: Int, timeout: Duration) extends WriteConsistency { require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1") /** INTERNAL API */ - @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteTo(n, timeout) + @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteTo(n, timeout.asScala) } - final case class WriteMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency { - def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) + final case class WriteMajority(timeout: Duration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency { + def this(timeout: Duration) = this(timeout, DefaultMajorityMinCap) /** INTERNAL API */ - @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteMajority(timeout, minCap) + @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteMajority(timeout.asScala, minCap) } - final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency { + final case class WriteAll(timeout: Duration) extends WriteConsistency { /** INTERNAL API */ - @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteAll(timeout) + @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteAll(timeout.asScala) } /** diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala index 3b6ab42fc6..a95aa1945a 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala @@ -10,6 +10,7 @@ import akka.cluster.ClusterSettings.DataCenter import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonManagerSettings ⇒ UntypedClusterSingletonManagerSettings } import akka.cluster.typed.internal.AdaptedClusterSingletonImpl import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } +import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.concurrent.duration._ @@ -60,8 +61,10 @@ final class ClusterSingletonSettings( def withNoDataCenter(): ClusterSingletonSettings = copy(dataCenter = None) def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonSettings = copy(removalMargin = removalMargin) + def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonSettings = withRemovalMargin(removalMargin.asScala) def withHandoverRetryInterval(handOverRetryInterval: FiniteDuration): ClusterSingletonSettings = copy(handOverRetryInterval = handOverRetryInterval) + def withHandoverRetryInterval(handOverRetryInterval: java.time.Duration): ClusterSingletonSettings = withHandoverRetryInterval(handOverRetryInterval.asScala) def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize) @@ -113,7 +116,7 @@ object ClusterSingleton extends ExtensionId[ClusterSingleton] { */ @InternalApi private[akka] object ClusterSingletonImpl { - def managerNameFor(singletonName: String) = s"singletonManager${singletonName}" + def managerNameFor(singletonName: String) = s"singletonManager$singletonName" } /** @@ -213,13 +216,17 @@ final class ClusterSingletonManagerSettings( def withRole(role: String): ClusterSingletonManagerSettings = copy(role = UntypedClusterSingletonManagerSettings.roleOption(role)) - def withRole(role: Option[String]) = copy(role = role) + def withRole(role: Option[String]): ClusterSingletonManagerSettings = copy(role = role) def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings = copy(removalMargin = removalMargin) + def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonManagerSettings = + withRemovalMargin(removalMargin.asScala) def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings = copy(handOverRetryInterval = retryInterval) + def withHandOverRetryInterval(retryInterval: java.time.Duration): ClusterSingletonManagerSettings = + withHandOverRetryInterval(retryInterval.asScala) private def copy( singletonName: String = singletonName, diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java index f9677dac97..ac851c1473 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java @@ -13,10 +13,9 @@ import akka.testkit.typed.javadsl.TestKitJunitResource; import akka.testkit.typed.javadsl.TestProbe; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.*; -import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; @@ -200,7 +199,7 @@ public class PersistentActorTest { }) .matchCommand(IncrementLater.class, (ctx, state, command) -> { ActorRef delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> { - timers.startSingleTimer(Tick.instance, Tick.instance, FiniteDuration.create(10, TimeUnit.MILLISECONDS)); + timers.startSingleTimer(Tick.instance, Tick.instance, Duration.ofMillis(10)); return Behaviors.receive((context, o) -> Behaviors.stopped()); })); ctx.watchWith(delay, new DelayFinished()); @@ -208,7 +207,7 @@ public class PersistentActorTest { }) .matchCommand(DelayFinished.class, (ctx, state, finished) -> Effect().persist(new Incremented(10))) .matchCommand(Increment100OnTimeout.class, (ctx, state, msg) -> { - ctx.setReceiveTimeout(FiniteDuration.create(10, TimeUnit.MILLISECONDS), new Timeout()); + ctx.setReceiveTimeout(Duration.ofMillis(10), new Timeout()); return Effect().none(); }) .matchCommand(Timeout.class, @@ -334,7 +333,7 @@ public class PersistentActorTest { TestProbe probe = testKit.createTestProbe(); ActorRef c = testKit.spawn(counter("c12")); c.tell(new StopThenLog()); - probe.expectTerminated(c, FiniteDuration.create(1, TimeUnit.SECONDS)); + probe.expectTerminated(c, Duration.ofSeconds(1)); } // FIXME test with by state command handler } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKitSettings.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKitSettings.scala index 0aa29aa8f7..a1026cbabe 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKitSettings.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKitSettings.scala @@ -64,7 +64,14 @@ final class TestKitSettings(val config: Config) { val ThrowOnShutdownTimeout: Boolean = config.getBoolean("throw-on-shutdown-timeout") /** - * Scale the `duration` with the configured `TestTimeFactor` + * Scala API: Scale the `duration` with the configured `TestTimeFactor` */ - def dilated(duration: FiniteDuration): FiniteDuration = (duration * TestTimeFactor).asInstanceOf[FiniteDuration] + def dilated(duration: FiniteDuration): FiniteDuration = + (duration * TestTimeFactor).asInstanceOf[FiniteDuration] + + /** + * Java API: Scale the `duration` with the configured `TestTimeFactor` + */ + def dilated(duration: java.time.Duration): java.time.Duration = + java.time.Duration.ofMillis((duration.toMillis * TestTimeFactor).toLong) } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestProbeImpl.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestProbeImpl.scala index 460a08d6cf..7e9813c2af 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestProbeImpl.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestProbeImpl.scala @@ -4,6 +4,7 @@ package akka.testkit.typed.internal +import java.time import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque } import java.util.function.Supplier @@ -16,7 +17,7 @@ import akka.testkit.typed.scaladsl.{ TestDuration, TestProbe ⇒ ScalaTestProbe import akka.testkit.typed.{ FishingOutcome, TestKitSettings } import akka.util.PrettyDuration._ import akka.util.{ BoxedType, Timeout } - +import akka.util.JavaDurationConverters._ import scala.annotation.tailrec import scala.concurrent.Await import scala.concurrent.duration._ @@ -60,26 +61,33 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) private val testActor: ActorRef[M] = { // FIXME arbitrary timeout? - implicit val timeout = Timeout(3.seconds) + implicit val timeout: Timeout = Timeout(3.seconds) val futRef = system.systemActorOf(TestProbeImpl.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}") Await.result(futRef, timeout.duration + 1.second) } - override def ref = testActor + override def ref: ActorRef[M] = testActor - override def remainingOrDefault = remainingOr(settings.SingleExpectDefaultTimeout.dilated) + override def remainingOrDefault: FiniteDuration = remainingOr(settings.SingleExpectDefaultTimeout.dilated) + + override def getRemainingOrDefault: java.time.Duration = remainingOrDefault.asJava override def remaining: FiniteDuration = end match { case f: FiniteDuration ⇒ f - now case _ ⇒ throw new AssertionError("`remaining` may not be called outside of `within`") } + override def getRemaining: java.time.Duration = remaining.asJava + override def remainingOr(duration: FiniteDuration): FiniteDuration = end match { case x if x eq Duration.Undefined ⇒ duration case x if !x.isFinite ⇒ throw new IllegalArgumentException("`end` cannot be infinite") case f: FiniteDuration ⇒ f - now } + override def getRemainingOr(duration: java.time.Duration): java.time.Duration = + remainingOr(duration.asScala).asJava + private def remainingOrDilated(max: Duration): FiniteDuration = max match { case x if x eq Duration.Undefined ⇒ remainingOrDefault case x if !x.isFinite ⇒ throw new IllegalArgumentException("max duration cannot be infinite") @@ -113,9 +121,15 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) override def expectMessage[T <: M](max: FiniteDuration, obj: T): T = expectMessage_internal(max.dilated, obj) + override def expectMessage[T <: M](max: java.time.Duration, obj: T): T = + expectMessage(max.asScala, obj) + override def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T = expectMessage_internal(max.dilated, obj, Some(hint)) + override def expectMessage[T <: M](max: java.time.Duration, hint: String, obj: T): T = + expectMessage(max.asScala, hint, obj) + private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = { val o = receiveOne(max) val hintOrEmptyString = hint.map(": " + _).getOrElse("") @@ -144,9 +158,14 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) message } - override def expectNoMessage(max: FiniteDuration): Unit = { expectNoMessage_internal(max) } + override def expectNoMessage(max: FiniteDuration): Unit = + expectNoMessage_internal(max) - override def expectNoMessage(): Unit = { expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated) } + override def expectNoMessage(max: java.time.Duration): Unit = + expectNoMessage(max.asScala) + + override def expectNoMessage(): Unit = + expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated) private def expectNoMessage_internal(max: FiniteDuration) { val o = receiveOne(max) @@ -210,8 +229,11 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop") } - override def awaitAssert[A](max: Duration, interval: Duration, supplier: Supplier[A]): A = - awaitAssert(supplier.get(), max, interval) + override def expectTerminated[U](actorRef: ActorRef[U], max: java.time.Duration): Unit = + expectTerminated(actorRef, max.asScala) + + override def awaitAssert[A](max: java.time.Duration, interval: java.time.Duration, supplier: Supplier[A]): A = + awaitAssert(supplier.get(), if (max == java.time.Duration.ZERO) Duration.Undefined else max.asScala, interval.asScala) override def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = { val _max = remainingOrDilated(max) diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ActorTestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ActorTestKit.scala index ea5589cd00..8f990e0d87 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ActorTestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ActorTestKit.scala @@ -4,6 +4,8 @@ package akka.testkit.typed.javadsl +import java.time.Duration + import akka.actor.Scheduler import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.testkit.typed.TestKitSettings @@ -11,8 +13,7 @@ import akka.testkit.typed.internal.TestKitUtils import akka.testkit.typed.scaladsl.{ ActorTestKit ⇒ ScalaTestKit } import akka.util.Timeout import com.typesafe.config.Config - -import scala.concurrent.duration.Duration +import akka.util.JavaDurationConverters._ object ActorTestKit { @@ -53,7 +54,7 @@ object ActorTestKit { * no exception is thrown. */ def shutdown(system: ActorSystem[_], duration: Duration, throwIfShutdownTimesOut: Boolean): Unit = { - TestKitUtils.shutdown(system, duration, throwIfShutdownTimesOut) + TestKitUtils.shutdown(system, duration.asScala, throwIfShutdownTimesOut) } /** @@ -75,7 +76,7 @@ object ActorTestKit { val settings = TestKitSettings.create(system) shutdown( system, - settings.DefaultActorSystemShutdownTimeout, + settings.DefaultActorSystemShutdownTimeout.asJava, settings.ThrowOnShutdownTimeout ) } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/Effects.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/Effects.scala index 2312c84ab0..97f853f43c 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/Effects.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/Effects.scala @@ -4,10 +4,11 @@ package akka.testkit.typed.javadsl +import java.time.Duration + import akka.actor.typed.{ ActorRef, Behavior, Props } import akka.testkit.typed.Effect - -import scala.concurrent.duration.{ Duration, FiniteDuration } +import akka.util.JavaDurationConverters._ /** * Factories for behavior effects for [[BehaviorTestKit]], each effect has a suitable equals and can be used to compare @@ -57,14 +58,14 @@ object Effects { /** * The behavior set a new receive timeout, with `msg` as timeout notification */ - def receiveTimeoutSet[T](d: Duration, msg: T): Effect = ReceiveTimeoutSet(d, msg) + def receiveTimeoutSet[T](d: Duration, msg: T): Effect = ReceiveTimeoutSet(d.asScala, msg) /** * The behavior used `ctx.schedule` to schedule `msg` to be sent to `target` after `delay` * FIXME what about events scheduled through the scheduler? */ - def scheduled[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Effect = - Scheduled(delay, target, msg) + def scheduled[U](delay: Duration, target: ActorRef[U], msg: U): Effect = + Scheduled(delay.asScala, target, msg) /** * Used to represent an empty list of effects - in other words, the behavior didn't do anything observable diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ManualTime.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ManualTime.scala index dbcb71febc..31144e2ee5 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ManualTime.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ManualTime.scala @@ -4,11 +4,11 @@ package akka.testkit.typed.javadsl +import java.time.Duration import akka.actor.typed.ActorSystem import com.typesafe.config.Config - +import akka.util.JavaDurationConverters._ import scala.annotation.varargs -import scala.concurrent.duration.{ Duration, FiniteDuration } /** * Manual time allows you to do async tests while controlling the scheduler of the system. @@ -48,12 +48,12 @@ final class ManualTime(delegate: akka.testkit.ExplicitlyTriggeredScheduler) { * If you want the amount of time passed to be dilated, apply the dilation before passing the delay to * this method. */ - def timePasses(amount: FiniteDuration): Unit = delegate.timePasses(amount) + def timePasses(amount: Duration): Unit = delegate.timePasses(amount.asScala) @varargs - def expectNoMessageFor(duration: FiniteDuration, on: TestProbe[_]*): Unit = { - delegate.timePasses(duration) - on.foreach(_.expectNoMessage(Duration.Zero)) + def expectNoMessageFor(duration: Duration, on: TestProbe[_]*): Unit = { + delegate.timePasses(duration.asScala) + on.foreach(_.expectNoMessage(Duration.ZERO)) } } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala index dbdb0fa25c..c61c02cdc3 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala @@ -4,6 +4,7 @@ package akka.testkit.typed.javadsl +import java.time.Duration import java.util.function.Supplier import akka.actor.typed.{ ActorRef, ActorSystem } @@ -11,10 +12,11 @@ import akka.annotation.DoNotInherit import akka.testkit.typed.internal.TestProbeImpl import akka.testkit.typed.{ FishingOutcome, TestKitSettings } import akka.testkit.typed.scaladsl.TestDuration - -import scala.concurrent.duration.{ Duration, FiniteDuration } +import akka.util.JavaDurationConverters._ import scala.collection.JavaConverters._ -import scala.concurrent.duration._ +import scala.concurrent.duration.FiniteDuration + +import akka.annotation.InternalApi object FishingOutcomes { /** @@ -81,20 +83,20 @@ abstract class TestProbe[M] { * block or missing that it returns the properly dilated default for this * case from settings (key "akka.actor.typed.test.single-expect-default"). */ - def remainingOrDefault: FiniteDuration + def getRemainingOrDefault: Duration /** * Obtain time remaining for execution of the innermost enclosing `within` * block or throw an [[AssertionError]] if no `within` block surrounds this * call. */ - def remaining: FiniteDuration + def getRemaining: Duration /** * Obtain time remaining for execution of the innermost enclosing `within` * block or missing that it returns the given duration. */ - def remainingOr(duration: FiniteDuration): FiniteDuration + def getRemainingOr(duration: Duration): Duration /** * Execute code block while bounding its execution time between `min` and @@ -112,16 +114,19 @@ abstract class TestProbe[M] { * } * }}} */ - def within[T](min: FiniteDuration, max: FiniteDuration)(f: Supplier[T]): T = - within_internal(min, max, f.get()) + def within[T](min: Duration, max: Duration)(f: Supplier[T]): T = + within_internal(min.asScala, max.asScala, f.get()) /** * Same as calling `within(0 seconds, max)(f)`. */ - def within[T](max: FiniteDuration)(f: Supplier[T]): T = - within_internal(Duration.Zero, max, f.get()) + def within[T](max: Duration)(f: Supplier[T]): T = + within_internal(scala.concurrent.duration.Duration.Zero, max.asScala, f.get()) - protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T + /** + * INTERNAL API + */ + @InternalApi protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T /** * Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor. @@ -135,7 +140,7 @@ abstract class TestProbe[M] { * * @return the received object */ - def expectMessage[T <: M](max: FiniteDuration, obj: T): T + def expectMessage[T <: M](max: Duration, obj: T): T /** * Receive one message from the test actor and assert that it equals the @@ -144,13 +149,13 @@ abstract class TestProbe[M] { * * @return the received object */ - def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T + def expectMessage[T <: M](max: Duration, hint: String, obj: T): T /** * Assert that no message is received for the specified time. * Supplied value is not dilated. */ - def expectNoMessage(max: FiniteDuration): Unit + def expectNoMessage(max: Duration): Unit /** * Assert that no message is received. Waits for the default period configured as `akka.actor.typed.test.expect-no-message-default` @@ -162,7 +167,7 @@ abstract class TestProbe[M] { * Expect the given actor to be stopped or stop withing the given timeout or * throw an [[AssertionError]]. */ - def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit + def expectTerminated[U](actorRef: ActorRef[U], max: Duration): Unit /** * Evaluate the given assert every `interval` until it does not throw an exception and return the @@ -183,14 +188,14 @@ abstract class TestProbe[M] { * Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "akka.test.timefactor". */ def awaitAssert[A](max: Duration, supplier: Supplier[A]): A = - awaitAssert(max, 100.millis, supplier) + awaitAssert(max, Duration.ofMillis(100), supplier) /** * Evaluate the given assert every 100 milliseconds until it does not throw an exception and return the * result. A max time is taken it from the innermost enclosing `within` block. */ def awaitAssert[A](supplier: Supplier[A]): A = - awaitAssert(Duration.Undefined, supplier) + awaitAssert(Duration.ZERO, supplier) // FIXME awaitAssert(Procedure): Unit would be nice for java people to not have to return null @@ -198,16 +203,19 @@ abstract class TestProbe[M] { * Same as `expectMessageType(clazz, remainingOrDefault)`, but correctly treating the timeFactor. */ def expectMessageClass[T](clazz: Class[T]): T = - expectMessageClass_internal(remainingOrDefault, clazz) + expectMessageClass_internal(getRemainingOrDefault.asScala, clazz) /** * Wait for a message of type M and return it when it arrives, or fail if the `max` timeout is hit. * The timeout is dilated. */ - def expectMessageClass[T](clazz: Class[T], max: FiniteDuration): T = - expectMessageClass_internal(max.dilated, clazz) + def expectMessageClass[T](clazz: Class[T], max: Duration): T = + expectMessageClass_internal(max.asScala.dilated, clazz) - protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C + /** + * INTERNAL API + */ + @InternalApi protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C /** * Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming @@ -226,15 +234,18 @@ abstract class TestProbe[M] { * The timeout is dilated. * @return The messages accepted in the order they arrived */ - def fishForMessage(max: FiniteDuration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = + def fishForMessage(max: Duration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = fishForMessage(max, "", fisher) /** * Same as the other `fishForMessageJava` but includes the provided hint in all error messages */ - def fishForMessage(max: FiniteDuration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = - fishForMessage_internal(max, hint, fisher.apply).asJava + def fishForMessage(max: Duration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = + fishForMessage_internal(max.asScala, hint, fisher.apply).asJava - protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] + /** + * INTERNAL API + */ + @InternalApi protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] } diff --git a/akka-testkit-typed/src/test/java/akka/testkit/typed/javadsl/ManualTimerExampleTest.java b/akka-testkit-typed/src/test/java/akka/testkit/typed/javadsl/ManualTimerExampleTest.java index ca9f2c602b..6e1aef52f1 100644 --- a/akka-testkit-typed/src/test/java/akka/testkit/typed/javadsl/ManualTimerExampleTest.java +++ b/akka-testkit-typed/src/test/java/akka/testkit/typed/javadsl/ManualTimerExampleTest.java @@ -5,14 +5,13 @@ package akka.testkit.typed.javadsl; //#manual-scheduling-simple -import java.util.concurrent.TimeUnit; import akka.actor.typed.Behavior; import akka.testkit.typed.javadsl.ManualTime; import akka.testkit.typed.javadsl.TestKitJunitResource; import org.junit.ClassRule; import org.scalatest.junit.JUnitSuite; -import scala.concurrent.duration.Duration; +import java.time.Duration; import akka.actor.typed.javadsl.Behaviors; @@ -34,7 +33,7 @@ public class ManualTimerExampleTest extends JUnitSuite { public void testScheduleNonRepeatedTicks() { TestProbe probe = testKit.createTestProbe(); Behavior behavior = Behaviors.withTimers(timer -> { - timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS)); + timer.startSingleTimer("T", new Tick(), Duration.ofMillis(10)); return Behaviors.receive( (ctx, tick) -> { probe.ref().tell(new Tock()); return Behaviors.same(); @@ -43,12 +42,12 @@ public class ManualTimerExampleTest extends JUnitSuite { testKit.spawn(behavior); - manualTime.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe); + manualTime.expectNoMessageFor(Duration.ofMillis(9), probe); - manualTime.timePasses(Duration.create(2, TimeUnit.MILLISECONDS)); + manualTime.timePasses(Duration.ofMillis(2)); probe.expectMessageClass(Tock.class); - manualTime.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe); + manualTime.expectNoMessageFor(Duration.ofSeconds(10), probe); } diff --git a/akka-testkit-typed/src/test/java/akka/testkit/typed/javadsl/TestProbeTest.java b/akka-testkit-typed/src/test/java/akka/testkit/typed/javadsl/TestProbeTest.java index 6c191ee079..aa1a03a13f 100644 --- a/akka-testkit-typed/src/test/java/akka/testkit/typed/javadsl/TestProbeTest.java +++ b/akka-testkit-typed/src/test/java/akka/testkit/typed/javadsl/TestProbeTest.java @@ -6,10 +6,8 @@ package akka.testkit.typed.javadsl; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; +import java.time.Duration; public class TestProbeTest { @@ -21,12 +19,12 @@ public class TestProbeTest { // ... something ... return null; }); - probe.awaitAssert(FiniteDuration.apply(3, TimeUnit.SECONDS), () -> { + probe.awaitAssert(Duration.ofSeconds(3), () -> { // ... something ... return null; }); String awaitAssertResult = - probe.awaitAssert(FiniteDuration.apply(3, TimeUnit.SECONDS), FiniteDuration.apply(100, TimeUnit.MILLISECONDS), () -> { + probe.awaitAssert(Duration.ofSeconds(3), Duration.ofMillis(100), () -> { // ... something ... return "some result"; }); @@ -35,16 +33,16 @@ public class TestProbeTest { probe.expectNoMessage(); ActorRef ref = null; - probe.expectTerminated(ref, FiniteDuration.apply(1, TimeUnit.SECONDS)); + probe.expectTerminated(ref, Duration.ofSeconds(1)); - FiniteDuration remaining = probe.remaining(); - probe.fishForMessage(FiniteDuration.apply(3, TimeUnit.SECONDS), "hint", (msg) -> { + Duration remaining = probe.getRemaining(); + probe.fishForMessage(Duration.ofSeconds(3), "hint", (msg) -> { if (msg.equals("one")) return FishingOutcomes.continueAndIgnore(); else if (msg.equals("two")) return FishingOutcomes.complete(); else return FishingOutcomes.fail("error"); }); - String withinResult = probe.within(FiniteDuration.apply(3, TimeUnit.SECONDS), () -> { + String withinResult = probe.within(Duration.ofSeconds(3), () -> { // ... something ... return "result"; });