From b7f3cbef94e7eee6985a2d741945c9c8a9d76f89 Mon Sep 17 00:00:00 2001 From: kerr Date: Thu, 6 Dec 2018 22:40:43 +0800 Subject: [PATCH] Merge PatternsCS to Patterns (#26008) * !act Move some Java API methods from PatternsCS to Patterns. * Deprecate PatternCS and in favor of Patterns. --- .../test/java/akka/pattern/PatternsTest.java | 4 +- .../main/scala/akka/pattern/Patterns.scala | 179 ++++++++++++++++++ .../test/java/jdocs/actor/ActorDocTest.java | 98 +++++----- .../actor/io/dns/DnsCompileOnlyDocTest.java | 7 +- .../java/jdocs/camel/ProducerTestBase.java | 6 +- .../circuitbreaker/DangerousJavaActor.java | 2 +- .../java/jdocs/cluster/FactorialBackend.java | 2 +- .../test/java/jdocs/future/FutureDocTest.java | 13 +- .../java/jdocs/pattern/SupervisedAsk.java | 15 +- .../java/jdocs/pattern/SupervisedAskSpec.java | 7 +- .../persistence/PersistenceQueryDocTest.java | 54 +++--- .../jdocs/stream/FlowStreamRefsDocTest.java | 6 +- .../java/jdocs/stream/IntegrationDocTest.java | 5 +- .../jdocs/stream/StreamTestKitDocTest.java | 4 +- .../cookbook/RecipeGlobalRateLimit.java | 7 +- .../java/jdocs/testkit/TestKitDocTest.java | 4 +- 16 files changed, 295 insertions(+), 118 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java index b27cedc176..2e316fb5df 100644 --- a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java +++ b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java @@ -232,7 +232,7 @@ public class PatternsTest extends JUnitSuite { TestProbe probe = new TestProbe(system); CompletableFuture f = new CompletableFuture<>(); f.complete("ho!"); - PatternsCS.pipe(f, ec).to(probe.ref()); + Patterns.pipe(f, ec).to(probe.ref()); probe.expectMsg("ho!"); } @@ -242,7 +242,7 @@ public class PatternsTest extends JUnitSuite { ActorSelection selection = system.actorSelection(probe.ref().path()); CompletableFuture f = new CompletableFuture<>(); f.complete("hi!"); - PatternsCS.pipe(f, ec).to(selection); + Patterns.pipe(f, ec).to(selection); probe.expectMsg("hi!"); } diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index c33fd2ceca..fde836919c 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -69,6 +69,36 @@ object Patterns { */ def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(actor, message)(timeout).asInstanceOf[Future[AnyRef]] + /** + * Java API for `akka.pattern.ask`: + * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] + * holding the eventual reply message; this means that the target actor + * needs to send the result to the `sender` reference provided. + * + * The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final CompletionStage f = PatternsCS.ask(worker, request, duration); + * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); + * }}} + */ + def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = + scalaAsk(actor, message)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** * A variation of ask which allows to implement "replyTo" pattern by including * sender reference in message. @@ -83,6 +113,24 @@ object Patterns { def askWithReplyTo(actor: ActorRef, messageFactory: japi.Function[ActorRef, Any], timeout: Timeout): Future[AnyRef] = extended.ask(actor, messageFactory.apply _)(timeout).asInstanceOf[Future[AnyRef]] + /** + * A variation of ask which allows to implement "replyTo" pattern by including + * sender reference in message. + * + * {{{ + * final CompletionStage f = PatternsCS.askWithReplyTo( + * worker, + * askSender -> new Request(askSender), + * timeout); + * }}} + * + * @param actor the actor to be asked + * @param messageFactory function taking an actor ref and returning the message to be sent + * @param timeout the timeout for the response before failing the returned completion stage + */ + def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] = + extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[scala.concurrent.Future]] @@ -165,6 +213,36 @@ object Patterns { def ask(selection: ActorSelection, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(selection, message)(timeout).asInstanceOf[Future[AnyRef]] + /** + * Java API for `akka.pattern.ask`: + * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]] + * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]] + * needs to send the result to the `sender` reference provided. + * + * The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the + * given timeout has expired; this is independent from any timeout applied + * while awaiting a result for this future (i.e. in + * `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the + * recipient actor didn't send a reply. + * + * Warning: + * When using future callbacks, inside actors you need to carefully avoid closing over + * the containing actor’s object, i.e. do not call methods or access mutable state + * on the enclosing actor from within the callback. This would break the actor + * encapsulation and may introduce synchronization bugs and race conditions because + * the callback will be scheduled concurrently to the enclosing actor. Unfortunately + * there is not yet a way to detect these illegal accesses at compile time. + * + * Recommended usage: + * + * {{{ + * final CompletionStage f = PatternsCS.ask(selection, request, duration); + * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); + * }}} + */ + def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = + scalaAsk(selection, message)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** * Java API for `akka.pattern.ask`: * Sends a message asynchronously and returns a [[scala.concurrent.Future]] @@ -213,6 +291,20 @@ object Patterns { def askWithReplyTo(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeoutMillis: Long): Future[AnyRef] = extended.ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).asInstanceOf[Future[AnyRef]] + /** + * A variation of ask which allows to implement "replyTo" pattern by including + * sender reference in message. + * + * {{{ + * final CompletionStage f = Patterns.askWithReplyTo( + * selection, + * replyTo -> new Request(replyTo), + * timeout); + * }}} + */ + def askWithReplyTo(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] = + extended.ask(selection, messageFactory.apply _)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** * Register an onComplete callback on this [[scala.concurrent.Future]] to send * the result to the given [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]]. @@ -232,6 +324,25 @@ object Patterns { */ def pipe[T](future: Future[T], context: ExecutionContext): PipeableFuture[T] = scalaPipe(future)(context) + /** + * When this [[java.util.concurrent.CompletionStage]] finishes, send its result to the given + * [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]]. + * Returns the original CompletionStage to allow method chaining. + * If the future was completed with failure it is sent as a [[akka.actor.Status.Failure]] + * to the recipient. + * + * Recommended usage example: + * + * {{{ + * final CompletionStage f = PatternsCS.ask(worker, request, timeout); + * // apply some transformation (i.e. enrich with request info) + * final CompletionStage transformed = f.thenApply(result -> { ... }); + * // send it on to the next operator + * PatternsCS.pipe(transformed, context).to(nextActor); + * }}} + */ + def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context) + /** * Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when * existing messages of the target actor has been processed and the actor has been @@ -245,6 +356,19 @@ object Patterns { def gracefulStop(target: ActorRef, timeout: FiniteDuration): Future[java.lang.Boolean] = scalaGracefulStop(target, timeout).asInstanceOf[Future[java.lang.Boolean]] + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] + * is completed with failure [[akka.pattern.AskTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: java.time.Duration): CompletionStage[java.lang.Boolean] = + scalaGracefulStop(target, timeout.asScala).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] + /** * Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when * existing messages of the target actor has been processed and the actor has been @@ -261,6 +385,22 @@ object Patterns { def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any): Future[java.lang.Boolean] = scalaGracefulStop(target, timeout, stopMessage).asInstanceOf[Future[java.lang.Boolean]] + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when + * existing messages of the target actor has been processed and the actor has been + * terminated. + * + * Useful when you need to wait for termination or compose ordered termination of several actors. + * + * If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your + * stop command as `stopMessage` parameter + * + * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] + * is completed with failure [[akka.pattern.AskTimeoutException]]. + */ + def gracefulStop(target: ActorRef, timeout: java.time.Duration, stopMessage: Any): CompletionStage[java.lang.Boolean] = + scalaGracefulStop(target, timeout.asScala, stopMessage).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] + /** * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable * after the specified duration. @@ -268,6 +408,13 @@ object Patterns { def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] = scalaAfter(duration, scheduler)(value.call())(context) + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable + * after the specified duration. + */ + def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[CompletionStage[T]]): CompletionStage[T] = + afterCompletionStage(duration.asScala, scheduler)(value.call())(context) + /** * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable * after the specified duration. @@ -275,6 +422,13 @@ object Patterns { def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] = scalaAfter(duration, scheduler)(value)(context) + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value + * after the specified duration. + */ + def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] = + afterCompletionStage(duration.asScala, scheduler)(value)(context) + /** * Returns an internally retrying [[scala.concurrent.Future]] * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. @@ -286,6 +440,17 @@ object Patterns { def retry[T](attempt: Callable[Future[T]], attempts: Int, delay: FiniteDuration, scheduler: Scheduler, context: ExecutionContext): Future[T] = scalaRetry(() ⇒ attempt.call, attempts, delay)(context, scheduler) + + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]] + * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. + * A scheduler (eg context.system.scheduler) must be provided to delay each retry + * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries + * and therefore must be thread safe (not touch unsafe mutable state). + */ + def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, delay: java.time.Duration, scheduler: Scheduler, ec: ExecutionContext): CompletionStage[T] = + scalaRetry(() ⇒ attempt.call().toScala, attempts, delay.asScala)(ec, scheduler).toJava } /** @@ -293,6 +458,7 @@ object Patterns { * * For working with Scala [[scala.concurrent.Future]] from Java you may want to use [[akka.pattern.Patterns]] instead. */ +@deprecated("Use Patterns instead.", since = "2.5.19") object PatternsCS { import akka.actor.ActorRef import akka.japi @@ -359,6 +525,7 @@ object PatternsCS { * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ + @deprecated("Use Patterns.ask instead.", since = "2.5.19") def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = ask(actor, message, Timeout.create(timeout)) @@ -396,6 +563,7 @@ object PatternsCS { * @param messageFactory function taking an actor ref and returning the message to be sent * @param timeout the timeout for the response before failing the returned completion stage */ + @deprecated("Use Pattens.askWithReplyTo instead.", since = "2.5.19") def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] = extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).toJava.asInstanceOf[CompletionStage[AnyRef]] @@ -426,6 +594,7 @@ object PatternsCS { * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ + @deprecated("Use Pattens.ask which accepts java.time.Duration instead.", since = "2.5.19") def ask(actor: ActorRef, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] = scalaAsk(actor, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).toJava.asInstanceOf[CompletionStage[AnyRef]] @@ -444,6 +613,7 @@ object PatternsCS { * @param messageFactory function taking an actor ref to reply to and returning the message to be sent * @param timeoutMillis the timeout for the response before failing the returned completion operator */ + @deprecated("Use Pattens.askWithReplyTo which accepts java.time.Duration instead.", since = "2.5.19") def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeoutMillis: Long): CompletionStage[AnyRef] = askWithReplyTo(actor, messageFactory, Timeout(timeoutMillis.millis)) @@ -505,6 +675,7 @@ object PatternsCS { * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ + @deprecated("Use Patterns.ask instead.", since = "2.5.19") def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = ask(selection, message, Timeout.create(timeout)) @@ -535,6 +706,7 @@ object PatternsCS { * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ + @deprecated("Use Pattens.ask which accepts java.time.Duration instead.", since = "2.5.19") def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] = scalaAsk(selection, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).toJava.asInstanceOf[CompletionStage[AnyRef]] @@ -549,6 +721,7 @@ object PatternsCS { * timeout); * }}} */ + @deprecated("Use Pattens.askWithReplyTo which accepts java.time.Duration instead.", since = "2.5.19") def askWithReplyTo(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeoutMillis: Long): CompletionStage[AnyRef] = extended.ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).toJava.asInstanceOf[CompletionStage[AnyRef]] @@ -569,6 +742,7 @@ object PatternsCS { * PatternsCS.pipe(transformed, context).to(nextActor); * }}} */ + @deprecated("Use Patterns.pipe instead.", since = "2.5.19") def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context) /** @@ -595,6 +769,7 @@ object PatternsCS { * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ + @deprecated("Use Patterns.gracefulStop instead.", since = "2.5.19") def gracefulStop(target: ActorRef, timeout: java.time.Duration): CompletionStage[java.lang.Boolean] = scalaGracefulStop(target, timeout.asScala).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] @@ -628,6 +803,7 @@ object PatternsCS { * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ + @deprecated("Use Patterns.gracefulStop instead.", since = "2.5.19") def gracefulStop(target: ActorRef, timeout: java.time.Duration, stopMessage: Any): CompletionStage[java.lang.Boolean] = scalaGracefulStop(target, timeout.asScala, stopMessage).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] @@ -643,6 +819,7 @@ object PatternsCS { * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable * after the specified duration. */ + @deprecated("Use Patterns.after instead.", since = "2.5.19") def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[CompletionStage[T]]): CompletionStage[T] = afterCompletionStage(duration.asScala, scheduler)(value.call())(context) @@ -658,6 +835,7 @@ object PatternsCS { * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value * after the specified duration. */ + @deprecated("Use Patterns.after instead.", since = "2.5.19") def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] = afterCompletionStage(duration.asScala, scheduler)(value)(context) @@ -669,6 +847,7 @@ object PatternsCS { * Note that the attempt function will be invoked on the given execution context for subsequent tries * and therefore must be thread safe (not touch unsafe mutable state). */ + @deprecated("Use Patterns.retry instead.", since = "2.5.19") def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, delay: java.time.Duration, scheduler: Scheduler, ec: ExecutionContext): CompletionStage[T] = scalaRetry(() ⇒ attempt.call().toScala, attempts, delay.asScala)(ec, scheduler).toJava } diff --git a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java index d3c4e06153..3106cd49aa 100644 --- a/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java @@ -39,13 +39,13 @@ import akka.actor.ActorSelection; import akka.actor.Identify; //#import-identify //#import-ask -import static akka.pattern.PatternsCS.ask; -import static akka.pattern.PatternsCS.pipe; +import static akka.pattern.Patterns.ask; +import static akka.pattern.Patterns.pipe; import java.util.concurrent.CompletableFuture; //#import-ask //#import-gracefulStop -import static akka.pattern.PatternsCS.gracefulStop; +import static akka.pattern.Patterns.gracefulStop; import akka.pattern.AskTimeoutException; import java.util.concurrent.CompletionStage; @@ -80,7 +80,7 @@ public class ActorDocTest extends AbstractJavaTest { //#context-actorOf public class FirstActor extends AbstractActor { final ActorRef child = getContext().actorOf(Props.create(MyActor.class), "myChild"); - + //#plus-some-behavior @Override public Receive createReceive() { @@ -102,15 +102,15 @@ public class ActorDocTest extends AbstractJavaTest { } //#createReceive } - - static + + static //#well-structured public class WellStructuredActor extends AbstractActor { - + public static class Msg1 {} public static class Msg2 {} public static class Msg3 {} - + @Override public Receive createReceive() { return receiveBuilder() @@ -119,29 +119,29 @@ public class ActorDocTest extends AbstractJavaTest { .match(Msg3.class, this::receiveMsg3) .build(); } - + private void receiveMsg1(Msg1 msg) { // actual work } - + private void receiveMsg2(Msg2 msg) { // actual work } - + private void receiveMsg3(Msg3 msg) { // actual work } } //#well-structured - - static + + static //#optimized public class OptimizedActor extends UntypedAbstractActor { - + public static class Msg1 {} public static class Msg2 {} public static class Msg3 {} - + @Override public void onReceive(Object msg) throws Exception { if (msg instanceof Msg1) @@ -153,15 +153,15 @@ public class ActorDocTest extends AbstractJavaTest { else unhandled(msg); } - + private void receiveMsg1(Msg1 msg) { // actual work } - + private void receiveMsg2(Msg2 msg) { // actual work } - + private void receiveMsg3(Msg3 msg) { // actual work } @@ -174,7 +174,7 @@ public class ActorDocTest extends AbstractJavaTest { public ActorWithArgs(String args) { this.args = args; } - + @Override public Receive createReceive() { return receiveBuilder().matchAny(x -> { }).build(); @@ -197,11 +197,11 @@ public class ActorDocTest extends AbstractJavaTest { } private final Integer magicNumber; - + public DemoActor(Integer magicNumber) { this.magicNumber = magicNumber; } - + @Override public Receive createReceive() { return receiveBuilder() @@ -243,7 +243,7 @@ public class ActorDocTest extends AbstractJavaTest { return from; } } - + @Override public Receive createReceive() { return receiveBuilder() @@ -255,23 +255,23 @@ public class ActorDocTest extends AbstractJavaTest { } //#messages-in-companion - + public static class LifecycleMethods extends AbstractActor { @Override public Receive createReceive() { return AbstractActor.emptyBehavior(); } - + /* * This section must be kept in sync with the actual Actor trait. - * + * * BOYSCOUT RULE: whenever you read this, verify that! */ //#lifecycle-callbacks public void preStart() { } - + public void preRestart(Throwable reason, Optional message) { for (ActorRef each : getContext().getChildren()) { getContext().unwatch(each); @@ -279,25 +279,25 @@ public class ActorDocTest extends AbstractJavaTest { } postStop(); } - + public void postRestart(Throwable reason) { preStart(); } - + public void postStop() { } //#lifecycle-callbacks - + } - + public static class Hook extends AbstractActor { ActorRef target = null; - + @Override public Receive createReceive() { return AbstractActor.emptyBehavior(); } - + //#preStart @Override public void preStart() { @@ -345,7 +345,7 @@ public class ActorDocTest extends AbstractJavaTest { } public static class ReplyException extends AbstractActor { - + @Override public Receive createReceive() { return receiveBuilder() @@ -397,7 +397,7 @@ public class ActorDocTest extends AbstractJavaTest { .matchEquals("job", s -> getSender().tell("service unavailable, shutting down", getSelf()) ) - .match(Terminated.class, t -> t.actor().equals(worker), t -> + .match(Terminated.class, t -> t.actor().equals(worker), t -> getContext().stop(getSelf()) ) .build(); @@ -536,7 +536,7 @@ public class ActorDocTest extends AbstractJavaTest { // To set an initial delay getContext().setReceiveTimeout(Duration.ofSeconds(10)); } - + @Override public Receive createReceive() { return receiveBuilder() @@ -598,11 +598,11 @@ public class ActorDocTest extends AbstractJavaTest { }) .build(); } - + @Override public Receive createReceive() { return receiveBuilder() - .matchEquals("foo", s -> + .matchEquals("foo", s -> getContext().become(angry) ) .matchEquals("bar", s -> @@ -669,7 +669,7 @@ public class ActorDocTest extends AbstractJavaTest { public WatchActor() { getContext().watch(child); // <-- this is the only call needed for registration } - + @Override public Receive createReceive() { return receiveBuilder() @@ -706,7 +706,7 @@ public class ActorDocTest extends AbstractJavaTest { ActorSelection selection = getContext().actorSelection("/user/another"); selection.tell(new Identify(identifyId), getSelf()); } - + @Override public Receive createReceive() { return receiveBuilder() @@ -720,7 +720,7 @@ public class ActorDocTest extends AbstractJavaTest { }) .build(); } - + final AbstractActor.Receive active(final ActorRef another) { return receiveBuilder() .match(Terminated.class, t -> t.actor().equals(another), t -> @@ -744,7 +744,7 @@ public class ActorDocTest extends AbstractJavaTest { } }; } - + @Test public void usePatternsAskPipe() { new TestKit(system) { @@ -754,11 +754,11 @@ public class ActorDocTest extends AbstractJavaTest { ActorRef actorC = getRef(); //#ask-pipe - Timeout t = Timeout.create(Duration.ofSeconds(5)); + final Duration t = Duration.ofSeconds(5); // using 1000ms timeout CompletableFuture future1 = - ask(actorA, "request", 1000).toCompletableFuture(); + ask(actorA, "request", Duration.ofMillis(1000)).toCompletableFuture(); // using timeout from above CompletableFuture future2 = @@ -774,12 +774,12 @@ public class ActorDocTest extends AbstractJavaTest { pipe(transformed, system.dispatcher()).to(actorC); //#ask-pipe - + expectMsgEquals(new Result("request", "another request")); } }; } - + @Test public void useKill() { new TestKit(system) { @@ -788,14 +788,14 @@ public class ActorDocTest extends AbstractJavaTest { watch(victim); //#kill victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender()); - + // expecting the actor to indeed terminate: expectTerminated(Duration.ofSeconds(3), victim); //#kill } }; } - + @Test public void usePoisonPill() { new TestKit(system) { @@ -809,7 +809,7 @@ public class ActorDocTest extends AbstractJavaTest { } }; } - + @Test public void coordinatedShutdown() { final ActorRef someActor = system.actorOf(Props.create(FirstActor.class)); @@ -817,7 +817,7 @@ public class ActorDocTest extends AbstractJavaTest { CoordinatedShutdown.get(system).addTask( CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskName", () -> { - return akka.pattern.PatternsCS.ask(someActor, "stop", new Timeout(5, TimeUnit.SECONDS)) + return akka.pattern.Patterns.ask(someActor, "stop", Duration.ofSeconds(5)) .thenApply(reply -> Done.getInstance()); }); //#coordinated-shutdown-addTask diff --git a/akka-docs/src/test/java/jdocs/actor/io/dns/DnsCompileOnlyDocTest.java b/akka-docs/src/test/java/jdocs/actor/io/dns/DnsCompileOnlyDocTest.java index 7735257c04..af892f0467 100644 --- a/akka-docs/src/test/java/jdocs/actor/io/dns/DnsCompileOnlyDocTest.java +++ b/akka-docs/src/test/java/jdocs/actor/io/dns/DnsCompileOnlyDocTest.java @@ -7,14 +7,13 @@ package jdocs.actor.io.dns; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.io.Dns; -import akka.io.IO; import akka.io.dns.DnsProtocol; -import static akka.pattern.PatternsCS.ask; -import static akka.pattern.PatternsCS.pipe; +import static akka.pattern.Patterns.ask; import scala.Option; +import java.time.Duration; import java.util.concurrent.CompletionStage; @@ -23,7 +22,7 @@ public class DnsCompileOnlyDocTest { ActorSystem system = ActorSystem.create(); ActorRef actorRef = null; - long timeout = 1000; + final Duration timeout = Duration.ofMillis(1000L); //#resolve Option initial = Dns.get(system).cache().resolve("google.com", system, actorRef); diff --git a/akka-docs/src/test/java/jdocs/camel/ProducerTestBase.java b/akka-docs/src/test/java/jdocs/camel/ProducerTestBase.java index 9a11771e62..5a197a9b04 100644 --- a/akka-docs/src/test/java/jdocs/camel/ProducerTestBase.java +++ b/akka-docs/src/test/java/jdocs/camel/ProducerTestBase.java @@ -4,16 +4,17 @@ package jdocs.camel; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletionStage; +import akka.pattern.Patterns; import akka.testkit.javadsl.TestKit; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.camel.CamelMessage; -import akka.pattern.PatternsCS; public class ProducerTestBase { public void tellJmsProducer() { @@ -33,7 +34,8 @@ public class ProducerTestBase { ActorSystem system = ActorSystem.create("some-system"); Props props = Props.create(FirstProducer.class); ActorRef producer = system.actorOf(props,"myproducer"); - CompletionStage future = PatternsCS.ask(producer, "some request", 1000); + CompletionStage future = Patterns.ask(producer, "some request", + Duration.ofMillis(1000L)); //#AskProducer system.stop(producer); TestKit.shutdownActorSystem(system); diff --git a/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java b/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java index e4355f871e..d3650e3a2b 100644 --- a/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java @@ -12,7 +12,7 @@ import java.time.Duration; import akka.pattern.CircuitBreaker; import akka.event.Logging; -import static akka.pattern.PatternsCS.pipe; +import static akka.pattern.Patterns.pipe; import java.util.concurrent.CompletableFuture; diff --git a/akka-docs/src/test/java/jdocs/cluster/FactorialBackend.java b/akka-docs/src/test/java/jdocs/cluster/FactorialBackend.java index eeadab1b87..4f39bb49e4 100644 --- a/akka-docs/src/test/java/jdocs/cluster/FactorialBackend.java +++ b/akka-docs/src/test/java/jdocs/cluster/FactorialBackend.java @@ -8,7 +8,7 @@ import java.math.BigInteger; import java.util.concurrent.CompletableFuture; import akka.actor.AbstractActor; -import static akka.pattern.PatternsCS.pipe; +import static akka.pattern.Patterns.pipe; //#backend public class FactorialBackend extends AbstractActor { diff --git a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java index 2861100d10..f58b88af28 100644 --- a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java @@ -7,7 +7,6 @@ package jdocs.future; //#imports1 import akka.dispatch.*; import jdocs.AbstractJavaTest; -import scala.Function0; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.Await; @@ -64,15 +63,15 @@ import java.util.Arrays; //#imports8 -import static akka.pattern.PatternsCS.retry; +import static akka.pattern.Patterns.retry; //#imports8 //#imports-ask -import static akka.pattern.PatternsCS.ask; +import static akka.pattern.Patterns.ask; //#imports-ask //#imports-pipe -import static akka.pattern.PatternsCS.pipe; +import static akka.pattern.Patterns.pipe; //#imports-pipe @@ -124,11 +123,11 @@ public class FutureDocTest extends AbstractJavaTest { //#pipe-to-usage public class ActorUsingPipeTo extends AbstractActor { ActorRef target; - Timeout timeout; + Duration timeout; ActorUsingPipeTo(ActorRef target) { this.target = target; - this.timeout = Timeout.create(Duration.ofSeconds(5)); + this.timeout = Duration.ofSeconds(5); } @Override @@ -216,7 +215,7 @@ public class FutureDocTest extends AbstractJavaTest { public class UserProxyActor extends AbstractActor { ActorRef userActor; ActorRef userActivityActor; - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + Duration timeout = Duration.ofSeconds(5); UserProxyActor(ActorRef userActor, ActorRef userActivityActor) { this.userActor = userActor; diff --git a/akka-docs/src/test/java/jdocs/pattern/SupervisedAsk.java b/akka-docs/src/test/java/jdocs/pattern/SupervisedAsk.java index b970f4333f..26b9fe6bbe 100644 --- a/akka-docs/src/test/java/jdocs/pattern/SupervisedAsk.java +++ b/akka-docs/src/test/java/jdocs/pattern/SupervisedAsk.java @@ -19,17 +19,16 @@ import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.actor.Terminated; import akka.actor.AbstractActor; -import akka.pattern.PatternsCS; -import akka.util.Timeout; +import akka.pattern.Patterns; public class SupervisedAsk { private static class AskParam { Props props; Object message; - Timeout timeout; + Duration timeout; - AskParam(Props props, Object message, Timeout timeout) { + AskParam(Props props, Object message, Duration timeout) { this.props = props; this.message = message; this.timeout = timeout; @@ -77,7 +76,7 @@ public class SupervisedAsk { getContext().watch(targetActor); targetActor.forward(askParam.message, getContext()); Scheduler scheduler = getContext().getSystem().scheduler(); - timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(), + timeoutMessage = scheduler.scheduleOnce(askParam.timeout, getSelf(), new AskTimeout(), getContext().dispatcher(), null); }) .match(Terminated.class, message -> { @@ -97,13 +96,13 @@ public class SupervisedAsk { } public static CompletionStage askOf(ActorRef supervisorCreator, Props props, - Object message, Timeout timeout) { + Object message, Duration timeout) { AskParam param = new AskParam(props, message, timeout); - return PatternsCS.ask(supervisorCreator, param, timeout); + return Patterns.ask(supervisorCreator, param, timeout); } synchronized public static ActorRef createSupervisorCreator( ActorRefFactory factory) { return factory.actorOf(Props.create(AskSupervisorCreator.class)); } -} \ No newline at end of file +} diff --git a/akka-docs/src/test/java/jdocs/pattern/SupervisedAskSpec.java b/akka-docs/src/test/java/jdocs/pattern/SupervisedAskSpec.java index 2a02ca106a..8f2d47bd35 100644 --- a/akka-docs/src/test/java/jdocs/pattern/SupervisedAskSpec.java +++ b/akka-docs/src/test/java/jdocs/pattern/SupervisedAskSpec.java @@ -11,12 +11,14 @@ import akka.actor.AbstractActor; import akka.util.Timeout; import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; public class SupervisedAskSpec { public Object execute(Class someActor, - Object message, Timeout timeout, ActorRefFactory actorSystem) + Object message, Duration timeout, ActorRefFactory actorSystem) throws Exception { // example usage try { @@ -24,8 +26,7 @@ public class SupervisedAskSpec { .createSupervisorCreator(actorSystem); CompletionStage finished = SupervisedAsk.askOf(supervisorCreator, Props.create(someActor), message, timeout); - FiniteDuration d = timeout.duration(); - return finished.toCompletableFuture().get(d.length(), d.unit()); + return finished.toCompletableFuture().get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (Exception e) { // exception propagated by supervision throw e; diff --git a/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java b/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java index 40eee55d5c..52284b4d3c 100644 --- a/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java @@ -4,7 +4,9 @@ package jdocs.persistence; -import static akka.pattern.PatternsCS.ask; +import static akka.pattern.Patterns.ask; + +import java.time.Duration; import java.util.HashSet; import java.util.Set; @@ -18,9 +20,7 @@ import akka.persistence.query.*; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import akka.util.Timeout; -import com.typesafe.config.ConfigFactory; import docs.persistence.query.MyEventsByTagPublisher; import org.reactivestreams.Subscriber; import scala.concurrent.duration.FiniteDuration; @@ -34,13 +34,13 @@ public class PersistenceQueryDocTest { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); - + static //#advanced-journal-query-types public class RichEvent { public final Settags; public final Object payload; - + public RichEvent(Set tags, Object payload) { this.tags = tags; this.payload = payload; @@ -70,7 +70,7 @@ public class PersistenceQueryDocTest { public MyReadJournalProvider(ExtendedActorSystem system, Config config) { this.javadslReadJournal = new MyJavadslReadJournal(system, config); } - + @Override public MyScaladslReadJournal scaladslReadJournal() { return new MyScaladslReadJournal(javadslReadJournal); @@ -79,13 +79,13 @@ public class PersistenceQueryDocTest { @Override public MyJavadslReadJournal javadslReadJournal() { return this.javadslReadJournal; - } + } } //#my-read-journal - + static //#my-read-journal - public class MyJavadslReadJournal implements + public class MyJavadslReadJournal implements akka.persistence.query.javadsl.ReadJournal, akka.persistence.query.javadsl.EventsByTagQuery, akka.persistence.query.javadsl.EventsByPersistenceIdQuery, @@ -95,8 +95,8 @@ public class PersistenceQueryDocTest { private final FiniteDuration refreshInterval; public MyJavadslReadJournal(ExtendedActorSystem system, Config config) { - refreshInterval = - FiniteDuration.create(config.getDuration("refresh-interval", + refreshInterval = + FiniteDuration.create(config.getDuration("refresh-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } @@ -136,13 +136,13 @@ public class PersistenceQueryDocTest { // implement in a similar way as eventsByTag throw new UnsupportedOperationException("Not implemented yet"); } - + @Override public Source currentPersistenceIds() { // implement in a similar way as eventsByTag throw new UnsupportedOperationException("Not implemented yet"); } - + // possibility to add more plugin specific queries //#advanced-journal-query-definition @@ -154,16 +154,16 @@ public class PersistenceQueryDocTest { } //#my-read-journal - + static //#my-read-journal - public class MyScaladslReadJournal implements + public class MyScaladslReadJournal implements akka.persistence.query.scaladsl.ReadJournal, akka.persistence.query.scaladsl.EventsByTagQuery, akka.persistence.query.scaladsl.EventsByPersistenceIdQuery, akka.persistence.query.scaladsl.PersistenceIdsQuery, akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { - + private final MyJavadslReadJournal javadslReadJournal; public MyScaladslReadJournal(MyJavadslReadJournal javadslReadJournal) { @@ -179,7 +179,7 @@ public class PersistenceQueryDocTest { @Override public akka.stream.scaladsl.Source eventsByPersistenceId( String persistenceId, long fromSequenceNr, long toSequenceNr) { - return javadslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, + return javadslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asScala(); } @@ -187,12 +187,12 @@ public class PersistenceQueryDocTest { public akka.stream.scaladsl.Source persistenceIds() { return javadslReadJournal.persistenceIds().asScala(); } - + @Override public akka.stream.scaladsl.Source currentPersistenceIds() { return javadslReadJournal.currentPersistenceIds().asScala(); } - + // possibility to add more plugin specific queries public akka.stream.scaladsl.Source byTagsWithMeta( @@ -306,13 +306,13 @@ public class PersistenceQueryDocTest { "infinite: " + meta.infinite); return meta; }); - + events.map(event -> { - System.out.println("Event payload: " + event.payload); + System.out.println("Event payload: " + event.payload); return event.payload; }).runWith(Sink.ignore(), mat); - - + + //#advanced-journal-query-usage } @@ -408,7 +408,7 @@ public class PersistenceQueryDocTest { //#projection-into-different-store-actor-run - final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS); + final Duration timeout = Duration.ofSeconds(3); final MyResumableProjection bidProjection = new MyResumableProjection("bid"); @@ -451,17 +451,17 @@ public class PersistenceQueryDocTest { public TheOneWhoWritesToQueryJournal() { store = new ExampleStore(); } - + @Override public Receive createReceive() { return receiveBuilder() .matchAny(message -> { state = updateState(state, message); - + // example saving logic that requires state to become ready: if (state.readyToSave()) store.save(Record.of(state)); - + }) .build(); } diff --git a/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java b/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java index 3974f0bad2..8a610767f6 100644 --- a/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/FlowStreamRefsDocTest.java @@ -9,7 +9,7 @@ import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.pattern.PatternsCS; +import akka.pattern.Patterns; import akka.stream.*; import akka.stream.javadsl.*; import akka.testkit.javadsl.TestKit; @@ -59,7 +59,7 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { Source logs = streamLogs(requestLogs.streamId); CompletionStage> logsRef = logs.runWith(StreamRefs.sourceRef(), mat); - PatternsCS.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher()) + Patterns.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher()) .to(sender()); } @@ -111,7 +111,7 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest { Sink sink = logsSinkFor(prepare.id); CompletionStage> sinkRef = StreamRefs.sinkRef().to(sink).run(mat); - PatternsCS.pipe(sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)), context().dispatcher()) + Patterns.pipe(sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)), context().dispatcher()) .to(sender()); }) .build(); diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index 87e3ac7841..3b61e6c0c7 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -20,7 +20,6 @@ import jdocs.stream.TwitterStreamQuickstartDocTest.Model.Tweet; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; import java.time.Duration; import java.util.Arrays; @@ -33,7 +32,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static akka.pattern.PatternsCS.ask; +import static akka.pattern.Patterns.ask; import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.AKKA; import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.tweets; import static junit.framework.TestCase.assertTrue; @@ -602,7 +601,7 @@ public class IntegrationDocTest extends AbstractJavaTest { final RunnableGraph saveTweets = akkaTweets - .mapAsync(4, tweet -> ask(database, new Save(tweet), 300)) + .mapAsync(4, tweet -> ask(database, new Save(tweet), Duration.ofMillis(300L))) .to(Sink.ignore()); //#save-tweets diff --git a/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java b/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java index 6bafe8d57b..6d1d0190ea 100644 --- a/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java @@ -97,7 +97,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { final CompletionStage>> future = sourceUnderTest .grouped(2) .runWith(Sink.head(), mat); - akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.getRef()); + akka.pattern.Patterns.pipe(future, system.dispatcher()).to(probe.getRef()); probe.expectMsg(Duration.ofSeconds(3), Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))); //#pipeto-testprobe } @@ -200,7 +200,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { public void testSourceAndTestSink() throws Exception { //#test-source-and-sink final Flow flowUnderTest = Flow.of(Integer.class) - .mapAsyncUnordered(2, sleep -> akka.pattern.PatternsCS.after( + .mapAsyncUnordered(2, sleep -> akka.pattern.Patterns.after( Duration.ofMillis(10), system.scheduler(), system.dispatcher(), diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java index 93369f1af0..9cb838f2d1 100644 --- a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java @@ -6,13 +6,12 @@ package jdocs.stream.javadsl.cookbook; import akka.NotUsed; import akka.actor.*; -import akka.pattern.PatternsCS; +import akka.pattern.Patterns; import akka.stream.*; import akka.stream.javadsl.*; import akka.stream.testkit.TestSubscriber; import akka.stream.testkit.javadsl.TestSink; import akka.testkit.javadsl.TestKit; -import akka.util.Timeout; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -83,7 +82,7 @@ public class RecipeGlobalRateLimit extends RecipeTest { getContext().getSystem().dispatcher(), getSelf()); } - + @Override public Receive createReceive() { return open(); @@ -150,7 +149,7 @@ public class RecipeGlobalRateLimit extends RecipeTest { return f.mapAsync(parallelism, element -> { final CompletionStage limiterTriggerFuture = - PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, maxAllowedWait); + Patterns.ask(limiter, Limiter.WANT_TO_PASS, maxAllowedWait); return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher()); }); } diff --git a/akka-docs/src/test/java/jdocs/testkit/TestKitDocTest.java b/akka-docs/src/test/java/jdocs/testkit/TestKitDocTest.java index 456eebaca2..49fc748d70 100644 --- a/akka-docs/src/test/java/jdocs/testkit/TestKitDocTest.java +++ b/akka-docs/src/test/java/jdocs/testkit/TestKitDocTest.java @@ -6,7 +6,7 @@ package jdocs.testkit; import static org.junit.Assert.*; -import akka.pattern.PatternsCS; +import akka.pattern.Patterns; import jdocs.AbstractJavaTest; import org.junit.Assert; import akka.japi.JavaPartialFunction; @@ -99,7 +99,7 @@ public class TestKitDocTest extends AbstractJavaTest { //#test-behavior final Props props = Props.create(MyActor.class); final TestActorRef ref = TestActorRef.create(system, props, "testB"); - final CompletableFuture future = PatternsCS.ask(ref, "say42", 3000).toCompletableFuture(); + final CompletableFuture future = Patterns.ask(ref, "say42", Duration.ofMillis(3000)).toCompletableFuture(); assertTrue(future.isDone()); assertEquals(42, future.get()); //#test-behavior