Merge pull request #25610 from akka/wip-24463-EntityRef.ask-patriknw

Introduce RecipientRef interface for ActorRef and EntityRef, #24463
This commit is contained in:
Patrik Nordwall 2018-09-21 15:16:18 +02:00 committed by GitHub
commit 546f1634c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 223 additions and 56 deletions

View file

@ -12,25 +12,34 @@ import akka.annotation.InternalApi
import akka.util.Timeout import akka.util.Timeout
import akka.{ actor a } import akka.{ actor a }
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.compat.java8.FutureConverters import scala.compat.java8.FutureConverters
import scala.concurrent._ import scala.concurrent._
import akka.actor.ActorRefProvider
import akka.actor.typed.internal.InternalRecipientRef
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final class ActorSystemStub(val name: String) @InternalApi private[akka] final class ActorSystemStub(val name: String)
extends ActorSystem[Nothing] with ActorRef[Nothing] with ActorRefImpl[Nothing] { extends ActorSystem[Nothing] with ActorRef[Nothing] with ActorRefImpl[Nothing] with InternalRecipientRef[Nothing] {
override val path: a.ActorPath = a.RootActorPath(a.Address("akka", name)) / "user" override val path: a.ActorPath = a.RootActorPath(a.Address("akka", name)) / "user"
override val settings: Settings = new Settings(getClass.getClassLoader, ConfigFactory.empty, name) override val settings: Settings = new Settings(getClass.getClassLoader, ConfigFactory.empty, name)
override def tell(msg: Nothing): Unit = throw new RuntimeException("must not send message to ActorSystemStub") override def tell(msg: Nothing): Unit = throw new UnsupportedOperationException("must not send message to ActorSystemStub")
// impl ActorRefImpl
override def isLocal: Boolean = true override def isLocal: Boolean = true
// impl ActorRefImpl
override def sendSystem(signal: akka.actor.typed.internal.SystemMessage): Unit = override def sendSystem(signal: akka.actor.typed.internal.SystemMessage): Unit =
throw new RuntimeException("must not send SYSTEM message to ActorSystemStub") throw new UnsupportedOperationException("must not send SYSTEM message to ActorSystemStub")
// impl InternalRecipientRef, ask not supported
override def provider: ActorRefProvider = throw new UnsupportedOperationException("no provider")
// impl InternalRecipientRef
def isTerminated: Boolean = whenTerminated.isCompleted
val deadLettersInbox = new DebugRef[Any](path.parent / "deadLetters", true) val deadLettersInbox = new DebugRef[Any](path.parent / "deadLetters", true)
override def deadLetters[U]: akka.actor.typed.ActorRef[U] = deadLettersInbox override def deadLetters[U]: akka.actor.typed.ActorRef[U] = deadLettersInbox

View file

@ -10,14 +10,16 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.internal.{ ActorRefImpl, SystemMessage } import akka.actor.typed.internal.{ ActorRefImpl, SystemMessage }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.{ actor a } import akka.{ actor a }
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.actor.ActorRefProvider
import akka.actor.typed.internal.InternalRecipientRef
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final class DebugRef[T](override val path: a.ActorPath, override val isLocal: Boolean) @InternalApi private[akka] final class DebugRef[T](override val path: a.ActorPath, override val isLocal: Boolean)
extends ActorRef[T] with ActorRefImpl[T] { extends ActorRef[T] with ActorRefImpl[T] with InternalRecipientRef[T] {
private val q = new ConcurrentLinkedQueue[Either[SystemMessage, T]] private val q = new ConcurrentLinkedQueue[Either[SystemMessage, T]]
@ -58,4 +60,9 @@ import scala.annotation.tailrec
} }
rec(Nil) rec(Nil)
} }
// impl InternalRecipientRef, ask not supported
override def provider: ActorRefProvider = throw new UnsupportedOperationException("no provider")
// impl InternalRecipientRef
def isTerminated: Boolean = false
} }

View file

