use java.time.Duration in Typed javadsl, #24646 (#24804)

This commit is contained in:
Patrik Nordwall 2018-04-02 09:30:49 +02:00 committed by Christopher Batey
parent 6229c6f34e
commit 896aa7e33b
21 changed files with 287 additions and 150 deletions

View file

@ -7,10 +7,10 @@ package akka.actor.typed.javadsl;
import akka.actor.typed.*; import akka.actor.typed.*;
import akka.actor.typed.ActorContext; import akka.actor.typed.ActorContext;
import java.time.Duration;
import static akka.actor.typed.javadsl.Behaviors.*; import static akka.actor.typed.javadsl.Behaviors.*;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
@SuppressWarnings("unused") @SuppressWarnings("unused")
public class ActorCompile { public class ActorCompile {
@ -65,7 +65,7 @@ public class ActorCompile {
{ {
Behavior<MyMsg> b = Behaviors.withTimers(timers -> { Behavior<MyMsg> b = Behaviors.withTimers(timers -> {
timers.startPeriodicTimer("key", new MyMsgB("tick"), Duration.create(1, TimeUnit.SECONDS)); timers.startPeriodicTimer("key", new MyMsgB("tick"), Duration.ofSeconds(1));
return Behaviors.ignore(); return Behaviors.ignore();
}); });
} }
@ -92,20 +92,20 @@ public class ActorCompile {
SupervisorStrategy strategy2 = SupervisorStrategy.restart().withLoggingEnabled(false); SupervisorStrategy strategy2 = SupervisorStrategy.restart().withLoggingEnabled(false);
SupervisorStrategy strategy3 = SupervisorStrategy.resume(); SupervisorStrategy strategy3 = SupervisorStrategy.resume();
SupervisorStrategy strategy4 = SupervisorStrategy strategy4 =
SupervisorStrategy.restartWithLimit(3, Duration.create(1, TimeUnit.SECONDS)); SupervisorStrategy.restartWithLimit(3, Duration.ofSeconds(1));
SupervisorStrategy strategy5 = SupervisorStrategy strategy5 =
SupervisorStrategy.restartWithBackoff( SupervisorStrategy.restartWithBackoff(
Duration.create(200, TimeUnit.MILLISECONDS), Duration.ofMillis(200),
Duration.create(10, TimeUnit.SECONDS), Duration.ofSeconds(10),
0.1); 0.1);
BackoffSupervisorStrategy strategy6 = BackoffSupervisorStrategy strategy6 =
SupervisorStrategy.restartWithBackoff( SupervisorStrategy.restartWithBackoff(
Duration.create(200, TimeUnit.MILLISECONDS), Duration.ofMillis(200),
Duration.create(10, TimeUnit.SECONDS), Duration.ofSeconds(10),
0.1); 0.1);
SupervisorStrategy strategy7 = strategy6.withResetBackoffAfter(Duration.create(2, TimeUnit.SECONDS)); SupervisorStrategy strategy7 = strategy6.withResetBackoffAfter(Duration.ofSeconds(2));
Behavior<MyMsg> behv = Behavior<MyMsg> behv =
Behaviors.supervise( Behaviors.supervise(

View file

@ -14,10 +14,10 @@ import akka.util.Timeout;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import java.net.URI; import java.net.URI;
import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -239,7 +239,8 @@ public class InteractionPatternsTest extends JUnitSuite {
ref.tell(new PrintMe("message 2")); ref.tell(new PrintMe("message 2"));
// #fire-and-forget-doit // #fire-and-forget-doit
Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS)); system.terminate();
system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
} }
//#timer //#timer
@ -284,12 +285,12 @@ public class InteractionPatternsTest extends JUnitSuite {
private static class TimeoutMsg implements Msg { private static class TimeoutMsg implements Msg {
} }
public static Behavior<Msg> behavior(ActorRef<Batch> target, FiniteDuration after, int maxSize) { public static Behavior<Msg> behavior(ActorRef<Batch> target, Duration after, int maxSize) {
return Behaviors.withTimers(timers -> idle(timers, target, after, maxSize)); return Behaviors.withTimers(timers -> idle(timers, target, after, maxSize));
} }
private static Behavior<Msg> idle(TimerScheduler<Msg> timers, ActorRef<Batch> target, private static Behavior<Msg> idle(TimerScheduler<Msg> timers, ActorRef<Batch> target,
FiniteDuration after, int maxSize) { Duration after, int maxSize) {
return Behaviors.receive(Msg.class) return Behaviors.receive(Msg.class)
.onMessage(Msg.class, (ctx, msg) -> { .onMessage(Msg.class, (ctx, msg) -> {
timers.startSingleTimer(TIMER_KEY, new TimeoutMsg(), after); timers.startSingleTimer(TIMER_KEY, new TimeoutMsg(), after);
@ -301,7 +302,7 @@ public class InteractionPatternsTest extends JUnitSuite {
} }
private static Behavior<Msg> active(List<Msg> buffer, TimerScheduler<Msg> timers, private static Behavior<Msg> active(List<Msg> buffer, TimerScheduler<Msg> timers,
ActorRef<Batch> target, FiniteDuration after, int maxSize) { ActorRef<Batch> target, Duration after, int maxSize) {
return Behaviors.receive(Msg.class) return Behaviors.receive(Msg.class)
.onMessage(TimeoutMsg.class, (ctx, msg) -> { .onMessage(TimeoutMsg.class, (ctx, msg) -> {
target.tell(new Batch(buffer)); target.tell(new Batch(buffer));
@ -326,19 +327,20 @@ public class InteractionPatternsTest extends JUnitSuite {
final ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "timers-sample"); final ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "timers-sample");
TestProbe<Batch> probe = TestProbe.create("batcher", system); TestProbe<Batch> probe = TestProbe.create("batcher", system);
ActorRef<Msg> bufferer = Await.result(system.systemActorOf( ActorRef<Msg> bufferer = Await.result(system.systemActorOf(
behavior(probe.ref(), new FiniteDuration(1, TimeUnit.SECONDS), 10), behavior(probe.ref(), Duration.ofSeconds(1), 10),
"batcher", Props.empty(), akka.util.Timeout.apply(1, TimeUnit.SECONDS)), "batcher", Props.empty(), akka.util.Timeout.create(Duration.ofSeconds(1))),
new FiniteDuration(1, TimeUnit.SECONDS)); FiniteDuration.create(3, TimeUnit.SECONDS));
ExcitingMessage msgOne = new ExcitingMessage("one"); ExcitingMessage msgOne = new ExcitingMessage("one");
ExcitingMessage msgTwo = new ExcitingMessage("two"); ExcitingMessage msgTwo = new ExcitingMessage("two");
bufferer.tell(msgOne); bufferer.tell(msgOne);
bufferer.tell(msgTwo); bufferer.tell(msgTwo);
probe.expectNoMessage(new FiniteDuration(1, TimeUnit.MILLISECONDS)); probe.expectNoMessage(Duration.ofMillis(1));
probe.expectMessage(new FiniteDuration(2, TimeUnit.SECONDS), probe.expectMessage(Duration.ofSeconds(2),
new Batch(Arrays.asList(msgOne, msgTwo))); new Batch(Arrays.asList(msgOne, msgTwo)));
Await.ready(system.terminate(), Duration.create(3, TimeUnit.SECONDS)); system.terminate();
system.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
} }

View file

@ -4,10 +4,14 @@
package akka.actor.typed package akka.actor.typed
import java.time
import akka.annotation.InternalApi import akka.annotation.InternalApi
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import akka.util.JavaDurationConverters._
object SupervisorStrategy { object SupervisorStrategy {
/** /**
@ -33,7 +37,7 @@ object SupervisorStrategy {
val stop: SupervisorStrategy = Stop(loggingEnabled = true) val stop: SupervisorStrategy = Stop(loggingEnabled = true)
/** /**
* Restart with a limit of number of restart retries. * Scala API: Restart with a limit of number of restart retries.
* The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`) * The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`)
* within a time range (`withinTimeRange`). When the time window has elapsed without reaching * within a time range (`withinTimeRange`). When the time window has elapsed without reaching
* `maxNrOfRetries` the restart count is reset. * `maxNrOfRetries` the restart count is reset.
@ -49,7 +53,23 @@ object SupervisorStrategy {
Restart(maxNrOfRetries, withinTimeRange, loggingEnabled = true) Restart(maxNrOfRetries, withinTimeRange, loggingEnabled = true)
/** /**
* It supports exponential back-off between the given `minBackoff` and * Java API: Restart with a limit of number of restart retries.
* The number of restarts are limited to a number of restart attempts (`maxNrOfRetries`)
* within a time range (`withinTimeRange`). When the time window has elapsed without reaching
* `maxNrOfRetries` the restart count is reset.
*
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* @param maxNrOfRetries the number of times a child actor is allowed to be restarted,
* if the limit is exceeded the child actor is stopped
* @param withinTimeRange duration of the time window for maxNrOfRetries
*/
def restartWithLimit(maxNrOfRetries: Int, withinTimeRange: java.time.Duration): SupervisorStrategy =
restartWithLimit(maxNrOfRetries, withinTimeRange.asScala)
/**
* Scala API: It supports exponential back-off between the given `minBackoff` and
* `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and
* `maxBackoff` 30 seconds the start attempts will be delayed with * `maxBackoff` 30 seconds the start attempts will be delayed with
* 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset
@ -81,6 +101,39 @@ object SupervisorStrategy {
randomFactor: Double): BackoffSupervisorStrategy = randomFactor: Double): BackoffSupervisorStrategy =
Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff, loggingEnabled = true) Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff, loggingEnabled = true)
/**
* Java API: It supports exponential back-off between the given `minBackoff` and
* `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and
* `maxBackoff` 30 seconds the start attempts will be delayed with
* 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset
* if the actor is not terminated within the `minBackoff` duration.
*
* In addition to the calculated exponential back-off an additional
* random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20%
* delay. The reason for adding a random delay is to avoid that all failing
* actors hit the backend resource at the same time.
*
* During the back-off incoming messages are dropped.
*
* If no new exception occurs within the `minBackoff` duration the exponentially
* increased back-off timeout is reset.
*
* The strategy is applied also if the actor behavior is deferred and throws an exception during
* startup.
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
*/
def restartWithBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double): BackoffSupervisorStrategy =
restartWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor)
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -124,6 +177,11 @@ object SupervisorStrategy {
override def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy = override def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy =
copy(resetBackoffAfter = timeout) copy(resetBackoffAfter = timeout)
override def withResetBackoffAfter(timeout: java.time.Duration): BackoffSupervisorStrategy =
withResetBackoffAfter(timeout.asScala)
override def getResetBackoffAfter: java.time.Duration = resetBackoffAfter.asJava
} }
} }
@ -136,10 +194,19 @@ sealed abstract class SupervisorStrategy {
sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy { sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy {
def resetBackoffAfter: FiniteDuration def resetBackoffAfter: FiniteDuration
def getResetBackoffAfter: java.time.Duration
/** /**
* The back-off algorithm is reset if the actor does not crash within the * Scala API: The back-off algorithm is reset if the actor does not crash within the
* specified `resetBackoffAfter`. By default, the `resetBackoffAfter` has * specified `resetBackoffAfter`. By default, the `resetBackoffAfter` has
* the same value as `minBackoff`. * the same value as `minBackoff`.
*/ */
def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy def withResetBackoffAfter(timeout: FiniteDuration): BackoffSupervisorStrategy
/**
* Java API: The back-off algorithm is reset if the actor does not crash within the
* specified `resetBackoffAfter`. By default, the `resetBackoffAfter` has
* the same value as `minBackoff`.
*/
def withResetBackoffAfter(timeout: java.time.Duration): BackoffSupervisorStrategy
} }

View file

@ -20,6 +20,7 @@ import scala.util.Try
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.util.OptionVal import akka.util.OptionVal
import akka.util.Timeout import akka.util.Timeout
import akka.util.JavaDurationConverters._
/** /**
* INTERNAL API * INTERNAL API
@ -68,6 +69,12 @@ import akka.util.Timeout
override def getLog: Logger = log override def getLog: Logger = log
override def setReceiveTimeout(d: java.time.Duration, msg: T): Unit =
setReceiveTimeout(d.asScala, msg)
override def schedule[U](delay: java.time.Duration, target: ActorRef[U], msg: U): akka.actor.Cancellable =
schedule(delay.asScala, target, msg)
override def spawn[U](behavior: akka.actor.typed.Behavior[U], name: String): akka.actor.typed.ActorRef[U] = override def spawn[U](behavior: akka.actor.typed.Behavior[U], name: String): akka.actor.typed.ActorRef[U] =
spawn(behavior, name, Props.empty) spawn(behavior, name, Props.empty)

View file

@ -12,6 +12,7 @@ import akka.actor.typed.ActorRef.ActorRefOps
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.util.JavaDurationConverters._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag import scala.reflect.ClassTag
@ -51,9 +52,15 @@ import scala.reflect.ClassTag
override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit = override def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit =
startTimer(key, msg, interval, repeat = true) startTimer(key, msg, interval, repeat = true)
override def startPeriodicTimer(key: Any, msg: T, interval: java.time.Duration): Unit =
startPeriodicTimer(key, msg, interval.asScala)
override def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit = override def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit =
startTimer(key, msg, timeout, repeat = false) startTimer(key, msg, timeout, repeat = false)
def startSingleTimer(key: Any, msg: T, timeout: java.time.Duration): Unit =
startSingleTimer(key, msg, timeout.asScala)
private def startTimer(key: Any, msg: T, timeout: FiniteDuration, repeat: Boolean): Unit = { private def startTimer(key: Any, msg: T, timeout: FiniteDuration, repeat: Boolean): Unit = {
timers.get(key) match { timers.get(key) match {
case Some(t) cancelTimer(t) case Some(t) cancelTimer(t)

View file

@ -4,6 +4,7 @@
package akka.actor.typed.javadsl package akka.actor.typed.javadsl
import java.time.Duration
import java.util.function.{ BiFunction, Function JFunction } import java.util.function.{ BiFunction, Function JFunction }
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
@ -12,8 +13,6 @@ import akka.actor.typed._
import java.util.Optional import java.util.Optional
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
/** /**
@ -181,13 +180,13 @@ trait ActorContext[T] {
/** /**
* Schedule the sending of a notification in case no other * Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew * message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this * with each received message. Use `cancelReceiveTimeout` to switch off this
* mechanism. * mechanism.
* *
* *Warning*: This method is not thread-safe and must not be accessed from threads other * *Warning*: This method is not thread-safe and must not be accessed from threads other
* than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks. * than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
*/ */
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit def setReceiveTimeout(d: Duration, msg: T): Unit
/** /**
* Cancel the sending of receive timeout notifications. * Cancel the sending of receive timeout notifications.
@ -208,7 +207,7 @@ trait ActorContext[T] {
* This method is thread-safe and can be called from other threads than the ordinary * This method is thread-safe and can be called from other threads than the ordinary
* actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks. * actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
*/ */
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable def schedule[U](delay: Duration, target: ActorRef[U], msg: U): akka.actor.Cancellable
/** /**
* This Actors execution context. It can be used to run asynchronous tasks * This Actors execution context. It can be used to run asynchronous tasks

View file

@ -4,7 +4,7 @@
package akka.actor.typed.javadsl package akka.actor.typed.javadsl
import scala.concurrent.duration.FiniteDuration import java.time.Duration
/** /**
* Support for scheduled `self` messages in an actor. * Support for scheduled `self` messages in an actor.
@ -26,7 +26,7 @@ trait TimerScheduler[T] {
* previous timer is not received, even though it might already be enqueued * previous timer is not received, even though it might already be enqueued
* in the mailbox when the new timer is started. * in the mailbox when the new timer is started.
*/ */
def startPeriodicTimer(key: Any, msg: T, interval: FiniteDuration): Unit def startPeriodicTimer(key: Any, msg: T, interval: Duration): Unit
/** /**
* * Start a timer that will send `msg` once to the `self` actor after * * Start a timer that will send `msg` once to the `self` actor after
@ -37,7 +37,7 @@ trait TimerScheduler[T] {
* previous timer is not received, even though it might already be enqueued * previous timer is not received, even though it might already be enqueued
* in the mailbox when the new timer is started. * in the mailbox when the new timer is started.
*/ */
def startSingleTimer(key: Any, msg: T, timeout: FiniteDuration): Unit def startSingleTimer(key: Any, msg: T, timeout: Duration): Unit
/** /**
* Check if a timer with a given `key` is active. * Check if a timer with a given `key` is active.

View file

@ -161,7 +161,7 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
/** /**
* Schedule the sending of a notification in case no other * Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew * message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this * with each received message. Use `cancelReceiveTimeout` to switch off this
* mechanism. * mechanism.
* *
* *Warning*: This method is not thread-safe and must not be accessed from threads other * *Warning*: This method is not thread-safe and must not be accessed from threads other

View file

@ -4,16 +4,18 @@
package akka.cluster.sharding.typed package akka.cluster.sharding.typed
import scala.concurrent.duration.FiniteDuration
import akka.actor.NoSerializationVerificationNeeded import akka.actor.NoSerializationVerificationNeeded
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.{ ClusterShardingSettings UntypedShardingSettings } import akka.cluster.sharding.{ ClusterShardingSettings UntypedShardingSettings }
import akka.cluster.singleton.{ ClusterSingletonManagerSettings UntypedClusterSingletonManagerSettings } import akka.cluster.singleton.{ ClusterSingletonManagerSettings UntypedClusterSingletonManagerSettings }
import akka.actor.typed.ActorSystem import akka.cluster.typed.Cluster
import akka.cluster.typed.{ Cluster, ClusterSingletonManagerSettings } import akka.cluster.typed.ClusterSingletonManagerSettings
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.cluster.ClusterSettings.DataCenter import akka.util.JavaDurationConverters._
import scala.concurrent.duration.FiniteDuration
object ClusterShardingSettings { object ClusterShardingSettings {
@ -147,21 +149,31 @@ object ClusterShardingSettings {
def withBufferSize(value: Int): TuningParameters = copy(bufferSize = value) def withBufferSize(value: Int): TuningParameters = copy(bufferSize = value)
def withCoordinatorFailureBackoff(value: FiniteDuration): TuningParameters = copy(coordinatorFailureBackoff = value) def withCoordinatorFailureBackoff(value: FiniteDuration): TuningParameters = copy(coordinatorFailureBackoff = value)
def withCoordinatorFailureBackoff(value: java.time.Duration): TuningParameters = withCoordinatorFailureBackoff(value.asScala)
def withEntityRecoveryConstantRateStrategyFrequency(value: FiniteDuration): TuningParameters = copy(entityRecoveryConstantRateStrategyFrequency = value) def withEntityRecoveryConstantRateStrategyFrequency(value: FiniteDuration): TuningParameters = copy(entityRecoveryConstantRateStrategyFrequency = value)
def withEntityRecoveryConstantRateStrategyFrequency(value: java.time.Duration): TuningParameters = withEntityRecoveryConstantRateStrategyFrequency(value.asScala)
def withEntityRecoveryConstantRateStrategyNumberOfEntities(value: Int): TuningParameters = copy(entityRecoveryConstantRateStrategyNumberOfEntities = value) def withEntityRecoveryConstantRateStrategyNumberOfEntities(value: Int): TuningParameters = copy(entityRecoveryConstantRateStrategyNumberOfEntities = value)
def withEntityRecoveryStrategy(value: java.lang.String): TuningParameters = copy(entityRecoveryStrategy = value) def withEntityRecoveryStrategy(value: java.lang.String): TuningParameters = copy(entityRecoveryStrategy = value)
def withEntityRestartBackoff(value: FiniteDuration): TuningParameters = copy(entityRestartBackoff = value) def withEntityRestartBackoff(value: FiniteDuration): TuningParameters = copy(entityRestartBackoff = value)
def withEntityRestartBackoff(value: java.time.Duration): TuningParameters = withEntityRestartBackoff(value.asScala)
def withHandOffTimeout(value: FiniteDuration): TuningParameters = copy(handOffTimeout = value) def withHandOffTimeout(value: FiniteDuration): TuningParameters = copy(handOffTimeout = value)
def withHandOffTimeout(value: java.time.Duration): TuningParameters = withHandOffTimeout(value.asScala)
def withKeepNrOfBatches(value: Int): TuningParameters = copy(keepNrOfBatches = value) def withKeepNrOfBatches(value: Int): TuningParameters = copy(keepNrOfBatches = value)
def withLeastShardAllocationMaxSimultaneousRebalance(value: Int): TuningParameters = copy(leastShardAllocationMaxSimultaneousRebalance = value) def withLeastShardAllocationMaxSimultaneousRebalance(value: Int): TuningParameters = copy(leastShardAllocationMaxSimultaneousRebalance = value)
def withLeastShardAllocationRebalanceThreshold(value: Int): TuningParameters = copy(leastShardAllocationRebalanceThreshold = value) def withLeastShardAllocationRebalanceThreshold(value: Int): TuningParameters = copy(leastShardAllocationRebalanceThreshold = value)
def withRebalanceInterval(value: FiniteDuration): TuningParameters = copy(rebalanceInterval = value) def withRebalanceInterval(value: FiniteDuration): TuningParameters = copy(rebalanceInterval = value)
def withRebalanceInterval(value: java.time.Duration): TuningParameters = withRebalanceInterval(value.asScala)
def withRetryInterval(value: FiniteDuration): TuningParameters = copy(retryInterval = value) def withRetryInterval(value: FiniteDuration): TuningParameters = copy(retryInterval = value)
def withRetryInterval(value: java.time.Duration): TuningParameters = withRetryInterval(value.asScala)
def withShardFailureBackoff(value: FiniteDuration): TuningParameters = copy(shardFailureBackoff = value) def withShardFailureBackoff(value: FiniteDuration): TuningParameters = copy(shardFailureBackoff = value)
def withShardFailureBackoff(value: java.time.Duration): TuningParameters = withShardFailureBackoff(value.asScala)
def withShardStartTimeout(value: FiniteDuration): TuningParameters = copy(shardStartTimeout = value) def withShardStartTimeout(value: FiniteDuration): TuningParameters = copy(shardStartTimeout = value)
def withShardStartTimeout(value: java.time.Duration): TuningParameters = withShardStartTimeout(value.asScala)
def withSnapshotAfter(value: Int): TuningParameters = copy(snapshotAfter = value) def withSnapshotAfter(value: Int): TuningParameters = copy(snapshotAfter = value)
def withUpdatingStateTimeout(value: FiniteDuration): TuningParameters = copy(updatingStateTimeout = value) def withUpdatingStateTimeout(value: FiniteDuration): TuningParameters = copy(updatingStateTimeout = value)
def withUpdatingStateTimeout(value: java.time.Duration): TuningParameters = withUpdatingStateTimeout(value.asScala)
def withWaitingForStateTimeout(value: FiniteDuration): TuningParameters = copy(waitingForStateTimeout = value) def withWaitingForStateTimeout(value: FiniteDuration): TuningParameters = copy(waitingForStateTimeout = value)
def withWaitingForStateTimeout(value: java.time.Duration): TuningParameters = withWaitingForStateTimeout(value.asScala)
private def copy( private def copy(
bufferSize: Int = bufferSize, bufferSize: Int = bufferSize,
@ -234,7 +246,8 @@ final class ClusterShardingSettings(
val tuningParameters: ClusterShardingSettings.TuningParameters, val tuningParameters: ClusterShardingSettings.TuningParameters,
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded { val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {
import akka.cluster.sharding.typed.ClusterShardingSettings.{ StateStoreModeDData, StateStoreModePersistence } import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModeDData
import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModePersistence
require( require(
stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData, stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData,
s"Unknown 'state-store-mode' [$stateStoreMode], " + s"Unknown 'state-store-mode' [$stateStoreMode], " +

View file

@ -16,6 +16,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.util.Timeout import akka.util.Timeout
import akka.util.JavaDurationConverters._
import akka.cluster.ddata.ReplicatedData import akka.cluster.ddata.ReplicatedData
import akka.actor.typed.Terminated import akka.actor.typed.Terminated
@ -67,8 +68,8 @@ import akka.actor.typed.Terminated
case cmd: JReplicator.Get[d] case cmd: JReplicator.Get[d]
implicit val timeout = Timeout(cmd.consistency.timeout match { implicit val timeout = Timeout(cmd.consistency.timeout match {
case Duration.Zero localAskTimeout case java.time.Duration.ZERO localAskTimeout
case t t + additionalAskTimeout case t t.asScala + additionalAskTimeout
}) })
import ctx.executionContext import ctx.executionContext
val reply = val reply =
@ -91,8 +92,8 @@ import akka.actor.typed.Terminated
case cmd: JReplicator.Update[d] case cmd: JReplicator.Update[d]
implicit val timeout = Timeout(cmd.writeConsistency.timeout match { implicit val timeout = Timeout(cmd.writeConsistency.timeout match {
case Duration.Zero localAskTimeout case java.time.Duration.ZERO localAskTimeout
case t t + additionalAskTimeout case t t.asScala + additionalAskTimeout
}) })
import ctx.executionContext import ctx.executionContext
val reply = val reply =
@ -146,8 +147,8 @@ import akka.actor.typed.Terminated
case cmd: JReplicator.Delete[d] case cmd: JReplicator.Delete[d]
implicit val timeout = Timeout(cmd.consistency.timeout match { implicit val timeout = Timeout(cmd.consistency.timeout match {
case Duration.Zero localAskTimeout case java.time.Duration.ZERO localAskTimeout
case t t + additionalAskTimeout case t t.asScala + additionalAskTimeout
}) })
import ctx.executionContext import ctx.executionContext
val reply = val reply =

View file

@ -4,27 +4,23 @@
package akka.cluster.ddata.typed.javadsl package akka.cluster.ddata.typed.javadsl
import java.util.function.{ Function JFunction } import java.time.Duration
import akka.cluster.{ ddata dd }
import akka.cluster.ddata.Key
import akka.cluster.ddata.ReplicatedData
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.ddata.typed.internal.ReplicatorBehavior
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.Duration
import java.util.Optional import java.util.Optional
import java.util.function.{ Function JFunction } import java.util.function.{ Function JFunction }
import akka.actor.{ DeadLetterSuppression, NoSerializationVerificationNeeded } import scala.util.control.NoStackTrace
import akka.actor.typed.{ ActorRef, Behavior }
import akka.annotation.{ DoNotInherit, InternalApi } import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.DeadLetterSuppression
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.ddata.Key
import akka.cluster.ddata.ReplicatedData
import akka.cluster.ddata.typed.internal.ReplicatorBehavior import akka.cluster.ddata.typed.internal.ReplicatorBehavior
import akka.cluster.{ ddata dd } import akka.cluster.{ ddata dd }
import akka.util.JavaDurationConverters._
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.control.NoStackTrace
/** /**
* @see [[akka.cluster.ddata.Replicator]]. * @see [[akka.cluster.ddata.Replicator]].
@ -48,61 +44,61 @@ object Replicator {
@DoNotInherit trait Command extends akka.cluster.ddata.typed.scaladsl.Replicator.Command @DoNotInherit trait Command extends akka.cluster.ddata.typed.scaladsl.Replicator.Command
sealed trait ReadConsistency { sealed trait ReadConsistency {
def timeout: FiniteDuration def timeout: Duration
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] def toUntyped: dd.Replicator.ReadConsistency @InternalApi private[akka] def toUntyped: dd.Replicator.ReadConsistency
} }
case object ReadLocal extends ReadConsistency { case object ReadLocal extends ReadConsistency {
override def timeout: FiniteDuration = Duration.Zero override def timeout: Duration = Duration.ZERO
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.ReadLocal @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadLocal
} }
final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency { final case class ReadFrom(n: Int, timeout: Duration) extends ReadConsistency {
require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1") require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1")
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.ReadFrom(n, timeout) @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadFrom(n, timeout.asScala)
} }
final case class ReadMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency { final case class ReadMajority(timeout: Duration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency {
def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) def this(timeout: Duration) = this(timeout, DefaultMajorityMinCap)
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.ReadMajority(timeout, minCap) @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadMajority(timeout.asScala, minCap)
} }
final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency { final case class ReadAll(timeout: Duration) extends ReadConsistency {
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.ReadAll(timeout) @InternalApi private[akka] override def toUntyped = dd.Replicator.ReadAll(timeout.asScala)
} }
sealed trait WriteConsistency { sealed trait WriteConsistency {
def timeout: FiniteDuration def timeout: Duration
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] def toUntyped: dd.Replicator.WriteConsistency @InternalApi private[akka] def toUntyped: dd.Replicator.WriteConsistency
} }
case object WriteLocal extends WriteConsistency { case object WriteLocal extends WriteConsistency {
override def timeout: FiniteDuration = Duration.Zero override def timeout: Duration = Duration.ZERO
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.WriteLocal @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteLocal
} }
final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency { final case class WriteTo(n: Int, timeout: Duration) extends WriteConsistency {
require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1") require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1")
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.WriteTo(n, timeout) @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteTo(n, timeout.asScala)
} }
final case class WriteMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency { final case class WriteMajority(timeout: Duration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency {
def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) def this(timeout: Duration) = this(timeout, DefaultMajorityMinCap)
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.WriteMajority(timeout, minCap) @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteMajority(timeout.asScala, minCap)
} }
final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency { final case class WriteAll(timeout: Duration) extends WriteConsistency {
/** INTERNAL API */ /** INTERNAL API */
@InternalApi private[akka] override def toUntyped = dd.Replicator.WriteAll(timeout) @InternalApi private[akka] override def toUntyped = dd.Replicator.WriteAll(timeout.asScala)
} }
/** /**

View file

@ -10,6 +10,7 @@ import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonManagerSettings UntypedClusterSingletonManagerSettings } import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonManagerSettings UntypedClusterSingletonManagerSettings }
import akka.cluster.typed.internal.AdaptedClusterSingletonImpl import akka.cluster.typed.internal.AdaptedClusterSingletonImpl
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -60,8 +61,10 @@ final class ClusterSingletonSettings(
def withNoDataCenter(): ClusterSingletonSettings = copy(dataCenter = None) def withNoDataCenter(): ClusterSingletonSettings = copy(dataCenter = None)
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonSettings = copy(removalMargin = removalMargin) def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonSettings = copy(removalMargin = removalMargin)
def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonSettings = withRemovalMargin(removalMargin.asScala)
def withHandoverRetryInterval(handOverRetryInterval: FiniteDuration): ClusterSingletonSettings = copy(handOverRetryInterval = handOverRetryInterval) def withHandoverRetryInterval(handOverRetryInterval: FiniteDuration): ClusterSingletonSettings = copy(handOverRetryInterval = handOverRetryInterval)
def withHandoverRetryInterval(handOverRetryInterval: java.time.Duration): ClusterSingletonSettings = withHandoverRetryInterval(handOverRetryInterval.asScala)
def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize) def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize)
@ -113,7 +116,7 @@ object ClusterSingleton extends ExtensionId[ClusterSingleton] {
*/ */
@InternalApi @InternalApi
private[akka] object ClusterSingletonImpl { private[akka] object ClusterSingletonImpl {
def managerNameFor(singletonName: String) = s"singletonManager${singletonName}" def managerNameFor(singletonName: String) = s"singletonManager$singletonName"
} }
/** /**
@ -213,13 +216,17 @@ final class ClusterSingletonManagerSettings(
def withRole(role: String): ClusterSingletonManagerSettings = copy(role = UntypedClusterSingletonManagerSettings.roleOption(role)) def withRole(role: String): ClusterSingletonManagerSettings = copy(role = UntypedClusterSingletonManagerSettings.roleOption(role))
def withRole(role: Option[String]) = copy(role = role) def withRole(role: Option[String]): ClusterSingletonManagerSettings = copy(role = role)
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings = def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings =
copy(removalMargin = removalMargin) copy(removalMargin = removalMargin)
def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonManagerSettings =
withRemovalMargin(removalMargin.asScala)
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings = def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
copy(handOverRetryInterval = retryInterval) copy(handOverRetryInterval = retryInterval)
def withHandOverRetryInterval(retryInterval: java.time.Duration): ClusterSingletonManagerSettings =
withHandOverRetryInterval(retryInterval.asScala)
private def copy( private def copy(
singletonName: String = singletonName, singletonName: String = singletonName,

View file

@ -13,10 +13,9 @@ import akka.testkit.typed.javadsl.TestKitJunitResource;
import akka.testkit.typed.javadsl.TestProbe; import akka.testkit.typed.javadsl.TestProbe;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -200,7 +199,7 @@ public class PersistentActorTest {
}) })
.matchCommand(IncrementLater.class, (ctx, state, command) -> { .matchCommand(IncrementLater.class, (ctx, state, command) -> {
ActorRef<Object> delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> { ActorRef<Object> delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> {
timers.startSingleTimer(Tick.instance, Tick.instance, FiniteDuration.create(10, TimeUnit.MILLISECONDS)); timers.startSingleTimer(Tick.instance, Tick.instance, Duration.ofMillis(10));
return Behaviors.receive((context, o) -> Behaviors.stopped()); return Behaviors.receive((context, o) -> Behaviors.stopped());
})); }));
ctx.watchWith(delay, new DelayFinished()); ctx.watchWith(delay, new DelayFinished());
@ -208,7 +207,7 @@ public class PersistentActorTest {
}) })
.matchCommand(DelayFinished.class, (ctx, state, finished) -> Effect().persist(new Incremented(10))) .matchCommand(DelayFinished.class, (ctx, state, finished) -> Effect().persist(new Incremented(10)))
.matchCommand(Increment100OnTimeout.class, (ctx, state, msg) -> { .matchCommand(Increment100OnTimeout.class, (ctx, state, msg) -> {
ctx.setReceiveTimeout(FiniteDuration.create(10, TimeUnit.MILLISECONDS), new Timeout()); ctx.setReceiveTimeout(Duration.ofMillis(10), new Timeout());
return Effect().none(); return Effect().none();
}) })
.matchCommand(Timeout.class, .matchCommand(Timeout.class,
@ -334,7 +333,7 @@ public class PersistentActorTest {
TestProbe<State> probe = testKit.createTestProbe(); TestProbe<State> probe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("c12")); ActorRef<Command> c = testKit.spawn(counter("c12"));
c.tell(new StopThenLog()); c.tell(new StopThenLog());
probe.expectTerminated(c, FiniteDuration.create(1, TimeUnit.SECONDS)); probe.expectTerminated(c, Duration.ofSeconds(1));
} }
// FIXME test with by state command handler // FIXME test with by state command handler
} }

View file

@ -64,7 +64,14 @@ final class TestKitSettings(val config: Config) {
val ThrowOnShutdownTimeout: Boolean = config.getBoolean("throw-on-shutdown-timeout") val ThrowOnShutdownTimeout: Boolean = config.getBoolean("throw-on-shutdown-timeout")
/** /**
* Scale the `duration` with the configured `TestTimeFactor` * Scala API: Scale the `duration` with the configured `TestTimeFactor`
*/ */
def dilated(duration: FiniteDuration): FiniteDuration = (duration * TestTimeFactor).asInstanceOf[FiniteDuration] def dilated(duration: FiniteDuration): FiniteDuration =
(duration * TestTimeFactor).asInstanceOf[FiniteDuration]
/**
* Java API: Scale the `duration` with the configured `TestTimeFactor`
*/
def dilated(duration: java.time.Duration): java.time.Duration =
java.time.Duration.ofMillis((duration.toMillis * TestTimeFactor).toLong)
} }

View file

@ -4,6 +4,7 @@
package akka.testkit.typed.internal package akka.testkit.typed.internal
import java.time
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque } import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque }
import java.util.function.Supplier import java.util.function.Supplier
@ -16,7 +17,7 @@ import akka.testkit.typed.scaladsl.{ TestDuration, TestProbe ⇒ ScalaTestProbe
import akka.testkit.typed.{ FishingOutcome, TestKitSettings } import akka.testkit.typed.{ FishingOutcome, TestKitSettings }
import akka.util.PrettyDuration._ import akka.util.PrettyDuration._
import akka.util.{ BoxedType, Timeout } import akka.util.{ BoxedType, Timeout }
import akka.util.JavaDurationConverters._
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -60,26 +61,33 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
private val testActor: ActorRef[M] = { private val testActor: ActorRef[M] = {
// FIXME arbitrary timeout? // FIXME arbitrary timeout?
implicit val timeout = Timeout(3.seconds) implicit val timeout: Timeout = Timeout(3.seconds)
val futRef = system.systemActorOf(TestProbeImpl.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}") val futRef = system.systemActorOf(TestProbeImpl.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}")
Await.result(futRef, timeout.duration + 1.second) Await.result(futRef, timeout.duration + 1.second)
} }
override def ref = testActor override def ref: ActorRef[M] = testActor
override def remainingOrDefault = remainingOr(settings.SingleExpectDefaultTimeout.dilated) override def remainingOrDefault: FiniteDuration = remainingOr(settings.SingleExpectDefaultTimeout.dilated)
override def getRemainingOrDefault: java.time.Duration = remainingOrDefault.asJava
override def remaining: FiniteDuration = end match { override def remaining: FiniteDuration = end match {
case f: FiniteDuration f - now case f: FiniteDuration f - now
case _ throw new AssertionError("`remaining` may not be called outside of `within`") case _ throw new AssertionError("`remaining` may not be called outside of `within`")
} }
override def getRemaining: java.time.Duration = remaining.asJava
override def remainingOr(duration: FiniteDuration): FiniteDuration = end match { override def remainingOr(duration: FiniteDuration): FiniteDuration = end match {
case x if x eq Duration.Undefined duration case x if x eq Duration.Undefined duration
case x if !x.isFinite throw new IllegalArgumentException("`end` cannot be infinite") case x if !x.isFinite throw new IllegalArgumentException("`end` cannot be infinite")
case f: FiniteDuration f - now case f: FiniteDuration f - now
} }
override def getRemainingOr(duration: java.time.Duration): java.time.Duration =
remainingOr(duration.asScala).asJava
private def remainingOrDilated(max: Duration): FiniteDuration = max match { private def remainingOrDilated(max: Duration): FiniteDuration = max match {
case x if x eq Duration.Undefined remainingOrDefault case x if x eq Duration.Undefined remainingOrDefault
case x if !x.isFinite throw new IllegalArgumentException("max duration cannot be infinite") case x if !x.isFinite throw new IllegalArgumentException("max duration cannot be infinite")
@ -113,9 +121,15 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
override def expectMessage[T <: M](max: FiniteDuration, obj: T): T = expectMessage_internal(max.dilated, obj) override def expectMessage[T <: M](max: FiniteDuration, obj: T): T = expectMessage_internal(max.dilated, obj)
override def expectMessage[T <: M](max: java.time.Duration, obj: T): T =
expectMessage(max.asScala, obj)
override def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T = override def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T =
expectMessage_internal(max.dilated, obj, Some(hint)) expectMessage_internal(max.dilated, obj, Some(hint))
override def expectMessage[T <: M](max: java.time.Duration, hint: String, obj: T): T =
expectMessage(max.asScala, hint, obj)
private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = { private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = {
val o = receiveOne(max) val o = receiveOne(max)
val hintOrEmptyString = hint.map(": " + _).getOrElse("") val hintOrEmptyString = hint.map(": " + _).getOrElse("")
@ -144,9 +158,14 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
message message
} }
override def expectNoMessage(max: FiniteDuration): Unit = { expectNoMessage_internal(max) } override def expectNoMessage(max: FiniteDuration): Unit =
expectNoMessage_internal(max)
override def expectNoMessage(): Unit = { expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated) } override def expectNoMessage(max: java.time.Duration): Unit =
expectNoMessage(max.asScala)
override def expectNoMessage(): Unit =
expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated)
private def expectNoMessage_internal(max: FiniteDuration) { private def expectNoMessage_internal(max: FiniteDuration) {
val o = receiveOne(max) val o = receiveOne(max)
@ -210,8 +229,11 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop") assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop")
} }
override def awaitAssert[A](max: Duration, interval: Duration, supplier: Supplier[A]): A = override def expectTerminated[U](actorRef: ActorRef[U], max: java.time.Duration): Unit =
awaitAssert(supplier.get(), max, interval) expectTerminated(actorRef, max.asScala)
override def awaitAssert[A](max: java.time.Duration, interval: java.time.Duration, supplier: Supplier[A]): A =
awaitAssert(supplier.get(), if (max == java.time.Duration.ZERO) Duration.Undefined else max.asScala, interval.asScala)
override def awaitAssert[A](a: A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = { override def awaitAssert[A](a: A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = {
val _max = remainingOrDilated(max) val _max = remainingOrDilated(max)

View file

@ -4,6 +4,8 @@
package akka.testkit.typed.javadsl package akka.testkit.typed.javadsl
import java.time.Duration
import akka.actor.Scheduler import akka.actor.Scheduler
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.testkit.typed.TestKitSettings import akka.testkit.typed.TestKitSettings
@ -11,8 +13,7 @@ import akka.testkit.typed.internal.TestKitUtils
import akka.testkit.typed.scaladsl.{ ActorTestKit ScalaTestKit } import akka.testkit.typed.scaladsl.{ ActorTestKit ScalaTestKit }
import akka.util.Timeout import akka.util.Timeout
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.JavaDurationConverters._
import scala.concurrent.duration.Duration
object ActorTestKit { object ActorTestKit {
@ -53,7 +54,7 @@ object ActorTestKit {
* no exception is thrown. * no exception is thrown.
*/ */
def shutdown(system: ActorSystem[_], duration: Duration, throwIfShutdownTimesOut: Boolean): Unit = { def shutdown(system: ActorSystem[_], duration: Duration, throwIfShutdownTimesOut: Boolean): Unit = {
TestKitUtils.shutdown(system, duration, throwIfShutdownTimesOut) TestKitUtils.shutdown(system, duration.asScala, throwIfShutdownTimesOut)
} }
/** /**
@ -75,7 +76,7 @@ object ActorTestKit {
val settings = TestKitSettings.create(system) val settings = TestKitSettings.create(system)
shutdown( shutdown(
system, system,
settings.DefaultActorSystemShutdownTimeout, settings.DefaultActorSystemShutdownTimeout.asJava,
settings.ThrowOnShutdownTimeout settings.ThrowOnShutdownTimeout
) )
} }

View file

@ -4,10 +4,11 @@
package akka.testkit.typed.javadsl package akka.testkit.typed.javadsl
import java.time.Duration
import akka.actor.typed.{ ActorRef, Behavior, Props } import akka.actor.typed.{ ActorRef, Behavior, Props }
import akka.testkit.typed.Effect import akka.testkit.typed.Effect
import akka.util.JavaDurationConverters._
import scala.concurrent.duration.{ Duration, FiniteDuration }
/** /**
* Factories for behavior effects for [[BehaviorTestKit]], each effect has a suitable equals and can be used to compare * Factories for behavior effects for [[BehaviorTestKit]], each effect has a suitable equals and can be used to compare
@ -57,14 +58,14 @@ object Effects {
/** /**
* The behavior set a new receive timeout, with `msg` as timeout notification * The behavior set a new receive timeout, with `msg` as timeout notification
*/ */
def receiveTimeoutSet[T](d: Duration, msg: T): Effect = ReceiveTimeoutSet(d, msg) def receiveTimeoutSet[T](d: Duration, msg: T): Effect = ReceiveTimeoutSet(d.asScala, msg)
/** /**
* The behavior used `ctx.schedule` to schedule `msg` to be sent to `target` after `delay` * The behavior used `ctx.schedule` to schedule `msg` to be sent to `target` after `delay`
* FIXME what about events scheduled through the scheduler? * FIXME what about events scheduled through the scheduler?
*/ */
def scheduled[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Effect = def scheduled[U](delay: Duration, target: ActorRef[U], msg: U): Effect =
Scheduled(delay, target, msg) Scheduled(delay.asScala, target, msg)
/** /**
* Used to represent an empty list of effects - in other words, the behavior didn't do anything observable * Used to represent an empty list of effects - in other words, the behavior didn't do anything observable

View file

@ -4,11 +4,11 @@
package akka.testkit.typed.javadsl package akka.testkit.typed.javadsl
import java.time.Duration
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.JavaDurationConverters._
import scala.annotation.varargs import scala.annotation.varargs
import scala.concurrent.duration.{ Duration, FiniteDuration }
/** /**
* Manual time allows you to do async tests while controlling the scheduler of the system. * Manual time allows you to do async tests while controlling the scheduler of the system.
@ -48,12 +48,12 @@ final class ManualTime(delegate: akka.testkit.ExplicitlyTriggeredScheduler) {
* If you want the amount of time passed to be dilated, apply the dilation before passing the delay to * If you want the amount of time passed to be dilated, apply the dilation before passing the delay to
* this method. * this method.
*/ */
def timePasses(amount: FiniteDuration): Unit = delegate.timePasses(amount) def timePasses(amount: Duration): Unit = delegate.timePasses(amount.asScala)
@varargs @varargs
def expectNoMessageFor(duration: FiniteDuration, on: TestProbe[_]*): Unit = { def expectNoMessageFor(duration: Duration, on: TestProbe[_]*): Unit = {
delegate.timePasses(duration) delegate.timePasses(duration.asScala)
on.foreach(_.expectNoMessage(Duration.Zero)) on.foreach(_.expectNoMessage(Duration.ZERO))
} }
} }

View file

@ -4,6 +4,7 @@
package akka.testkit.typed.javadsl package akka.testkit.typed.javadsl
import java.time.Duration
import java.util.function.Supplier import java.util.function.Supplier
import akka.actor.typed.{ ActorRef, ActorSystem } import akka.actor.typed.{ ActorRef, ActorSystem }
@ -11,10 +12,11 @@ import akka.annotation.DoNotInherit
import akka.testkit.typed.internal.TestProbeImpl import akka.testkit.typed.internal.TestProbeImpl
import akka.testkit.typed.{ FishingOutcome, TestKitSettings } import akka.testkit.typed.{ FishingOutcome, TestKitSettings }
import akka.testkit.typed.scaladsl.TestDuration import akka.testkit.typed.scaladsl.TestDuration
import akka.util.JavaDurationConverters._
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration
import akka.annotation.InternalApi
object FishingOutcomes { object FishingOutcomes {
/** /**
@ -81,20 +83,20 @@ abstract class TestProbe[M] {
* block or missing that it returns the properly dilated default for this * block or missing that it returns the properly dilated default for this
* case from settings (key "akka.actor.typed.test.single-expect-default"). * case from settings (key "akka.actor.typed.test.single-expect-default").
*/ */
def remainingOrDefault: FiniteDuration def getRemainingOrDefault: Duration
/** /**
* Obtain time remaining for execution of the innermost enclosing `within` * Obtain time remaining for execution of the innermost enclosing `within`
* block or throw an [[AssertionError]] if no `within` block surrounds this * block or throw an [[AssertionError]] if no `within` block surrounds this
* call. * call.
*/ */
def remaining: FiniteDuration def getRemaining: Duration
/** /**
* Obtain time remaining for execution of the innermost enclosing `within` * Obtain time remaining for execution of the innermost enclosing `within`
* block or missing that it returns the given duration. * block or missing that it returns the given duration.
*/ */
def remainingOr(duration: FiniteDuration): FiniteDuration def getRemainingOr(duration: Duration): Duration
/** /**
* Execute code block while bounding its execution time between `min` and * Execute code block while bounding its execution time between `min` and
@ -112,16 +114,19 @@ abstract class TestProbe[M] {
* } * }
* }}} * }}}
*/ */
def within[T](min: FiniteDuration, max: FiniteDuration)(f: Supplier[T]): T = def within[T](min: Duration, max: Duration)(f: Supplier[T]): T =
within_internal(min, max, f.get()) within_internal(min.asScala, max.asScala, f.get())
/** /**
* Same as calling `within(0 seconds, max)(f)`. * Same as calling `within(0 seconds, max)(f)`.
*/ */
def within[T](max: FiniteDuration)(f: Supplier[T]): T = def within[T](max: Duration)(f: Supplier[T]): T =
within_internal(Duration.Zero, max, f.get()) within_internal(scala.concurrent.duration.Duration.Zero, max.asScala, f.get())
protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: T): T /**
* INTERNAL API
*/
@InternalApi protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: T): T
/** /**
* Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor. * Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor.
@ -135,7 +140,7 @@ abstract class TestProbe[M] {
* *
* @return the received object * @return the received object
*/ */
def expectMessage[T <: M](max: FiniteDuration, obj: T): T def expectMessage[T <: M](max: Duration, obj: T): T
/** /**
* Receive one message from the test actor and assert that it equals the * Receive one message from the test actor and assert that it equals the
@ -144,13 +149,13 @@ abstract class TestProbe[M] {
* *
* @return the received object * @return the received object
*/ */
def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T def expectMessage[T <: M](max: Duration, hint: String, obj: T): T
/** /**
* Assert that no message is received for the specified time. * Assert that no message is received for the specified time.
* Supplied value is not dilated. * Supplied value is not dilated.
*/ */
def expectNoMessage(max: FiniteDuration): Unit def expectNoMessage(max: Duration): Unit
/** /**
* Assert that no message is received. Waits for the default period configured as `akka.actor.typed.test.expect-no-message-default` * Assert that no message is received. Waits for the default period configured as `akka.actor.typed.test.expect-no-message-default`
@ -162,7 +167,7 @@ abstract class TestProbe[M] {
* Expect the given actor to be stopped or stop withing the given timeout or * Expect the given actor to be stopped or stop withing the given timeout or
* throw an [[AssertionError]]. * throw an [[AssertionError]].
*/ */
def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit def expectTerminated[U](actorRef: ActorRef[U], max: Duration): Unit
/** /**
* Evaluate the given assert every `interval` until it does not throw an exception and return the * Evaluate the given assert every `interval` until it does not throw an exception and return the
@ -183,14 +188,14 @@ abstract class TestProbe[M] {
* Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "akka.test.timefactor". * Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "akka.test.timefactor".
*/ */
def awaitAssert[A](max: Duration, supplier: Supplier[A]): A = def awaitAssert[A](max: Duration, supplier: Supplier[A]): A =
awaitAssert(max, 100.millis, supplier) awaitAssert(max, Duration.ofMillis(100), supplier)
/** /**
* Evaluate the given assert every 100 milliseconds until it does not throw an exception and return the * Evaluate the given assert every 100 milliseconds until it does not throw an exception and return the
* result. A max time is taken it from the innermost enclosing `within` block. * result. A max time is taken it from the innermost enclosing `within` block.
*/ */
def awaitAssert[A](supplier: Supplier[A]): A = def awaitAssert[A](supplier: Supplier[A]): A =
awaitAssert(Duration.Undefined, supplier) awaitAssert(Duration.ZERO, supplier)
// FIXME awaitAssert(Procedure): Unit would be nice for java people to not have to return null // FIXME awaitAssert(Procedure): Unit would be nice for java people to not have to return null
@ -198,16 +203,19 @@ abstract class TestProbe[M] {
* Same as `expectMessageType(clazz, remainingOrDefault)`, but correctly treating the timeFactor. * Same as `expectMessageType(clazz, remainingOrDefault)`, but correctly treating the timeFactor.
*/ */
def expectMessageClass[T](clazz: Class[T]): T = def expectMessageClass[T](clazz: Class[T]): T =
expectMessageClass_internal(remainingOrDefault, clazz) expectMessageClass_internal(getRemainingOrDefault.asScala, clazz)
/** /**
* Wait for a message of type M and return it when it arrives, or fail if the `max` timeout is hit. * Wait for a message of type M and return it when it arrives, or fail if the `max` timeout is hit.
* The timeout is dilated. * The timeout is dilated.
*/ */
def expectMessageClass[T](clazz: Class[T], max: FiniteDuration): T = def expectMessageClass[T](clazz: Class[T], max: Duration): T =
expectMessageClass_internal(max.dilated, clazz) expectMessageClass_internal(max.asScala.dilated, clazz)
protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C /**
* INTERNAL API
*/
@InternalApi protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C
/** /**
* Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming * Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming
@ -226,15 +234,18 @@ abstract class TestProbe[M] {
* The timeout is dilated. * The timeout is dilated.
* @return The messages accepted in the order they arrived * @return The messages accepted in the order they arrived
*/ */
def fishForMessage(max: FiniteDuration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = def fishForMessage(max: Duration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] =
fishForMessage(max, "", fisher) fishForMessage(max, "", fisher)
/** /**
* Same as the other `fishForMessageJava` but includes the provided hint in all error messages * Same as the other `fishForMessageJava` but includes the provided hint in all error messages
*/ */
def fishForMessage(max: FiniteDuration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = def fishForMessage(max: Duration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] =
fishForMessage_internal(max, hint, fisher.apply).asJava fishForMessage_internal(max.asScala, hint, fisher.apply).asJava
protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M FishingOutcome): List[M] /**
* INTERNAL API
*/
@InternalApi protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M FishingOutcome): List[M]
} }

View file

@ -5,14 +5,13 @@
package akka.testkit.typed.javadsl; package akka.testkit.typed.javadsl;
//#manual-scheduling-simple //#manual-scheduling-simple
import java.util.concurrent.TimeUnit;
import akka.actor.typed.Behavior; import akka.actor.typed.Behavior;
import akka.testkit.typed.javadsl.ManualTime; import akka.testkit.typed.javadsl.ManualTime;
import akka.testkit.typed.javadsl.TestKitJunitResource; import akka.testkit.typed.javadsl.TestKitJunitResource;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.Duration; import java.time.Duration;
import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Behaviors;
@ -34,7 +33,7 @@ public class ManualTimerExampleTest extends JUnitSuite {
public void testScheduleNonRepeatedTicks() { public void testScheduleNonRepeatedTicks() {
TestProbe<Tock> probe = testKit.createTestProbe(); TestProbe<Tock> probe = testKit.createTestProbe();
Behavior<Tick> behavior = Behaviors.withTimers(timer -> { Behavior<Tick> behavior = Behaviors.withTimers(timer -> {
timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS)); timer.startSingleTimer("T", new Tick(), Duration.ofMillis(10));
return Behaviors.receive( (ctx, tick) -> { return Behaviors.receive( (ctx, tick) -> {
probe.ref().tell(new Tock()); probe.ref().tell(new Tock());
return Behaviors.same(); return Behaviors.same();
@ -43,12 +42,12 @@ public class ManualTimerExampleTest extends JUnitSuite {
testKit.spawn(behavior); testKit.spawn(behavior);
manualTime.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe); manualTime.expectNoMessageFor(Duration.ofMillis(9), probe);
manualTime.timePasses(Duration.create(2, TimeUnit.MILLISECONDS)); manualTime.timePasses(Duration.ofMillis(2));
probe.expectMessageClass(Tock.class); probe.expectMessageClass(Tock.class);
manualTime.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe); manualTime.expectNoMessageFor(Duration.ofSeconds(10), probe);
} }

View file

@ -6,10 +6,8 @@ package akka.testkit.typed.javadsl;
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem; import akka.actor.typed.ActorSystem;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit; import java.time.Duration;
public class TestProbeTest { public class TestProbeTest {
@ -21,12 +19,12 @@ public class TestProbeTest {
// ... something ... // ... something ...
return null; return null;
}); });
probe.awaitAssert(FiniteDuration.apply(3, TimeUnit.SECONDS), () -> { probe.awaitAssert(Duration.ofSeconds(3), () -> {
// ... something ... // ... something ...
return null; return null;
}); });
String awaitAssertResult = String awaitAssertResult =
probe.awaitAssert(FiniteDuration.apply(3, TimeUnit.SECONDS), FiniteDuration.apply(100, TimeUnit.MILLISECONDS), () -> { probe.awaitAssert(Duration.ofSeconds(3), Duration.ofMillis(100), () -> {
// ... something ... // ... something ...
return "some result"; return "some result";
}); });
@ -35,16 +33,16 @@ public class TestProbeTest {
probe.expectNoMessage(); probe.expectNoMessage();
ActorRef<String> ref = null; ActorRef<String> ref = null;
probe.expectTerminated(ref, FiniteDuration.apply(1, TimeUnit.SECONDS)); probe.expectTerminated(ref, Duration.ofSeconds(1));
FiniteDuration remaining = probe.remaining(); Duration remaining = probe.getRemaining();
probe.fishForMessage(FiniteDuration.apply(3, TimeUnit.SECONDS), "hint", (msg) -> { probe.fishForMessage(Duration.ofSeconds(3), "hint", (msg) -> {
if (msg.equals("one")) return FishingOutcomes.continueAndIgnore(); if (msg.equals("one")) return FishingOutcomes.continueAndIgnore();
else if (msg.equals("two")) return FishingOutcomes.complete(); else if (msg.equals("two")) return FishingOutcomes.complete();
else return FishingOutcomes.fail("error"); else return FishingOutcomes.fail("error");
}); });
String withinResult = probe.within(FiniteDuration.apply(3, TimeUnit.SECONDS), () -> { String withinResult = probe.within(Duration.ofSeconds(3), () -> {
// ... something ... // ... something ...
return "result"; return "result";
}); });