diff --git a/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes new file mode 100644 index 0000000000..0d0fd144f2 --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes @@ -0,0 +1,3 @@ +# #24646 java.time.Duration +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.cancelReceiveTimeout") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.setReceiveTimeout") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/AbstractActor.scala b/akka-actor/src/main/scala/akka/actor/AbstractActor.scala index 8041ac8a2a..405bac6e49 100644 --- a/akka-actor/src/main/scala/akka/actor/AbstractActor.scala +++ b/akka-actor/src/main/scala/akka/actor/AbstractActor.scala @@ -10,6 +10,10 @@ import akka.japi.pf.ReceiveBuilder import scala.runtime.BoxedUnit import java.util.Optional +import akka.util.JavaDurationConverters + +import scala.concurrent.duration.Duration + /** * Java API: compatible with lambda expressions */ @@ -115,6 +119,35 @@ object AbstractActor { */ def become(behavior: Receive, discardOld: Boolean): Unit = become(behavior.onMessage.asInstanceOf[PartialFunction[Any, Unit]], discardOld) + + /** + * Defines the inactivity timeout after which the sending of a [[akka.actor.ReceiveTimeout]] message is triggered. + * When specified, the receive function should be able to handle a [[akka.actor.ReceiveTimeout]] message. + * 1 millisecond is the minimum supported timeout. + * + * Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after + * another message was enqueued; hence it is '''not guaranteed''' that upon reception of the receive + * timeout there must have been an idle period beforehand as configured via this method. + * + * Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity + * periods). Pass in `Duration.Undefined` to switch off this feature. + * + * Messages marked with [[NotInfluenceReceiveTimeout]] will not reset the timer. This can be useful when + * `ReceiveTimeout` should be fired by external inactivity but not influenced by internal activity, + * e.g. scheduled tick messages. + * + * *Warning*: This method is not thread-safe and must not be accessed from threads other + * than the ordinary actor message processing thread, such as [[java.util.concurrent.CompletionStage]] and [[scala.concurrent.Future]] callbacks. + */ + def setReceiveTimeout(timeout: java.time.Duration): Unit = { + import JavaDurationConverters._ + setReceiveTimeout(timeout.asScala) + } + + /** + * Cancel the sending of receive timeout notifications. + */ + def cancelReceiveTimeout(): Unit = setReceiveTimeout(Duration.Undefined) } } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/ReceiveTimeout.scala b/akka-actor/src/main/scala/akka/actor/dungeon/ReceiveTimeout.scala index ab7777d0df..35431c7241 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/ReceiveTimeout.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/ReceiveTimeout.scala @@ -4,9 +4,7 @@ package akka.actor.dungeon -import ReceiveTimeout.emptyReceiveTimeoutData import akka.actor.ActorCell -import akka.actor.ActorCell.emptyCancellable import akka.actor.Cancellable import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration @@ -40,7 +38,7 @@ private[akka] trait ReceiveTimeout { this: ActorCell ⇒ } - final def cancelReceiveTimeout(): Unit = + override final def cancelReceiveTimeout(): Unit = if (receiveTimeoutData._2 ne emptyCancellable) { receiveTimeoutData._2.cancel() receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable) diff --git a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java index 6010ec7aea..0c1dd56c9b 100644 --- a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java @@ -18,8 +18,8 @@ import akka.Done; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.time.Duration; import akka.testkit.TestActors; -import scala.concurrent.Await; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -47,7 +47,6 @@ import java.util.concurrent.CompletableFuture; //#import-gracefulStop import static akka.pattern.PatternsCS.gracefulStop; import akka.pattern.AskTimeoutException; -import scala.concurrent.duration.Duration; import java.util.concurrent.CompletionStage; //#import-gracefulStop @@ -73,8 +72,8 @@ public class ActorDocTest extends AbstractJavaTest { } @AfterClass - public static void afterClass() throws Exception { - Await.ready(system.terminate(), Duration.create(5, TimeUnit.SECONDS)); + public static void afterClass() { + TestKit.shutdownActorSystem(system); } static @@ -412,7 +411,7 @@ public class ActorDocTest extends AbstractJavaTest { //#gracefulStop try { CompletionStage stopped = - gracefulStop(actorRef, java.time.Duration.ofSeconds(5), Manager.SHUTDOWN); + gracefulStop(actorRef, Duration.ofSeconds(5), Manager.SHUTDOWN); stopped.toCompletableFuture().get(6, TimeUnit.SECONDS); // the actor has been stopped } catch (AskTimeoutException e) { @@ -535,7 +534,7 @@ public class ActorDocTest extends AbstractJavaTest { //#receive-timeout public ReceiveTimeoutActor() { // To set an initial delay - getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS)); + getContext().setReceiveTimeout(Duration.ofSeconds(10)); } @Override @@ -543,7 +542,7 @@ public class ActorDocTest extends AbstractJavaTest { return receiveBuilder() .matchEquals("Hello", s -> { // To set in a response to a message - getContext().setReceiveTimeout(Duration.create(1, TimeUnit.SECONDS)); + getContext().setReceiveTimeout(Duration.ofSeconds(1)); //#receive-timeout target = getSender(); target.tell("Hello world", getSelf()); @@ -551,7 +550,7 @@ public class ActorDocTest extends AbstractJavaTest { }) .match(ReceiveTimeout.class, r -> { // To turn it off - getContext().setReceiveTimeout(Duration.Undefined()); + getContext().cancelReceiveTimeout(); //#receive-timeout target.tell("timeout", getSelf()); //#receive-timeout @@ -628,7 +627,7 @@ public class ActorDocTest extends AbstractJavaTest { actor.tell("foo", getRef()); actor.tell("foo", getRef()); expectMsgEquals("I am already angry?"); - expectNoMsg(Duration.create(1, TimeUnit.SECONDS)); + expectNoMessage(Duration.ofSeconds(1)); } }; } @@ -741,7 +740,7 @@ public class ActorDocTest extends AbstractJavaTest { { watch(b); system.stop(a); - assertEquals(expectMsgClass(java.time.Duration.ofSeconds(2), Terminated.class).actor(), b); + assertEquals(expectMsgClass(Duration.ofSeconds(2), Terminated.class).actor(), b); } }; } @@ -755,7 +754,7 @@ public class ActorDocTest extends AbstractJavaTest { ActorRef actorC = getRef(); //#ask-pipe - Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + Timeout t = Timeout.create(Duration.ofSeconds(5)); // using 1000ms timeout CompletableFuture future1 = @@ -791,7 +790,7 @@ public class ActorDocTest extends AbstractJavaTest { victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender()); // expecting the actor to indeed terminate: - expectTerminated(Duration.create(3, TimeUnit.SECONDS), victim); + expectTerminated(Duration.ofSeconds(3), victim); //#kill } }; @@ -806,7 +805,7 @@ public class ActorDocTest extends AbstractJavaTest { //#poison-pill victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender()); //#poison-pill - expectTerminated(Duration.create(3, TimeUnit.SECONDS), victim); + expectTerminated(Duration.ofSeconds(3), victim); } }; } diff --git a/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java b/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java index 12ad84da64..636123c0f3 100644 --- a/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java +++ b/akka-docs/src/test/java/jdocs/actor/FaultHandlingDocSample.java @@ -10,6 +10,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.time.Duration; import akka.actor.*; import akka.dispatch.Mapper; @@ -19,7 +20,6 @@ import akka.pattern.Patterns; import akka.util.Timeout; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import scala.concurrent.duration.Duration; import static akka.japi.Util.classTag; import static akka.actor.SupervisorStrategy.restart; @@ -67,7 +67,7 @@ public class FaultHandlingDocSample { public void preStart() { // If we don't get any progress within 15 seconds then the service // is unavailable - getContext().setReceiveTimeout(Duration.create("15 seconds")); + getContext().setReceiveTimeout(Duration.ofSeconds(15)); } @Override @@ -114,7 +114,7 @@ public class FaultHandlingDocSample { * The Worker supervise the CounterService. */ public static class Worker extends AbstractLoggingActor { - final Timeout askTimeout = new Timeout(Duration.create(5, "seconds")); + final Timeout askTimeout = Timeout.create(Duration.ofSeconds(5)); // The sender of the initial Start message will continuously be notified // about progress @@ -140,7 +140,7 @@ public class FaultHandlingDocSample { matchEquals(Start, x -> progressListener == null, x -> { progressListener = getSender(); getContext().getSystem().scheduler().schedule( - java.time.Duration.ZERO, java.time.Duration.ofSeconds(1L), getSelf(), Do, + Duration.ZERO, Duration.ofSeconds(1L), getSelf(), Do, getContext().dispatcher(), null ); }). @@ -232,7 +232,7 @@ public class FaultHandlingDocSample { // Restart the storage child when StorageException is thrown. // After 3 restarts within 5 seconds it will be stopped. private static final SupervisorStrategy strategy = - new OneForOneStrategy(3, Duration.create("5 seconds"), DeciderBuilder. + new OneForOneStrategy(3, scala.concurrent.duration.Duration.create("5 seconds"), DeciderBuilder. match(StorageException.class, e -> restart()). matchAny(o -> escalate()).build()); @@ -292,7 +292,7 @@ public class FaultHandlingDocSample { counter.tell(new UseStorage(null), getSelf()); // Try to re-establish storage after while getContext().getSystem().scheduler().scheduleOnce( - Duration.create(10, "seconds"), getSelf(), Reconnect, + Duration.ofSeconds(10), getSelf(), Reconnect, getContext().dispatcher(), null); }). matchEquals(Reconnect, o -> { diff --git a/akka-docs/src/test/java/jdocs/cluster/FactorialFrontend.java b/akka-docs/src/test/java/jdocs/cluster/FactorialFrontend.java index be48a0714c..74873c5281 100644 --- a/akka-docs/src/test/java/jdocs/cluster/FactorialFrontend.java +++ b/akka-docs/src/test/java/jdocs/cluster/FactorialFrontend.java @@ -8,7 +8,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import akka.actor.Props; import akka.cluster.metrics.AdaptiveLoadBalancingGroup; @@ -19,7 +19,6 @@ import akka.cluster.routing.ClusterRouterGroup; import akka.cluster.routing.ClusterRouterGroupSettings; import akka.cluster.routing.ClusterRouterPool; import akka.cluster.routing.ClusterRouterPoolSettings; -import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ReceiveTimeout; import akka.actor.AbstractActor; @@ -45,7 +44,7 @@ public class FactorialFrontend extends AbstractActor { @Override public void preStart() { sendJobs(); - getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS)); + getContext().setReceiveTimeout(Duration.ofSeconds(10)); } @Override diff --git a/akka-docs/src/test/java/jdocs/cluster/StatsAggregator.java b/akka-docs/src/test/java/jdocs/cluster/StatsAggregator.java index 7f998f8121..bf811031a9 100644 --- a/akka-docs/src/test/java/jdocs/cluster/StatsAggregator.java +++ b/akka-docs/src/test/java/jdocs/cluster/StatsAggregator.java @@ -6,11 +6,10 @@ package jdocs.cluster; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import jdocs.cluster.StatsMessages.JobFailed; import jdocs.cluster.StatsMessages.StatsResult; -import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ReceiveTimeout; import akka.actor.AbstractActor; @@ -29,7 +28,7 @@ public class StatsAggregator extends AbstractActor { @Override public void preStart() { - getContext().setReceiveTimeout(Duration.create(3, TimeUnit.SECONDS)); + getContext().setReceiveTimeout(Duration.ofSeconds(3)); } @Override diff --git a/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java b/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java index 8384ce3562..4ec19350e1 100644 --- a/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java +++ b/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java @@ -4,10 +4,9 @@ package jdocs.sharding; -import static java.util.concurrent.TimeUnit.SECONDS; import java.util.Optional; -import scala.concurrent.duration.Duration; +import java.time.Duration; import akka.actor.AbstractActor; import akka.actor.ActorInitializationException; @@ -17,7 +16,6 @@ import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.SupervisorStrategy; -import akka.actor.Terminated; import akka.actor.ReceiveTimeout; //#counter-extractor import akka.cluster.sharding.ShardRegion; @@ -31,7 +29,6 @@ import akka.cluster.sharding.ClusterShardingSettings; //#counter-start import akka.persistence.AbstractPersistentActor; -import akka.cluster.Cluster; import akka.japi.pf.DeciderBuilder; // Doc code, compile only @@ -200,7 +197,7 @@ public class ClusterShardingTest { @Override public void preStart() throws Exception { super.preStart(); - getContext().setReceiveTimeout(Duration.create(120, SECONDS)); + getContext().setReceiveTimeout(Duration.ofSeconds(120)); } void updateState(CounterChanged event) { diff --git a/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala index f2140682c3..d065bcd325 100644 --- a/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala @@ -660,6 +660,17 @@ class TestKit(system: ActorSystem) { */ def expectTerminated(max: Duration, target: ActorRef): Terminated = tp.expectTerminated(target, max) + /** + * Receive one message from the test actor and assert that it is the Terminated message of the given ActorRef. + * Before calling this method, you have to `watch` the target actor ref. + * Wait time is bounded by the given duration, with an AssertionFailure being thrown in case of timeout. + * + * @param max wait no more than max time, otherwise throw AssertionFailure + * @param target the actor ref expected to be Terminated + * @return the received Terminated message + */ + def expectTerminated(max: java.time.Duration, target: ActorRef): Terminated = tp.expectTerminated(target, max.asScala) + /** * Receive one message from the test actor and assert that it is the Terminated message of the given ActorRef. * Before calling this method, you have to `watch` the target actor ref.