diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java index 96e6aa8154..b2b589cb60 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java @@ -4,6 +4,7 @@ package jdocs.akka.typed; +import akka.Done; import akka.actor.testkit.typed.javadsl.LogCapturing; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.typed.ActorRef; @@ -19,10 +20,12 @@ import org.scalatest.junit.JUnitSuite; import java.net.URI; import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import static jdocs.akka.typed.InteractionPatternsTest.Samples.*; +import static org.junit.Assert.assertEquals; public class InteractionPatternsTest extends JUnitSuite { @@ -665,6 +668,130 @@ public class InteractionPatternsTest extends JUnitSuite { } } + interface PipeToSelfSample { + // #pipeToSelf + public interface CustomerDataAccess { + CompletionStage update(Customer customer); + } + + public class Customer { + public final String id; + public final long version; + public final String name; + public final String address; + + public Customer(String id, long version, String name, String address) { + this.id = id; + this.version = version; + this.name = name; + this.address = address; + } + } + + public class CustomerRepository extends AbstractBehavior { + + private static final int MAX_OPERATIONS_IN_PROGRESS = 10; + + interface Command {} + + public static class Update implements Command { + public final Customer customer; + public final ActorRef replyTo; + + public Update(Customer customer, ActorRef replyTo) { + this.customer = customer; + this.replyTo = replyTo; + } + } + + interface OperationResult {} + + public static class UpdateSuccess implements OperationResult { + public final String id; + + public UpdateSuccess(String id) { + this.id = id; + } + } + + public static class UpdateFailure implements OperationResult { + public final String id; + public final String reason; + + public UpdateFailure(String id, String reason) { + this.id = id; + this.reason = reason; + } + } + + private static class WrappedUpdateResult implements Command { + public final OperationResult result; + public final ActorRef replyTo; + + private WrappedUpdateResult(OperationResult result, ActorRef replyTo) { + this.result = result; + this.replyTo = replyTo; + } + } + + public static Behavior create(CustomerDataAccess dataAccess) { + return Behaviors.setup(context -> new CustomerRepository(context, dataAccess)); + } + + private final CustomerDataAccess dataAccess; + private int operationsInProgress = 0; + + private CustomerRepository(ActorContext context, CustomerDataAccess dataAccess) { + super(context); + this.dataAccess = dataAccess; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Update.class, this::onUpdate) + .onMessage(WrappedUpdateResult.class, this::onUpdateResult) + .build(); + } + + private Behavior onUpdate(Update command) { + if (operationsInProgress == MAX_OPERATIONS_IN_PROGRESS) { + command.replyTo.tell( + new UpdateFailure( + command.customer.id, + "Max " + MAX_OPERATIONS_IN_PROGRESS + " concurrent operations supported")); + } else { + // increase operationsInProgress counter + operationsInProgress++; + CompletionStage futureResult = dataAccess.update(command.customer); + getContext() + .pipeToSelf( + futureResult, + (ok, exc) -> { + if (exc == null) + return new WrappedUpdateResult( + new UpdateSuccess(command.customer.id), command.replyTo); + else + return new WrappedUpdateResult( + new UpdateFailure(command.customer.id, exc.getMessage()), + command.replyTo); + }); + } + return this; + } + + private Behavior onUpdateResult(WrappedUpdateResult wrapped) { + // decrease operationsInProgress counter + operationsInProgress--; + // send result to original requestor + wrapped.replyTo.tell(wrapped.result); + return this; + } + } + // #pipeToSelf + + } + @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(); @Rule public final LogCapturing logCapturing = new LogCapturing(); @@ -700,4 +827,29 @@ public class InteractionPatternsTest extends JUnitSuite { probe.expectNoMessage(); probe.expectMessage(Duration.ofSeconds(2), new Buncher.Batch(Arrays.asList(msgOne, msgTwo))); } + + @Test + public void testPipeToSelf() { + + PipeToSelfSample.CustomerDataAccess dataAccess = + new PipeToSelfSample.CustomerDataAccess() { + @Override + public CompletionStage update(PipeToSelfSample.Customer customer) { + return CompletableFuture.completedFuture(Done.getInstance()); + } + }; + + ActorRef repository = + testKit.spawn(PipeToSelfSample.CustomerRepository.create(dataAccess)); + TestProbe probe = + testKit.createTestProbe(PipeToSelfSample.CustomerRepository.OperationResult.class); + + repository.tell( + new PipeToSelfSample.CustomerRepository.Update( + new PipeToSelfSample.Customer("123", 1L, "Alice", "Fairy tail road 7"), + probe.getRef())); + assertEquals( + "123", + probe.expectMessageClass(PipeToSelfSample.CustomerRepository.UpdateSuccess.class).id); + } } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala index 4bbf3af4db..aefa85ae8e 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success +import akka.Done import akka.NotUsed import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -427,4 +428,68 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik result.futureValue shouldEqual CookieFabric.Cookies(5) } + + "contain a sample for pipeToSelf" in { + //#pipeToSelf + + trait CustomerDataAccess { + def update(value: Customer): Future[Done] + } + + final case class Customer(id: String, version: Long, name: String, address: String) + + object CustomerRepository { + sealed trait Command + + final case class Update(value: Customer, replyTo: ActorRef[UpdateResult]) extends Command + sealed trait UpdateResult + final case class UpdateSuccess(id: String) extends UpdateResult + final case class UpdateFailure(id: String, reason: String) extends UpdateResult + private final case class WrappedUpdateResult(result: UpdateResult, replyTo: ActorRef[UpdateResult]) + extends Command + + private val MaxOperationsInProgress = 10 + + def apply(dataAccess: CustomerDataAccess): Behavior[Command] = { + next(dataAccess, operationsInProgress = 0) + } + + private def next(dataAccess: CustomerDataAccess, operationsInProgress: Int): Behavior[Command] = { + Behaviors.receive { (context, command) => + command match { + case Update(value, replyTo) => + if (operationsInProgress == MaxOperationsInProgress) { + replyTo ! UpdateFailure(value.id, s"Max $MaxOperationsInProgress concurrent operations supported") + Behaviors.same + } else { + val futureResult = dataAccess.update(value) + context.pipeToSelf(futureResult) { + // map the Future value to a message, handled by this actor + case Success(_) => WrappedUpdateResult(UpdateSuccess(value.id), replyTo) + case Failure(e) => WrappedUpdateResult(UpdateFailure(value.id, e.getMessage), replyTo) + } + // increase operationsInProgress counter + next(dataAccess, operationsInProgress + 1) + } + + case WrappedUpdateResult(result, replyTo) => + // send result to original requestor + replyTo ! result + // decrease operationsInProgress counter + next(dataAccess, operationsInProgress - 1) + } + } + } + } + //#pipeToSelf + + val dataAccess = new CustomerDataAccess { + override def update(value: Customer): Future[Done] = Future.successful(Done) + } + + val repository = spawn(CustomerRepository(dataAccess)) + val probe = createTestProbe[CustomerRepository.UpdateResult]() + repository ! CustomerRepository.Update(Customer("123", 1L, "Alice", "Fairy tail road 7"), probe.ref) + probe.expectMessage(CustomerRepository.UpdateSuccess("123")) + } } diff --git a/akka-docs/src/main/paradox/typed/images/pipe-to-self.png b/akka-docs/src/main/paradox/typed/images/pipe-to-self.png new file mode 100644 index 0000000000..bdf0eef91c Binary files /dev/null and b/akka-docs/src/main/paradox/typed/images/pipe-to-self.png differ diff --git a/akka-docs/src/main/paradox/typed/interaction-patterns.md b/akka-docs/src/main/paradox/typed/interaction-patterns.md index 16e53f5cfc..f36d257f94 100644 --- a/akka-docs/src/main/paradox/typed/interaction-patterns.md +++ b/akka-docs/src/main/paradox/typed/interaction-patterns.md @@ -216,6 +216,40 @@ Java * There can only be a single response to one `ask` (see @ref:[per session child Actor](#per-session-child-actor)) * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact +## Send Future result to self + +When using an API that returns a @scala[`Future`]@java[`CompletionStage`] from an actor it's common that you would +like to use the value of the in the actor when the @scala[`Future`]@java[`CompletionStage`] is completed. For +this purpose the `ActorContext` provides a `pipeToSelf` method. + +**Example:** + +![pipe-to-self.png](./images/pipe-to-self.png) + +An actor, `CustomerRepository`, is invoking a method on `CustomerDataAccess` that returns a @scala[`Future`]@java[`CompletionStage`]. + +Scala +: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #pipeToSelf } + +Java +: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java) { #pipeToSelf } + +It could be tempting to just use @scala[`onComplete on the Future`]@java[`a callback on the CompletionStage`], but +that introduces the risk of accessing internal state of the actor that is not thread-safe from an external thread. +For example, the `numberOfPendingOperations` counter in above example can't be accessed from such callback. +Therefore it is better to map the result to a message and perform further processing when receiving that message. + +**Useful when:** + + * Accessing APIs that are returning @scala[`Future`]@java[`CompletionStage`] from an actor, such as a database or + an external service + * The actor needs to continue processing when the @scala[`Future`]@java[`CompletionStage`] has completed + * Keep context from the original request and use that when the @scala[`Future`]@java[`CompletionStage`] has completed, + for example an `replyTo` actor reference + +**Problems:** + + * Boilerplate of adding wrapper messages for the results ## Per session child Actor