From 824e4196e793e9053ed0b53a9ca50edf6508ae3b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 12 Sep 2018 14:06:32 +0200 Subject: [PATCH] Introduce RecipientRef interface for ActorRef and EntityRef, #24463 * to make it possible to use ActorContext.ask also with EntityRef destinations * this abstraction will probably be useful for other things, where a full ActorRef is not required * a RecipientRef only has to support tell (and ask), while a full ActorRef is watchable (destination has a lifecycle) and location transparent (serializable) --- .../typed/internal/ActorSystemStub.scala | 17 ++++++-- .../testkit/typed/internal/DebugRef.scala | 11 +++++- .../typed/internal/StubbedActorContext.scala | 14 +++++-- .../typed/scaladsl/ActorContextAskSpec.scala | 11 +++--- .../scala/akka/actor/typed/ActorRef.scala | 29 +++++++++++++- .../scala/akka/actor/typed/ActorSystem.scala | 6 +-- .../typed/internal/ActorContextImpl.scala | 8 ++-- .../actor/typed/internal/ActorRefImpl.scala | 2 +- .../typed/internal/InternalRecipientRef.scala | 26 +++++++++++++ .../internal/adapter/ActorRefAdapter.scala | 11 +++++- .../internal/adapter/ActorSystemAdapter.scala | 17 ++++++-- .../actor/typed/javadsl/ActorContext.scala | 4 +- .../actor/typed/scaladsl/ActorContext.scala | 4 +- .../actor/typed/scaladsl/AskPattern.scala | 34 +++++++++------- .../typed/scaladsl/adapter/package.scala | 13 +++++++ .../typed/internal/ClusterShardingImpl.scala | 16 +++++++- .../typed/javadsl/ClusterSharding.scala | 7 +++- .../typed/scaladsl/ClusterSharding.scala | 10 ++++- .../typed/scaladsl/ClusterShardingSpec.scala | 39 +++++++++++++++++-- 19 files changed, 223 insertions(+), 56 deletions(-) create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalRecipientRef.scala diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala index 1770dce6f7..d7fc3985eb 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala @@ -12,25 +12,34 @@ import akka.annotation.InternalApi import akka.util.Timeout import akka.{ actor ⇒ a } import com.typesafe.config.ConfigFactory - import scala.compat.java8.FutureConverters import scala.concurrent._ +import akka.actor.ActorRefProvider +import akka.actor.typed.internal.InternalRecipientRef + /** * INTERNAL API */ @InternalApi private[akka] final class ActorSystemStub(val name: String) - extends ActorSystem[Nothing] with ActorRef[Nothing] with ActorRefImpl[Nothing] { + extends ActorSystem[Nothing] with ActorRef[Nothing] with ActorRefImpl[Nothing] with InternalRecipientRef[Nothing] { override val path: a.ActorPath = a.RootActorPath(a.Address("akka", name)) / "user" override val settings: Settings = new Settings(getClass.getClassLoader, ConfigFactory.empty, name) - override def tell(msg: Nothing): Unit = throw new RuntimeException("must not send message to ActorSystemStub") + override def tell(msg: Nothing): Unit = throw new UnsupportedOperationException("must not send message to ActorSystemStub") + // impl ActorRefImpl override def isLocal: Boolean = true + // impl ActorRefImpl override def sendSystem(signal: akka.actor.typed.internal.SystemMessage): Unit = - throw new RuntimeException("must not send SYSTEM message to ActorSystemStub") + throw new UnsupportedOperationException("must not send SYSTEM message to ActorSystemStub") + + // impl InternalRecipientRef, ask not supported + override def provider: ActorRefProvider = throw new UnsupportedOperationException("no provider") + // impl InternalRecipientRef + def isTerminated: Boolean = whenTerminated.isCompleted val deadLettersInbox = new DebugRef[Any](path.parent / "deadLetters", true) override def deadLetters[U]: akka.actor.typed.ActorRef[U] = deadLettersInbox diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/DebugRef.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/DebugRef.scala index e6d7a0fd1b..3401d9cbec 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/DebugRef.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/DebugRef.scala @@ -10,14 +10,16 @@ import akka.actor.typed.ActorRef import akka.actor.typed.internal.{ ActorRefImpl, SystemMessage } import akka.annotation.InternalApi import akka.{ actor ⇒ a } - import scala.annotation.tailrec +import akka.actor.ActorRefProvider +import akka.actor.typed.internal.InternalRecipientRef + /** * INTERNAL API */ @InternalApi private[akka] final class DebugRef[T](override val path: a.ActorPath, override val isLocal: Boolean) - extends ActorRef[T] with ActorRefImpl[T] { + extends ActorRef[T] with ActorRefImpl[T] with InternalRecipientRef[T] { private val q = new ConcurrentLinkedQueue[Either[SystemMessage, T]] @@ -58,4 +60,9 @@ import scala.annotation.tailrec } rec(Nil) } + + // impl InternalRecipientRef, ask not supported + override def provider: ActorRefProvider = throw new UnsupportedOperationException("no provider") + // impl InternalRecipientRef + def isTerminated: Boolean = false } diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala index a96e798f32..b6a74796c2 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/StubbedActorContext.scala @@ -14,7 +14,6 @@ import akka.event.Logging import akka.event.Logging.LogLevel import akka.util.{ Helpers, OptionVal } import akka.{ actor ⇒ untyped } - import java.util.concurrent.ThreadLocalRandom.{ current ⇒ rnd } import scala.collection.JavaConverters._ @@ -22,6 +21,8 @@ import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorRefProvider + /** * INTERNAL API * @@ -32,15 +33,22 @@ import scala.concurrent.duration.FiniteDuration private[akka] final class FunctionRef[-T]( override val path: ActorPath, send: (T, FunctionRef[T]) ⇒ Unit) - extends ActorRef[T] with ActorRefImpl[T] { + extends ActorRef[T] with ActorRefImpl[T] with InternalRecipientRef[T] { override def tell(msg: T): Unit = { if (msg == null) throw InvalidMessageException("[null] is not an allowed message") send(msg, this) } - override def sendSystem(signal: SystemMessage): Unit = {} + // impl ActorRefImpl + override def sendSystem(signal: SystemMessage): Unit = () + // impl ActorRefImpl override def isLocal = true + + // impl InternalRecipientRef, ask not supported + override def provider: ActorRefProvider = throw new UnsupportedOperationException("no provider") + // impl InternalRecipientRef + def isTerminated: Boolean = false } final case class CapturedLogEvent(logLevel: LogLevel, message: String, diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala index a44ba50d7b..483bbecc57 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala @@ -47,9 +47,9 @@ class ActorContextAskSpec extends ScalaTestWithActorTestKit(ActorContextAskSpec. Behaviors.same }, "ping-pong", Props.empty.withDispatcherFromConfig("ping-pong-dispatcher")) - val probe = TestProbe[AnyRef]() + val probe = TestProbe[Pong]() - val snitch = Behaviors.setup[Pong] { (ctx) ⇒ + val snitch = Behaviors.setup[Pong] { ctx ⇒ // Timeout comes from TypedAkkaSpec @@ -58,10 +58,9 @@ class ActorContextAskSpec extends ScalaTestWithActorTestKit(ActorContextAskSpec. case Failure(ex) ⇒ throw ex } - Behaviors.receive { - case (ctx, pong: Pong) ⇒ - probe.ref ! pong - Behaviors.same + Behaviors.receiveMessage { pong ⇒ + probe.ref ! pong + Behaviors.same } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala index 48b58a8095..2c6b592db7 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala @@ -6,11 +6,12 @@ package akka.actor.typed import akka.annotation.InternalApi import akka.{ actor ⇒ a } - import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.Future import scala.util.Success +import akka.actor.typed.internal.InternalRecipientRef + /** * An ActorRef is the identity or address of an Actor instance. It is valid * only during the Actor’s lifetime and allows messages to be sent to that @@ -20,7 +21,7 @@ import scala.util.Success * [[akka.event.EventStream]] on a best effort basis * (i.e. this delivery is not reliable). */ -trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] with java.io.Serializable { +trait ActorRef[-T] extends RecipientRef[T] with java.lang.Comparable[ActorRef[_]] with java.io.Serializable { this: InternalRecipientRef[T] ⇒ /** * Send a message to the Actor referenced by this ActorRef using *at-most-once* * messaging semantics. @@ -102,3 +103,27 @@ private[akka] final case class SerializedActorRef[T] private (address: String) { resolver.resolveActorRef(address) } } + +/** + * FIXME doc + * - not serializable + * - not watchable + */ +trait RecipientRef[-T] { this: InternalRecipientRef[T] ⇒ + /** + * Send a message to the destination referenced by this `RecipientRef` using *at-most-once* + * messaging semantics. + */ + def tell(msg: T): Unit +} + +object RecipientRef { + + implicit final class RecipientRefOps[-T](val ref: RecipientRef[T]) extends AnyVal { + /** + * Send a message to the destination referenced by this `RecipientRef` using *at-most-once* + * messaging semantics. + */ + def !(msg: T): Unit = ref.tell(msg) + } +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index 229b30f8fb..98b36be609 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -9,14 +9,14 @@ import java.util.concurrent.{ CompletionStage, ThreadFactory } import akka.actor.setup.ActorSystemSetup import com.typesafe.config.{ Config, ConfigFactory } - import scala.concurrent.{ ExecutionContextExecutor, Future } + import akka.actor.typed.internal.adapter.{ ActorSystemAdapter, PropsAdapter } import akka.util.Timeout import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange - import akka.actor.BootstrapSetup +import akka.actor.typed.internal.InternalRecipientRef import akka.actor.typed.internal.adapter.GuardianActorAdapter import akka.actor.typed.receptionist.Receptionist @@ -31,7 +31,7 @@ import akka.actor.typed.receptionist.Receptionist */ @DoNotInherit @ApiMayChange -abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { +abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: InternalRecipientRef[T] ⇒ /** * The name of this actor system, used to distinguish multiple ones within * the same JVM & class loader. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 06c1d45df2..f4d070cd27 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -82,16 +82,16 @@ import akka.util.JavaDurationConverters._ spawnAnonymous(behavior, Props.empty) // Scala API impl - override def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = { + override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = { import akka.actor.typed.scaladsl.AskPattern._ - (otherActor ? createRequest)(responseTimeout, system.scheduler).onComplete(res ⇒ + (target ? createRequest)(responseTimeout, system.scheduler).onComplete(res ⇒ self.asInstanceOf[ActorRef[AnyRef]] ! new AskResponse(res, mapResponse) ) } // Java API impl - def ask[Req, Res](resClass: Class[Res], otherActor: ActorRef[Req], responseTimeout: Timeout, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = { - this.ask(otherActor)(createRequest.apply) { + def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Timeout, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = { + this.ask(target)(createRequest.apply) { case Success(message) ⇒ applyToResponse.apply(message, null) case Failure(ex) ⇒ applyToResponse.apply(null.asInstanceOf[Res], ex) }(responseTimeout, ClassTag[Res](resClass)) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorRefImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorRefImpl.scala index 11f2b2aadf..b4690b8346 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorRefImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorRefImpl.scala @@ -13,7 +13,7 @@ import scala.annotation.unchecked.uncheckedVariance * available in the package object, enabling `ref.toImpl` (or `ref.toImplN` * for `ActorRef[Nothing]`—Scala refuses to infer `Nothing` as a type parameter). */ -private[akka] trait ActorRefImpl[-T] extends ActorRef[T] { +private[akka] trait ActorRefImpl[-T] extends ActorRef[T] { this: InternalRecipientRef[T] ⇒ def sendSystem(signal: SystemMessage): Unit def isLocal: Boolean diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalRecipientRef.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalRecipientRef.scala new file mode 100644 index 0000000000..3a5f45321a --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalRecipientRef.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.actor.typed.internal + +import akka.actor.ActorRefProvider +import akka.actor.typed.RecipientRef +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi private[akka] trait InternalRecipientRef[-T] extends RecipientRef[T] { + + /** + * Get a reference to the actor ref provider which created this ref. + */ + def provider: ActorRefProvider + + /** + * @return `true` if the actor is locally known to be terminated, `false` if alive or uncertain. + */ + def isTerminated: Boolean + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala index 43b70bec44..9daf61254d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala @@ -6,6 +6,7 @@ package akka.actor.typed package internal package adapter +import akka.actor.ActorRefProvider import akka.actor.InvalidMessageException import akka.{ actor ⇒ a } import akka.annotation.InternalApi @@ -15,7 +16,7 @@ import akka.dispatch.sysmsg * INTERNAL API */ @InternalApi private[typed] class ActorRefAdapter[-T](val untyped: a.InternalActorRef) - extends ActorRef[T] with internal.ActorRefImpl[T] { + extends ActorRef[T] with internal.ActorRefImpl[T] with internal.InternalRecipientRef[T] { override def path: a.ActorPath = untyped.path @@ -23,10 +24,18 @@ import akka.dispatch.sysmsg if (msg == null) throw new InvalidMessageException("[null] is not an allowed message") untyped ! msg } + + // impl ActorRefImpl override def isLocal: Boolean = untyped.isLocal + // impl ActorRefImpl override def sendSystem(signal: internal.SystemMessage): Unit = ActorRefAdapter.sendSystemMessage(untyped, signal) + // impl InternalRecipientRef + override def provider: ActorRefProvider = untyped.provider + // impl InternalRecipientRef + def isTerminated: Boolean = untyped.isTerminated + @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = SerializedActorRef[T](this) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala index 2cddb4d2ee..06c7d2d196 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala @@ -12,15 +12,16 @@ import akka.actor import akka.actor.ExtendedActorSystem import akka.actor.InvalidMessageException import akka.{ actor ⇒ a } - import scala.concurrent.ExecutionContextExecutor + import akka.util.Timeout - import scala.concurrent.Future -import akka.annotation.InternalApi +import akka.annotation.InternalApi import scala.compat.java8.FutureConverters +import akka.actor.ActorRefProvider + /** * INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context). * Therefore it does not have a lot of vals, only the whenTerminated Future is cached after @@ -29,7 +30,8 @@ import scala.compat.java8.FutureConverters * most circumstances. */ @InternalApi private[akka] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) - extends ActorSystem[T] with ActorRef[T] with internal.ActorRefImpl[T] with ExtensionsImpl { + extends ActorSystem[T] with ActorRef[T] with internal.ActorRefImpl[T] with internal.InternalRecipientRef[T] with ExtensionsImpl { + untyped.assertInitialized() import ActorRefAdapter.sendSystemMessage @@ -40,9 +42,16 @@ import scala.compat.java8.FutureConverters untyped.guardian ! msg } + // impl ActorRefImpl override def isLocal: Boolean = true + // impl ActorRefImpl override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped.guardian, signal) + // impl InternalRecipientRef + override def provider: ActorRefProvider = untyped.provider + // impl InternalRecipientRef + def isTerminated: Boolean = whenTerminated.isCompleted + final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", untyped.name)) / "user" override def toString: String = untyped.toString diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 86e136b558..14055503b7 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -260,7 +260,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { * * @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that * the other actor can send a message back through. - * @param applyToResponse Transforms the response from the `otherActor` into a message this actor understands. + * @param applyToResponse Transforms the response from the `target` into a message this actor understands. * Will be invoked with either the response message or an AskTimeoutException failed or * potentially another exception if the remote actor is untyped and sent a * [[akka.actor.Status.Failure]] as response. The returned message of type `T` is then @@ -274,7 +274,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { */ def ask[Req, Res]( resClass: Class[Res], - otherActor: ActorRef[Req], + target: RecipientRef[Req], responseTimeout: Timeout, createRequest: java.util.function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 028dcd8e9f..18ccccc0f7 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -265,7 +265,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { this: akka.acto * * @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that * the other actor can send a message back through. - * @param mapResponse Transforms the response from the `otherActor` into a message this actor understands. + * @param mapResponse Transforms the response from the `target` into a message this actor understands. * Should be a pure function but is executed inside the actor when the response arrives * so can safely touch the actor internals. If this function throws an exception it is * just as if the normal message receiving logic would throw. @@ -273,6 +273,6 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { this: akka.acto * @tparam Req The request protocol, what the other actor accepts * @tparam Res The response protocol, what the other actor sends back */ - def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit + def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala index 36568b3ebf..ef19432cba 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala @@ -6,15 +6,17 @@ package akka.actor.typed.scaladsl import java.util.concurrent.TimeoutException -import akka.actor.{ Address, InternalActorRef, RootActorPath, Scheduler } +import akka.actor.{ Address, RootActorPath, Scheduler } import akka.actor.typed.ActorRef import akka.actor.typed.internal.{ adapter ⇒ adapt } import akka.annotation.InternalApi import akka.pattern.PromiseActorRef import akka.util.Timeout - import scala.concurrent.Future +import akka.actor.typed.RecipientRef +import akka.actor.typed.internal.InternalRecipientRef + /** * The ask-pattern implements the initiator side of a request–reply protocol. * The `?` operator is pronounced as "ask". @@ -26,7 +28,7 @@ object AskPattern { /** * See [[?]] */ - implicit final class Askable[T](val ref: ActorRef[T]) extends AnyVal { + implicit final class Askable[T](val ref: RecipientRef[T]) extends AnyVal { /** * The ask-pattern implements the initiator side of a request–reply protocol. * The `?` operator is pronounced as "ask". @@ -49,36 +51,38 @@ object AskPattern { * implicit val scheduler = system.scheduler * implicit val timeout = Timeout(3.seconds) * val target: ActorRef[Request] = ... - * val f: Future[Reply] = target ? ref => (Request("hello", ref)) + * val f: Future[Reply] = target ? replyTo => (Request("hello", replyTo)) * }}} */ - def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { + def ?[U](replyTo: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { // We do not currently use the implicit scheduler, but want to require it // because it might be needed when we move to a 'native' typed runtime, see #24219 ref match { - case a: adapt.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f) - case a: adapt.ActorSystemAdapter[_] ⇒ askUntyped(ref, a.untyped.guardian, timeout, f) - case a ⇒ throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass) + case a: InternalRecipientRef[_] ⇒ askUntyped(a, timeout, replyTo) + case a ⇒ throw new IllegalStateException( + "Only expect references to be RecipientRef, ActorRefAdapter or ActorSystemAdapter until " + + "native system is implemented: " + a.getClass) } } } private val onTimeout: String ⇒ Throwable = msg ⇒ new TimeoutException(msg) - private final class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) { + private final class PromiseRef[U](target: InternalRecipientRef[_], timeout: Timeout) { // Note: _promiseRef mustn't have a type pattern, since it can be null private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) = - if (untyped.isTerminated) + if (target.isTerminated) ( - adapt.ActorRefAdapter[U](untyped.provider.deadLetters), + adapt.ActorRefAdapter[U](target.provider.deadLetters), Future.failed[U](new TimeoutException(s"Recipient[$target] had already been terminated.")), null) else if (timeout.duration.length <= 0) ( - adapt.ActorRefAdapter[U](untyped.provider.deadLetters), + adapt.ActorRefAdapter[U](target.provider.deadLetters), Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$target]")), null) else { - val a = PromiseActorRef(untyped.provider, timeout, target, "unknown", onTimeout = onTimeout) + // messageClassName "unknown' is set later, after applying the message factory + val a = PromiseActorRef(target.provider, timeout, target, "unknown", onTimeout = onTimeout) val b = adapt.ActorRefAdapter[U](a) (b, a.result.future.asInstanceOf[Future[U]], a) } @@ -88,8 +92,8 @@ object AskPattern { val promiseRef: PromiseActorRef = _promiseRef } - private def askUntyped[T, U](target: ActorRef[T], untyped: InternalActorRef, timeout: Timeout, f: ActorRef[U] ⇒ T): Future[U] = { - val p = new PromiseRef[U](target, untyped, timeout) + private def askUntyped[T, U](target: InternalRecipientRef[T], timeout: Timeout, f: ActorRef[U] ⇒ T): Future[U] = { + val p = new PromiseRef[U](target, timeout) val m = f(p.ref) if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName target ! m diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala index 695d5d03df..34083a3b2a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/adapter/package.scala @@ -98,6 +98,19 @@ package object adapter { def toUntyped: akka.actor.ActorRef = ActorRefAdapter.toUntyped(ref) } + /** + * Extension methods added to [[akka.actor.ActorRef]]. + */ + implicit class UntypedActorRefOps(val ref: akka.actor.ActorRef) extends AnyVal { + + /** + * Adapt the untyped `ActorRef` to typed `ActorRef[T]`. There is also an + * automatic implicit conversion for this, but this more explicit variant might + * sometimes be preferred. + */ + def toTyped[T]: ActorRef[T] = ActorRefAdapter(ref) + } + /** * Implicit conversion from untyped [[akka.actor.ActorRef]] to typed [[akka.actor.typed.ActorRef]]. */ diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index a10d5ae926..7a4478bd56 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -12,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.compat.java8.FutureConverters._ import scala.concurrent.Future +import akka.actor.ActorRefProvider import akka.actor.ExtendedActorSystem import akka.actor.InternalActorRef import akka.actor.Scheduler @@ -20,6 +21,7 @@ import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.Props +import akka.actor.typed.internal.InternalRecipientRef import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.Behaviors @@ -222,7 +224,7 @@ import akka.util.Timeout */ @InternalApi private[akka] final class EntityRefImpl[M](shardRegion: akka.actor.ActorRef, entityId: String, scheduler: Scheduler) - extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] { + extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] with InternalRecipientRef[M] { override def tell(msg: M): Unit = shardRegion ! ShardingEnvelope(entityId, msg) @@ -267,6 +269,18 @@ import akka.util.Timeout val promiseRef: PromiseActorRef = _promiseRef } + // impl InternalRecipientRef + override def provider: ActorRefProvider = { + import akka.actor.typed.scaladsl.adapter._ + shardRegion.toTyped.asInstanceOf[InternalRecipientRef[_]].provider + } + + // impl InternalRecipientRef + def isTerminated: Boolean = { + import akka.actor.typed.scaladsl.adapter._ + shardRegion.toTyped.asInstanceOf[InternalRecipientRef[_]].isTerminated + } + } /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index f138e00cef..a959d85627 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -12,7 +12,9 @@ import java.util.function.BiFunction import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior +import akka.actor.typed.RecipientRef import akka.actor.typed.Props +import akka.actor.typed.internal.InternalRecipientRef import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy @@ -333,7 +335,7 @@ object EntityTypeKey { * * Not for user extension. */ -@DoNotInherit abstract class EntityRef[M] { scaladslSelf: scaladsl.EntityRef[M] ⇒ +@DoNotInherit abstract class EntityRef[M] extends RecipientRef[M] { scaladslSelf: scaladsl.EntityRef[M] with InternalRecipientRef[M] ⇒ /** * Send a message to the entity referenced by this EntityRef using *at-most-once* @@ -344,6 +346,9 @@ object EntityTypeKey { /** * Allows to "ask" the [[EntityRef]] for a reply. * See [[akka.actor.typed.javadsl.AskPattern]] for a complete write-up of this pattern + * + * Note that if you are inside of an actor you should prefer [[akka.actor.typed.javadsl.ActorContext.ask]] + * as that provides better safety. */ def ask[U](message: JFunction[ActorRef[U], M], timeout: Timeout): CompletionStage[U] diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 91e48d529b..9826aaea76 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -16,7 +16,9 @@ import akka.actor.typed.Behavior import akka.actor.typed.Extension import akka.actor.typed.ExtensionId import akka.actor.typed.ExtensionSetup +import akka.actor.typed.RecipientRef import akka.actor.typed.Props +import akka.actor.typed.internal.InternalRecipientRef import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy @@ -330,7 +332,7 @@ object EntityTypeKey { * [[ActorRef]] and watch it in case such notification is desired. * Not for user extension. */ -@DoNotInherit trait EntityRef[M] { +@DoNotInherit trait EntityRef[M] extends RecipientRef[M] { this: InternalRecipientRef[M] ⇒ /** * Send a message to the entity referenced by this EntityRef using *at-most-once* @@ -360,6 +362,9 @@ object EntityTypeKey { * Allows to "ask" the [[EntityRef]] for a reply. * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern * + * Note that if you are inside of an actor you should prefer [[akka.actor.typed.scaladsl.ActorContext.ask]] + * as that provides better safety. + * * Example usage: * {{{ * case class Request(msg: String, replyTo: ActorRef[Reply]) @@ -378,6 +383,9 @@ object EntityTypeKey { * Allows to "ask" the [[EntityRef]] for a reply. * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern * + * Note that if you are inside of an actor you should prefer [[akka.actor.typed.scaladsl.ActorContext.ask]] + * as that provides better safety. + * * Example usage: * {{{ * case class Request(msg: String, replyTo: ActorRef[Reply]) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index efb090c28e..ded7df579a 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -7,28 +7,29 @@ package akka.cluster.sharding.typed.scaladsl import java.nio.charset.StandardCharsets import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success import akka.Done import akka.actor.ExtendedActorSystem import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef import akka.actor.typed.ActorRefResolver import akka.actor.typed.ActorSystem -import akka.actor.typed.Props import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.cluster.MemberStatus -import akka.cluster.sharding.typed.{ ClusterShardingSettings, ShardingEnvelope, ShardingMessageExtractor } +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.cluster.typed.Leave import akka.serialization.SerializerWithStringManifest -import akka.actor.testkit.typed.scaladsl.TestProbe import akka.util.Timeout import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike -import org.scalatest.time.Span object ClusterShardingSpec { val config = ConfigFactory.parseString( @@ -66,6 +67,7 @@ object ClusterShardingSpec { sealed trait TestProtocol extends java.io.Serializable final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol + final case class WhoAreYou2(x: Int, replyTo: ActorRef[String]) extends TestProtocol final case class StopPlz() extends TestProtocol final case class PassivatePlz() extends TestProtocol @@ -127,6 +129,8 @@ object ClusterShardingSpec { } } + final case class TheReply(s: String) + } class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike { @@ -312,6 +316,33 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. bobRef ! StopPlz() } + "EntityRef - ActorContext.ask" in { + val aliceRef = sharding.entityRefFor(typeKey, "alice") + + val p = TestProbe[TheReply]() + + spawn( + Behaviors.setup[TheReply] { ctx ⇒ + // FIXME is the implicit ClassTag difficult to use? + // it works fine when there is a single parameter apply, + // but trouble when more parameters and this doesn't compile + //ctx.ask(aliceRef)(x => WhoAreYou(x)) { + ctx.ask(aliceRef)(WhoAreYou) { + case Success(name) ⇒ TheReply(name) + case Failure(ex) ⇒ TheReply(ex.getMessage) + } + + Behaviors.receiveMessage[TheReply] { reply ⇒ + p.ref ! reply + Behaviors.same + } + }) + + p.expectMessageType[TheReply].s should startWith("I'm alice") + + aliceRef ! StopPlz() + } + "handle untyped StartEntity message" in { // it is normally using envolopes, but the untyped StartEntity message can be sent internally, // e.g. for remember entities