add java.time.Duration support for javadsl #24646

* add java.time.Duration support for askWithReplyto methods and add deprecated to method with akka Timeout as parameter type
This commit is contained in:
Guy Youansi 2018-06-27 01:12:11 +02:00 committed by Patrik Nordwall
parent b9aecb7f5a
commit eb5885e94e
5 changed files with 95 additions and 9 deletions

View file

@ -19,11 +19,9 @@ import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.Duration;
import scala.util.control.NoStackTrace;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.time.Duration;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -145,7 +143,7 @@ public class LoggingAdapterTest extends JUnitSuite {
} }
void expectLog(final Object level, final String message, final Throwable cause, final String mdc) { void expectLog(final Object level, final String message, final Throwable cause, final String mdc) {
expectMsgPF(Duration.create(3, TimeUnit.SECONDS), "LogEvent", event -> { expectMsgPF(Duration.ofSeconds(3), "LogEvent", event -> {
LogEvent log = (LogEvent) event; LogEvent log = (LogEvent) event;
assertEquals(message, log.message()); assertEquals(message, log.message());
assertEquals(level, log.level()); assertEquals(level, log.level());

View file

@ -119,7 +119,7 @@ public class PatternsTest extends JUnitSuite {
.askWithReplyTo( .askWithReplyTo(
echo, echo,
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
Timeout.apply(3, SECONDS)) Duration.ofSeconds(3))
.thenApply(o -> (String)o); .thenApply(o -> (String)o);
final String actual = response.toCompletableFuture().get(3, SECONDS); final String actual = response.toCompletableFuture().get(3, SECONDS);
@ -136,7 +136,8 @@ public class PatternsTest extends JUnitSuite {
.askWithReplyTo( .askWithReplyTo(
echo, echo,
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
3000) 3000
)
.thenApply(o -> (String)o); .thenApply(o -> (String)o);
final String actual = response.toCompletableFuture().get(3, SECONDS); final String actual = response.toCompletableFuture().get(3, SECONDS);
@ -153,7 +154,8 @@ public class PatternsTest extends JUnitSuite {
.askWithReplyTo( .askWithReplyTo(
selection, selection,
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
3000) 3000
)
.thenApply(o -> (String)o); .thenApply(o -> (String)o);
final String actual = response.toCompletableFuture().get(3, SECONDS); final String actual = response.toCompletableFuture().get(3, SECONDS);

View file

@ -318,9 +318,38 @@ object PatternsCS {
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}} * }}}
*/ */
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15")
def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] = def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
scalaAsk(actor, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]] scalaAsk(actor, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
/**
* <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The CompletionStage
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, duration);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}}
*/
def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
ask(actor, message, Timeout.create(timeout))
/** /**
* A variation of ask which allows to implement "replyTo" pattern by including * A variation of ask which allows to implement "replyTo" pattern by including
* sender reference in message. * sender reference in message.
@ -336,9 +365,28 @@ object PatternsCS {
* @param messageFactory function taking an actor ref and returning the message to be sent * @param messageFactory function taking an actor ref and returning the message to be sent
* @param timeout the timeout for the response before failing the returned completion operator * @param timeout the timeout for the response before failing the returned completion operator
*/ */
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15")
def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: Timeout): CompletionStage[AnyRef] = def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: Timeout): CompletionStage[AnyRef] =
extended.ask(actor, messageFactory.apply _)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]] extended.ask(actor, messageFactory.apply _)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
/**
* A variation of ask which allows to implement "replyTo" pattern by including
* sender reference in message.
*
* {{{
* final CompletionStage<Object> f = PatternsCS.askWithReplyTo(
* worker,
* askSender -> new Request(askSender),
* timeout);
* }}}
*
* @param actor the actor to be asked
* @param messageFactory function taking an actor ref and returning the message to be sent
* @param timeout the timeout for the response before failing the returned completion stage
*/
def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] =
extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).toJava.asInstanceOf[CompletionStage[AnyRef]]
/** /**
* <i>Java API for `akka.pattern.ask`:</i> * <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
@ -410,9 +458,38 @@ object PatternsCS {
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}} * }}}
*/ */
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15")
def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] = def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
scalaAsk(selection, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]] scalaAsk(selection, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
/**
* <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
* holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]]
* needs to send the result to the `sender` reference provided. The CompletionStage
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* final CompletionStage<Object> f = PatternsCS.ask(selection, request, duration);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}}
*/
def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
ask(selection, message, Timeout.create(timeout))
/** /**
* <i>Java API for `akka.pattern.ask`:</i> * <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]

View file

@ -149,9 +149,8 @@ public class RecipeGlobalRateLimit extends RecipeTest {
final Flow<T, T, NotUsed> f = Flow.create(); final Flow<T, T, NotUsed> f = Flow.create();
return f.mapAsync(parallelism, element -> { return f.mapAsync(parallelism, element -> {
final Timeout triggerTimeout = Timeout.create(maxAllowedWait);
final CompletionStage<Object> limiterTriggerFuture = final CompletionStage<Object> limiterTriggerFuture =
PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, triggerTimeout); PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, maxAllowedWait);
return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher()); return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher());
}); });
} }

View file

@ -521,6 +521,16 @@ class TestKit(system: ActorSystem) {
}) })
} }
/**
* Receive one message from the test actor and assert that the given
* partial function accepts it. Wait time is bounded by the given duration,
* with an AssertionFailure being thrown in case of timeout.
*
* Use this variant to implement more complicated or conditional
* processing.
*/
def expectMsgPF[T](max: java.time.Duration, hint: String, f: JFunction[Any, T]): T = expectMsgPF(max.asScala, hint, f)
/** /**
* Same as `expectMsgClass(remainingOrDefault, c)`, but correctly treating the timeFactor. * Same as `expectMsgClass(remainingOrDefault, c)`, but correctly treating the timeFactor.
*/ */