From eb5885e94e62d79535a0ca5b16d95405a8d5c200 Mon Sep 17 00:00:00 2001 From: Guy Youansi Date: Wed, 27 Jun 2018 01:12:11 +0200 Subject: [PATCH] add java.time.Duration support for javadsl #24646 * add java.time.Duration support for askWithReplyto methods and add deprecated to method with akka Timeout as parameter type --- .../java/akka/event/LoggingAdapterTest.java | 6 +- .../test/java/akka/pattern/PatternsTest.java | 8 +- .../main/scala/akka/pattern/Patterns.scala | 77 +++++++++++++++++++ .../cookbook/RecipeGlobalRateLimit.java | 3 +- .../scala/akka/testkit/javadsl/TestKit.scala | 10 +++ 5 files changed, 95 insertions(+), 9 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/event/LoggingAdapterTest.java b/akka-actor-tests/src/test/java/akka/event/LoggingAdapterTest.java index 77933c7d9a..b66732bb16 100644 --- a/akka-actor-tests/src/test/java/akka/event/LoggingAdapterTest.java +++ b/akka-actor-tests/src/test/java/akka/event/LoggingAdapterTest.java @@ -19,11 +19,9 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import scala.concurrent.duration.Duration; -import scala.util.control.NoStackTrace; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -145,7 +143,7 @@ public class LoggingAdapterTest extends JUnitSuite { } void expectLog(final Object level, final String message, final Throwable cause, final String mdc) { - expectMsgPF(Duration.create(3, TimeUnit.SECONDS), "LogEvent", event -> { + expectMsgPF(Duration.ofSeconds(3), "LogEvent", event -> { LogEvent log = (LogEvent) event; assertEquals(message, log.message()); assertEquals(level, log.level()); diff --git a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java index d31d5a7dba..b27cedc176 100644 --- a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java +++ b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java @@ -119,7 +119,7 @@ public class PatternsTest extends JUnitSuite { .askWithReplyTo( echo, replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), - Timeout.apply(3, SECONDS)) + Duration.ofSeconds(3)) .thenApply(o -> (String)o); final String actual = response.toCompletableFuture().get(3, SECONDS); @@ -136,7 +136,8 @@ public class PatternsTest extends JUnitSuite { .askWithReplyTo( echo, replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), - 3000) + 3000 + ) .thenApply(o -> (String)o); final String actual = response.toCompletableFuture().get(3, SECONDS); @@ -153,7 +154,8 @@ public class PatternsTest extends JUnitSuite { .askWithReplyTo( selection, replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), - 3000) + 3000 + ) .thenApply(o -> (String)o); final String actual = response.toCompletableFuture().get(3, SECONDS); diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index e4ba1616d5..d7fc651153 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -318,9 +318,38 @@ object PatternsCS { * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15") def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] = scalaAsk(actor, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** + * Java API for `akka.pattern.ask`: + * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. The CompletionStage + * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final CompletionStage f = PatternsCS.ask(worker, request, duration); + * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); + * }}} + */ + def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = + ask(actor, message, Timeout.create(timeout)) + /** * A variation of ask which allows to implement "replyTo" pattern by including * sender reference in message. @@ -336,9 +365,28 @@ object PatternsCS { * @param messageFactory function taking an actor ref and returning the message to be sent * @param timeout the timeout for the response before failing the returned completion operator */ + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15") def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: Timeout): CompletionStage[AnyRef] = extended.ask(actor, messageFactory.apply _)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** + * A variation of ask which allows to implement "replyTo" pattern by including + * sender reference in message. + * + * {{{ + * final CompletionStage f = PatternsCS.askWithReplyTo( + * worker, + * askSender -> new Request(askSender), + * timeout); + * }}} + * + * @param actor the actor to be asked + * @param messageFactory function taking an actor ref and returning the message to be sent + * @param timeout the timeout for the response before failing the returned completion stage + */ + def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] = + extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] @@ -410,9 +458,38 @@ object PatternsCS { * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15") def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] = scalaAsk(selection, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** + * Java API for `akka.pattern.ask`: + * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] + * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] + * needs to send the result to the `sender` reference provided. The CompletionStage + * will be completed with an [[akka.pattern.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final CompletionStage f = PatternsCS.ask(selection, request, duration); + * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); + * }}} + */ + def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = + ask(selection, message, Timeout.create(timeout)) + /** * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java index ecc43aab92..08490975b6 100644 --- a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java @@ -149,9 +149,8 @@ public class RecipeGlobalRateLimit extends RecipeTest { final Flow f = Flow.create(); return f.mapAsync(parallelism, element -> { - final Timeout triggerTimeout = Timeout.create(maxAllowedWait); final CompletionStage limiterTriggerFuture = - PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, triggerTimeout); + PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, maxAllowedWait); return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher()); }); } diff --git a/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala index a4539b1896..1a40c95b18 100644 --- a/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala @@ -521,6 +521,16 @@ class TestKit(system: ActorSystem) { }) } + /** + * Receive one message from the test actor and assert that the given + * partial function accepts it. Wait time is bounded by the given duration, + * with an AssertionFailure being thrown in case of timeout. + * + * Use this variant to implement more complicated or conditional + * processing. + */ + def expectMsgPF[T](max: java.time.Duration, hint: String, f: JFunction[Any, T]): T = expectMsgPF(max.asScala, hint, f) + /** * Same as `expectMsgClass(remainingOrDefault, c)`, but correctly treating the timeFactor. */