Merge PatternsCS to Patterns (#26008)

* !act Move some Java API methods from PatternsCS to Patterns.

* Deprecate PatternCS and in favor of Patterns.
This commit is contained in:
kerr 2018-12-06 22:40:43 +08:00 committed by Patrik Nordwall
parent 4f100a1f1e
commit b7f3cbef94
16 changed files with 295 additions and 118 deletions

View file

@ -232,7 +232,7 @@ public class PatternsTest extends JUnitSuite {
TestProbe probe = new TestProbe(system); TestProbe probe = new TestProbe(system);
CompletableFuture<String> f = new CompletableFuture<>(); CompletableFuture<String> f = new CompletableFuture<>();
f.complete("ho!"); f.complete("ho!");
PatternsCS.pipe(f, ec).to(probe.ref()); Patterns.pipe(f, ec).to(probe.ref());
probe.expectMsg("ho!"); probe.expectMsg("ho!");
} }
@ -242,7 +242,7 @@ public class PatternsTest extends JUnitSuite {
ActorSelection selection = system.actorSelection(probe.ref().path()); ActorSelection selection = system.actorSelection(probe.ref().path());
CompletableFuture<String> f = new CompletableFuture<>(); CompletableFuture<String> f = new CompletableFuture<>();
f.complete("hi!"); f.complete("hi!");
PatternsCS.pipe(f, ec).to(selection); Patterns.pipe(f, ec).to(selection);
probe.expectMsg("hi!"); probe.expectMsg("hi!");
} }

View file

@ -69,6 +69,36 @@ object Patterns {
*/ */
def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(actor, message)(timeout).asInstanceOf[Future[AnyRef]] def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[AnyRef] = scalaAsk(actor, message)(timeout).asInstanceOf[Future[AnyRef]]
/**
* <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided.
*
* The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
* recipient actor didn't send a reply.
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, duration);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}}
*/
def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
scalaAsk(actor, message)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]]
/** /**
* A variation of ask which allows to implement "replyTo" pattern by including * A variation of ask which allows to implement "replyTo" pattern by including
* sender reference in message. * sender reference in message.
@ -83,6 +113,24 @@ object Patterns {
def askWithReplyTo(actor: ActorRef, messageFactory: japi.Function[ActorRef, Any], timeout: Timeout): Future[AnyRef] = def askWithReplyTo(actor: ActorRef, messageFactory: japi.Function[ActorRef, Any], timeout: Timeout): Future[AnyRef] =
extended.ask(actor, messageFactory.apply _)(timeout).asInstanceOf[Future[AnyRef]] extended.ask(actor, messageFactory.apply _)(timeout).asInstanceOf[Future[AnyRef]]
/**
* A variation of ask which allows to implement "replyTo" pattern by including
* sender reference in message.
*
* {{{
* final CompletionStage<Object> f = PatternsCS.askWithReplyTo(
* worker,
* askSender -> new Request(askSender),
* timeout);
* }}}
*
* @param actor the actor to be asked
* @param messageFactory function taking an actor ref and returning the message to be sent
* @param timeout the timeout for the response before failing the returned completion stage
*/
def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] =
extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).toJava.asInstanceOf[CompletionStage[AnyRef]]
/** /**
* <i>Java API for `akka.pattern.ask`:</i> * <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[scala.concurrent.Future]] * Sends a message asynchronously and returns a [[scala.concurrent.Future]]
@ -165,6 +213,36 @@ object Patterns {
def ask(selection: ActorSelection, message: Any, timeout: Timeout): Future[AnyRef] = def ask(selection: ActorSelection, message: Any, timeout: Timeout): Future[AnyRef] =
scalaAsk(selection, message)(timeout).asInstanceOf[Future[AnyRef]] scalaAsk(selection, message)(timeout).asInstanceOf[Future[AnyRef]]
/**
* <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
* holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]]
* needs to send the result to the `sender` reference provided.
*
* The CompletionStage will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`). A typical reason for `AskTimeoutException` is that the
* recipient actor didn't send a reply.
*
* <b>Warning:</b>
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors object, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*
* <b>Recommended usage:</b>
*
* {{{
* final CompletionStage<Object> f = PatternsCS.ask(selection, request, duration);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}}
*/
def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
scalaAsk(selection, message)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]]
/** /**
* <i>Java API for `akka.pattern.ask`:</i> * <i>Java API for `akka.pattern.ask`:</i>
* Sends a message asynchronously and returns a [[scala.concurrent.Future]] * Sends a message asynchronously and returns a [[scala.concurrent.Future]]
@ -213,6 +291,20 @@ object Patterns {
def askWithReplyTo(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeoutMillis: Long): Future[AnyRef] = def askWithReplyTo(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeoutMillis: Long): Future[AnyRef] =
extended.ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).asInstanceOf[Future[AnyRef]] extended.ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).asInstanceOf[Future[AnyRef]]
/**
* A variation of ask which allows to implement "replyTo" pattern by including
* sender reference in message.
*
* {{{
* final CompletionStage<Object> f = Patterns.askWithReplyTo(
* selection,
* replyTo -> new Request(replyTo),
* timeout);
* }}}
*/
def askWithReplyTo(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] =
extended.ask(selection, messageFactory.apply _)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]]
/** /**
* Register an onComplete callback on this [[scala.concurrent.Future]] to send * Register an onComplete callback on this [[scala.concurrent.Future]] to send
* the result to the given [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]]. * the result to the given [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]].
@ -232,6 +324,25 @@ object Patterns {
*/ */
def pipe[T](future: Future[T], context: ExecutionContext): PipeableFuture[T] = scalaPipe(future)(context) def pipe[T](future: Future[T], context: ExecutionContext): PipeableFuture[T] = scalaPipe(future)(context)
/**
* When this [[java.util.concurrent.CompletionStage]] finishes, send its result to the given
* [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]].
* Returns the original CompletionStage to allow method chaining.
* If the future was completed with failure it is sent as a [[akka.actor.Status.Failure]]
* to the recipient.
*
* <b>Recommended usage example:</b>
*
* {{{
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, timeout);
* // apply some transformation (i.e. enrich with request info)
* final CompletionStage<Object> transformed = f.thenApply(result -> { ... });
* // send it on to the next operator
* PatternsCS.pipe(transformed, context).to(nextActor);
* }}}
*/
def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context)
/** /**
* Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when * Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when
* existing messages of the target actor has been processed and the actor has been * existing messages of the target actor has been processed and the actor has been
@ -245,6 +356,19 @@ object Patterns {
def gracefulStop(target: ActorRef, timeout: FiniteDuration): Future[java.lang.Boolean] = def gracefulStop(target: ActorRef, timeout: FiniteDuration): Future[java.lang.Boolean] =
scalaGracefulStop(target, timeout).asInstanceOf[Future[java.lang.Boolean]] scalaGracefulStop(target, timeout).asInstanceOf[Future[java.lang.Boolean]]
/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when
* existing messages of the target actor has been processed and the actor has been
* terminated.
*
* Useful when you need to wait for termination or compose ordered termination of several actors.
*
* If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: java.time.Duration): CompletionStage[java.lang.Boolean] =
scalaGracefulStop(target, timeout.asScala).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
/** /**
* Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when * Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when
* existing messages of the target actor has been processed and the actor has been * existing messages of the target actor has been processed and the actor has been
@ -261,6 +385,22 @@ object Patterns {
def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any): Future[java.lang.Boolean] = def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any): Future[java.lang.Boolean] =
scalaGracefulStop(target, timeout, stopMessage).asInstanceOf[Future[java.lang.Boolean]] scalaGracefulStop(target, timeout, stopMessage).asInstanceOf[Future[java.lang.Boolean]]
/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when
* existing messages of the target actor has been processed and the actor has been
* terminated.
*
* Useful when you need to wait for termination or compose ordered termination of several actors.
*
* If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your
* stop command as `stopMessage` parameter
*
* If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: java.time.Duration, stopMessage: Any): CompletionStage[java.lang.Boolean] =
scalaGracefulStop(target, timeout.asScala, stopMessage).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
/** /**
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration. * after the specified duration.
@ -268,6 +408,13 @@ object Patterns {
def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] = def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] =
scalaAfter(duration, scheduler)(value.call())(context) scalaAfter(duration, scheduler)(value.call())(context)
/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable
* after the specified duration.
*/
def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[CompletionStage[T]]): CompletionStage[T] =
afterCompletionStage(duration.asScala, scheduler)(value.call())(context)
/** /**
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration. * after the specified duration.
@ -275,6 +422,13 @@ object Patterns {
def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] = def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] =
scalaAfter(duration, scheduler)(value)(context) scalaAfter(duration, scheduler)(value)(context)
/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value
* after the specified duration.
*/
def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] =
afterCompletionStage(duration.asScala, scheduler)(value)(context)
/** /**
* Returns an internally retrying [[scala.concurrent.Future]] * Returns an internally retrying [[scala.concurrent.Future]]
* The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'.
@ -286,6 +440,17 @@ object Patterns {
def retry[T](attempt: Callable[Future[T]], attempts: Int, delay: FiniteDuration, scheduler: Scheduler, def retry[T](attempt: Callable[Future[T]], attempts: Int, delay: FiniteDuration, scheduler: Scheduler,
context: ExecutionContext): Future[T] = context: ExecutionContext): Future[T] =
scalaRetry(() attempt.call, attempts, delay)(context, scheduler) scalaRetry(() attempt.call, attempts, delay)(context, scheduler)
/**
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]
* The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'.
* A scheduler (eg context.system.scheduler) must be provided to delay each retry
* If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
* Note that the attempt function will be invoked on the given execution context for subsequent tries
* and therefore must be thread safe (not touch unsafe mutable state).
*/
def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, delay: java.time.Duration, scheduler: Scheduler, ec: ExecutionContext): CompletionStage[T] =
scalaRetry(() attempt.call().toScala, attempts, delay.asScala)(ec, scheduler).toJava
} }
/** /**
@ -293,6 +458,7 @@ object Patterns {
* *
* For working with Scala [[scala.concurrent.Future]] from Java you may want to use [[akka.pattern.Patterns]] instead. * For working with Scala [[scala.concurrent.Future]] from Java you may want to use [[akka.pattern.Patterns]] instead.
*/ */
@deprecated("Use Patterns instead.", since = "2.5.19")
object PatternsCS { object PatternsCS {
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.japi import akka.japi
@ -359,6 +525,7 @@ object PatternsCS {
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}} * }}}
*/ */
@deprecated("Use Patterns.ask instead.", since = "2.5.19")
def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
ask(actor, message, Timeout.create(timeout)) ask(actor, message, Timeout.create(timeout))
@ -396,6 +563,7 @@ object PatternsCS {
* @param messageFactory function taking an actor ref and returning the message to be sent * @param messageFactory function taking an actor ref and returning the message to be sent
* @param timeout the timeout for the response before failing the returned completion stage * @param timeout the timeout for the response before failing the returned completion stage
*/ */
@deprecated("Use Pattens.askWithReplyTo instead.", since = "2.5.19")
def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] = def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeout: java.time.Duration): CompletionStage[AnyRef] =
extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).toJava.asInstanceOf[CompletionStage[AnyRef]] extended.ask(actor, messageFactory.apply _)(Timeout.create(timeout)).toJava.asInstanceOf[CompletionStage[AnyRef]]
@ -426,6 +594,7 @@ object PatternsCS {
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}} * }}}
*/ */
@deprecated("Use Pattens.ask which accepts java.time.Duration instead.", since = "2.5.19")
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] = def ask(actor: ActorRef, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] =
scalaAsk(actor, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).toJava.asInstanceOf[CompletionStage[AnyRef]] scalaAsk(actor, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).toJava.asInstanceOf[CompletionStage[AnyRef]]
@ -444,6 +613,7 @@ object PatternsCS {
* @param messageFactory function taking an actor ref to reply to and returning the message to be sent * @param messageFactory function taking an actor ref to reply to and returning the message to be sent
* @param timeoutMillis the timeout for the response before failing the returned completion operator * @param timeoutMillis the timeout for the response before failing the returned completion operator
*/ */
@deprecated("Use Pattens.askWithReplyTo which accepts java.time.Duration instead.", since = "2.5.19")
def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeoutMillis: Long): CompletionStage[AnyRef] = def askWithReplyTo(actor: ActorRef, messageFactory: japi.function.Function[ActorRef, Any], timeoutMillis: Long): CompletionStage[AnyRef] =
askWithReplyTo(actor, messageFactory, Timeout(timeoutMillis.millis)) askWithReplyTo(actor, messageFactory, Timeout(timeoutMillis.millis))
@ -505,6 +675,7 @@ object PatternsCS {
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}} * }}}
*/ */
@deprecated("Use Patterns.ask instead.", since = "2.5.19")
def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = def ask(selection: ActorSelection, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
ask(selection, message, Timeout.create(timeout)) ask(selection, message, Timeout.create(timeout))
@ -535,6 +706,7 @@ object PatternsCS {
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}} * }}}
*/ */
@deprecated("Use Pattens.ask which accepts java.time.Duration instead.", since = "2.5.19")
def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] = def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] =
scalaAsk(selection, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).toJava.asInstanceOf[CompletionStage[AnyRef]] scalaAsk(selection, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).toJava.asInstanceOf[CompletionStage[AnyRef]]
@ -549,6 +721,7 @@ object PatternsCS {
* timeout); * timeout);
* }}} * }}}
*/ */
@deprecated("Use Pattens.askWithReplyTo which accepts java.time.Duration instead.", since = "2.5.19")
def askWithReplyTo(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeoutMillis: Long): CompletionStage[AnyRef] = def askWithReplyTo(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeoutMillis: Long): CompletionStage[AnyRef] =
extended.ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).toJava.asInstanceOf[CompletionStage[AnyRef]] extended.ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).toJava.asInstanceOf[CompletionStage[AnyRef]]
@ -569,6 +742,7 @@ object PatternsCS {
* PatternsCS.pipe(transformed, context).to(nextActor); * PatternsCS.pipe(transformed, context).to(nextActor);
* }}} * }}}
*/ */
@deprecated("Use Patterns.pipe instead.", since = "2.5.19")
def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context) def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context)
/** /**
@ -595,6 +769,7 @@ object PatternsCS {
* If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
* is completed with failure [[akka.pattern.AskTimeoutException]]. * is completed with failure [[akka.pattern.AskTimeoutException]].
*/ */
@deprecated("Use Patterns.gracefulStop instead.", since = "2.5.19")
def gracefulStop(target: ActorRef, timeout: java.time.Duration): CompletionStage[java.lang.Boolean] = def gracefulStop(target: ActorRef, timeout: java.time.Duration): CompletionStage[java.lang.Boolean] =
scalaGracefulStop(target, timeout.asScala).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] scalaGracefulStop(target, timeout.asScala).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
@ -628,6 +803,7 @@ object PatternsCS {
* If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]] * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
* is completed with failure [[akka.pattern.AskTimeoutException]]. * is completed with failure [[akka.pattern.AskTimeoutException]].
*/ */
@deprecated("Use Patterns.gracefulStop instead.", since = "2.5.19")
def gracefulStop(target: ActorRef, timeout: java.time.Duration, stopMessage: Any): CompletionStage[java.lang.Boolean] = def gracefulStop(target: ActorRef, timeout: java.time.Duration, stopMessage: Any): CompletionStage[java.lang.Boolean] =
scalaGracefulStop(target, timeout.asScala, stopMessage).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]] scalaGracefulStop(target, timeout.asScala, stopMessage).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
@ -643,6 +819,7 @@ object PatternsCS {
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable
* after the specified duration. * after the specified duration.
*/ */
@deprecated("Use Patterns.after instead.", since = "2.5.19")
def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[CompletionStage[T]]): CompletionStage[T] = def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[CompletionStage[T]]): CompletionStage[T] =
afterCompletionStage(duration.asScala, scheduler)(value.call())(context) afterCompletionStage(duration.asScala, scheduler)(value.call())(context)
@ -658,6 +835,7 @@ object PatternsCS {
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value
* after the specified duration. * after the specified duration.
*/ */
@deprecated("Use Patterns.after instead.", since = "2.5.19")
def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] = def after[T](duration: java.time.Duration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] =
afterCompletionStage(duration.asScala, scheduler)(value)(context) afterCompletionStage(duration.asScala, scheduler)(value)(context)
@ -669,6 +847,7 @@ object PatternsCS {
* Note that the attempt function will be invoked on the given execution context for subsequent tries * Note that the attempt function will be invoked on the given execution context for subsequent tries
* and therefore must be thread safe (not touch unsafe mutable state). * and therefore must be thread safe (not touch unsafe mutable state).
*/ */
@deprecated("Use Patterns.retry instead.", since = "2.5.19")
def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, delay: java.time.Duration, scheduler: Scheduler, ec: ExecutionContext): CompletionStage[T] = def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, delay: java.time.Duration, scheduler: Scheduler, ec: ExecutionContext): CompletionStage[T] =
scalaRetry(() attempt.call().toScala, attempts, delay.asScala)(ec, scheduler).toJava scalaRetry(() attempt.call().toScala, attempts, delay.asScala)(ec, scheduler).toJava
} }

View file

@ -39,13 +39,13 @@ import akka.actor.ActorSelection;
import akka.actor.Identify; import akka.actor.Identify;
//#import-identify //#import-identify
//#import-ask //#import-ask
import static akka.pattern.PatternsCS.ask; import static akka.pattern.Patterns.ask;
import static akka.pattern.PatternsCS.pipe; import static akka.pattern.Patterns.pipe;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
//#import-ask //#import-ask
//#import-gracefulStop //#import-gracefulStop
import static akka.pattern.PatternsCS.gracefulStop; import static akka.pattern.Patterns.gracefulStop;
import akka.pattern.AskTimeoutException; import akka.pattern.AskTimeoutException;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
@ -80,7 +80,7 @@ public class ActorDocTest extends AbstractJavaTest {
//#context-actorOf //#context-actorOf
public class FirstActor extends AbstractActor { public class FirstActor extends AbstractActor {
final ActorRef child = getContext().actorOf(Props.create(MyActor.class), "myChild"); final ActorRef child = getContext().actorOf(Props.create(MyActor.class), "myChild");
//#plus-some-behavior //#plus-some-behavior
@Override @Override
public Receive createReceive() { public Receive createReceive() {
@ -102,15 +102,15 @@ public class ActorDocTest extends AbstractJavaTest {
} }
//#createReceive //#createReceive
} }
static static
//#well-structured //#well-structured
public class WellStructuredActor extends AbstractActor { public class WellStructuredActor extends AbstractActor {
public static class Msg1 {} public static class Msg1 {}
public static class Msg2 {} public static class Msg2 {}
public static class Msg3 {} public static class Msg3 {}
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -119,29 +119,29 @@ public class ActorDocTest extends AbstractJavaTest {
.match(Msg3.class, this::receiveMsg3) .match(Msg3.class, this::receiveMsg3)
.build(); .build();
} }
private void receiveMsg1(Msg1 msg) { private void receiveMsg1(Msg1 msg) {
// actual work // actual work
} }
private void receiveMsg2(Msg2 msg) { private void receiveMsg2(Msg2 msg) {
// actual work // actual work
} }
private void receiveMsg3(Msg3 msg) { private void receiveMsg3(Msg3 msg) {
// actual work // actual work
} }
} }
//#well-structured //#well-structured
static static
//#optimized //#optimized
public class OptimizedActor extends UntypedAbstractActor { public class OptimizedActor extends UntypedAbstractActor {
public static class Msg1 {} public static class Msg1 {}
public static class Msg2 {} public static class Msg2 {}
public static class Msg3 {} public static class Msg3 {}
@Override @Override
public void onReceive(Object msg) throws Exception { public void onReceive(Object msg) throws Exception {
if (msg instanceof Msg1) if (msg instanceof Msg1)
@ -153,15 +153,15 @@ public class ActorDocTest extends AbstractJavaTest {
else else
unhandled(msg); unhandled(msg);
} }
private void receiveMsg1(Msg1 msg) { private void receiveMsg1(Msg1 msg) {
// actual work // actual work
} }
private void receiveMsg2(Msg2 msg) { private void receiveMsg2(Msg2 msg) {
// actual work // actual work
} }
private void receiveMsg3(Msg3 msg) { private void receiveMsg3(Msg3 msg) {
// actual work // actual work
} }
@ -174,7 +174,7 @@ public class ActorDocTest extends AbstractJavaTest {
public ActorWithArgs(String args) { public ActorWithArgs(String args) {
this.args = args; this.args = args;
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder().matchAny(x -> { }).build(); return receiveBuilder().matchAny(x -> { }).build();
@ -197,11 +197,11 @@ public class ActorDocTest extends AbstractJavaTest {
} }
private final Integer magicNumber; private final Integer magicNumber;
public DemoActor(Integer magicNumber) { public DemoActor(Integer magicNumber) {
this.magicNumber = magicNumber; this.magicNumber = magicNumber;
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -243,7 +243,7 @@ public class ActorDocTest extends AbstractJavaTest {
return from; return from;
} }
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -255,23 +255,23 @@ public class ActorDocTest extends AbstractJavaTest {
} }
//#messages-in-companion //#messages-in-companion
public static class LifecycleMethods extends AbstractActor { public static class LifecycleMethods extends AbstractActor {
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return AbstractActor.emptyBehavior(); return AbstractActor.emptyBehavior();
} }
/* /*
* This section must be kept in sync with the actual Actor trait. * This section must be kept in sync with the actual Actor trait.
* *
* BOYSCOUT RULE: whenever you read this, verify that! * BOYSCOUT RULE: whenever you read this, verify that!
*/ */
//#lifecycle-callbacks //#lifecycle-callbacks
public void preStart() { public void preStart() {
} }
public void preRestart(Throwable reason, Optional<Object> message) { public void preRestart(Throwable reason, Optional<Object> message) {
for (ActorRef each : getContext().getChildren()) { for (ActorRef each : getContext().getChildren()) {
getContext().unwatch(each); getContext().unwatch(each);
@ -279,25 +279,25 @@ public class ActorDocTest extends AbstractJavaTest {
} }
postStop(); postStop();
} }
public void postRestart(Throwable reason) { public void postRestart(Throwable reason) {
preStart(); preStart();
} }
public void postStop() { public void postStop() {
} }
//#lifecycle-callbacks //#lifecycle-callbacks
} }
public static class Hook extends AbstractActor { public static class Hook extends AbstractActor {
ActorRef target = null; ActorRef target = null;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return AbstractActor.emptyBehavior(); return AbstractActor.emptyBehavior();
} }
//#preStart //#preStart
@Override @Override
public void preStart() { public void preStart() {
@ -345,7 +345,7 @@ public class ActorDocTest extends AbstractJavaTest {
} }
public static class ReplyException extends AbstractActor { public static class ReplyException extends AbstractActor {
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -397,7 +397,7 @@ public class ActorDocTest extends AbstractJavaTest {
.matchEquals("job", s -> .matchEquals("job", s ->
getSender().tell("service unavailable, shutting down", getSelf()) getSender().tell("service unavailable, shutting down", getSelf())
) )
.match(Terminated.class, t -> t.actor().equals(worker), t -> .match(Terminated.class, t -> t.actor().equals(worker), t ->
getContext().stop(getSelf()) getContext().stop(getSelf())
) )
.build(); .build();
@ -536,7 +536,7 @@ public class ActorDocTest extends AbstractJavaTest {
// To set an initial delay // To set an initial delay
getContext().setReceiveTimeout(Duration.ofSeconds(10)); getContext().setReceiveTimeout(Duration.ofSeconds(10));
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -598,11 +598,11 @@ public class ActorDocTest extends AbstractJavaTest {
}) })
.build(); .build();
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
.matchEquals("foo", s -> .matchEquals("foo", s ->
getContext().become(angry) getContext().become(angry)
) )
.matchEquals("bar", s -> .matchEquals("bar", s ->
@ -669,7 +669,7 @@ public class ActorDocTest extends AbstractJavaTest {
public WatchActor() { public WatchActor() {
getContext().watch(child); // <-- this is the only call needed for registration getContext().watch(child); // <-- this is the only call needed for registration
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -706,7 +706,7 @@ public class ActorDocTest extends AbstractJavaTest {
ActorSelection selection = getContext().actorSelection("/user/another"); ActorSelection selection = getContext().actorSelection("/user/another");
selection.tell(new Identify(identifyId), getSelf()); selection.tell(new Identify(identifyId), getSelf());
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -720,7 +720,7 @@ public class ActorDocTest extends AbstractJavaTest {
}) })
.build(); .build();
} }
final AbstractActor.Receive active(final ActorRef another) { final AbstractActor.Receive active(final ActorRef another) {
return receiveBuilder() return receiveBuilder()
.match(Terminated.class, t -> t.actor().equals(another), t -> .match(Terminated.class, t -> t.actor().equals(another), t ->
@ -744,7 +744,7 @@ public class ActorDocTest extends AbstractJavaTest {
} }
}; };
} }
@Test @Test
public void usePatternsAskPipe() { public void usePatternsAskPipe() {
new TestKit(system) { new TestKit(system) {
@ -754,11 +754,11 @@ public class ActorDocTest extends AbstractJavaTest {
ActorRef actorC = getRef(); ActorRef actorC = getRef();
//#ask-pipe //#ask-pipe
Timeout t = Timeout.create(Duration.ofSeconds(5)); final Duration t = Duration.ofSeconds(5);
// using 1000ms timeout // using 1000ms timeout
CompletableFuture<Object> future1 = CompletableFuture<Object> future1 =
ask(actorA, "request", 1000).toCompletableFuture(); ask(actorA, "request", Duration.ofMillis(1000)).toCompletableFuture();
// using timeout from above // using timeout from above
CompletableFuture<Object> future2 = CompletableFuture<Object> future2 =
@ -774,12 +774,12 @@ public class ActorDocTest extends AbstractJavaTest {
pipe(transformed, system.dispatcher()).to(actorC); pipe(transformed, system.dispatcher()).to(actorC);
//#ask-pipe //#ask-pipe
expectMsgEquals(new Result("request", "another request")); expectMsgEquals(new Result("request", "another request"));
} }
}; };
} }
@Test @Test
public void useKill() { public void useKill() {
new TestKit(system) { new TestKit(system) {
@ -788,14 +788,14 @@ public class ActorDocTest extends AbstractJavaTest {
watch(victim); watch(victim);
//#kill //#kill
victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender()); victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender());
// expecting the actor to indeed terminate: // expecting the actor to indeed terminate:
expectTerminated(Duration.ofSeconds(3), victim); expectTerminated(Duration.ofSeconds(3), victim);
//#kill //#kill
} }
}; };
} }
@Test @Test
public void usePoisonPill() { public void usePoisonPill() {
new TestKit(system) { new TestKit(system) {
@ -809,7 +809,7 @@ public class ActorDocTest extends AbstractJavaTest {
} }
}; };
} }
@Test @Test
public void coordinatedShutdown() { public void coordinatedShutdown() {
final ActorRef someActor = system.actorOf(Props.create(FirstActor.class)); final ActorRef someActor = system.actorOf(Props.create(FirstActor.class));
@ -817,7 +817,7 @@ public class ActorDocTest extends AbstractJavaTest {
CoordinatedShutdown.get(system).addTask( CoordinatedShutdown.get(system).addTask(
CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskName", CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskName",
() -> { () -> {
return akka.pattern.PatternsCS.ask(someActor, "stop", new Timeout(5, TimeUnit.SECONDS)) return akka.pattern.Patterns.ask(someActor, "stop", Duration.ofSeconds(5))
.thenApply(reply -> Done.getInstance()); .thenApply(reply -> Done.getInstance());
}); });
//#coordinated-shutdown-addTask //#coordinated-shutdown-addTask

View file

@ -7,14 +7,13 @@ package jdocs.actor.io.dns;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.io.Dns; import akka.io.Dns;
import akka.io.IO;
import akka.io.dns.DnsProtocol; import akka.io.dns.DnsProtocol;
import static akka.pattern.PatternsCS.ask; import static akka.pattern.Patterns.ask;
import static akka.pattern.PatternsCS.pipe;
import scala.Option; import scala.Option;
import java.time.Duration;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
@ -23,7 +22,7 @@ public class DnsCompileOnlyDocTest {
ActorSystem system = ActorSystem.create(); ActorSystem system = ActorSystem.create();
ActorRef actorRef = null; ActorRef actorRef = null;
long timeout = 1000; final Duration timeout = Duration.ofMillis(1000L);
//#resolve //#resolve
Option<Dns.Resolved> initial = Dns.get(system).cache().resolve("google.com", system, actorRef); Option<Dns.Resolved> initial = Dns.get(system).cache().resolve("google.com", system, actorRef);

View file

@ -4,16 +4,17 @@
package jdocs.camel; package jdocs.camel;
import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import akka.pattern.Patterns;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.camel.CamelMessage; import akka.camel.CamelMessage;
import akka.pattern.PatternsCS;
public class ProducerTestBase { public class ProducerTestBase {
public void tellJmsProducer() { public void tellJmsProducer() {
@ -33,7 +34,8 @@ public class ProducerTestBase {
ActorSystem system = ActorSystem.create("some-system"); ActorSystem system = ActorSystem.create("some-system");
Props props = Props.create(FirstProducer.class); Props props = Props.create(FirstProducer.class);
ActorRef producer = system.actorOf(props,"myproducer"); ActorRef producer = system.actorOf(props,"myproducer");
CompletionStage<Object> future = PatternsCS.ask(producer, "some request", 1000); CompletionStage<Object> future = Patterns.ask(producer, "some request",
Duration.ofMillis(1000L));
//#AskProducer //#AskProducer
system.stop(producer); system.stop(producer);
TestKit.shutdownActorSystem(system); TestKit.shutdownActorSystem(system);

View file

@ -12,7 +12,7 @@ import java.time.Duration;
import akka.pattern.CircuitBreaker; import akka.pattern.CircuitBreaker;
import akka.event.Logging; import akka.event.Logging;
import static akka.pattern.PatternsCS.pipe; import static akka.pattern.Patterns.pipe;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;

View file

@ -8,7 +8,7 @@ import java.math.BigInteger;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import static akka.pattern.PatternsCS.pipe; import static akka.pattern.Patterns.pipe;
//#backend //#backend
public class FactorialBackend extends AbstractActor { public class FactorialBackend extends AbstractActor {

View file

@ -7,7 +7,6 @@ package jdocs.future;
//#imports1 //#imports1
import akka.dispatch.*; import akka.dispatch.*;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
import scala.Function0;
import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.Await; import scala.concurrent.Await;
@ -64,15 +63,15 @@ import java.util.Arrays;
//#imports8 //#imports8
import static akka.pattern.PatternsCS.retry; import static akka.pattern.Patterns.retry;
//#imports8 //#imports8
//#imports-ask //#imports-ask
import static akka.pattern.PatternsCS.ask; import static akka.pattern.Patterns.ask;
//#imports-ask //#imports-ask
//#imports-pipe //#imports-pipe
import static akka.pattern.PatternsCS.pipe; import static akka.pattern.Patterns.pipe;
//#imports-pipe //#imports-pipe
@ -124,11 +123,11 @@ public class FutureDocTest extends AbstractJavaTest {
//#pipe-to-usage //#pipe-to-usage
public class ActorUsingPipeTo extends AbstractActor { public class ActorUsingPipeTo extends AbstractActor {
ActorRef target; ActorRef target;
Timeout timeout; Duration timeout;
ActorUsingPipeTo(ActorRef target) { ActorUsingPipeTo(ActorRef target) {
this.target = target; this.target = target;
this.timeout = Timeout.create(Duration.ofSeconds(5)); this.timeout = Duration.ofSeconds(5);
} }
@Override @Override
@ -216,7 +215,7 @@ public class FutureDocTest extends AbstractJavaTest {
public class UserProxyActor extends AbstractActor { public class UserProxyActor extends AbstractActor {
ActorRef userActor; ActorRef userActor;
ActorRef userActivityActor; ActorRef userActivityActor;
Timeout timeout = Timeout.create(Duration.ofSeconds(5)); Duration timeout = Duration.ofSeconds(5);
UserProxyActor(ActorRef userActor, ActorRef userActivityActor) { UserProxyActor(ActorRef userActor, ActorRef userActivityActor) {
this.userActor = userActor; this.userActor = userActor;

View file

@ -19,17 +19,16 @@ import akka.actor.Status;
import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategy;
import akka.actor.Terminated; import akka.actor.Terminated;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.pattern.PatternsCS; import akka.pattern.Patterns;
import akka.util.Timeout;
public class SupervisedAsk { public class SupervisedAsk {
private static class AskParam { private static class AskParam {
Props props; Props props;
Object message; Object message;
Timeout timeout; Duration timeout;
AskParam(Props props, Object message, Timeout timeout) { AskParam(Props props, Object message, Duration timeout) {
this.props = props; this.props = props;
this.message = message; this.message = message;
this.timeout = timeout; this.timeout = timeout;
@ -77,7 +76,7 @@ public class SupervisedAsk {
getContext().watch(targetActor); getContext().watch(targetActor);
targetActor.forward(askParam.message, getContext()); targetActor.forward(askParam.message, getContext());
Scheduler scheduler = getContext().getSystem().scheduler(); Scheduler scheduler = getContext().getSystem().scheduler();
timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(), timeoutMessage = scheduler.scheduleOnce(askParam.timeout,
getSelf(), new AskTimeout(), getContext().dispatcher(), null); getSelf(), new AskTimeout(), getContext().dispatcher(), null);
}) })
.match(Terminated.class, message -> { .match(Terminated.class, message -> {
@ -97,13 +96,13 @@ public class SupervisedAsk {
} }
public static CompletionStage<Object> askOf(ActorRef supervisorCreator, Props props, public static CompletionStage<Object> askOf(ActorRef supervisorCreator, Props props,
Object message, Timeout timeout) { Object message, Duration timeout) {
AskParam param = new AskParam(props, message, timeout); AskParam param = new AskParam(props, message, timeout);
return PatternsCS.ask(supervisorCreator, param, timeout); return Patterns.ask(supervisorCreator, param, timeout);
} }
synchronized public static ActorRef createSupervisorCreator( synchronized public static ActorRef createSupervisorCreator(
ActorRefFactory factory) { ActorRefFactory factory) {
return factory.actorOf(Props.create(AskSupervisorCreator.class)); return factory.actorOf(Props.create(AskSupervisorCreator.class));
} }
} }

View file

@ -11,12 +11,14 @@ import akka.actor.AbstractActor;
import akka.util.Timeout; import akka.util.Timeout;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class SupervisedAskSpec { public class SupervisedAskSpec {
public Object execute(Class<? extends AbstractActor> someActor, public Object execute(Class<? extends AbstractActor> someActor,
Object message, Timeout timeout, ActorRefFactory actorSystem) Object message, Duration timeout, ActorRefFactory actorSystem)
throws Exception { throws Exception {
// example usage // example usage
try { try {
@ -24,8 +26,7 @@ public class SupervisedAskSpec {
.createSupervisorCreator(actorSystem); .createSupervisorCreator(actorSystem);
CompletionStage<Object> finished = SupervisedAsk.askOf(supervisorCreator, CompletionStage<Object> finished = SupervisedAsk.askOf(supervisorCreator,
Props.create(someActor), message, timeout); Props.create(someActor), message, timeout);
FiniteDuration d = timeout.duration(); return finished.toCompletableFuture().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
return finished.toCompletableFuture().get(d.length(), d.unit());
} catch (Exception e) { } catch (Exception e) {
// exception propagated by supervision // exception propagated by supervision
throw e; throw e;

View file

@ -4,7 +4,9 @@
package jdocs.persistence; package jdocs.persistence;
import static akka.pattern.PatternsCS.ask; import static akka.pattern.Patterns.ask;
import java.time.Duration;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -18,9 +20,7 @@ import akka.persistence.query.*;
import akka.stream.ActorMaterializer; import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.util.Timeout;
import com.typesafe.config.ConfigFactory;
import docs.persistence.query.MyEventsByTagPublisher; import docs.persistence.query.MyEventsByTagPublisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
@ -34,13 +34,13 @@ public class PersistenceQueryDocTest {
final ActorSystem system = ActorSystem.create(); final ActorSystem system = ActorSystem.create();
final ActorMaterializer mat = ActorMaterializer.create(system); final ActorMaterializer mat = ActorMaterializer.create(system);
static static
//#advanced-journal-query-types //#advanced-journal-query-types
public class RichEvent { public class RichEvent {
public final Set<String >tags; public final Set<String >tags;
public final Object payload; public final Object payload;
public RichEvent(Set<String> tags, Object payload) { public RichEvent(Set<String> tags, Object payload) {
this.tags = tags; this.tags = tags;
this.payload = payload; this.payload = payload;
@ -70,7 +70,7 @@ public class PersistenceQueryDocTest {
public MyReadJournalProvider(ExtendedActorSystem system, Config config) { public MyReadJournalProvider(ExtendedActorSystem system, Config config) {
this.javadslReadJournal = new MyJavadslReadJournal(system, config); this.javadslReadJournal = new MyJavadslReadJournal(system, config);
} }
@Override @Override
public MyScaladslReadJournal scaladslReadJournal() { public MyScaladslReadJournal scaladslReadJournal() {
return new MyScaladslReadJournal(javadslReadJournal); return new MyScaladslReadJournal(javadslReadJournal);
@ -79,13 +79,13 @@ public class PersistenceQueryDocTest {
@Override @Override
public MyJavadslReadJournal javadslReadJournal() { public MyJavadslReadJournal javadslReadJournal() {
return this.javadslReadJournal; return this.javadslReadJournal;
} }
} }
//#my-read-journal //#my-read-journal
static static
//#my-read-journal //#my-read-journal
public class MyJavadslReadJournal implements public class MyJavadslReadJournal implements
akka.persistence.query.javadsl.ReadJournal, akka.persistence.query.javadsl.ReadJournal,
akka.persistence.query.javadsl.EventsByTagQuery, akka.persistence.query.javadsl.EventsByTagQuery,
akka.persistence.query.javadsl.EventsByPersistenceIdQuery, akka.persistence.query.javadsl.EventsByPersistenceIdQuery,
@ -95,8 +95,8 @@ public class PersistenceQueryDocTest {
private final FiniteDuration refreshInterval; private final FiniteDuration refreshInterval;
public MyJavadslReadJournal(ExtendedActorSystem system, Config config) { public MyJavadslReadJournal(ExtendedActorSystem system, Config config) {
refreshInterval = refreshInterval =
FiniteDuration.create(config.getDuration("refresh-interval", FiniteDuration.create(config.getDuration("refresh-interval",
TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
} }
@ -136,13 +136,13 @@ public class PersistenceQueryDocTest {
// implement in a similar way as eventsByTag // implement in a similar way as eventsByTag
throw new UnsupportedOperationException("Not implemented yet"); throw new UnsupportedOperationException("Not implemented yet");
} }
@Override @Override
public Source<String, NotUsed> currentPersistenceIds() { public Source<String, NotUsed> currentPersistenceIds() {
// implement in a similar way as eventsByTag // implement in a similar way as eventsByTag
throw new UnsupportedOperationException("Not implemented yet"); throw new UnsupportedOperationException("Not implemented yet");
} }
// possibility to add more plugin specific queries // possibility to add more plugin specific queries
//#advanced-journal-query-definition //#advanced-journal-query-definition
@ -154,16 +154,16 @@ public class PersistenceQueryDocTest {
} }
//#my-read-journal //#my-read-journal
static static
//#my-read-journal //#my-read-journal
public class MyScaladslReadJournal implements public class MyScaladslReadJournal implements
akka.persistence.query.scaladsl.ReadJournal, akka.persistence.query.scaladsl.ReadJournal,
akka.persistence.query.scaladsl.EventsByTagQuery, akka.persistence.query.scaladsl.EventsByTagQuery,
akka.persistence.query.scaladsl.EventsByPersistenceIdQuery, akka.persistence.query.scaladsl.EventsByPersistenceIdQuery,
akka.persistence.query.scaladsl.PersistenceIdsQuery, akka.persistence.query.scaladsl.PersistenceIdsQuery,
akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery {
private final MyJavadslReadJournal javadslReadJournal; private final MyJavadslReadJournal javadslReadJournal;
public MyScaladslReadJournal(MyJavadslReadJournal javadslReadJournal) { public MyScaladslReadJournal(MyJavadslReadJournal javadslReadJournal) {
@ -179,7 +179,7 @@ public class PersistenceQueryDocTest {
@Override @Override
public akka.stream.scaladsl.Source<EventEnvelope, NotUsed> eventsByPersistenceId( public akka.stream.scaladsl.Source<EventEnvelope, NotUsed> eventsByPersistenceId(
String persistenceId, long fromSequenceNr, long toSequenceNr) { String persistenceId, long fromSequenceNr, long toSequenceNr) {
return javadslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, return javadslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr,
toSequenceNr).asScala(); toSequenceNr).asScala();
} }
@ -187,12 +187,12 @@ public class PersistenceQueryDocTest {
public akka.stream.scaladsl.Source<String, NotUsed> persistenceIds() { public akka.stream.scaladsl.Source<String, NotUsed> persistenceIds() {
return javadslReadJournal.persistenceIds().asScala(); return javadslReadJournal.persistenceIds().asScala();
} }
@Override @Override
public akka.stream.scaladsl.Source<String, NotUsed> currentPersistenceIds() { public akka.stream.scaladsl.Source<String, NotUsed> currentPersistenceIds() {
return javadslReadJournal.currentPersistenceIds().asScala(); return javadslReadJournal.currentPersistenceIds().asScala();
} }
// possibility to add more plugin specific queries // possibility to add more plugin specific queries
public akka.stream.scaladsl.Source<RichEvent, QueryMetadata> byTagsWithMeta( public akka.stream.scaladsl.Source<RichEvent, QueryMetadata> byTagsWithMeta(
@ -306,13 +306,13 @@ public class PersistenceQueryDocTest {
"infinite: " + meta.infinite); "infinite: " + meta.infinite);
return meta; return meta;
}); });
events.map(event -> { events.map(event -> {
System.out.println("Event payload: " + event.payload); System.out.println("Event payload: " + event.payload);
return event.payload; return event.payload;
}).runWith(Sink.ignore(), mat); }).runWith(Sink.ignore(), mat);
//#advanced-journal-query-usage //#advanced-journal-query-usage
} }
@ -408,7 +408,7 @@ public class PersistenceQueryDocTest {
//#projection-into-different-store-actor-run //#projection-into-different-store-actor-run
final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS); final Duration timeout = Duration.ofSeconds(3);
final MyResumableProjection bidProjection = new MyResumableProjection("bid"); final MyResumableProjection bidProjection = new MyResumableProjection("bid");
@ -451,17 +451,17 @@ public class PersistenceQueryDocTest {
public TheOneWhoWritesToQueryJournal() { public TheOneWhoWritesToQueryJournal() {
store = new ExampleStore(); store = new ExampleStore();
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
.matchAny(message -> { .matchAny(message -> {
state = updateState(state, message); state = updateState(state, message);
// example saving logic that requires state to become ready: // example saving logic that requires state to become ready:
if (state.readyToSave()) if (state.readyToSave())
store.save(Record.of(state)); store.save(Record.of(state));
}) })
.build(); .build();
} }

View file

@ -9,7 +9,7 @@ import akka.actor.AbstractActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.pattern.PatternsCS; import akka.pattern.Patterns;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
@ -59,7 +59,7 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
Source<String, NotUsed> logs = streamLogs(requestLogs.streamId); Source<String, NotUsed> logs = streamLogs(requestLogs.streamId);
CompletionStage<SourceRef<String>> logsRef = logs.runWith(StreamRefs.sourceRef(), mat); CompletionStage<SourceRef<String>> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
PatternsCS.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher()) Patterns.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher())
.to(sender()); .to(sender());
} }
@ -111,7 +111,7 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
Sink<String, NotUsed> sink = logsSinkFor(prepare.id); Sink<String, NotUsed> sink = logsSinkFor(prepare.id);
CompletionStage<SinkRef<String>> sinkRef = StreamRefs.<String>sinkRef().to(sink).run(mat); CompletionStage<SinkRef<String>> sinkRef = StreamRefs.<String>sinkRef().to(sink).run(mat);
PatternsCS.pipe(sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)), context().dispatcher()) Patterns.pipe(sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)), context().dispatcher())
.to(sender()); .to(sender());
}) })
.build(); .build();

View file

@ -20,7 +20,6 @@ import jdocs.stream.TwitterStreamQuickstartDocTest.Model.Tweet;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
@ -33,7 +32,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static akka.pattern.PatternsCS.ask; import static akka.pattern.Patterns.ask;
import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.AKKA; import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.AKKA;
import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.tweets; import static jdocs.stream.TwitterStreamQuickstartDocTest.Model.tweets;
import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.assertTrue;
@ -602,7 +601,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
final RunnableGraph<NotUsed> saveTweets = final RunnableGraph<NotUsed> saveTweets =
akkaTweets akkaTweets
.mapAsync(4, tweet -> ask(database, new Save(tweet), 300)) .mapAsync(4, tweet -> ask(database, new Save(tweet), Duration.ofMillis(300L)))
.to(Sink.ignore()); .to(Sink.ignore());
//#save-tweets //#save-tweets

View file

@ -97,7 +97,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
final CompletionStage<List<List<Integer>>> future = sourceUnderTest final CompletionStage<List<List<Integer>>> future = sourceUnderTest
.grouped(2) .grouped(2)
.runWith(Sink.head(), mat); .runWith(Sink.head(), mat);
akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.getRef()); akka.pattern.Patterns.pipe(future, system.dispatcher()).to(probe.getRef());
probe.expectMsg(Duration.ofSeconds(3), Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))); probe.expectMsg(Duration.ofSeconds(3), Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)));
//#pipeto-testprobe //#pipeto-testprobe
} }
@ -200,7 +200,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
public void testSourceAndTestSink() throws Exception { public void testSourceAndTestSink() throws Exception {
//#test-source-and-sink //#test-source-and-sink
final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class) final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class)
.mapAsyncUnordered(2, sleep -> akka.pattern.PatternsCS.after( .mapAsyncUnordered(2, sleep -> akka.pattern.Patterns.after(
Duration.ofMillis(10), Duration.ofMillis(10),
system.scheduler(), system.scheduler(),
system.dispatcher(), system.dispatcher(),

View file

@ -6,13 +6,12 @@ package jdocs.stream.javadsl.cookbook;
import akka.NotUsed; import akka.NotUsed;
import akka.actor.*; import akka.actor.*;
import akka.pattern.PatternsCS; import akka.pattern.Patterns;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
import akka.stream.testkit.TestSubscriber; import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink; import akka.stream.testkit.javadsl.TestSink;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -83,7 +82,7 @@ public class RecipeGlobalRateLimit extends RecipeTest {
getContext().getSystem().dispatcher(), getContext().getSystem().dispatcher(),
getSelf()); getSelf());
} }
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return open(); return open();
@ -150,7 +149,7 @@ public class RecipeGlobalRateLimit extends RecipeTest {
return f.mapAsync(parallelism, element -> { return f.mapAsync(parallelism, element -> {
final CompletionStage<Object> limiterTriggerFuture = final CompletionStage<Object> limiterTriggerFuture =
PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, maxAllowedWait); Patterns.ask(limiter, Limiter.WANT_TO_PASS, maxAllowedWait);
return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher()); return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher());
}); });
} }

View file

@ -6,7 +6,7 @@ package jdocs.testkit;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import akka.pattern.PatternsCS; import akka.pattern.Patterns;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
import org.junit.Assert; import org.junit.Assert;
import akka.japi.JavaPartialFunction; import akka.japi.JavaPartialFunction;
@ -99,7 +99,7 @@ public class TestKitDocTest extends AbstractJavaTest {
//#test-behavior //#test-behavior
final Props props = Props.create(MyActor.class); final Props props = Props.create(MyActor.class);
final TestActorRef<MyActor> ref = TestActorRef.create(system, props, "testB"); final TestActorRef<MyActor> ref = TestActorRef.create(system, props, "testB");
final CompletableFuture<Object> future = PatternsCS.ask(ref, "say42", 3000).toCompletableFuture(); final CompletableFuture<Object> future = Patterns.ask(ref, "say42", Duration.ofMillis(3000)).toCompletableFuture();
assertTrue(future.isDone()); assertTrue(future.isDone());
assertEquals(42, future.get()); assertEquals(42, future.get());
//#test-behavior //#test-behavior