@ -14,7 +14,6 @@ import akka.event.Logging
import akka.event.Logging.LogLevel import akka.event.Logging.LogLevel
import akka.util.{ Helpers, OptionVal } import akka.util.{ Helpers, OptionVal }
import akka.{ actor untyped } import akka.{ actor untyped }
import java.util.concurrent.ThreadLocalRandom.{ current rnd } import java.util.concurrent.ThreadLocalRandom.{ current rnd }
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -22,6 +21,8 @@ import scala.collection.immutable.TreeMap
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorRefProvider
/** /**
* INTERNAL API * INTERNAL API
* *
@ -32,15 +33,22 @@ import scala.concurrent.duration.FiniteDuration
private[akka] final class FunctionRef[-T]( private[akka] final class FunctionRef[-T](
override val path: ActorPath, override val path: ActorPath,
send: (T, FunctionRef[T]) Unit) send: (T, FunctionRef[T]) Unit)
extends ActorRef[T] with ActorRefImpl[T] { extends ActorRef[T] with ActorRefImpl[T] with InternalRecipientRef[T] {
override def tell(msg: T): Unit = { override def tell(msg: T): Unit = {
if (msg == null) throw InvalidMessageException("[null] is not an allowed message") if (msg == null) throw InvalidMessageException("[null] is not an allowed message")
send(msg, this) send(msg, this)
} }
override def sendSystem(signal: SystemMessage): Unit = {} // impl ActorRefImpl
override def sendSystem(signal: SystemMessage): Unit = ()
// impl ActorRefImpl
override def isLocal = true override def isLocal = true
// impl InternalRecipientRef, ask not supported
override def provider: ActorRefProvider = throw new UnsupportedOperationException("no provider")
// impl InternalRecipientRef
def isTerminated: Boolean = false
} }
final case class CapturedLogEvent(logLevel: LogLevel, message: String, final case class CapturedLogEvent(logLevel: LogLevel, message: String,

View file

@ -47,9 +47,9 @@ class ActorContextAskSpec extends ScalaTestWithActorTestKit(ActorContextAskSpec.
Behaviors.same Behaviors.same
}, "ping-pong", Props.empty.withDispatcherFromConfig("ping-pong-dispatcher")) }, "ping-pong", Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val probe = TestProbe[AnyRef]() val probe = TestProbe[Pong]()
val snitch = Behaviors.setup[Pong] { (ctx) val snitch = Behaviors.setup[Pong] { ctx
// Timeout comes from TypedAkkaSpec // Timeout comes from TypedAkkaSpec
@ -58,10 +58,9 @@ class ActorContextAskSpec extends ScalaTestWithActorTestKit(ActorContextAskSpec.
case Failure(ex) throw ex case Failure(ex) throw ex
} }
Behaviors.receive { Behaviors.receiveMessage { pong
case (ctx, pong: Pong) probe.ref ! pong
probe.ref ! pong Behaviors.same
Behaviors.same
} }
} }

View file

@ -6,11 +6,12 @@ package akka.actor.typed
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.{ actor a } import akka.{ actor a }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.Success import scala.util.Success
import akka.actor.typed.internal.InternalRecipientRef
/** /**
* An ActorRef is the identity or address of an Actor instance. It is valid * An ActorRef is the identity or address of an Actor instance. It is valid
* only during the Actors lifetime and allows messages to be sent to that * only during the Actors lifetime and allows messages to be sent to that
@ -20,7 +21,7 @@ import scala.util.Success
* [[akka.event.EventStream]] on a best effort basis * [[akka.event.EventStream]] on a best effort basis
* (i.e. this delivery is not reliable). * (i.e. this delivery is not reliable).
*/ */
trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] with java.io.Serializable { trait ActorRef[-T] extends RecipientRef[T] with java.lang.Comparable[ActorRef[_]] with java.io.Serializable { this: InternalRecipientRef[T]
/** /**
* Send a message to the Actor referenced by this ActorRef using *at-most-once* * Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics. * messaging semantics.
@ -102,3 +103,27 @@ private[akka] final case class SerializedActorRef[T] private (address: String) {
resolver.resolveActorRef(address) resolver.resolveActorRef(address)
} }
} }
/**
* FIXME doc
* - not serializable
* - not watchable
*/
trait RecipientRef[-T] { this: InternalRecipientRef[T]
/**
* Send a message to the destination referenced by this `RecipientRef` using *at-most-once*
* messaging semantics.
*/
def tell(msg: T): Unit
}
object RecipientRef {
implicit final class RecipientRefOps[-T](val ref: RecipientRef[T]) extends AnyVal {
/**
* Send a message to the destination referenced by this `RecipientRef` using *at-most-once*
* messaging semantics.
*/
def !(msg: T): Unit = ref.tell(msg)
}
}

View file

@ -9,14 +9,14 @@ import java.util.concurrent.{ CompletionStage, ThreadFactory }
import akka.actor.setup.ActorSystemSetup import akka.actor.setup.ActorSystemSetup
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.{ ExecutionContextExecutor, Future } import scala.concurrent.{ ExecutionContextExecutor, Future }
import akka.actor.typed.internal.adapter.{ ActorSystemAdapter, PropsAdapter } import akka.actor.typed.internal.adapter.{ ActorSystemAdapter, PropsAdapter }
import akka.util.Timeout import akka.util.Timeout
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange import akka.annotation.ApiMayChange
import akka.actor.BootstrapSetup import akka.actor.BootstrapSetup
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.internal.adapter.GuardianActorAdapter import akka.actor.typed.internal.adapter.GuardianActorAdapter
import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.Receptionist
@ -31,7 +31,7 @@ import akka.actor.typed.receptionist.Receptionist
*/ */
@DoNotInherit @DoNotInherit
@ApiMayChange @ApiMayChange
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: InternalRecipientRef[T]
/** /**
* The name of this actor system, used to distinguish multiple ones within * The name of this actor system, used to distinguish multiple ones within
* the same JVM & class loader. * the same JVM & class loader.

View file

@ -82,16 +82,16 @@ import akka.util.JavaDurationConverters._
spawnAnonymous(behavior, Props.empty) spawnAnonymous(behavior, Props.empty)
// Scala API impl // Scala API impl
override def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = { override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
(otherActor ? createRequest)(responseTimeout, system.scheduler).onComplete(res (target ? createRequest)(responseTimeout, system.scheduler).onComplete(res
self.asInstanceOf[ActorRef[AnyRef]] ! new AskResponse(res, mapResponse) self.asInstanceOf[ActorRef[AnyRef]] ! new AskResponse(res, mapResponse)
) )
} }
// Java API impl // Java API impl
def ask[Req, Res](resClass: Class[Res], otherActor: ActorRef[Req], responseTimeout: Timeout, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = { def ask[Req, Res](resClass: Class[Res], target: RecipientRef[Req], responseTimeout: Timeout, createRequest: function.Function[ActorRef[Res], Req], applyToResponse: BiFunction[Res, Throwable, T]): Unit = {
this.ask(otherActor)(createRequest.apply) { this.ask(target)(createRequest.apply) {
case Success(message) applyToResponse.apply(message, null) case Success(message) applyToResponse.apply(message, null)
case Failure(ex) applyToResponse.apply(null.asInstanceOf[Res], ex) case Failure(ex) applyToResponse.apply(null.asInstanceOf[Res], ex)
}(responseTimeout, ClassTag[Res](resClass)) }(responseTimeout, ClassTag[Res](resClass))

View file

@ -13,7 +13,7 @@ import scala.annotation.unchecked.uncheckedVariance
* available in the package object, enabling `ref.toImpl` (or `ref.toImplN` * available in the package object, enabling `ref.toImpl` (or `ref.toImplN`
* for `ActorRef[Nothing]`Scala refuses to infer `Nothing` as a type parameter). * for `ActorRef[Nothing]`Scala refuses to infer `Nothing` as a type parameter).
*/ */
private[akka] trait ActorRefImpl[-T] extends ActorRef[T] { private[akka] trait ActorRefImpl[-T] extends ActorRef[T] { this: InternalRecipientRef[T]
def sendSystem(signal: SystemMessage): Unit def sendSystem(signal: SystemMessage): Unit
def isLocal: Boolean def isLocal: Boolean

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal
import akka.actor.ActorRefProvider
import akka.actor.typed.RecipientRef
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[akka] trait InternalRecipientRef[-T] extends RecipientRef[T] {
/**
* Get a reference to the actor ref provider which created this ref.
*/
def provider: ActorRefProvider
/**
* @return `true` if the actor is locally known to be terminated, `false` if alive or uncertain.
*/
def isTerminated: Boolean
}

View file

@ -6,6 +6,7 @@ package akka.actor.typed
package internal package internal
package adapter package adapter
import akka.actor.ActorRefProvider
import akka.actor.InvalidMessageException import akka.actor.InvalidMessageException
import akka.{ actor a } import akka.{ actor a }
import akka.annotation.InternalApi import akka.annotation.InternalApi
@ -15,7 +16,7 @@ import akka.dispatch.sysmsg
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[typed] class ActorRefAdapter[-T](val untyped: a.InternalActorRef) @InternalApi private[typed] class ActorRefAdapter[-T](val untyped: a.InternalActorRef)
extends ActorRef[T] with internal.ActorRefImpl[T] { extends ActorRef[T] with internal.ActorRefImpl[T] with internal.InternalRecipientRef[T] {
override def path: a.ActorPath = untyped.path override def path: a.ActorPath = untyped.path
@ -23,10 +24,18 @@ import akka.dispatch.sysmsg
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message") if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
untyped ! msg untyped ! msg
} }
// impl ActorRefImpl
override def isLocal: Boolean = untyped.isLocal override def isLocal: Boolean = untyped.isLocal
// impl ActorRefImpl
override def sendSystem(signal: internal.SystemMessage): Unit = override def sendSystem(signal: internal.SystemMessage): Unit =
ActorRefAdapter.sendSystemMessage(untyped, signal) ActorRefAdapter.sendSystemMessage(untyped, signal)
// impl InternalRecipientRef
override def provider: ActorRefProvider = untyped.provider
// impl InternalRecipientRef
def isTerminated: Boolean = untyped.isTerminated
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef[T](this) private def writeReplace(): AnyRef = SerializedActorRef[T](this)
} }

View file

@ -12,15 +12,16 @@ import akka.actor
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.InvalidMessageException import akka.actor.InvalidMessageException
import akka.{ actor a } import akka.{ actor a }
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.Future import scala.concurrent.Future
import akka.annotation.InternalApi
import akka.annotation.InternalApi
import scala.compat.java8.FutureConverters import scala.compat.java8.FutureConverters
import akka.actor.ActorRefProvider
/** /**
* INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context). * INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context).
* Therefore it does not have a lot of vals, only the whenTerminated Future is cached after * Therefore it does not have a lot of vals, only the whenTerminated Future is cached after
@ -29,7 +30,8 @@ import scala.compat.java8.FutureConverters
* most circumstances. * most circumstances.
*/ */
@InternalApi private[akka] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) @InternalApi private[akka] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
extends ActorSystem[T] with ActorRef[T] with internal.ActorRefImpl[T] with ExtensionsImpl { extends ActorSystem[T] with ActorRef[T] with internal.ActorRefImpl[T] with internal.InternalRecipientRef[T] with ExtensionsImpl {
untyped.assertInitialized() untyped.assertInitialized()
import ActorRefAdapter.sendSystemMessage import ActorRefAdapter.sendSystemMessage
@ -40,9 +42,16 @@ import scala.compat.java8.FutureConverters
untyped.guardian ! msg untyped.guardian ! msg
} }
// impl ActorRefImpl
override def isLocal: Boolean = true override def isLocal: Boolean = true
// impl ActorRefImpl
override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped.guardian, signal) override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped.guardian, signal)
// impl InternalRecipientRef
override def provider: ActorRefProvider = untyped.provider
// impl InternalRecipientRef
def isTerminated: Boolean = whenTerminated.isCompleted
final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", untyped.name)) / "user" final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", untyped.name)) / "user"
override def toString: String = untyped.toString override def toString: String = untyped.toString

View file

@ -260,7 +260,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] {
* *
* @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that * @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that
* the other actor can send a message back through. * the other actor can send a message back through.
* @param applyToResponse Transforms the response from the `otherActor` into a message this actor understands. * @param applyToResponse Transforms the response from the `target` into a message this actor understands.
* Will be invoked with either the response message or an AskTimeoutException failed or * Will be invoked with either the response message or an AskTimeoutException failed or
* potentially another exception if the remote actor is untyped and sent a * potentially another exception if the remote actor is untyped and sent a
* [[akka.actor.Status.Failure]] as response. The returned message of type `T` is then * [[akka.actor.Status.Failure]] as response. The returned message of type `T` is then
@ -274,7 +274,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] {
*/ */
def ask[Req, Res]( def ask[Req, Res](
resClass: Class[Res], resClass: Class[Res],
otherActor: ActorRef[Req], target: RecipientRef[Req],
responseTimeout: Timeout, responseTimeout: Timeout,
createRequest: java.util.function.Function[ActorRef[Res], Req], createRequest: java.util.function.Function[ActorRef[Res], Req],
applyToResponse: BiFunction[Res, Throwable, T]): Unit applyToResponse: BiFunction[Res, Throwable, T]): Unit

View file

@ -265,7 +265,7 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { this: akka.acto
* *
* @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that * @param createRequest A function that creates a message for the other actor, containing the provided `ActorRef[Res]` that
* the other actor can send a message back through. * the other actor can send a message back through.
* @param mapResponse Transforms the response from the `otherActor` into a message this actor understands. * @param mapResponse Transforms the response from the `target` into a message this actor understands.
* Should be a pure function but is executed inside the actor when the response arrives * Should be a pure function but is executed inside the actor when the response arrives
* so can safely touch the actor internals. If this function throws an exception it is * so can safely touch the actor internals. If this function throws an exception it is
* just as if the normal message receiving logic would throw. * just as if the normal message receiving logic would throw.
@ -273,6 +273,6 @@ trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { this: akka.acto
* @tparam Req The request protocol, what the other actor accepts * @tparam Req The request protocol, what the other actor accepts
* @tparam Res The response protocol, what the other actor sends back * @tparam Res The response protocol, what the other actor sends back
*/ */
def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
} }

View file

@ -6,15 +6,17 @@ package akka.actor.typed.scaladsl
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.actor.{ Address, InternalActorRef, RootActorPath, Scheduler } import akka.actor.{ Address, RootActorPath, Scheduler }
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.internal.{ adapter adapt } import akka.actor.typed.internal.{ adapter adapt }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.pattern.PromiseActorRef import akka.pattern.PromiseActorRef
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.Future import scala.concurrent.Future
import akka.actor.typed.RecipientRef
import akka.actor.typed.internal.InternalRecipientRef
/** /**
* The ask-pattern implements the initiator side of a requestreply protocol. * The ask-pattern implements the initiator side of a requestreply protocol.
* The `?` operator is pronounced as "ask". * The `?` operator is pronounced as "ask".
@ -26,7 +28,7 @@ object AskPattern {
/** /**
* See [[?]] * See [[?]]
*/ */
implicit final class Askable[T](val ref: ActorRef[T]) extends AnyVal { implicit final class Askable[T](val ref: RecipientRef[T]) extends AnyVal {
/** /**
* The ask-pattern implements the initiator side of a requestreply protocol. * The ask-pattern implements the initiator side of a requestreply protocol.
* The `?` operator is pronounced as "ask". * The `?` operator is pronounced as "ask".
@ -49,36 +51,38 @@ object AskPattern {
* implicit val scheduler = system.scheduler * implicit val scheduler = system.scheduler
* implicit val timeout = Timeout(3.seconds) * implicit val timeout = Timeout(3.seconds)
* val target: ActorRef[Request] = ... * val target: ActorRef[Request] = ...
* val f: Future[Reply] = target ? ref => (Request("hello", ref)) * val f: Future[Reply] = target ? replyTo => (Request("hello", replyTo))
* }}} * }}}
*/ */
def ?[U](f: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { def ?[U](replyTo: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
// We do not currently use the implicit scheduler, but want to require it // We do not currently use the implicit scheduler, but want to require it
// because it might be needed when we move to a 'native' typed runtime, see #24219 // because it might be needed when we move to a 'native' typed runtime, see #24219
ref match { ref match {
case a: adapt.ActorRefAdapter[_] askUntyped(ref, a.untyped, timeout, f) case a: InternalRecipientRef[_] askUntyped(a, timeout, replyTo)
case a: adapt.ActorSystemAdapter[_] askUntyped(ref, a.untyped.guardian, timeout, f) case a throw new IllegalStateException(
case a throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass) "Only expect references to be RecipientRef, ActorRefAdapter or ActorSystemAdapter until " +
"native system is implemented: " + a.getClass)
} }
} }
} }
private val onTimeout: String Throwable = msg new TimeoutException(msg) private val onTimeout: String Throwable = msg new TimeoutException(msg)
private final class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) { private final class PromiseRef[U](target: InternalRecipientRef[_], timeout: Timeout) {
// Note: _promiseRef mustn't have a type pattern, since it can be null // Note: _promiseRef mustn't have a type pattern, since it can be null
private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) = private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) =
if (untyped.isTerminated) if (target.isTerminated)
( (
adapt.ActorRefAdapter[U](untyped.provider.deadLetters), adapt.ActorRefAdapter[U](target.provider.deadLetters),
Future.failed[U](new TimeoutException(s"Recipient[$target] had already been terminated.")), null) Future.failed[U](new TimeoutException(s"Recipient[$target] had already been terminated.")), null)
else if (timeout.duration.length <= 0) else if (timeout.duration.length <= 0)
( (
adapt.ActorRefAdapter[U](untyped.provider.deadLetters), adapt.ActorRefAdapter[U](target.provider.deadLetters),
Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$target]")), null) Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$target]")), null)
else { else {
val a = PromiseActorRef(untyped.provider, timeout, target, "unknown", onTimeout = onTimeout) // messageClassName "unknown' is set later, after applying the message factory
val a = PromiseActorRef(target.provider, timeout, target, "unknown", onTimeout = onTimeout)
val b = adapt.ActorRefAdapter[U](a) val b = adapt.ActorRefAdapter[U](a)
(b, a.result.future.asInstanceOf[Future[U]], a) (b, a.result.future.asInstanceOf[Future[U]], a)
} }
@ -88,8 +92,8 @@ object AskPattern {
val promiseRef: PromiseActorRef = _promiseRef val promiseRef: PromiseActorRef = _promiseRef
} }
private def askUntyped[T, U](target: ActorRef[T], untyped: InternalActorRef, timeout: Timeout, f: ActorRef[U] T): Future[U] = { private def askUntyped[T, U](target: InternalRecipientRef[T], timeout: Timeout, f: ActorRef[U] T): Future[U] = {
val p = new PromiseRef[U](target, untyped, timeout) val p = new PromiseRef[U](target, timeout)
val m = f(p.ref) val m = f(p.ref)
if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName
target ! m target ! m

View file

@ -98,6 +98,19 @@ package object adapter {
def toUntyped: akka.actor.ActorRef = ActorRefAdapter.toUntyped(ref) def toUntyped: akka.actor.ActorRef = ActorRefAdapter.toUntyped(ref)
} }
/**
* Extension methods added to [[akka.actor.ActorRef]].
*/
implicit class UntypedActorRefOps(val ref: akka.actor.ActorRef) extends AnyVal {
/**
* Adapt the untyped `ActorRef` to typed `ActorRef[T]`. There is also an
* automatic implicit conversion for this, but this more explicit variant might
* sometimes be preferred.
*/
def toTyped[T]: ActorRef[T] = ActorRefAdapter(ref)
}
/** /**
* Implicit conversion from untyped [[akka.actor.ActorRef]] to typed [[akka.actor.typed.ActorRef]]. * Implicit conversion from untyped [[akka.actor.ActorRef]] to typed [[akka.actor.typed.ActorRef]].
*/ */

View file

@ -12,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.concurrent.Future import scala.concurrent.Future
import akka.actor.ActorRefProvider
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.InternalActorRef import akka.actor.InternalActorRef
import akka.actor.Scheduler import akka.actor.Scheduler
@ -20,6 +21,7 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
@ -222,7 +224,7 @@ import akka.util.Timeout
*/ */
@InternalApi private[akka] final class EntityRefImpl[M](shardRegion: akka.actor.ActorRef, entityId: String, @InternalApi private[akka] final class EntityRefImpl[M](shardRegion: akka.actor.ActorRef, entityId: String,
scheduler: Scheduler) scheduler: Scheduler)
extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] { extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] with InternalRecipientRef[M] {
override def tell(msg: M): Unit = override def tell(msg: M): Unit =
shardRegion ! ShardingEnvelope(entityId, msg) shardRegion ! ShardingEnvelope(entityId, msg)
@ -267,6 +269,18 @@ import akka.util.Timeout
val promiseRef: PromiseActorRef = _promiseRef val promiseRef: PromiseActorRef = _promiseRef
} }
// impl InternalRecipientRef
override def provider: ActorRefProvider = {
import akka.actor.typed.scaladsl.adapter._
shardRegion.toTyped.asInstanceOf[InternalRecipientRef[_]].provider
}
// impl InternalRecipientRef
def isTerminated: Boolean = {
import akka.actor.typed.scaladsl.adapter._
shardRegion.toTyped.asInstanceOf[InternalRecipientRef[_]].isTerminated
}
} }
/** /**

View file

@ -12,7 +12,9 @@ import java.util.function.BiFunction
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.RecipientRef
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
@ -333,7 +335,7 @@ object EntityTypeKey {
* *
* Not for user extension. * Not for user extension.
*/ */
@DoNotInherit abstract class EntityRef[M] { scaladslSelf: scaladsl.EntityRef[M] @DoNotInherit abstract class EntityRef[M] extends RecipientRef[M] { scaladslSelf: scaladsl.EntityRef[M] with InternalRecipientRef[M]
/** /**
* Send a message to the entity referenced by this EntityRef using *at-most-once* * Send a message to the entity referenced by this EntityRef using *at-most-once*
@ -344,6 +346,9 @@ object EntityTypeKey {
/** /**
* Allows to "ask" the [[EntityRef]] for a reply. * Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.javadsl.AskPattern]] for a complete write-up of this pattern * See [[akka.actor.typed.javadsl.AskPattern]] for a complete write-up of this pattern
*
* Note that if you are inside of an actor you should prefer [[akka.actor.typed.javadsl.ActorContext.ask]]
* as that provides better safety.
*/ */
def ask[U](message: JFunction[ActorRef[U], M], timeout: Timeout): CompletionStage[U] def ask[U](message: JFunction[ActorRef[U], M], timeout: Timeout): CompletionStage[U]

View file

@ -16,7 +16,9 @@ import akka.actor.typed.Behavior
import akka.actor.typed.Extension import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId import akka.actor.typed.ExtensionId
import akka.actor.typed.ExtensionSetup import akka.actor.typed.ExtensionSetup
import akka.actor.typed.RecipientRef
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
@ -330,7 +332,7 @@ object EntityTypeKey {
* [[ActorRef]] and watch it in case such notification is desired. * [[ActorRef]] and watch it in case such notification is desired.
* Not for user extension. * Not for user extension.
*/ */
@DoNotInherit trait EntityRef[M] { @DoNotInherit trait EntityRef[M] extends RecipientRef[M] { this: InternalRecipientRef[M]
/** /**
* Send a message to the entity referenced by this EntityRef using *at-most-once* * Send a message to the entity referenced by this EntityRef using *at-most-once*
@ -360,6 +362,9 @@ object EntityTypeKey {
* Allows to "ask" the [[EntityRef]] for a reply. * Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern
* *
* Note that if you are inside of an actor you should prefer [[akka.actor.typed.scaladsl.ActorContext.ask]]
* as that provides better safety.
*
* Example usage: * Example usage:
* {{{ * {{{
* case class Request(msg: String, replyTo: ActorRef[Reply]) * case class Request(msg: String, replyTo: ActorRef[Reply])
@ -378,6 +383,9 @@ object EntityTypeKey {
* Allows to "ask" the [[EntityRef]] for a reply. * Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern
* *
* Note that if you are inside of an actor you should prefer [[akka.actor.typed.scaladsl.ActorContext.ask]]
* as that provides better safety.
*
* Example usage: * Example usage:
* {{{ * {{{
* case class Request(msg: String, replyTo: ActorRef[Reply]) * case class Request(msg: String, replyTo: ActorRef[Reply])

View file

@ -7,28 +7,29 @@ package akka.cluster.sharding.typed.scaladsl
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import akka.Done import akka.Done
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorRefResolver import akka.actor.typed.ActorRefResolver
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Props
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.cluster.sharding.typed.{ ClusterShardingSettings, ShardingEnvelope, ShardingMessageExtractor } import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.cluster.typed.Cluster import akka.cluster.typed.Cluster
import akka.cluster.typed.Join import akka.cluster.typed.Join
import akka.cluster.typed.Leave import akka.cluster.typed.Leave
import akka.serialization.SerializerWithStringManifest import akka.serialization.SerializerWithStringManifest
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.util.Timeout import akka.util.Timeout
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike import org.scalatest.WordSpecLike
import org.scalatest.time.Span
object ClusterShardingSpec { object ClusterShardingSpec {
val config = ConfigFactory.parseString( val config = ConfigFactory.parseString(
@ -66,6 +67,7 @@ object ClusterShardingSpec {
sealed trait TestProtocol extends java.io.Serializable sealed trait TestProtocol extends java.io.Serializable
final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol
final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol
final case class WhoAreYou2(x: Int, replyTo: ActorRef[String]) extends TestProtocol
final case class StopPlz() extends TestProtocol final case class StopPlz() extends TestProtocol
final case class PassivatePlz() extends TestProtocol final case class PassivatePlz() extends TestProtocol
@ -127,6 +129,8 @@ object ClusterShardingSpec {
} }
} }
final case class TheReply(s: String)
} }
class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike { class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with WordSpecLike {
@ -312,6 +316,33 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
bobRef ! StopPlz() bobRef ! StopPlz()
} }
"EntityRef - ActorContext.ask" in {
val aliceRef = sharding.entityRefFor(typeKey, "alice")
val p = TestProbe[TheReply]()
spawn(
Behaviors.setup[TheReply] { ctx
// FIXME is the implicit ClassTag difficult to use?
// it works fine when there is a single parameter apply,
// but trouble when more parameters and this doesn't compile
//ctx.ask(aliceRef)(x => WhoAreYou(x)) {
ctx.ask(aliceRef)(WhoAreYou) {
case Success(name) TheReply(name)
case Failure(ex) TheReply(ex.getMessage)
}
Behaviors.receiveMessage[TheReply] { reply
p.ref ! reply
Behaviors.same
}
})
p.expectMessageType[TheReply].s should startWith("I'm alice")
aliceRef ! StopPlz()
}
"handle untyped StartEntity message" in { "handle untyped StartEntity message" in {
// it is normally using envolopes, but the untyped StartEntity message can be sent internally, // it is normally using envolopes, but the untyped StartEntity message can be sent internally,
// e.g. for remember entities // e.g. for remember entities