Merge pull request #25511 from akka/java-time-duration-support-for-javadsl
add java.time.Duration support for javadsl #24646
This commit is contained in:
commit
8e3df9110a
5 changed files with 95 additions and 9 deletions
|
|
@ -19,11 +19,9 @@ import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.scalatest.junit.JUnitSuite;
|
import org.scalatest.junit.JUnitSuite;
|
||||||
import scala.concurrent.duration.Duration;
|
|
||||||
import scala.util.control.NoStackTrace;
|
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.time.Duration;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
@ -145,7 +143,7 @@ public class LoggingAdapterTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
void expectLog(final Object level, final String message, final Throwable cause, final String mdc) {
|
void expectLog(final Object level, final String message, final Throwable cause, final String mdc) {
|
||||||
expectMsgPF(Duration.create(3, TimeUnit.SECONDS), "LogEvent", event -> {
|
expectMsgPF(Duration.ofSeconds(3), "LogEvent", event -> {
|
||||||
LogEvent log = (LogEvent) event;
|
LogEvent log = (LogEvent) event;
|
||||||
assertEquals(message, log.message());
|
assertEquals(message, log.message());
|
||||||
assertEquals(level, log.level());
|
assertEquals(level, log.level());
|
||||||
|
|
|
||||||
|
|
@ -119,7 +119,7 @@ public class PatternsTest extends JUnitSuite {
|
||||||
.askWithReplyTo(
|
.askWithReplyTo(
|
||||||
echo,
|
echo,
|
||||||
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
|
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
|
||||||
Timeout.apply(3, SECONDS))
|
Duration.ofSeconds(3))
|
||||||
.thenApply(o -> (String)o);
|
.thenApply(o -> (String)o);
|
||||||
|
|
||||||
final String actual = response.toCompletableFuture().get(3, SECONDS);
|
final String actual = response.toCompletableFuture().get(3, SECONDS);
|
||||||
|
|
@ -136,7 +136,8 @@ public class PatternsTest extends JUnitSuite {
|
||||||
.askWithReplyTo(
|
.askWithReplyTo(
|
||||||
echo,
|
echo,
|
||||||
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
|
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
|
||||||
3000)
|
3000
|
||||||
|
)
|
||||||
.thenApply(o -> (String)o);
|
.thenApply(o -> (String)o);
|
||||||
|
|
||||||
final String actual = response.toCompletableFuture().get(3, SECONDS);
|
final String actual = response.toCompletableFuture().get(3, SECONDS);
|
||||||
|
|
@ -153,7 +154,8 @@ public class PatternsTest extends JUnitSuite {
|
||||||
.askWithReplyTo(
|
.askWithReplyTo(
|
||||||
selection,
|
selection,
|
||||||
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
|
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
|
||||||
3000)
|
3000
|
||||||
|
)
|
||||||
.thenApply(o -> (String)o);
|
.thenApply(o -> (String)o);
|
||||||
|
|
||||||
final String actual = response.toCompletableFuture().get(3, SECONDS);
|
final String actual = response.toCompletableFuture().get(3, SECONDS);
|
||||||
|
|
|
||||||
|
|
@ -318,9 +318,38 @@ object PatternsCS {
|
||||||
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
|
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
|
||||||
* }}}
|
* }}}
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15")
|
||||||
def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
|
def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
|
||||||
scalaAsk(actor, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
|
scalaAsk(actor, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <i>Java API for `akka.pattern.ask`:</i>
|
||||||
|
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
|
||||||
|
* holding the eventual reply message; this means that the target actor
|
||||||
|
* needs to send the result to the `sender` reference provided. The CompletionStage
|
||||||
|
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||||
|
* given timeout has expired; this is independent from any timeout applied
|
||||||
|
* while awaiting a result for this future (i.e. in
|
||||||
|
* `Await.result(..., timeout)`).
|
||||||
|
*
|
||||||
|
* <b>Warning:</b>
|
||||||
|
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||||
|
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||||
|
* on the enclosing actor from within the callback. This would break the actor
|
||||||
|
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||||
|
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||||
|
* there is not yet a way to detect these illegal accesses at compile time.
|
||||||
|
*
|
||||||
|
* <b>Recommended usage:</b>
|
||||||
|
*
|
||||||
|
* {{{
|
||||||
|
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, duration);
|
||||||
|
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
|
def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
|
||||||
|
ask(actor, message, Timeout.create(timeout))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A variation of ask which allows to implement "replyTo" pattern by including
|
* A variation of ask which allows to implement "replyTo" pattern by including
|
||||||
* sender reference in message.
|
* sender reference in message.
|
||||||
|
|
@ -336,9 +365,28 @@ object PatternsCS {
|
||||||
* @param messageFactory function taking an actor ref and returning the message to be sent
|
* @param messageFactory function taking an actor ref and returning the message to be sent
|
||||||
* @param timeout the timeout for the response before failing the returned completion operator
|
* @param timeout the timeout for the response before failing the returned completion operator
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15")
|
||||||
def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: Timeout): CompletionStage[AnyRef] =
|
def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: Timeout): CompletionStage[AnyRef] =
|
||||||
extended.ask(actor, messageFactory.apply _)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
|
extended.ask(actor, messageFactory.apply _)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A variation of ask which allows to implement "replyTo" pattern by including
|
||||||
|
* sender reference in message.
|
||||||
|
*
|
||||||
|
* {{{
|
||||||
|
* final CompletionStage<Object> f = PatternsCS.askWithReplyTo(
|
||||||
|
* worker,
|
||||||
|
* askSender -> new Request(askSender),
|
||||||
|
* timeout);
|
||||||
|
* }}}
|
||||||
|
*
|
||||||
|
* @param actor the actor to be asked
|
||||||
|
* @param messageFactory function taking an actor ref and returning the message to be sent
|
||||||
|
* @param timeout the timeout for the response before failing the returned completion stage
|
||||||
|
*/
|
||||||
|
def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] =
|
||||||
|
extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).toJava.asInstanceOf[CompletionStage[AnyRef]]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <i>Java API for `akka.pattern.ask`:</i>
|
* <i>Java API for `akka.pattern.ask`:</i>
|
||||||
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
|
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
|
||||||
|
|
@ -410,9 +458,38 @@ object PatternsCS {
|
||||||
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
|
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
|
||||||
* }}}
|
* }}}
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.15")
|
||||||
def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
|
def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
|
||||||
scalaAsk(selection, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
|
scalaAsk(selection, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <i>Java API for `akka.pattern.ask`:</i>
|
||||||
|
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
|
||||||
|
* holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]]
|
||||||
|
* needs to send the result to the `sender` reference provided. The CompletionStage
|
||||||
|
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
|
||||||
|
* given timeout has expired; this is independent from any timeout applied
|
||||||
|
* while awaiting a result for this future (i.e. in
|
||||||
|
* `Await.result(..., timeout)`).
|
||||||
|
*
|
||||||
|
* <b>Warning:</b>
|
||||||
|
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||||
|
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||||
|
* on the enclosing actor from within the callback. This would break the actor
|
||||||
|
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||||
|
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||||
|
* there is not yet a way to detect these illegal accesses at compile time.
|
||||||
|
*
|
||||||
|
* <b>Recommended usage:</b>
|
||||||
|
*
|
||||||
|
* {{{
|
||||||
|
* final CompletionStage<Object> f = PatternsCS.ask(selection, request, duration);
|
||||||
|
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
|
||||||
|
* }}}
|
||||||
|
*/
|
||||||
|
def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
|
||||||
|
ask(selection, message, Timeout.create(timeout))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <i>Java API for `akka.pattern.ask`:</i>
|
* <i>Java API for `akka.pattern.ask`:</i>
|
||||||
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
|
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
|
||||||
|
|
|
||||||
|
|
@ -149,9 +149,8 @@ public class RecipeGlobalRateLimit extends RecipeTest {
|
||||||
final Flow<T, T, NotUsed> f = Flow.create();
|
final Flow<T, T, NotUsed> f = Flow.create();
|
||||||
|
|
||||||
return f.mapAsync(parallelism, element -> {
|
return f.mapAsync(parallelism, element -> {
|
||||||
final Timeout triggerTimeout = Timeout.create(maxAllowedWait);
|
|
||||||
final CompletionStage<Object> limiterTriggerFuture =
|
final CompletionStage<Object> limiterTriggerFuture =
|
||||||
PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, triggerTimeout);
|
PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, maxAllowedWait);
|
||||||
return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher());
|
return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -521,6 +521,16 @@ class TestKit(system: ActorSystem) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receive one message from the test actor and assert that the given
|
||||||
|
* partial function accepts it. Wait time is bounded by the given duration,
|
||||||
|
* with an AssertionFailure being thrown in case of timeout.
|
||||||
|
*
|
||||||
|
* Use this variant to implement more complicated or conditional
|
||||||
|
* processing.
|
||||||
|
*/
|
||||||
|
def expectMsgPF[T](max: java.time.Duration, hint: String, f: JFunction[Any, T]): T = expectMsgPF(max.asScala, hint, f)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Same as `expectMsgClass(remainingOrDefault, c)`, but correctly treating the timeFactor.
|
* Same as `expectMsgClass(remainingOrDefault, c)`, but correctly treating the timeFactor.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue