Add pipeToSelf to typed ActorContext #26199

Implemented in terms of AdaptMessage, which makes sure to map the values on the actor's thread, in Scala.
This commit is contained in:
Dale Wijnand 2019-01-04 10:04:30 +01:00 committed by Johan Andrén
parent f8618b24b0
commit 1c370c1282
7 changed files with 192 additions and 15 deletions

View file

@ -0,0 +1,83 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.javadsl;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.Behavior;
import akka.actor.typed.Props;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.*;
public final class ActorContextPipeToSelfTest extends JUnitSuite {
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(ConfigFactory.parseString(
"pipe-to-self-spec-dispatcher.executor = thread-pool-executor\n" +
"pipe-to-self-spec-dispatcher.type = PinnedDispatcher\n"
));
static final class Msg {
final String response;
final String selfName;
final String threadName;
Msg(final String response, final String selfName, final String threadName) {
this.response = response;
this.selfName = selfName;
this.threadName = threadName;
}
}
@Test public void handlesSuccess() {
assertEquals("ok: hi", responseFrom(CompletableFuture.completedFuture("hi")));
}
@Test public void handlesFailure() {
assertEquals("ko: boom", responseFrom(failedFuture(new RuntimeException("boom"))));
}
private CompletableFuture<String> failedFuture(final Throwable ex) {
final CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(ex);
return future;
}
private String responseFrom(final CompletionStage<String> future) {
final TestProbe<Msg> probe = testKit.createTestProbe();
final Behavior<Msg> behavior = Behaviors.setup(context -> {
context.pipeToSelf(future, (string, exception) -> {
final String response;
if (string != null) response = String.format("ok: %s", string);
else if (exception != null) response = String.format("ko: %s", exception.getMessage());
else response = "???";
return new Msg(response, context.getSelf().path().name(), Thread.currentThread().getName());
});
return Behaviors.receiveMessage(msg -> {
probe.getRef().tell(msg);
return Behaviors.stopped();
});
});
final String name = "pipe-to-self-spec";
final Props props = Props.empty().withDispatcherFromConfig("pipe-to-self-spec-dispatcher");
testKit.spawn(behavior, name, props);
final Msg msg = probe.expectMessageClass(Msg.class);
assertEquals("pipe-to-self-spec", msg.selfName);
assertThat(msg.threadName, startsWith("ActorContextPipeToSelfTest-pipe-to-self-spec-dispatcher"));
return msg.response;
}
}

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import scala.concurrent.Future
import scala.util.control.NoStackTrace
import scala.util.{ Failure, Success }
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.Props
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object ActorContextPipeToSelfSpec {
val config = ConfigFactory.parseString(
"""
|pipe-to-self-spec-dispatcher {
| executor = thread-pool-executor
| type = PinnedDispatcher
|}
""".stripMargin)
}
final class ActorContextPipeToSelfSpec extends ScalaTestWithActorTestKit(ActorContextPipeToSelfSpec.config)
with WordSpecLike {
"The Scala DSL ActorContext pipeToSelf" must {
"handle success" in { responseFrom(Future.successful("hi")) should ===("ok: hi") }
"handle failure" in { responseFrom(Future.failed(Fail)) should ===(s"ko: $Fail") }
}
object Fail extends NoStackTrace
private def responseFrom(future: Future[String]) = {
final case class Msg(response: String, selfName: String, threadName: String)
val probe = TestProbe[Msg]()
val behavior = Behaviors.setup[Msg] { context
context.pipeToSelf(future) {
case Success(s) Msg(s"ok: $s", context.self.path.name, Thread.currentThread().getName)
case Failure(e) Msg(s"ko: $e", context.self.path.name, Thread.currentThread().getName)
}
Behaviors.receiveMessage { msg
probe.ref ! msg
Behaviors.stopped
}
}
val name = "pipe-to-self-spec"
val props = Props.empty.withDispatcherFromConfig("pipe-to-self-spec-dispatcher")
spawn(behavior, name, props)
val msg = probe.expectMessageType[Msg]
msg.selfName should ===("pipe-to-self-spec")
msg.threadName should startWith("ActorContextPipeToSelfSpec-pipe-to-self-spec-dispatcher")
msg.response
}
}

View file

@ -9,13 +9,12 @@ import java.time.Duration
import java.util.function.{ Function JFunction }
import java.util.ArrayList
import java.util.Optional
import java.util.function
import java.util.concurrent.CompletionStage
import java.util.function.BiConsumer
import java.util.function.BiFunction
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.reflect.ClassTag
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.annotation.InternalApi
import akka.util.OptionVal
@ -84,17 +83,31 @@ import akka.util.JavaDurationConverters._
// Scala API impl
override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
import akka.actor.typed.scaladsl.AskPattern._
(target ? createRequest)(responseTimeout, system.scheduler).onComplete(res
self.asInstanceOf[ActorRef[AnyRef]] ! AdaptMessage(res, mapResponse)
)
pipeToSelf((target ? createRequest)(responseTimeout, system.scheduler))(mapResponse)
}
// Java API impl
def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
this.ask(target)(createRequest.apply) {
case Success(message) applyToResponse.apply(message, null)
case Failure(ex) applyToResponse.apply(null.asInstanceOf[Res], ex)
}(responseTimeout.asScala, ClassTag[Res](resClass))
def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Duration, createRequest: JFunction[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
import akka.actor.typed.javadsl.AskPattern
val message = new akka.japi.function.Function[ActorRef[Res], Req] {
def apply(ref: ActorRef[Res]): Req = createRequest(ref)
}
pipeToSelf(AskPattern.ask(target, message, responseTimeout, system.scheduler), applyToResponse)
}
// Scala API impl
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] T): Unit = {
future.onComplete(value self.unsafeUpcast ! AdaptMessage(value, mapResult))
}
// Java API impl
def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit = {
future.whenComplete(new BiConsumer[Value, Throwable] {
def accept(value: Value, ex: Throwable): Unit = {
if (value != null) self ! applyToResult.apply(value, null)
if (ex != null) self ! applyToResult.apply(null.asInstanceOf[Value], ex)
}
})
}
private[akka] override def spawnMessageAdapter[U](f: U T, name: String): ActorRef[U] =

View file

@ -19,7 +19,7 @@ import akka.annotation.InternalApi
/**
* INTERNAL API: Wrapping of messages that should be adapted by the included
* function. Used by `ActorContext.spawnMessageAdapter` so that the function is
* function. Used by `ActorContext.spawnMessageAdapter` and `ActorContext.ask` so that the function is
* applied in the "parent" actor (for better thread safety)..
*/
@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U T) extends InternalMessage {

View file

@ -11,6 +11,7 @@ import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import akka.actor.typed._
import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.concurrent.ExecutionContextExecutor
@ -278,4 +279,13 @@ trait ActorContext[T] extends TypedActorContext[T] {
createRequest: java.util.function.Function[ActorRef[Res], Req],
applyToResponse: BiFunction[Res, Throwable, T]): Unit
/**
* Sends the result of the given `CompletionStage` to this Actor (`self`), after adapted it with
* the given function.
*
* This method is thread-safe and can be called from other threads than the ordinary
* actor message processing thread, such as [[java.util.concurrent.CompletionStage]] callbacks.
*/
def pipeToSelf[Value](future: CompletionStage[Value], applyToResult: BiFunction[Value, Throwable, T]): Unit
}

View file

@ -30,6 +30,6 @@ import scala.compat.java8.FutureConverters._
*
*/
object AskPattern {
def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
def ask[T, U](actor: RecipientRef[T], message: JFunction[ActorRef[U], T], timeout: Duration, scheduler: Scheduler): CompletionStage[U] =
(actor.?(message.apply)(timeout.asScala, scheduler)).toJava
}

View file

@ -8,7 +8,7 @@ import akka.actor.typed._
import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.util.Timeout
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.util.Try
@ -275,4 +275,13 @@ trait ActorContext[T] extends TypedActorContext[T] { this: akka.actor.typed.java
*/
def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
/**
* Sends the result of the given `Future` to this Actor (`self`), after adapted it with
* the given function.
*
* This method is thread-safe and can be called from other threads than the ordinary
* actor message processing thread, such as [[scala.concurrent.Future]] callbacks.
*/
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] T): Unit
}