=typ Remove the AskResponse message. (#25971)
* =typ Remove the AskResponse message. * =typ import `akka.actor` as `akka.untyped` but not `akka.a`.
This commit is contained in:
parent
f1b9adb0c4
commit
3012154ae6
4 changed files with 18 additions and 36 deletions
|
|
@ -85,7 +85,7 @@ import akka.util.JavaDurationConverters._
|
|||
override def ask[Req, Res](target: RecipientRef[Req])(createRequest: ActorRef[Res] ⇒ Req)(mapResponse: Try[Res] ⇒ T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
|
||||
import akka.actor.typed.scaladsl.AskPattern._
|
||||
(target ? createRequest)(responseTimeout, system.scheduler).onComplete(res ⇒
|
||||
self.asInstanceOf[ActorRef[AnyRef]] ! new AskResponse(res, mapResponse)
|
||||
self.asInstanceOf[ActorRef[AnyRef]] ! AdaptMessage(res, mapResponse)
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,19 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.internal
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Message wrapper used to allow ActorContext.ask to map the response inside the asking actor.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] final class AskResponse[U, T](result: Try[U], adapter: Try[U] ⇒ T) {
|
||||
def adapt(): T = adapter(result)
|
||||
}
|
||||
|
|
@ -6,19 +6,22 @@ package akka.actor.typed.internal
|
|||
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
/**
|
||||
* A marker trait for internal messages.
|
||||
*/
|
||||
@InternalApi sealed trait InternalMessage
|
||||
|
||||
/**
|
||||
* INTERNAL API: Wrapping of messages that should be adapted by
|
||||
* adapters registered with `ActorContext.messageAdapter`.
|
||||
*/
|
||||
@InternalApi private[akka] final case class AdaptWithRegisteredMessageAdapter[U](msg: U)
|
||||
@InternalApi private[akka] final case class AdaptWithRegisteredMessageAdapter[U](msg: U) extends InternalMessage
|
||||
|
||||
/**
|
||||
* INTERNAL API: Wrapping of messages that should be adapted by the included
|
||||
* function. Used by `ActorContext.spawnMessageAdapter` so that the function is
|
||||
* applied in the "parent" actor (for better thread safetey)..
|
||||
* applied in the "parent" actor (for better thread safety)..
|
||||
*/
|
||||
@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U ⇒ T) {
|
||||
@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U ⇒ T) extends InternalMessage {
|
||||
def adapt(): T = adapter(msg)
|
||||
}
|
||||
|
||||
// FIXME move AskResponse in other PR
|
||||
|
|
|
|||
|
|
@ -11,14 +11,14 @@ import scala.util.Failure
|
|||
import scala.util.Success
|
||||
import scala.util.Try
|
||||
|
||||
import akka.{ actor ⇒ a }
|
||||
import akka.{ actor ⇒ untyped }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.OptionVal
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor with a.ActorLogging {
|
||||
@InternalApi private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends untyped.Actor with untyped.ActorLogging {
|
||||
import Behavior._
|
||||
|
||||
protected var behavior: Behavior[T] = _initialBehavior
|
||||
|
|
@ -32,12 +32,12 @@ import akka.util.OptionVal
|
|||
* Failures from failed children, that were stopped through untyped supervision, this is what allows us to pass
|
||||
* child exception in Terminated for direct children.
|
||||
*/
|
||||
private var failures: Map[a.ActorRef, Throwable] = Map.empty
|
||||
private var failures: Map[untyped.ActorRef, Throwable] = Map.empty
|
||||
|
||||
def receive = running
|
||||
def receive: Receive = running
|
||||
|
||||
def running: Receive = {
|
||||
case a.Terminated(ref) ⇒
|
||||
case untyped.Terminated(ref) ⇒
|
||||
val msg =
|
||||
if (failures contains ref) {
|
||||
val ex = failures(ref)
|
||||
|
|
@ -45,10 +45,8 @@ import akka.util.OptionVal
|
|||
Terminated(ActorRefAdapter(ref))(ex)
|
||||
} else Terminated(ActorRefAdapter(ref))(null)
|
||||
next(Behavior.interpretSignal(behavior, ctx, msg), msg)
|
||||
case a.ReceiveTimeout ⇒
|
||||
case untyped.ReceiveTimeout ⇒
|
||||
next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg)
|
||||
case wrapped: AskResponse[Any, T] @unchecked ⇒
|
||||
withSafelyAdapted(() ⇒ wrapped.adapt())(handleMessage)
|
||||
case wrapped: AdaptMessage[Any, T] @unchecked ⇒
|
||||
withSafelyAdapted(() ⇒ wrapped.adapt()) {
|
||||
case AdaptWithRegisteredMessageAdapter(msg) ⇒
|
||||
|
|
@ -121,11 +119,11 @@ import akka.util.OptionVal
|
|||
case other ⇒ super.unhandled(other)
|
||||
}
|
||||
|
||||
override val supervisorStrategy = a.OneForOneStrategy() {
|
||||
override val supervisorStrategy = untyped.OneForOneStrategy() {
|
||||
case ex ⇒
|
||||
val ref = sender()
|
||||
if (context.asInstanceOf[a.ActorCell].isWatching(ref)) failures = failures.updated(ref, ex)
|
||||
a.SupervisorStrategy.Stop
|
||||
if (context.asInstanceOf[untyped.ActorCell].isWatching(ref)) failures = failures.updated(ref, ex)
|
||||
untyped.SupervisorStrategy.Stop
|
||||
}
|
||||
|
||||
override def preStart(): Unit =
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue