diff --git a/akka-actor-tests/src/test/java/akka/actor/ActorSelectionTest.java b/akka-actor-tests/src/test/java/akka/actor/ActorSelectionTest.java index aa54d0757b..78253e019b 100644 --- a/akka-actor-tests/src/test/java/akka/actor/ActorSelectionTest.java +++ b/akka-actor-tests/src/test/java/akka/actor/ActorSelectionTest.java @@ -9,7 +9,7 @@ import akka.testkit.AkkaSpec; import org.junit.ClassRule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -28,7 +28,7 @@ public class ActorSelectionTest extends JUnitSuite { public void testResolveOneCS() throws Exception { ActorRef actorRef = system.actorOf(Props.create(JavaAPITestActor.class), "ref1"); ActorSelection selection = system.actorSelection("user/ref1"); - FiniteDuration timeout = new FiniteDuration(10, TimeUnit.MILLISECONDS); + Duration timeout = Duration.ofMillis(10); CompletionStage cs = selection.resolveOneCS(timeout); diff --git a/akka-actor-tests/src/test/java/akka/actor/InboxJavaAPITest.java b/akka-actor-tests/src/test/java/akka/actor/InboxJavaAPITest.java index 698a13c99a..5983f9dc20 100644 --- a/akka-actor-tests/src/test/java/akka/actor/InboxJavaAPITest.java +++ b/akka-actor-tests/src/test/java/akka/actor/InboxJavaAPITest.java @@ -4,14 +4,13 @@ package akka.actor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.time.Duration; import org.junit.ClassRule; import org.junit.Test; import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaSpec; import org.scalatest.junit.JUnitSuite; -import scala.concurrent.duration.FiniteDuration; public class InboxJavaAPITest extends JUnitSuite { @@ -24,7 +23,7 @@ public class InboxJavaAPITest extends JUnitSuite { @Test(expected = TimeoutException.class) public void mustBeAbleToThrowTimeoutException() throws TimeoutException { Inbox inbox = Inbox.create(system); - inbox.receive(new FiniteDuration(10, TimeUnit.MILLISECONDS)); + inbox.receive(Duration.ofMillis(10)); } } diff --git a/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java b/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java index ab3858ff9c..5d82a47355 100644 --- a/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java +++ b/akka-actor-tests/src/test/java/akka/pattern/CircuitBreakerTest.java @@ -7,18 +7,18 @@ package akka.pattern; import akka.actor.*; import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaSpec; +import akka.util.JavaDurationConverters; import org.junit.ClassRule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import scala.compat.java8.FutureConverters; import scala.concurrent.Await; -import scala.concurrent.duration.FiniteDuration; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; +import java.time.Duration; import static org.junit.Assert.assertEquals; @@ -32,20 +32,20 @@ public class CircuitBreakerTest extends JUnitSuite { @Test public void useCircuitBreakerWithCompletableFuture() throws Exception { - final FiniteDuration fiveSeconds = FiniteDuration.create(5, TimeUnit.SECONDS); - final FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS); + final Duration fiveSeconds = Duration.ofSeconds(5); + final Duration fiveHundredMillis = Duration.ofMillis(500); final CircuitBreaker breaker = new CircuitBreaker(system.dispatcher(), system.scheduler(), 1, fiveSeconds, fiveHundredMillis); final CompletableFuture f = new CompletableFuture<>(); f.complete("hello"); final CompletionStage res = breaker.callWithCircuitBreakerCS(() -> f); - assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds)); + assertEquals("hello", Await.result(FutureConverters.toScala(res), JavaDurationConverters.asFiniteDuration(fiveSeconds))); } @Test public void useCircuitBreakerWithCompletableFutureAndCustomDefineFailure() throws Exception { - final FiniteDuration fiveSeconds = FiniteDuration.create(5, TimeUnit.SECONDS); - final FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS); + final Duration fiveSeconds = Duration.ofSeconds(5); + final Duration fiveHundredMillis = Duration.ofMillis(500); final CircuitBreaker breaker = new CircuitBreaker(system.dispatcher(), system.scheduler(), 1, fiveSeconds, fiveHundredMillis); final BiFunction, Optional, java.lang.Boolean> fn = @@ -54,7 +54,7 @@ public class CircuitBreakerTest extends JUnitSuite { final CompletableFuture f = new CompletableFuture<>(); f.complete("hello"); final CompletionStage res = breaker.callWithCircuitBreakerCS(() -> f, fn); - assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds)); + assertEquals("hello", Await.result(FutureConverters.toScala(res), JavaDurationConverters.asFiniteDuration(fiveSeconds))); assertEquals(1, breaker.currentFailureCount()); } } diff --git a/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala b/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala index de8a06bf45..c13bfa0496 100644 --- a/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala +++ b/akka-actor/src/main/scala/akka/actor/AbstractFSM.scala @@ -430,7 +430,7 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] { */ final def setTimer(name: String, msg: Any, timeout: java.time.Duration): Unit = { import JavaDurationConverters._ - setTimer(name, msg, timeout.asScala) + setTimer(name, msg, timeout.asScala, false) } /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index c75fa9f040..0363a5933d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration._ import akka.pattern.ask import scala.concurrent.Await import akka.util.Helpers.ConfigOps +import akka.util.JavaDurationConverters._ /** * This object contains elements which make writing actors and related code @@ -125,6 +126,15 @@ abstract class Inbox { @throws(classOf[java.util.concurrent.TimeoutException]) def receive(max: FiniteDuration): Any + /** + * Receive the next message from this Inbox. This call will return immediately + * if the internal actor previously received a message, or it will block for + * up to the specified duration to await reception of a message. If no message + * is received a [[java.util.concurrent.TimeoutException]] will be raised. + */ + @throws(classOf[java.util.concurrent.TimeoutException]) + def receive(max: java.time.Duration): Any = receive(max.asScala) + /** * Have the internal actor watch the target actor. When the target actor * terminates a [[Terminated]] message will be received. diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 19b66f97a5..ffabce0171 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -5,7 +5,6 @@ package akka.actor import akka.util.JavaDurationConverters - import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -233,6 +232,7 @@ trait Scheduler { import JavaDurationConverters._ scheduleOnce(delay.asScala, runnable)(executor) } + /** * The maximum supported task frequency of this scheduler, i.e. the inverse * of the minimum time interval between executions of a recurring task, in Hz. diff --git a/akka-actor/src/main/scala/akka/actor/Timers.scala b/akka-actor/src/main/scala/akka/actor/Timers.scala index 7e93282820..2de5b95413 100644 --- a/akka-actor/src/main/scala/akka/actor/Timers.scala +++ b/akka-actor/src/main/scala/akka/actor/Timers.scala @@ -4,6 +4,7 @@ package akka.actor +import akka.util.JavaDurationConverters._ import scala.concurrent.duration.FiniteDuration import akka.annotation.DoNotInherit import akka.util.OptionVal @@ -92,6 +93,18 @@ abstract class AbstractActorWithTimers extends AbstractActor with Timers { */ def startPeriodicTimer(key: Any, msg: Any, interval: FiniteDuration): Unit + /** + * Start a periodic timer that will send `msg` to the `self` actor at + * a fixed `interval`. + * + * Each timer has a key and if a new timer with same key is started + * the previous is cancelled and it's guaranteed that a message from the + * previous timer is not received, even though it might already be enqueued + * in the mailbox when the new timer is started. + */ + def startPeriodicTimer(key: Any, msg: Any, interval: java.time.Duration): Unit = + startPeriodicTimer(key, msg, interval.asScala) + /** * Start a timer that will send `msg` once to the `self` actor after * the given `timeout`. @@ -103,6 +116,18 @@ abstract class AbstractActorWithTimers extends AbstractActor with Timers { */ def startSingleTimer(key: Any, msg: Any, timeout: FiniteDuration): Unit + /** + * Start a timer that will send `msg` once to the `self` actor after + * the given `timeout`. + * + * Each timer has a key and if a new timer with same key is started + * the previous is cancelled and it's guaranteed that a message from the + * previous timer is not received, even though it might already be enqueued + * in the mailbox when the new timer is started. + */ + def startSingleTimer(key: Any, msg: Any, timeout: java.time.Duration): Unit = + startSingleTimer(key, msg, timeout.asScala) + /** * Check if a timer with a given `key` is active. */ diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index a1a2d7f74a..56265620cb 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -15,6 +15,7 @@ import scala.collection.immutable import scala.collection.JavaConverters._ import akka.util.{ ByteString, Helpers } import akka.util.Helpers.Requiring +import akka.util.JavaDurationConverters._ import akka.actor._ import java.lang.{ Iterable ⇒ JIterable } import java.nio.file.Path @@ -669,6 +670,25 @@ object TcpMessage { timeout: FiniteDuration, pullMode: Boolean): Command = Connect(remoteAddress, Option(localAddress), options, Option(timeout), pullMode) + /** + * The Connect message is sent to the TCP manager actor, which is obtained via + * [[TcpExt#getManager]]. Either the manager replies with a [[Tcp.CommandFailed]] + * or the actor handling the new connection replies with a [[Tcp.Connected]] + * message. + * + * @param remoteAddress is the address to connect to + * @param localAddress optionally specifies a specific address to bind to + * @param options Please refer to [[TcpSO]] for a list of all supported options. + * @param timeout is the desired connection timeout, `null` means "no timeout" + * @param pullMode enables pull based reading from the connection + */ + def connect( + remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + options: JIterable[SocketOption], + timeout: java.time.Duration, + pullMode: Boolean): Command = connect(remoteAddress, localAddress, options, timeout.asScala, pullMode) + /** * Connect to the given `remoteAddress` without binding to a local address and without * specifying options. diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala index abd548bf24..9acf8a49f8 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala @@ -5,6 +5,7 @@ package akka.pattern import scala.concurrent.duration.{ Duration, FiniteDuration } +import akka.util.JavaDurationConverters._ import akka.actor.{ Props, OneForOneStrategy, SupervisorStrategy } /** @@ -79,6 +80,60 @@ object Backoff { randomFactor: Double): BackoffOptions = BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) + /** + * Back-off options for creating a back-off supervisor actor that expects a child actor to restart on failure. + * + * This explicit supervisor behaves similarly to the normal implicit supervision where + * if an actor throws an exception, the decider on the supervisor will decide when to + * `Stop`, `Restart`, `Escalate`, `Resume` the child actor. + * + * When the `Restart` directive is specified, the supervisor will delay the restart + * using an exponential back off strategy (bounded by minBackoff and maxBackoff). + * + * This supervisor is intended to be transparent to both the child actor and external actors. + * Where external actors can send messages to the supervisor as if it was the child and the + * messages will be forwarded. And when the child is `Terminated`, the supervisor is also + * `Terminated`. + * Transparent to the child means that the child does not have to be aware that it is being + * supervised specifically by this actor. Just like it does + * not need to know when it is being supervised by the usual implicit supervisors. + * The only caveat is that the `ActorRef` of the child is not stable, so any user storing the + * `sender()` `ActorRef` from the child response may eventually not be able to communicate with + * the stored `ActorRef`. In general all messages to the child should be directed through this actor. + * + * An example of where this supervisor might be used is when you may have an actor that is + * responsible for continuously polling on a server for some resource that sometimes may be down. + * Instead of hammering the server continuously when the resource is unavailable, the actor will + * be restarted with an exponentially increasing back off until the resource is available again. + * + * '''*** + * This supervisor should not be used with `Akka Persistence` child actors. + * `Akka Persistence` actors shutdown unconditionally on `persistFailure()`s rather + * than throw an exception on a failure like normal actors. + * [[#onStop]] should be used instead for cases where the child actor + * terminates itself as a failure signal instead of the normal behavior of throwing an exception. + * ***''' + * You can define another + * supervision strategy by using `akka.pattern.BackoffOptions.withSupervisorStrategy` on [[akka.pattern.BackoffOptions]]. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def onFailure( + childProps: Props, + childName: String, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): BackoffOptions = + onFailure(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor) + /** * Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. * @@ -139,6 +194,68 @@ object Backoff { maxBackoff: FiniteDuration, randomFactor: Double): BackoffOptions = BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) + + /** + * Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. + * + * This actor can be used to supervise a child actor and start it again + * after a back-off duration if the child actor is stopped. + * + * This is useful in situations where the re-start of the child actor should be + * delayed e.g. in order to give an external resource time to recover before the + * child actor tries contacting it again (after being restarted). + * + * Specifically this pattern is useful for persistent actors, + * which are stopped in case of persistence failures. + * Just restarting them immediately would probably fail again (since the data + * store is probably unavailable). It is better to try again after a delay. + * + * It supports exponential back-off between the given `minBackoff` and + * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and + * `maxBackoff` 30 seconds the start attempts will be delayed with + * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset + * if the actor is not terminated within the `minBackoff` duration. + * + * In addition to the calculated exponential back-off an additional + * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20% + * delay. The reason for adding a random delay is to avoid that all failing + * actors hit the backend resource at the same time. + * + * You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild` + * message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]] + * containing the `ActorRef` of the current child, if any. + * + * The `BackoffSupervisor`delegates all messages from the child to the parent of the + * `BackoffSupervisor`, with the supervisor as sender. + * + * The `BackoffSupervisor` forwards all other messages to the child, if it is currently running. + * + * The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor + * if it wants to do an intentional stop. + * + * Exceptions in the child are handled with the default supervisionStrategy, which can be changed by using + * [[BackoffOptions#withSupervisorStrategy]] or [[BackoffOptions#withDefaultStoppingStrategy]]. A + * `Restart` will perform a normal immediate restart of the child. A `Stop` will + * stop the child, but it will be started again after the back-off duration. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def onStop( + childProps: Props, + childName: String, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): BackoffOptions = + onStop(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor) + } /** diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala index 4b7d19ef7b..5145b43bfb 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala @@ -16,6 +16,7 @@ import akka.actor.SupervisorStrategy.Directive import akka.actor.SupervisorStrategy.Escalate import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy +import akka.util.JavaDurationConverters._ import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.util.Try @@ -48,6 +49,32 @@ object BackoffSupervisor { propsWithSupervisorStrategy(childProps, childName, minBackoff, maxBackoff, randomFactor, SupervisorStrategy.defaultStrategy) } + /** + * Props for creating a [[BackoffSupervisor]] actor. + * + * Exceptions in the child are handled with the default supervision strategy, i.e. + * most exceptions will immediately restart the child. You can define another + * supervision strategy by using [[#propsWithSupervisorStrategy]]. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def props( + childProps: Props, + childName: String, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double): Props = { + props(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor) + } + /** * Props for creating a [[BackoffSupervisor]] actor with a custom * supervision strategy. @@ -82,6 +109,37 @@ object BackoffSupervisor { Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor, strategy)) } + /** + * Props for creating a [[BackoffSupervisor]] actor with a custom + * supervision strategy. + * + * Exceptions in the child are handled with the given `supervisionStrategy`. A + * `Restart` will perform a normal immediate restart of the child. A `Stop` will + * stop the child, but it will be started again after the back-off duration. + * + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param strategy the supervision strategy to use for handling exceptions + * in the child. As the BackoffSupervisor creates a separate actor to handle the + * backoff process, only a [[OneForOneStrategy]] makes sense here. + */ + def propsWithSupervisorStrategy( + childProps: Props, + childName: String, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + strategy: SupervisorStrategy): Props = { + propsWithSupervisorStrategy(childProps, childName, minBackoff.asScala, maxBackoff.asScala, randomFactor, strategy) + } + /** * Props for creating a [[BackoffSupervisor]] actor from [[BackoffOptions]]. * diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index f429cad38c..06aaec3534 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -10,6 +10,7 @@ import java.util.function.Consumer import akka.AkkaException import akka.actor.Scheduler +import akka.util.JavaDurationConverters._ import akka.util.Unsafe import scala.util.control.NoStackTrace @@ -57,9 +58,25 @@ object CircuitBreaker { * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit */ + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = apply(scheduler, maxFailures, callTimeout, resetTimeout) + /** + * Java API: Create a new CircuitBreaker. + * + * Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed + * in Future when using withCircuitBreaker. To use another ExecutionContext for the callbacks you can specify the + * executor in the constructor. + * + * @param scheduler Reference to Akka scheduler + * @param maxFailures Maximum number of failures before opening the circuit + * @param callTimeout [[java.time.Duration]] of time after which to consider a call a failure + * @param resetTimeout [[java.time.Duration]] of time after which to attempt to close the circuit + */ + def create(scheduler: Scheduler, maxFailures: Int, callTimeout: java.time.Duration, resetTimeout: java.time.Duration): CircuitBreaker = + apply(scheduler, maxFailures, callTimeout.asScala, resetTimeout.asScala) + private val exceptionAsFailure: Try[_] ⇒ Boolean = { case _: Success[_] ⇒ false case _ ⇒ true @@ -113,10 +130,15 @@ class CircuitBreaker( require(exponentialBackoffFactor >= 1.0, "factor must be >= 1.0") + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = { this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor) } + def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: java.time.Duration, resetTimeout: java.time.Duration) = { + this(scheduler, maxFailures, callTimeout.asScala, resetTimeout.asScala, 36500.days, 1.0)(executor) + } + // add the old constructor to make it binary compatible def this(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) = { this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor) @@ -132,6 +154,16 @@ class CircuitBreaker( new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0)(executor) } + /** + * The `resetTimeout` will be increased exponentially for each failed attempt to close the circuit. + * The default exponential backoff factor is 2. + * + * @param maxResetTimeout the upper bound of resetTimeout + */ + def withExponentialBackoff(maxResetTimeout: java.time.Duration): CircuitBreaker = { + withExponentialBackoff(maxResetTimeout.asScala) + } + /** * Holds reference to current state of CircuitBreaker - *access only via helper methods* */ diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index f2bd90e861..f719ebb68e 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -7,6 +7,7 @@ package akka.pattern import java.util.concurrent.{ Callable, CompletionStage, TimeUnit } import akka.actor.{ ActorSelection, Scheduler } +import akka.util.JavaDurationConverters._ import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext @@ -512,9 +513,23 @@ object PatternsCS { * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def gracefulStop(target: ActorRef, timeout: FiniteDuration): CompletionStage[java.lang.Boolean] = scalaGracefulStop(target, timeout).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] + * is completed with failure [[akka.pattern.AskTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: java.time.Duration): CompletionStage[java.lang.Boolean] = + scalaGracefulStop(target, timeout.asScala).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] + /** * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when * existing messages of the target actor has been processed and the actor has been @@ -528,23 +543,56 @@ object PatternsCS { * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any): CompletionStage[java.lang.Boolean] = scalaGracefulStop(target, timeout, stopMessage).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your + * stop command as `stopMessage` parameter + * + * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] + * is completed with failure [[akka.pattern.AskTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: java.time.Duration, stopMessage: Any): CompletionStage[java.lang.Boolean] = + scalaGracefulStop(target, timeout.asScala, stopMessage).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] + /** * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable * after the specified duration. */ + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Callable[CompletionStage[T]]): CompletionStage[T] = afterCompletionStage(duration, scheduler)(value.call())(context) + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable + * after the specified duration. + */ + def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[CompletionStage[T]]): CompletionStage[T] = + afterCompletionStage(duration.asScala, scheduler)(value.call())(context) + /** * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value * after the specified duration. */ + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] = afterCompletionStage(duration, scheduler)(value)(context) + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value + * after the specified duration. + */ + def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] = + afterCompletionStage(duration.asScala, scheduler)(value)(context) + /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]] * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. @@ -553,6 +601,6 @@ object PatternsCS { * Note that the attempt function will be invoked on the given execution context for subsequent tries * and therefore must be thread safe (not touch unsafe mutable state). */ - def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, delay: FiniteDuration, scheduler: Scheduler, ec: ExecutionContext): CompletionStage[T] = - scalaRetry(() ⇒ attempt.call().toScala, attempts, delay)(ec, scheduler).toJava + def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, delay: java.time.Duration, scheduler: Scheduler, ec: ExecutionContext): CompletionStage[T] = + scalaRetry(() ⇒ attempt.call().toScala, attempts, delay.asScala)(ec, scheduler).toJava } diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index d81ad92a9f..30ea8a52d4 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -15,9 +15,9 @@ import akka.pattern.ask import akka.pattern.pipe import akka.dispatch.ExecutionContexts import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ import akka.util.Timeout import akka.util.Helpers.ConfigOps +import akka.util.JavaDurationConverters._ import akka.actor.ActorSystem import scala.concurrent.Future import java.util.concurrent.TimeoutException @@ -119,6 +119,14 @@ final case class ScatterGatherFirstCompletedPool( */ def this(nr: Int, within: FiniteDuration) = this(nrOfInstances = nr, within = within) + /** + * Java API + * @param nr initial number of routees in the pool + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + */ + def this(nr: Int, within: java.time.Duration) = this(nr, within.asScala) + override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within)) override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances @@ -186,6 +194,16 @@ final case class ScatterGatherFirstCompletedGroup( def this(routeePaths: java.lang.Iterable[String], within: FiniteDuration) = this(paths = immutableSeq(routeePaths), within = within) + /** + * Java API + * @param routeePaths string representation of the actor paths of the routees, messages are + * sent with [[akka.actor.ActorSelection]] to these paths + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + */ + def this(routeePaths: java.lang.Iterable[String], within: java.time.Duration) = + this(immutableSeq(routeePaths), within.asScala) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within)) diff --git a/akka-actor/src/main/scala/akka/routing/TailChopping.scala b/akka-actor/src/main/scala/akka/routing/TailChopping.scala index ceb5418439..660bdd599d 100644 --- a/akka-actor/src/main/scala/akka/routing/TailChopping.scala +++ b/akka-actor/src/main/scala/akka/routing/TailChopping.scala @@ -14,6 +14,7 @@ import akka.japi.Util.immutableSeq import scala.concurrent.{ ExecutionContext, Promise } import akka.pattern.{ AskTimeoutException, ask, pipe } import scala.concurrent.duration._ +import akka.util.JavaDurationConverters._ import akka.util.Timeout import akka.util.Helpers.ConfigOps @@ -168,6 +169,16 @@ final case class TailChoppingPool( def this(nr: Int, within: FiniteDuration, interval: FiniteDuration) = this(nrOfInstances = nr, within = within, interval = interval) + /** + * Java API + * @param nr initial number of routees in the pool + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + * @param interval duration after which next routee will be picked + */ + def this(nr: Int, within: java.time.Duration, interval: java.time.Duration) = + this(nr, within.asScala, interval.asScala) + override def createRouter(system: ActorSystem): Router = new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher))) @@ -250,6 +261,17 @@ final case class TailChoppingGroup( def this(routeePaths: java.lang.Iterable[String], within: FiniteDuration, interval: FiniteDuration) = this(paths = immutableSeq(routeePaths), within = within, interval = interval) + /** + * Java API + * @param routeePaths string representation of the actor paths of the routees, messages are + * sent with [[akka.actor.ActorSelection]] to these paths + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + * @param interval duration after which next routee will be picked + */ + def this(routeePaths: java.lang.Iterable[String], within: java.time.Duration, interval: java.time.Duration) = + this(immutableSeq(routeePaths), within.asScala, interval.asScala) + override def createRouter(system: ActorSystem): Router = new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher))) diff --git a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java index 9ca911c973..6010ec7aea 100644 --- a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java @@ -412,7 +412,7 @@ public class ActorDocTest extends AbstractJavaTest { //#gracefulStop try { CompletionStage stopped = - gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN); + gracefulStop(actorRef, java.time.Duration.ofSeconds(5), Manager.SHUTDOWN); stopped.toCompletableFuture().get(6, TimeUnit.SECONDS); // the actor has been stopped } catch (AskTimeoutException e) { diff --git a/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java b/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java index a58283d23a..12ad84da64 100644 --- a/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java +++ b/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java @@ -26,7 +26,6 @@ import static akka.actor.SupervisorStrategy.restart; import static akka.actor.SupervisorStrategy.stop; import static akka.actor.SupervisorStrategy.escalate; -import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipe; import static jdocs.actor.FaultHandlingDocSample.WorkerApi.*; @@ -141,7 +140,7 @@ public class FaultHandlingDocSample { matchEquals(Start, x -> progressListener == null, x -> { progressListener = getSender(); getContext().getSystem().scheduler().schedule( - Duration.Zero(), Duration.create(1, "second"), getSelf(), Do, + java.time.Duration.ZERO, java.time.Duration.ofSeconds(1L), getSelf(), Do, getContext().dispatcher(), null ); }). diff --git a/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java b/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java index 3d00926fce..4e16199571 100644 --- a/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/InboxDocTest.java @@ -4,15 +4,12 @@ package jdocs.actor; -import java.util.concurrent.TimeUnit; - import akka.testkit.AkkaJUnitActorSystemResource; import jdocs.AbstractJavaTest; import akka.testkit.javadsl.TestKit; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Inbox; @@ -20,6 +17,8 @@ import akka.actor.PoisonPill; import akka.actor.Terminated; import akka.testkit.AkkaSpec; +import java.time.Duration; + public class InboxDocTest extends AbstractJavaTest { @ClassRule @@ -40,7 +39,7 @@ public class InboxDocTest extends AbstractJavaTest { probe.send(probe.getLastSender(), "world"); //#inbox try { - assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)).equals("world"); + assert inbox.receive(Duration.ofSeconds(1)).equals("world"); } catch (java.util.concurrent.TimeoutException e) { // timeout } @@ -56,7 +55,7 @@ public class InboxDocTest extends AbstractJavaTest { inbox.watch(target); target.tell(PoisonPill.getInstance(), ActorRef.noSender()); try { - assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)) instanceof Terminated; + assert inbox.receive(Duration.ofSeconds(1)) instanceof Terminated; } catch (java.util.concurrent.TimeoutException e) { // timeout } diff --git a/akka-docs/src/test/java/jdocs/actor/SchedulerDocTest.java b/akka-docs/src/test/java/jdocs/actor/SchedulerDocTest.java index 2260ddc7f8..c89c54419e 100644 --- a/akka-docs/src/test/java/jdocs/actor/SchedulerDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/SchedulerDocTest.java @@ -7,8 +7,7 @@ package jdocs.actor; //#imports1 import akka.actor.Props; import jdocs.AbstractJavaTest; -import scala.concurrent.duration.Duration; -import java.util.concurrent.TimeUnit; +import java.time.Duration; //#imports1 //#imports2 @@ -34,12 +33,12 @@ public class SchedulerDocTest extends AbstractJavaTest { @Test public void scheduleOneOffTask() { //#schedule-one-off-message - system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), + system.scheduler().scheduleOnce(Duration.ofMillis(50), testActor, "foo", system.dispatcher(), null); //#schedule-one-off-message //#schedule-one-off-thunk - system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), + system.scheduler().scheduleOnce(Duration.ofMillis(50), new Runnable() { @Override public void run() { @@ -67,8 +66,8 @@ public class SchedulerDocTest extends AbstractJavaTest { //This will schedule to send the Tick-message //to the tickActor after 0ms repeating every 50ms - Cancellable cancellable = system.scheduler().schedule(Duration.Zero(), - Duration.create(50, TimeUnit.MILLISECONDS), tickActor, "Tick", + Cancellable cancellable = system.scheduler().schedule(Duration.ZERO, + Duration.ofMillis(50), tickActor, "Tick", system.dispatcher(), null); //This cancels further Ticks to be sent diff --git a/akka-docs/src/test/java/jdocs/actor/TimerDocTest.java b/akka-docs/src/test/java/jdocs/actor/TimerDocTest.java index 0c5d6d5f58..f8a734c8f4 100644 --- a/akka-docs/src/test/java/jdocs/actor/TimerDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/TimerDocTest.java @@ -5,8 +5,7 @@ package jdocs.actor; //#timers -import java.util.concurrent.TimeUnit; -import scala.concurrent.duration.Duration; +import java.time.Duration; import akka.actor.AbstractActorWithTimers; //#timers @@ -24,8 +23,7 @@ public class TimerDocTest { } public MyActor() { - getTimers().startSingleTimer(TICK_KEY, new FirstTick(), - Duration.create(500, TimeUnit.MILLISECONDS)); + getTimers().startSingleTimer(TICK_KEY, new FirstTick(), Duration.ofMillis(500)); } @Override @@ -33,8 +31,7 @@ public class TimerDocTest { return receiveBuilder() .match(FirstTick.class, message -> { // do something useful here - getTimers().startPeriodicTimer(TICK_KEY, new Tick(), - Duration.create(1, TimeUnit.SECONDS)); + getTimers().startPeriodicTimer(TICK_KEY, new Tick(), Duration.ofSeconds(1)); }) .match(Tick.class, message -> { // do something useful here diff --git a/akka-docs/src/test/java/jdocs/actor/fsm/Buncher.java b/akka-docs/src/test/java/jdocs/actor/fsm/Buncher.java index 4563a9b34a..af085360dc 100644 --- a/akka-docs/src/test/java/jdocs/actor/fsm/Buncher.java +++ b/akka-docs/src/test/java/jdocs/actor/fsm/Buncher.java @@ -11,7 +11,7 @@ import akka.japi.pf.UnitMatch; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import scala.concurrent.duration.Duration; +import java.time.Duration; //#simple-imports import static jdocs.actor.fsm.Buncher.Data; @@ -45,7 +45,7 @@ public class Buncher extends AbstractFSM { state(Idle, Active, () -> {/* Do something here */})); //#transition-elided - when(Active, Duration.create(1, "second"), + when(Active, Duration.ofSeconds(1L), matchEvent(Arrays.asList(Flush.class, StateTimeout()), Todo.class, (event, todo) -> goTo(Idle).using(todo.copy(new LinkedList<>())))); diff --git a/akka-docs/src/test/java/jdocs/actor/fsm/FSMDocTest.java b/akka-docs/src/test/java/jdocs/actor/fsm/FSMDocTest.java index 7e0de06dfd..43d20b2b0a 100644 --- a/akka-docs/src/test/java/jdocs/actor/fsm/FSMDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/fsm/FSMDocTest.java @@ -75,7 +75,7 @@ public class FSMDocTest extends AbstractJavaTest { //#transition-syntax onTransition( matchState(Active, Idle, () -> setTimer("timeout", - Tick, Duration.create(1, SECONDS), true)). + Tick, java.time.Duration.ofSeconds(1L), true)). state(Active, null, () -> cancelTimer("timeout")). state(null, Idle, (f, t) -> log().info("entering Idle from " + f))); //#transition-syntax diff --git a/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java b/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java index bdba8e4ba4..a1b03d5693 100644 --- a/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java @@ -8,7 +8,7 @@ package jdocs.circuitbreaker; import akka.actor.AbstractActor; import akka.event.LoggingAdapter; -import scala.concurrent.duration.Duration; +import java.time.Duration; import akka.pattern.CircuitBreaker; import akka.event.Logging; @@ -27,7 +27,7 @@ public class DangerousJavaActor extends AbstractActor { public DangerousJavaActor() { this.breaker = new CircuitBreaker( getContext().dispatcher(), getContext().system().scheduler(), - 5, Duration.create(10, "s"), Duration.create(1, "m")) + 5, Duration.ofSeconds(10), Duration.ofMinutes(1)) .addOnOpenListener(this::notifyMeOnOpen); } diff --git a/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java b/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java index 0b7ee405a4..482578d85a 100644 --- a/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java +++ b/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java @@ -6,7 +6,7 @@ package jdocs.circuitbreaker; import akka.actor.AbstractActor; import akka.pattern.CircuitBreaker; -import scala.concurrent.duration.Duration; +import java.time.Duration; import java.util.Optional; import java.util.function.BiFunction; @@ -18,7 +18,7 @@ public class EvenNoFailureJavaExample extends AbstractActor { public EvenNoFailureJavaExample() { this.breaker = new CircuitBreaker( getContext().dispatcher(), getContext().system().scheduler(), - 5, Duration.create(10, "s"), Duration.create(1, "m")); + 5, Duration.ofSeconds(10), Duration.ofMinutes(1)); } public int luckyNumber() { diff --git a/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java b/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java index 4405a54a51..81c6399f4d 100644 --- a/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java +++ b/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java @@ -6,12 +6,11 @@ package jdocs.circuitbreaker; import akka.actor.ActorRef; import akka.actor.ReceiveTimeout; -import akka.actor.AbstractActor.Receive; import akka.actor.AbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.pattern.CircuitBreaker; -import scala.concurrent.duration.Duration; +import java.time.Duration; public class TellPatternJavaActor extends AbstractActor { @@ -23,7 +22,7 @@ public class TellPatternJavaActor extends AbstractActor { this.target = targetActor; this.breaker = new CircuitBreaker( getContext().dispatcher(), getContext().system().scheduler(), - 5, Duration.create(10, "s"), Duration.create(1, "m")) + 5, Duration.ofSeconds(10), Duration.ofMinutes(1)) .onOpen(new Runnable() { public void run() { notifyMeOnOpen(); diff --git a/akka-docs/src/test/java/jdocs/cluster/StatsSampleClient.java b/akka-docs/src/test/java/jdocs/cluster/StatsSampleClient.java index 0804a3a59c..77af89ba5d 100644 --- a/akka-docs/src/test/java/jdocs/cluster/StatsSampleClient.java +++ b/akka-docs/src/test/java/jdocs/cluster/StatsSampleClient.java @@ -8,14 +8,12 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; import jdocs.cluster.StatsMessages.JobFailed; import jdocs.cluster.StatsMessages.StatsJob; import jdocs.cluster.StatsMessages.StatsResult; import java.util.concurrent.ThreadLocalRandom; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; @@ -40,7 +38,7 @@ public class StatsSampleClient extends AbstractActor { public StatsSampleClient(String servicePath) { this.servicePath = servicePath; - FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS); + Duration interval = Duration.ofMillis(2); tickTask = getContext() .getSystem() .scheduler() diff --git a/akka-docs/src/test/java/jdocs/ddata/DataBot.java b/akka-docs/src/test/java/jdocs/ddata/DataBot.java index e57af5dd90..eeb32152fb 100644 --- a/akka-docs/src/test/java/jdocs/ddata/DataBot.java +++ b/akka-docs/src/test/java/jdocs/ddata/DataBot.java @@ -5,9 +5,7 @@ package jdocs.ddata; //#data-bot -import static java.util.concurrent.TimeUnit.SECONDS; - -import scala.concurrent.duration.Duration; +import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; import akka.actor.AbstractActor; @@ -37,7 +35,7 @@ public class DataBot extends AbstractActor { private final Cluster node = Cluster.get(getContext().getSystem()); private final Cancellable tickTask = getContext().getSystem().scheduler().schedule( - Duration.create(5, SECONDS), Duration.create(5, SECONDS), getSelf(), TICK, + Duration.ofSeconds(5), Duration.ofSeconds(5), getSelf(), TICK, getContext().dispatcher(), getSelf()); private final Key> dataKey = ORSetKey.create("key"); diff --git a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java index 91c1e3111e..1b5c4886f9 100644 --- a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java @@ -552,7 +552,7 @@ public class FutureDocTest extends AbstractJavaTest { //#retry final ExecutionContext ec = system.dispatcher(); Callable> attempt = () -> CompletableFuture.completedFuture("test"); - CompletionStage retriedFuture = retry(attempt, 3, Duration.create(200, "millis"), system.scheduler(), ec); + CompletionStage retriedFuture = retry(attempt, 3, java.time.Duration.ofMillis(200), system.scheduler(), ec); //#retry retriedFuture.toCompletableFuture().get(2, SECONDS); diff --git a/akka-docs/src/test/java/jdocs/io/IODocTest.java b/akka-docs/src/test/java/jdocs/io/IODocTest.java index a1294d0c2e..faf285946e 100644 --- a/akka-docs/src/test/java/jdocs/io/IODocTest.java +++ b/akka-docs/src/test/java/jdocs/io/IODocTest.java @@ -16,6 +16,7 @@ import akka.io.Tcp; import akka.io.TcpMessage; import akka.io.TcpSO; import akka.util.ByteString; +import java.time.Duration; //#imports public class IODocTest { @@ -41,7 +42,8 @@ public class IODocTest { 1234); final List options = new ArrayList(); options.add(TcpSO.keepAlive(true)); - tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options, null, false), getSelf()); + Duration timeout = null; + tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options, timeout, false), getSelf()); //#connect-with-options }) //#connected diff --git a/akka-docs/src/test/java/jdocs/io/JavaReadBackPressure.java b/akka-docs/src/test/java/jdocs/io/JavaReadBackPressure.java index 5e796b7736..f1a18becf2 100644 --- a/akka-docs/src/test/java/jdocs/io/JavaReadBackPressure.java +++ b/akka-docs/src/test/java/jdocs/io/JavaReadBackPressure.java @@ -11,6 +11,7 @@ import akka.io.Inet; import akka.io.Tcp; import akka.io.TcpMessage; import akka.util.ByteString; +import java.time.Duration; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -59,8 +60,9 @@ public class JavaReadBackPressure { private void demonstrateConnect() { //#pull-mode-connect final List options = new ArrayList(); + Duration timeout = null; tcp.tell( - TcpMessage.connect(new InetSocketAddress("localhost", 3000), null, options, null, true), + TcpMessage.connect(new InetSocketAddress("localhost", 3000), null, options, timeout, true), getSelf() ); //#pull-mode-connect diff --git a/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java b/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java index b7b05a5aa9..009c47c377 100644 --- a/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java +++ b/akka-docs/src/test/java/jdocs/pattern/BackoffSupervisorDocTest.java @@ -9,11 +9,9 @@ import akka.pattern.Backoff; import akka.pattern.BackoffSupervisor; import akka.testkit.TestActors.EchoActor; //#backoff-imports -import scala.concurrent.duration.Duration; +import java.time.Duration; //#backoff-imports -import java.util.concurrent.TimeUnit; - public class BackoffSupervisorDocTest { void exampleStop (ActorSystem system) { @@ -24,8 +22,8 @@ public class BackoffSupervisorDocTest { Backoff.onStop( childProps, "myEcho", - Duration.create(3, TimeUnit.SECONDS), - Duration.create(30, TimeUnit.SECONDS), + Duration.ofSeconds(3), + Duration.ofSeconds(30), 0.2)); // adds 20% "noise" to vary the intervals slightly system.actorOf(supervisorProps, "echoSupervisor"); @@ -40,8 +38,8 @@ public class BackoffSupervisorDocTest { Backoff.onFailure( childProps, "myEcho", - Duration.create(3, TimeUnit.SECONDS), - Duration.create(30, TimeUnit.SECONDS), + Duration.ofSeconds(3), + Duration.ofSeconds(30), 0.2)); // adds 20% "noise" to vary the intervals slightly system.actorOf(supervisorProps, "echoSupervisor"); diff --git a/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java b/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java index bacd65456c..1bf92658b4 100644 --- a/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java @@ -8,11 +8,10 @@ import akka.actor.*; import akka.japi.Procedure; import akka.pattern.BackoffSupervisor; import akka.persistence.*; -import scala.concurrent.duration.Duration; +import java.time.Duration; import java.io.Serializable; import java.util.Optional; -import java.util.concurrent.TimeUnit; public class LambdaPersistenceDocTest { @@ -122,8 +121,8 @@ public class LambdaPersistenceDocTest { final Props props = BackoffSupervisor.props( childProps, "myActor", - Duration.create(3, TimeUnit.SECONDS), - Duration.create(30, TimeUnit.SECONDS), + Duration.ofSeconds(3), + Duration.ofSeconds(30), 0.2); getContext().actorOf(props, "mySupervisor"); super.preStart(); diff --git a/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java index db3414216e..f076ea7760 100644 --- a/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java +++ b/akka-docs/src/test/java/jdocs/persistence/query/MyEventsByTagJavaPublisher.java @@ -17,14 +17,13 @@ import akka.actor.Props; import akka.persistence.query.EventEnvelope; import akka.stream.actor.ActorPublisherMessage.Cancel; -import scala.concurrent.duration.FiniteDuration; - import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.time.Duration; import static java.util.stream.Collectors.toList; @@ -47,7 +46,7 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { public MyEventsByTagJavaPublisher(Connection connection, String tag, Long offset, - FiniteDuration refreshInterval) { + Duration refreshInterval) { this.connection = connection; this.tag = tag; this.currentOffset = offset; @@ -72,7 +71,7 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { } public static Props props(Connection conn, String tag, Long offset, - FiniteDuration refreshInterval) { + Duration refreshInterval) { return Props.create(() -> new MyEventsByTagJavaPublisher(conn, tag, offset, refreshInterval)); } diff --git a/akka-docs/src/test/java/jdocs/routing/RouterDocTest.java b/akka-docs/src/test/java/jdocs/routing/RouterDocTest.java index 4a26affb98..07cd7f2b54 100644 --- a/akka-docs/src/test/java/jdocs/routing/RouterDocTest.java +++ b/akka-docs/src/test/java/jdocs/routing/RouterDocTest.java @@ -278,7 +278,7 @@ public class RouterDocTest extends AbstractJavaTest { //#scatter-gather-pool-1 //#scatter-gather-pool-2 - FiniteDuration within = FiniteDuration.create(10, TimeUnit.SECONDS); + java.time.Duration within = java.time.Duration.ofSeconds(10); ActorRef router18 = getContext().actorOf(new ScatterGatherFirstCompletedPool(5, within).props( Props.create(Worker.class)), "router18"); diff --git a/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java b/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java index ebcfd5d5ec..32bb5f009e 100644 --- a/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.time.Duration; import akka.NotUsed; import jdocs.AbstractJavaTest; @@ -207,7 +208,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { //#test-source-and-sink final Flow flowUnderTest = Flow.of(Integer.class) .mapAsyncUnordered(2, sleep -> akka.pattern.PatternsCS.after( - FiniteDuration.create(10, TimeUnit.MILLISECONDS), + Duration.ofMillis(10), system.scheduler(), system.dispatcher(), CompletableFuture.completedFuture(sleep) diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java index 90e8d7aeb3..6ab5adef1e 100644 --- a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java @@ -57,7 +57,7 @@ public class RecipeGlobalRateLimit extends RecipeTest { public static final ReplenishTokens REPLENISH_TOKENS = new ReplenishTokens(); private final int maxAvailableTokens; - private final FiniteDuration tokenRefreshPeriod; + private final Duration tokenRefreshPeriod; private final int tokenRefreshAmount; private final List waitQueue = new ArrayList<>(); @@ -65,13 +65,13 @@ public class RecipeGlobalRateLimit extends RecipeTest { private int permitTokens; - public static Props props(int maxAvailableTokens, FiniteDuration tokenRefreshPeriod, + public static Props props(int maxAvailableTokens, Duration tokenRefreshPeriod, int tokenRefreshAmount) { return Props.create(Limiter.class, maxAvailableTokens, tokenRefreshPeriod, tokenRefreshAmount); } - private Limiter(int maxAvailableTokens, FiniteDuration tokenRefreshPeriod, + private Limiter(int maxAvailableTokens, Duration tokenRefreshPeriod, int tokenRefreshAmount) { this.maxAvailableTokens = maxAvailableTokens; this.tokenRefreshPeriod = tokenRefreshPeriod; @@ -162,7 +162,7 @@ public class RecipeGlobalRateLimit extends RecipeTest { { // Use a large period and emulate the timer by hand instead - ActorRef limiter = system.actorOf(Limiter.props(2, new FiniteDuration(100, TimeUnit.DAYS), 1), "limiter"); + ActorRef limiter = system.actorOf(Limiter.props(2, Duration.ofDays(100), 1), "limiter"); final Iterator e1 = new Iterator() { @Override