Remove death watch and supervision mechanics (#24198)
These were missed in #24176 Also some removal of unused imports and a couple compiler warnings
This commit is contained in:
parent
bfbdf89776
commit
2aab0762dd
13 changed files with 39 additions and 399 deletions
|
|
@ -60,8 +60,6 @@ object ActorRef {
|
|||
def !(msg: T): Unit = ref.tell(msg)
|
||||
}
|
||||
|
||||
// FIXME factory methods for below for Java (trait + object)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
|
|
|
|||
|
|
@ -139,8 +139,7 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions {
|
|||
* Ask the system guardian of this system to create an actor from the given
|
||||
* behavior and props and with the given name. The name does not need to
|
||||
* be unique since the guardian will prefix it with a running number when
|
||||
* creating the child actor. The timeout sets the timeout used for the [[akka.actor.typed.scaladsl.AskPattern]]
|
||||
* invocation when asking the guardian.
|
||||
* creating the child actor.
|
||||
*
|
||||
* The returned Future of [[ActorRef]] may be converted into an [[ActorRef]]
|
||||
* to which messages can immediately be sent by using the [[ActorRef$.apply[T](s*]]
|
||||
|
|
@ -156,8 +155,6 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions {
|
|||
}
|
||||
|
||||
object ActorSystem {
|
||||
import internal._
|
||||
|
||||
/**
|
||||
* Scala API: Create an ActorSystem
|
||||
*/
|
||||
|
|
@ -229,8 +226,6 @@ object ActorSystem {
|
|||
* This class is immutable.
|
||||
*/
|
||||
final class Settings(val config: Config, val untyped: a.ActorSystem.Settings, val name: String) {
|
||||
import collection.JavaConverters._
|
||||
|
||||
def this(_cl: ClassLoader, _config: Config, name: String) = this({
|
||||
val config = _config.withFallback(ConfigFactory.defaultReference(_cl))
|
||||
config.checkValid(ConfigFactory.defaultReference(_cl), "akka")
|
||||
|
|
@ -240,22 +235,6 @@ final class Settings(val config: Config, val untyped: a.ActorSystem.Settings, va
|
|||
def this(untyped: a.ActorSystem.Settings) = this(untyped.config, untyped, untyped.name)
|
||||
|
||||
private var foundSettings = List.empty[String]
|
||||
private def found(name: String, value: String): Unit = foundSettings ::= s"$name = $value"
|
||||
private def getS(name: String, path: String): String = {
|
||||
val value = config.getString(path)
|
||||
found(name, value)
|
||||
value
|
||||
}
|
||||
private def getSL(name: String, path: String): List[String] = {
|
||||
val value = config.getStringList(path)
|
||||
found(name, value.toString)
|
||||
value.asScala.toList
|
||||
}
|
||||
private def getI(name: String, path: String): Int = {
|
||||
val value = config.getInt(path)
|
||||
found(name, value.toString)
|
||||
value
|
||||
}
|
||||
|
||||
foundSettings = foundSettings.reverse
|
||||
|
||||
|
|
|
|||
|
|
@ -290,7 +290,7 @@ object Behavior {
|
|||
case ext: ExtensibleBehavior[T] ⇒
|
||||
val possiblyDeferredResult = msg match {
|
||||
case signal: Signal ⇒ ext.receiveSignal(ctx, signal)
|
||||
case msg ⇒ ext.receiveMessage(ctx, msg.asInstanceOf[T])
|
||||
case m ⇒ ext.receiveMessage(ctx, m.asInstanceOf[T])
|
||||
}
|
||||
undefer(possiblyDeferredResult, ctx)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContextExecutor
|
|||
case None ⇒ Optional.empty()
|
||||
}
|
||||
|
||||
override def getChildren: java.util.List[akka.actor.typed.ActorRef[Void]] = {
|
||||
override def getChildren: java.util.List[ActorRef[Void]] = {
|
||||
val c = children
|
||||
val a = new ArrayList[ActorRef[Void]](c.size)
|
||||
val i = c.iterator
|
||||
|
|
@ -57,10 +57,10 @@ import scala.concurrent.ExecutionContextExecutor
|
|||
internalSpawnAdapter(f, "")
|
||||
|
||||
override def spawnAdapter[U](f: java.util.function.Function[U, T]): akka.actor.typed.ActorRef[U] =
|
||||
internalSpawnAdapter(f.apply _, "")
|
||||
internalSpawnAdapter(f.apply, "")
|
||||
|
||||
override def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.actor.typed.ActorRef[U] =
|
||||
internalSpawnAdapter(f.apply _, name)
|
||||
internalSpawnAdapter(f.apply, name)
|
||||
|
||||
/**
|
||||
* INTERNAL API: Needed to make Scala 2.12 compiler happy.
|
||||
|
|
|
|||
|
|
@ -4,12 +4,10 @@
|
|||
package akka.actor.typed
|
||||
package internal
|
||||
|
||||
import akka.actor.InvalidMessageException
|
||||
import akka.util.LineNumbers
|
||||
import akka.annotation.InternalApi
|
||||
import akka.actor.typed.{ ActorContext ⇒ AC }
|
||||
import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC }
|
||||
import akka.actor.typed.scaladsl.Actor
|
||||
import akka.actor.typed.scaladsl.{ ActorContext ⇒ TAC }
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
import scala.annotation.tailrec
|
||||
|
|
@ -67,19 +65,19 @@ import scala.annotation.tailrec
|
|||
}
|
||||
|
||||
class ImmutableBehavior[T](
|
||||
val onMessage: (SAC[T], T) ⇒ Behavior[T],
|
||||
onSignal: PartialFunction[(SAC[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
val onMessage: (TAC[T], T) ⇒ Behavior[T],
|
||||
onSignal: PartialFunction[(TAC[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(TAC[T], Signal), Behavior[T]]])
|
||||
extends ExtensibleBehavior[T] {
|
||||
|
||||
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] =
|
||||
onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(TAC[T], Signal), Behavior[T]]])
|
||||
override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg)
|
||||
override def toString = s"Immutable(${LineNumbers(onMessage)})"
|
||||
}
|
||||
|
||||
def tap[T](
|
||||
onMessage: Function2[SAC[T], T, _],
|
||||
onSignal: Function2[SAC[T], Signal, _],
|
||||
onMessage: (TAC[T], T) ⇒ _,
|
||||
onSignal: (TAC[T], Signal) ⇒ _,
|
||||
behavior: Behavior[T]): Behavior[T] = {
|
||||
intercept[T, T](
|
||||
beforeMessage = (ctx, msg) ⇒ {
|
||||
|
|
@ -90,8 +88,8 @@ import scala.annotation.tailrec
|
|||
onSignal(ctx, sig)
|
||||
true
|
||||
},
|
||||
afterMessage = (ctx, msg, b) ⇒ b, // TODO optimize by using more ConstantFun
|
||||
afterSignal = (ctx, sig, b) ⇒ b,
|
||||
afterMessage = (_, _, b) ⇒ b, // TODO optimize by using more ConstantFun
|
||||
afterSignal = (_, _, b) ⇒ b,
|
||||
behavior)(ClassTag(classOf[Any]))
|
||||
}
|
||||
|
||||
|
|
@ -111,10 +109,10 @@ import scala.annotation.tailrec
|
|||
* different than the incoming message).
|
||||
*/
|
||||
def intercept[T, U <: Any: ClassTag](
|
||||
beforeMessage: Function2[SAC[U], U, T],
|
||||
beforeSignal: Function2[SAC[T], Signal, Boolean],
|
||||
afterMessage: Function3[SAC[T], T, Behavior[T], Behavior[T]],
|
||||
afterSignal: Function3[SAC[T], Signal, Behavior[T], Behavior[T]],
|
||||
beforeMessage: Function2[TAC[U], U, T],
|
||||
beforeSignal: Function2[TAC[T], Signal, Boolean],
|
||||
afterMessage: Function3[TAC[T], T, Behavior[T], Behavior[T]],
|
||||
afterSignal: Function3[TAC[T], Signal, Behavior[T], Behavior[T]],
|
||||
behavior: Behavior[T],
|
||||
toStringPrefix: String = "Intercept"): Behavior[T] = {
|
||||
behavior match {
|
||||
|
|
@ -130,10 +128,10 @@ import scala.annotation.tailrec
|
|||
}
|
||||
|
||||
private final case class Intercept[T, U <: Any: ClassTag](
|
||||
beforeOnMessage: Function2[SAC[U], U, T],
|
||||
beforeOnSignal: Function2[SAC[T], Signal, Boolean],
|
||||
afterMessage: Function3[SAC[T], T, Behavior[T], Behavior[T]],
|
||||
afterSignal: Function3[SAC[T], Signal, Behavior[T], Behavior[T]],
|
||||
beforeOnMessage: Function2[TAC[U], U, T],
|
||||
beforeOnSignal: Function2[TAC[T], Signal, Boolean],
|
||||
afterMessage: Function3[TAC[T], T, Behavior[T], Behavior[T]],
|
||||
afterSignal: Function3[TAC[T], Signal, Behavior[T], Behavior[T]],
|
||||
behavior: Behavior[T],
|
||||
toStringPrefix: String = "Intercept") extends ExtensibleBehavior[T] {
|
||||
|
||||
|
|
@ -162,7 +160,7 @@ import scala.annotation.tailrec
|
|||
override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = {
|
||||
msg match {
|
||||
case m: U ⇒
|
||||
val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[SAC[U]], m)
|
||||
val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[TAC[U]], m)
|
||||
val next: Behavior[T] =
|
||||
if (msg2 == null)
|
||||
same
|
||||
|
|
|
|||
|
|
@ -1,221 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed
|
||||
package internal
|
||||
|
||||
import akka.event.Logging.{ Warning, Debug }
|
||||
import akka.event.AddressTerminatedTopic
|
||||
import akka.event.Logging
|
||||
import akka.actor.Address
|
||||
|
||||
/*
|
||||
* THOUGHTS
|
||||
*
|
||||
* - an ActorRef is a channel that allows sending messages — in particular it is NOT a sender!
|
||||
* - a channel is scoped by the session it is part of
|
||||
* - termination means that the session ends because sending further messages is pointless
|
||||
* - this means that there is no ordering requirement between Terminated and any other received message
|
||||
*/
|
||||
private[typed] trait DeathWatch[T] {
|
||||
|
||||
/*
|
||||
* INTERFACE WITH ACTORCELL
|
||||
*/
|
||||
protected def system: ActorSystem[Nothing]
|
||||
protected def self: ActorRefImpl[T]
|
||||
protected def parent: ActorRefImpl[Nothing]
|
||||
protected def behavior: Behavior[T]
|
||||
protected def next(b: Behavior[T], msg: Any): Unit
|
||||
protected def childrenMap: Map[String, ActorRefImpl[Nothing]]
|
||||
protected def terminatingMap: Map[String, ActorRefImpl[Nothing]]
|
||||
protected def isTerminating: Boolean
|
||||
protected def ctx: ActorContext[T]
|
||||
protected def maySend: Boolean
|
||||
protected def publish(e: Logging.LogEvent): Unit
|
||||
protected def clazz(obj: AnyRef): Class[_]
|
||||
|
||||
protected def removeChild(actor: ActorRefImpl[Nothing]): Unit
|
||||
protected def finishTerminate(): Unit
|
||||
|
||||
type ARImpl = ActorRefImpl[Nothing]
|
||||
|
||||
/**
|
||||
* This map holds a [[None]] for actors for which we send a [[Terminated]] notification on termination,
|
||||
* ``Some(message)`` for actors for which we send a custom termination message.
|
||||
*/
|
||||
private var watching = Map.empty[ARImpl, Option[T]]
|
||||
private var watchedBy = Set.empty[ARImpl]
|
||||
|
||||
final def watch[U](subject: ActorRef[U]): Unit = {
|
||||
val a = subject.sorry
|
||||
if (a != self && !watching.contains(a)) {
|
||||
maintainAddressTerminatedSubscription(a) {
|
||||
a.sendSystem(Watch(a, self))
|
||||
watching = watching.updated(a, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final def watchWith[U](subject: ActorRef[U], msg: T): Unit = {
|
||||
val a = subject.sorry
|
||||
if (a != self && !watching.contains(a)) {
|
||||
maintainAddressTerminatedSubscription(a) {
|
||||
a.sendSystem(Watch(a, self))
|
||||
watching = watching.updated(a, Some(msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final def unwatch[U](_a: ActorRef[U]): Unit = {
|
||||
val a = _a.sorry
|
||||
if (a != self && watching.contains(a)) {
|
||||
a.sendSystem(Unwatch(a, self))
|
||||
maintainAddressTerminatedSubscription(a) {
|
||||
watching -= a
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When this actor is watching the subject of [[akka.actor.Terminated]] message
|
||||
* it will be propagated to user's receive.
|
||||
*/
|
||||
protected def watchedActorTerminated(actor: ARImpl, failure: Throwable): Boolean = {
|
||||
removeChild(actor)
|
||||
watching.get(actor) match {
|
||||
case None ⇒ // We're apparently no longer watching this actor.
|
||||
case Some(optionalMessage) ⇒
|
||||
maintainAddressTerminatedSubscription(actor) {
|
||||
watching -= actor
|
||||
}
|
||||
if (maySend) {
|
||||
optionalMessage match {
|
||||
case None ⇒
|
||||
val t = Terminated(actor)(failure)
|
||||
next(Behavior.interpretSignal(behavior, ctx, t), t)
|
||||
case Some(msg) ⇒
|
||||
next(Behavior.interpretMessage(behavior, ctx, msg), msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (isTerminating && terminatingMap.isEmpty) {
|
||||
finishTerminate()
|
||||
false
|
||||
} else true
|
||||
}
|
||||
|
||||
protected def tellWatchersWeDied(): Unit =
|
||||
if (watchedBy.nonEmpty) {
|
||||
try {
|
||||
// Don't need to send to parent parent since it receives a DWN by default
|
||||
def sendTerminated(ifLocal: Boolean)(watcher: ARImpl): Unit =
|
||||
if (watcher.isLocal == ifLocal && watcher != parent) watcher.sendSystem(DeathWatchNotification(self, null))
|
||||
|
||||
/*
|
||||
* It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing
|
||||
* the remoting to shut down as well. At this point Terminated messages to remote watchers are no longer
|
||||
* deliverable.
|
||||
*
|
||||
* The problematic case is:
|
||||
* 1. Terminated is sent to RemoteDaemon
|
||||
* 1a. RemoteDaemon is fast enough to notify the terminator actor in RemoteActorRefProvider
|
||||
* 1b. The terminator is fast enough to enqueue the shutdown command in the remoting
|
||||
* 2. Only at this point is the Terminated (to be sent remotely) enqueued in the mailbox of remoting
|
||||
*
|
||||
* If the remote watchers are notified first, then the mailbox of the Remoting will guarantee the correct order.
|
||||
*/
|
||||
watchedBy foreach sendTerminated(ifLocal = false)
|
||||
watchedBy foreach sendTerminated(ifLocal = true)
|
||||
} finally {
|
||||
maintainAddressTerminatedSubscription() {
|
||||
watchedBy = Set.empty
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected def unwatchWatchedActors(): Unit =
|
||||
if (watching.nonEmpty) {
|
||||
maintainAddressTerminatedSubscription() {
|
||||
try {
|
||||
watching.foreach { case (watchee, _) ⇒ watchee.sendSystem(Unwatch(watchee, self)) }
|
||||
} finally {
|
||||
watching = Map.empty
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected def addWatcher(watchee: ARImpl, watcher: ARImpl): Unit = {
|
||||
val watcheeSelf = watchee == self
|
||||
val watcherSelf = watcher == self
|
||||
|
||||
if (watcheeSelf && !watcherSelf) {
|
||||
if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) {
|
||||
watchedBy += watcher
|
||||
if (system.settings.untyped.DebugLifecycle) publish(Debug(self.path.toString, clazz(behavior), s"now watched by $watcher"))
|
||||
}
|
||||
} else if (!watcheeSelf && watcherSelf) {
|
||||
watch(watchee)
|
||||
} else {
|
||||
publish(Warning(self.path.toString, clazz(behavior), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
|
||||
}
|
||||
}
|
||||
|
||||
protected def remWatcher(watchee: ARImpl, watcher: ARImpl): Unit = {
|
||||
val watcheeSelf = watchee == self
|
||||
val watcherSelf = watcher == self
|
||||
|
||||
if (watcheeSelf && !watcherSelf) {
|
||||
if (watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) {
|
||||
watchedBy -= watcher
|
||||
if (system.settings.untyped.DebugLifecycle) publish(Debug(self.path.toString, clazz(behavior), s"no longer watched by $watcher"))
|
||||
}
|
||||
} else if (!watcheeSelf && watcherSelf) {
|
||||
unwatch(watchee)
|
||||
} else {
|
||||
publish(Warning(self.path.toString, clazz(behavior), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
|
||||
}
|
||||
}
|
||||
|
||||
protected def addressTerminated(address: Address): Unit = {
|
||||
// cleanup watchedBy since we know they are dead
|
||||
maintainAddressTerminatedSubscription() {
|
||||
for (a ← watchedBy; if a.path.address == address) watchedBy -= a
|
||||
}
|
||||
|
||||
for ((a, _) ← watching; if a.path.address == address) {
|
||||
self.sendSystem(DeathWatchNotification(a, null))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts subscription to AddressTerminated if not already subscribing and the
|
||||
* block adds a non-local ref to watching or watchedBy.
|
||||
* Ends subscription to AddressTerminated if subscribing and the
|
||||
* block removes the last non-local ref from watching and watchedBy.
|
||||
*/
|
||||
private def maintainAddressTerminatedSubscription[U](change: ARImpl = null)(block: ⇒ U): U = {
|
||||
def isNonLocal(ref: ARImpl) = ref match {
|
||||
case null ⇒ true
|
||||
case a ⇒ !a.isLocal
|
||||
}
|
||||
|
||||
if (isNonLocal(change)) {
|
||||
def hasNonLocalAddress: Boolean = ((watching.keysIterator exists isNonLocal) || (watchedBy exists isNonLocal))
|
||||
val had = hasNonLocalAddress
|
||||
val result = block
|
||||
val has = hasNonLocalAddress
|
||||
if (had && !has) unsubscribeAddressTerminated()
|
||||
else if (!had && has) subscribeAddressTerminated()
|
||||
result
|
||||
} else {
|
||||
block
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: these will need to be redone once remoting is integrated
|
||||
private def unsubscribeAddressTerminated(): Unit = ???
|
||||
private def subscribeAddressTerminated(): Unit = ???
|
||||
|
||||
}
|
||||
|
|
@ -97,7 +97,7 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒
|
|||
//Always notify listeners of the inProcess signal
|
||||
inProcessOfRegistration.countDown()
|
||||
}
|
||||
case other ⇒
|
||||
case _ ⇒
|
||||
//Someone else is in process of registering an extension for this Extension, retry
|
||||
registerExtension(ext)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,10 +15,10 @@ import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
|
|||
class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
|
||||
|
||||
private val resolver = ActorRefResolver(system.toTyped)
|
||||
private val actorRefManifest = "a"
|
||||
private val ActorRefManifest = "a"
|
||||
|
||||
def manifest(o: AnyRef): String = o match {
|
||||
case ref: ActorRef[_] ⇒ actorRefManifest
|
||||
case ref: ActorRef[_] ⇒ ActorRefManifest
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
|
@ -30,7 +30,7 @@ class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends
|
|||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], manifest: String): ActorRef[Any] = manifest match {
|
||||
case `actorRefManifest` ⇒ resolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
|
||||
case ActorRefManifest ⇒ resolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
|
||||
case _ ⇒
|
||||
throw new NotSerializableException(
|
||||
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
|
||||
|
|
|
|||
|
|
@ -1,112 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package akka.actor.typed
|
||||
package internal
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import akka.event.Logging
|
||||
import akka.actor.typed.Behavior.{ DeferredBehavior, undefer, validateAsInitial }
|
||||
import akka.actor.typed.Behavior.StoppedBehavior
|
||||
import akka.util.OptionVal
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[typed] trait SupervisionMechanics[T] {
|
||||
/*
|
||||
* INTERFACE WITH ACTOR CELL
|
||||
*/
|
||||
protected def system: ActorSystem[Nothing]
|
||||
protected def initialBehavior: Behavior[T]
|
||||
protected def self: ActorRefImpl[T]
|
||||
protected def parent: ActorRefImpl[Nothing]
|
||||
protected def behavior: Behavior[T]
|
||||
protected def behavior_=(b: Behavior[T]): Unit
|
||||
protected def next(b: Behavior[T], msg: Any): Unit
|
||||
protected def terminatingMap: Map[String, ActorRefImpl[Nothing]]
|
||||
protected def stopAll(): Unit
|
||||
protected def setTerminating(): Unit
|
||||
protected def setClosed(): Unit
|
||||
protected def maySend: Boolean
|
||||
protected def ctx: ActorContext[T]
|
||||
protected def publish(e: Logging.LogEvent): Unit
|
||||
protected def clazz(obj: AnyRef): Class[_]
|
||||
|
||||
// INTERFACE WITH DEATHWATCH
|
||||
protected def addWatcher(watchee: ActorRefImpl[Nothing], watcher: ActorRefImpl[Nothing]): Unit
|
||||
protected def remWatcher(watchee: ActorRefImpl[Nothing], watcher: ActorRefImpl[Nothing]): Unit
|
||||
protected def watchedActorTerminated(actor: ActorRefImpl[Nothing], failure: Throwable): Boolean
|
||||
protected def tellWatchersWeDied(): Unit
|
||||
protected def unwatchWatchedActors(): Unit
|
||||
|
||||
/**
|
||||
* Process one system message and return whether further messages shall be processed.
|
||||
*/
|
||||
protected def processSignal(message: SystemMessage): Boolean = {
|
||||
message match {
|
||||
case Watch(watchee, watcher) ⇒ { addWatcher(watchee.sorryForNothing, watcher.sorryForNothing); true }
|
||||
case Unwatch(watchee, watcher) ⇒ { remWatcher(watchee.sorryForNothing, watcher.sorryForNothing); true }
|
||||
case DeathWatchNotification(a, f) ⇒ watchedActorTerminated(a.sorryForNothing, f)
|
||||
case Create() ⇒ create()
|
||||
case Terminate() ⇒ terminate()
|
||||
case NoMessage ⇒ false // only here to suppress warning
|
||||
}
|
||||
}
|
||||
|
||||
private[this] var _failed: Throwable = null
|
||||
protected def failed: Throwable = _failed
|
||||
|
||||
protected def fail(thr: Throwable): Unit = {
|
||||
if (_failed eq null) _failed = thr
|
||||
publish(Logging.Error(thr, self.path.toString, getClass, thr.getMessage))
|
||||
if (maySend) self.sendSystem(Terminate())
|
||||
}
|
||||
|
||||
private def create(): Boolean = {
|
||||
behavior = initialBehavior
|
||||
if (system.settings.untyped.DebugLifecycle)
|
||||
publish(Logging.Debug(self.path.toString, clazz(behavior), "started"))
|
||||
behavior = validateAsInitial(undefer(behavior, ctx))
|
||||
if (!Behavior.isAlive(behavior)) self.sendSystem(Terminate())
|
||||
true
|
||||
}
|
||||
|
||||
private def terminate(): Boolean = {
|
||||
setTerminating()
|
||||
unwatchWatchedActors()
|
||||
stopAll()
|
||||
if (terminatingMap.isEmpty) {
|
||||
finishTerminate()
|
||||
false
|
||||
} else true
|
||||
}
|
||||
|
||||
protected def finishTerminate(): Unit = {
|
||||
val a = behavior
|
||||
/*
|
||||
* The following order is crucial for things to work properly. Only change this if you're very confident and lucky.
|
||||
*
|
||||
*
|
||||
*/
|
||||
try a match {
|
||||
case null ⇒ // skip PostStop
|
||||
case _: DeferredBehavior[_] ⇒
|
||||
// Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination.
|
||||
case s: StoppedBehavior[_] ⇒ s.postStop match {
|
||||
case OptionVal.Some(postStop) ⇒ Behavior.interpretSignal(postStop, ctx, PostStop)
|
||||
case OptionVal.None ⇒ // no postStop behavior defined
|
||||
}
|
||||
case _ ⇒ Behavior.interpretSignal(a, ctx, PostStop)
|
||||
} catch { case NonFatal(ex) ⇒ publish(Logging.Error(ex, self.path.toString, clazz(a), "failure during PostStop")) }
|
||||
finally try tellWatchersWeDied()
|
||||
finally try parent.sendSystem(DeathWatchNotification(self, failed))
|
||||
finally {
|
||||
behavior = null
|
||||
_failed = null
|
||||
setClosed()
|
||||
if (system.settings.untyped.DebugLifecycle)
|
||||
publish(Logging.Debug(self.path.toString, clazz(a), "stopped"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -27,7 +27,7 @@ import akka.actor.typed.Behavior.UntypedBehavior
|
|||
ActorContextAdapter.spawnAnonymous(untyped, behavior, props)
|
||||
override def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty) =
|
||||
ActorContextAdapter.spawn(untyped, behavior, name, props)
|
||||
override def stop[U](child: ActorRef[U]) =
|
||||
override def stop[U](child: ActorRef[U]): Boolean =
|
||||
toUntyped(child) match {
|
||||
case f: akka.actor.FunctionRef ⇒
|
||||
val cell = untyped.asInstanceOf[akka.actor.ActorCell]
|
||||
|
|
@ -63,7 +63,6 @@ import akka.actor.typed.Behavior.UntypedBehavior
|
|||
val ref = cell.addFunctionRef((_, msg) ⇒ untyped.self ! f(msg.asInstanceOf[U]), _name)
|
||||
ActorRefAdapter[U](ref)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -6,11 +6,8 @@ package akka.actor.typed.javadsl
|
|||
import java.util.function.{ Function ⇒ JFunction }
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed._
|
||||
import java.util.Optional
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Props
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.ExecutionContextExecutor
|
||||
|
||||
|
|
@ -30,7 +27,7 @@ import scala.concurrent.ExecutionContextExecutor
|
|||
*
|
||||
* An `ActorContext` in addition provides access to the Actor’s own identity (“`getSelf`”),
|
||||
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
|
||||
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
|
||||
* created, access to [[Terminated]] and timed message scheduling.
|
||||
*/
|
||||
@DoNotInherit
|
||||
@ApiMayChange
|
||||
|
|
|
|||
|
|
@ -8,11 +8,7 @@ import scala.concurrent.duration.FiniteDuration
|
|||
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Props
|
||||
import akka.actor.typed.EmptyProps
|
||||
import akka.actor.typed._
|
||||
|
||||
/**
|
||||
* An Actor is given by the combination of a [[Behavior]] and a context in
|
||||
|
|
@ -30,7 +26,7 @@ import akka.actor.typed.EmptyProps
|
|||
*
|
||||
* An `ActorContext` in addition provides access to the Actor’s own identity (“`self`”),
|
||||
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
|
||||
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
|
||||
* created, access to [[Terminated]] and timed message scheduling.
|
||||
*/
|
||||
@DoNotInherit
|
||||
@ApiMayChange
|
||||
|
|
@ -90,7 +86,7 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
|
|||
def stop[U](child: ActorRef[U]): Boolean
|
||||
|
||||
/**
|
||||
* Register for [[Terminated]] notification once the Actor identified by the
|
||||
* Register for [[akka.actor.typed.Terminated]] notification once the Actor identified by the
|
||||
* given [[ActorRef]] terminates. This message is also sent when the watched actor
|
||||
* is on a node that has been removed from the cluster when using akka-cluster
|
||||
* or has been marked unreachable when using akka-remote directly
|
||||
|
|
|
|||
|
|
@ -17,7 +17,8 @@ import akka.persistence.typed.scaladsl.PersistentActor._
|
|||
|
||||
object PersistentActorSpec {
|
||||
|
||||
val config = ConfigFactory.parseString("""
|
||||
val config = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
|
||||
""")
|
||||
|
||||
|
|
@ -109,6 +110,7 @@ object PersistentActorSpec {
|
|||
}
|
||||
|
||||
class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eventually with StartSupport {
|
||||
|
||||
import PersistentActorSpec._
|
||||
|
||||
implicit val testSettings = TestKitSettings(system)
|
||||
|
|
@ -215,9 +217,13 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
|
|||
// behavior is running as an untyped PersistentActor it's not possible to
|
||||
// wrap it in Actor.deferred or Actor.supervise
|
||||
pending
|
||||
val probe = TestProbe[State]
|
||||
val behavior = Actor.supervise[Command](counter("c13"))
|
||||
.onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1))
|
||||
val c = start(behavior)
|
||||
c ! Increment
|
||||
c ! GetValue(probe.ref)
|
||||
probe.expectMsg(State(1, Vector(0)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue