diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala index 2c1b9c131a..1f3a7f53dc 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala @@ -60,8 +60,6 @@ object ActorRef { def !(msg: T): Unit = ref.tell(msg) } - // FIXME factory methods for below for Java (trait + object) - /** * INTERNAL API * diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index 0f119638c6..ea00b486d0 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -139,8 +139,7 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { * Ask the system guardian of this system to create an actor from the given * behavior and props and with the given name. The name does not need to * be unique since the guardian will prefix it with a running number when - * creating the child actor. The timeout sets the timeout used for the [[akka.actor.typed.scaladsl.AskPattern]] - * invocation when asking the guardian. + * creating the child actor. * * The returned Future of [[ActorRef]] may be converted into an [[ActorRef]] * to which messages can immediately be sent by using the [[ActorRef$.apply[T](s*]] @@ -156,8 +155,6 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { } object ActorSystem { - import internal._ - /** * Scala API: Create an ActorSystem */ @@ -229,8 +226,6 @@ object ActorSystem { * This class is immutable. */ final class Settings(val config: Config, val untyped: a.ActorSystem.Settings, val name: String) { - import collection.JavaConverters._ - def this(_cl: ClassLoader, _config: Config, name: String) = this({ val config = _config.withFallback(ConfigFactory.defaultReference(_cl)) config.checkValid(ConfigFactory.defaultReference(_cl), "akka") @@ -240,22 +235,6 @@ final class Settings(val config: Config, val untyped: a.ActorSystem.Settings, va def this(untyped: a.ActorSystem.Settings) = this(untyped.config, untyped, untyped.name) private var foundSettings = List.empty[String] - private def found(name: String, value: String): Unit = foundSettings ::= s"$name = $value" - private def getS(name: String, path: String): String = { - val value = config.getString(path) - found(name, value) - value - } - private def getSL(name: String, path: String): List[String] = { - val value = config.getStringList(path) - found(name, value.toString) - value.asScala.toList - } - private def getI(name: String, path: String): Int = { - val value = config.getInt(path) - found(name, value.toString) - value - } foundSettings = foundSettings.reverse diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index d659edec2a..37f8e5e6a6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -290,7 +290,7 @@ object Behavior { case ext: ExtensibleBehavior[T] ⇒ val possiblyDeferredResult = msg match { case signal: Signal ⇒ ext.receiveSignal(ctx, signal) - case msg ⇒ ext.receiveMessage(ctx, msg.asInstanceOf[T]) + case m ⇒ ext.receiveMessage(ctx, m.asInstanceOf[T]) } undefer(possiblyDeferredResult, ctx) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 69a5bd2a91..794fbbcc9a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContextExecutor case None ⇒ Optional.empty() } - override def getChildren: java.util.List[akka.actor.typed.ActorRef[Void]] = { + override def getChildren: java.util.List[ActorRef[Void]] = { val c = children val a = new ArrayList[ActorRef[Void]](c.size) val i = c.iterator @@ -57,10 +57,10 @@ import scala.concurrent.ExecutionContextExecutor internalSpawnAdapter(f, "") override def spawnAdapter[U](f: java.util.function.Function[U, T]): akka.actor.typed.ActorRef[U] = - internalSpawnAdapter(f.apply _, "") + internalSpawnAdapter(f.apply, "") override def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.actor.typed.ActorRef[U] = - internalSpawnAdapter(f.apply _, name) + internalSpawnAdapter(f.apply, name) /** * INTERNAL API: Needed to make Scala 2.12 compiler happy. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index e9683c18b5..0cfdbd9165 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -4,12 +4,10 @@ package akka.actor.typed package internal -import akka.actor.InvalidMessageException import akka.util.LineNumbers import akka.annotation.InternalApi import akka.actor.typed.{ ActorContext ⇒ AC } -import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC } -import akka.actor.typed.scaladsl.Actor +import akka.actor.typed.scaladsl.{ ActorContext ⇒ TAC } import scala.reflect.ClassTag import scala.annotation.tailrec @@ -67,19 +65,19 @@ import scala.annotation.tailrec } class ImmutableBehavior[T]( - val onMessage: (SAC[T], T) ⇒ Behavior[T], - onSignal: PartialFunction[(SAC[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) + val onMessage: (TAC[T], T) ⇒ Behavior[T], + onSignal: PartialFunction[(TAC[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(TAC[T], Signal), Behavior[T]]]) extends ExtensibleBehavior[T] { override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = - onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) + onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(TAC[T], Signal), Behavior[T]]]) override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg) override def toString = s"Immutable(${LineNumbers(onMessage)})" } def tap[T]( - onMessage: Function2[SAC[T], T, _], - onSignal: Function2[SAC[T], Signal, _], + onMessage: (TAC[T], T) ⇒ _, + onSignal: (TAC[T], Signal) ⇒ _, behavior: Behavior[T]): Behavior[T] = { intercept[T, T]( beforeMessage = (ctx, msg) ⇒ { @@ -90,8 +88,8 @@ import scala.annotation.tailrec onSignal(ctx, sig) true }, - afterMessage = (ctx, msg, b) ⇒ b, // TODO optimize by using more ConstantFun - afterSignal = (ctx, sig, b) ⇒ b, + afterMessage = (_, _, b) ⇒ b, // TODO optimize by using more ConstantFun + afterSignal = (_, _, b) ⇒ b, behavior)(ClassTag(classOf[Any])) } @@ -111,10 +109,10 @@ import scala.annotation.tailrec * different than the incoming message). */ def intercept[T, U <: Any: ClassTag]( - beforeMessage: Function2[SAC[U], U, T], - beforeSignal: Function2[SAC[T], Signal, Boolean], - afterMessage: Function3[SAC[T], T, Behavior[T], Behavior[T]], - afterSignal: Function3[SAC[T], Signal, Behavior[T], Behavior[T]], + beforeMessage: Function2[TAC[U], U, T], + beforeSignal: Function2[TAC[T], Signal, Boolean], + afterMessage: Function3[TAC[T], T, Behavior[T], Behavior[T]], + afterSignal: Function3[TAC[T], Signal, Behavior[T], Behavior[T]], behavior: Behavior[T], toStringPrefix: String = "Intercept"): Behavior[T] = { behavior match { @@ -130,10 +128,10 @@ import scala.annotation.tailrec } private final case class Intercept[T, U <: Any: ClassTag]( - beforeOnMessage: Function2[SAC[U], U, T], - beforeOnSignal: Function2[SAC[T], Signal, Boolean], - afterMessage: Function3[SAC[T], T, Behavior[T], Behavior[T]], - afterSignal: Function3[SAC[T], Signal, Behavior[T], Behavior[T]], + beforeOnMessage: Function2[TAC[U], U, T], + beforeOnSignal: Function2[TAC[T], Signal, Boolean], + afterMessage: Function3[TAC[T], T, Behavior[T], Behavior[T]], + afterSignal: Function3[TAC[T], Signal, Behavior[T], Behavior[T]], behavior: Behavior[T], toStringPrefix: String = "Intercept") extends ExtensibleBehavior[T] { @@ -162,7 +160,7 @@ import scala.annotation.tailrec override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = { msg match { case m: U ⇒ - val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[SAC[U]], m) + val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[TAC[U]], m) val next: Behavior[T] = if (msg2 == null) same diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/DeathWatch.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/DeathWatch.scala deleted file mode 100644 index 5c38032358..0000000000 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/DeathWatch.scala +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Copyright (C) 2009-2017 Lightbend Inc. - */ - -package akka.actor.typed -package internal - -import akka.event.Logging.{ Warning, Debug } -import akka.event.AddressTerminatedTopic -import akka.event.Logging -import akka.actor.Address - -/* - * THOUGHTS - * - * - an ActorRef is a channel that allows sending messages — in particular it is NOT a sender! - * - a channel is scoped by the session it is part of - * - termination means that the session ends because sending further messages is pointless - * - this means that there is no ordering requirement between Terminated and any other received message - */ -private[typed] trait DeathWatch[T] { - - /* - * INTERFACE WITH ACTORCELL - */ - protected def system: ActorSystem[Nothing] - protected def self: ActorRefImpl[T] - protected def parent: ActorRefImpl[Nothing] - protected def behavior: Behavior[T] - protected def next(b: Behavior[T], msg: Any): Unit - protected def childrenMap: Map[String, ActorRefImpl[Nothing]] - protected def terminatingMap: Map[String, ActorRefImpl[Nothing]] - protected def isTerminating: Boolean - protected def ctx: ActorContext[T] - protected def maySend: Boolean - protected def publish(e: Logging.LogEvent): Unit - protected def clazz(obj: AnyRef): Class[_] - - protected def removeChild(actor: ActorRefImpl[Nothing]): Unit - protected def finishTerminate(): Unit - - type ARImpl = ActorRefImpl[Nothing] - - /** - * This map holds a [[None]] for actors for which we send a [[Terminated]] notification on termination, - * ``Some(message)`` for actors for which we send a custom termination message. - */ - private var watching = Map.empty[ARImpl, Option[T]] - private var watchedBy = Set.empty[ARImpl] - - final def watch[U](subject: ActorRef[U]): Unit = { - val a = subject.sorry - if (a != self && !watching.contains(a)) { - maintainAddressTerminatedSubscription(a) { - a.sendSystem(Watch(a, self)) - watching = watching.updated(a, None) - } - } - } - - final def watchWith[U](subject: ActorRef[U], msg: T): Unit = { - val a = subject.sorry - if (a != self && !watching.contains(a)) { - maintainAddressTerminatedSubscription(a) { - a.sendSystem(Watch(a, self)) - watching = watching.updated(a, Some(msg)) - } - } - } - - final def unwatch[U](_a: ActorRef[U]): Unit = { - val a = _a.sorry - if (a != self && watching.contains(a)) { - a.sendSystem(Unwatch(a, self)) - maintainAddressTerminatedSubscription(a) { - watching -= a - } - } - } - - /** - * When this actor is watching the subject of [[akka.actor.Terminated]] message - * it will be propagated to user's receive. - */ - protected def watchedActorTerminated(actor: ARImpl, failure: Throwable): Boolean = { - removeChild(actor) - watching.get(actor) match { - case None ⇒ // We're apparently no longer watching this actor. - case Some(optionalMessage) ⇒ - maintainAddressTerminatedSubscription(actor) { - watching -= actor - } - if (maySend) { - optionalMessage match { - case None ⇒ - val t = Terminated(actor)(failure) - next(Behavior.interpretSignal(behavior, ctx, t), t) - case Some(msg) ⇒ - next(Behavior.interpretMessage(behavior, ctx, msg), msg) - } - } - } - if (isTerminating && terminatingMap.isEmpty) { - finishTerminate() - false - } else true - } - - protected def tellWatchersWeDied(): Unit = - if (watchedBy.nonEmpty) { - try { - // Don't need to send to parent parent since it receives a DWN by default - def sendTerminated(ifLocal: Boolean)(watcher: ARImpl): Unit = - if (watcher.isLocal == ifLocal && watcher != parent) watcher.sendSystem(DeathWatchNotification(self, null)) - - /* - * It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing - * the remoting to shut down as well. At this point Terminated messages to remote watchers are no longer - * deliverable. - * - * The problematic case is: - * 1. Terminated is sent to RemoteDaemon - * 1a. RemoteDaemon is fast enough to notify the terminator actor in RemoteActorRefProvider - * 1b. The terminator is fast enough to enqueue the shutdown command in the remoting - * 2. Only at this point is the Terminated (to be sent remotely) enqueued in the mailbox of remoting - * - * If the remote watchers are notified first, then the mailbox of the Remoting will guarantee the correct order. - */ - watchedBy foreach sendTerminated(ifLocal = false) - watchedBy foreach sendTerminated(ifLocal = true) - } finally { - maintainAddressTerminatedSubscription() { - watchedBy = Set.empty - } - } - } - - protected def unwatchWatchedActors(): Unit = - if (watching.nonEmpty) { - maintainAddressTerminatedSubscription() { - try { - watching.foreach { case (watchee, _) ⇒ watchee.sendSystem(Unwatch(watchee, self)) } - } finally { - watching = Map.empty - } - } - } - - protected def addWatcher(watchee: ARImpl, watcher: ARImpl): Unit = { - val watcheeSelf = watchee == self - val watcherSelf = watcher == self - - if (watcheeSelf && !watcherSelf) { - if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) { - watchedBy += watcher - if (system.settings.untyped.DebugLifecycle) publish(Debug(self.path.toString, clazz(behavior), s"now watched by $watcher")) - } - } else if (!watcheeSelf && watcherSelf) { - watch(watchee) - } else { - publish(Warning(self.path.toString, clazz(behavior), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self))) - } - } - - protected def remWatcher(watchee: ARImpl, watcher: ARImpl): Unit = { - val watcheeSelf = watchee == self - val watcherSelf = watcher == self - - if (watcheeSelf && !watcherSelf) { - if (watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) { - watchedBy -= watcher - if (system.settings.untyped.DebugLifecycle) publish(Debug(self.path.toString, clazz(behavior), s"no longer watched by $watcher")) - } - } else if (!watcheeSelf && watcherSelf) { - unwatch(watchee) - } else { - publish(Warning(self.path.toString, clazz(behavior), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self))) - } - } - - protected def addressTerminated(address: Address): Unit = { - // cleanup watchedBy since we know they are dead - maintainAddressTerminatedSubscription() { - for (a ← watchedBy; if a.path.address == address) watchedBy -= a - } - - for ((a, _) ← watching; if a.path.address == address) { - self.sendSystem(DeathWatchNotification(a, null)) - } - } - - /** - * Starts subscription to AddressTerminated if not already subscribing and the - * block adds a non-local ref to watching or watchedBy. - * Ends subscription to AddressTerminated if subscribing and the - * block removes the last non-local ref from watching and watchedBy. - */ - private def maintainAddressTerminatedSubscription[U](change: ARImpl = null)(block: ⇒ U): U = { - def isNonLocal(ref: ARImpl) = ref match { - case null ⇒ true - case a ⇒ !a.isLocal - } - - if (isNonLocal(change)) { - def hasNonLocalAddress: Boolean = ((watching.keysIterator exists isNonLocal) || (watchedBy exists isNonLocal)) - val had = hasNonLocalAddress - val result = block - val has = hasNonLocalAddress - if (had && !has) unsubscribeAddressTerminated() - else if (!had && has) subscribeAddressTerminated() - result - } else { - block - } - } - - // FIXME: these will need to be redone once remoting is integrated - private def unsubscribeAddressTerminated(): Unit = ??? - private def subscribeAddressTerminated(): Unit = ??? - -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala index 234f490fe6..6b459f79c3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala @@ -97,7 +97,7 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒ //Always notify listeners of the inProcess signal inProcessOfRegistration.countDown() } - case other ⇒ + case _ ⇒ //Someone else is in process of registering an extension for this Extension, retry registerExtension(ext) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/MiscMessageSerializer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/MiscMessageSerializer.scala index c7ef5773f0..96d6a9dc75 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/MiscMessageSerializer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/MiscMessageSerializer.scala @@ -15,10 +15,10 @@ import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { private val resolver = ActorRefResolver(system.toTyped) - private val actorRefManifest = "a" + private val ActorRefManifest = "a" def manifest(o: AnyRef): String = o match { - case ref: ActorRef[_] ⇒ actorRefManifest + case ref: ActorRef[_] ⇒ ActorRefManifest case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") } @@ -30,7 +30,7 @@ class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends } def fromBinary(bytes: Array[Byte], manifest: String): ActorRef[Any] = manifest match { - case `actorRefManifest` ⇒ resolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8)) + case ActorRefManifest ⇒ resolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8)) case _ ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SupervisionMechanics.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SupervisionMechanics.scala deleted file mode 100644 index 759f86e040..0000000000 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SupervisionMechanics.scala +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import scala.util.control.NonFatal -import akka.event.Logging -import akka.actor.typed.Behavior.{ DeferredBehavior, undefer, validateAsInitial } -import akka.actor.typed.Behavior.StoppedBehavior -import akka.util.OptionVal - -/** - * INTERNAL API - */ -private[typed] trait SupervisionMechanics[T] { - /* - * INTERFACE WITH ACTOR CELL - */ - protected def system: ActorSystem[Nothing] - protected def initialBehavior: Behavior[T] - protected def self: ActorRefImpl[T] - protected def parent: ActorRefImpl[Nothing] - protected def behavior: Behavior[T] - protected def behavior_=(b: Behavior[T]): Unit - protected def next(b: Behavior[T], msg: Any): Unit - protected def terminatingMap: Map[String, ActorRefImpl[Nothing]] - protected def stopAll(): Unit - protected def setTerminating(): Unit - protected def setClosed(): Unit - protected def maySend: Boolean - protected def ctx: ActorContext[T] - protected def publish(e: Logging.LogEvent): Unit - protected def clazz(obj: AnyRef): Class[_] - - // INTERFACE WITH DEATHWATCH - protected def addWatcher(watchee: ActorRefImpl[Nothing], watcher: ActorRefImpl[Nothing]): Unit - protected def remWatcher(watchee: ActorRefImpl[Nothing], watcher: ActorRefImpl[Nothing]): Unit - protected def watchedActorTerminated(actor: ActorRefImpl[Nothing], failure: Throwable): Boolean - protected def tellWatchersWeDied(): Unit - protected def unwatchWatchedActors(): Unit - - /** - * Process one system message and return whether further messages shall be processed. - */ - protected def processSignal(message: SystemMessage): Boolean = { - message match { - case Watch(watchee, watcher) ⇒ { addWatcher(watchee.sorryForNothing, watcher.sorryForNothing); true } - case Unwatch(watchee, watcher) ⇒ { remWatcher(watchee.sorryForNothing, watcher.sorryForNothing); true } - case DeathWatchNotification(a, f) ⇒ watchedActorTerminated(a.sorryForNothing, f) - case Create() ⇒ create() - case Terminate() ⇒ terminate() - case NoMessage ⇒ false // only here to suppress warning - } - } - - private[this] var _failed: Throwable = null - protected def failed: Throwable = _failed - - protected def fail(thr: Throwable): Unit = { - if (_failed eq null) _failed = thr - publish(Logging.Error(thr, self.path.toString, getClass, thr.getMessage)) - if (maySend) self.sendSystem(Terminate()) - } - - private def create(): Boolean = { - behavior = initialBehavior - if (system.settings.untyped.DebugLifecycle) - publish(Logging.Debug(self.path.toString, clazz(behavior), "started")) - behavior = validateAsInitial(undefer(behavior, ctx)) - if (!Behavior.isAlive(behavior)) self.sendSystem(Terminate()) - true - } - - private def terminate(): Boolean = { - setTerminating() - unwatchWatchedActors() - stopAll() - if (terminatingMap.isEmpty) { - finishTerminate() - false - } else true - } - - protected def finishTerminate(): Unit = { - val a = behavior - /* - * The following order is crucial for things to work properly. Only change this if you're very confident and lucky. - * - * - */ - try a match { - case null ⇒ // skip PostStop - case _: DeferredBehavior[_] ⇒ - // Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination. - case s: StoppedBehavior[_] ⇒ s.postStop match { - case OptionVal.Some(postStop) ⇒ Behavior.interpretSignal(postStop, ctx, PostStop) - case OptionVal.None ⇒ // no postStop behavior defined - } - case _ ⇒ Behavior.interpretSignal(a, ctx, PostStop) - } catch { case NonFatal(ex) ⇒ publish(Logging.Error(ex, self.path.toString, clazz(a), "failure during PostStop")) } - finally try tellWatchersWeDied() - finally try parent.sendSystem(DeathWatchNotification(self, failed)) - finally { - behavior = null - _failed = null - setClosed() - if (system.settings.untyped.DebugLifecycle) - publish(Logging.Debug(self.path.toString, clazz(a), "stopped")) - } - } -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala index ee24411f5d..100b63956d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorContextAdapter.scala @@ -27,7 +27,7 @@ import akka.actor.typed.Behavior.UntypedBehavior ActorContextAdapter.spawnAnonymous(untyped, behavior, props) override def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty) = ActorContextAdapter.spawn(untyped, behavior, name, props) - override def stop[U](child: ActorRef[U]) = + override def stop[U](child: ActorRef[U]): Boolean = toUntyped(child) match { case f: akka.actor.FunctionRef ⇒ val cell = untyped.asInstanceOf[akka.actor.ActorCell] @@ -63,7 +63,6 @@ import akka.actor.typed.Behavior.UntypedBehavior val ref = cell.addFunctionRef((_, msg) ⇒ untyped.self ! f(msg.asInstanceOf[U]), _name) ActorRefAdapter[U](ref) } - } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 4ee557f2e8..55e295054a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -6,11 +6,8 @@ package akka.actor.typed.javadsl import java.util.function.{ Function ⇒ JFunction } import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange -import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem +import akka.actor.typed._ import java.util.Optional -import akka.actor.typed.Behavior -import akka.actor.typed.Props import scala.concurrent.duration.FiniteDuration import scala.concurrent.ExecutionContextExecutor @@ -30,7 +27,7 @@ import scala.concurrent.ExecutionContextExecutor * * An `ActorContext` in addition provides access to the Actor’s own identity (“`getSelf`”), * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it - * created, access to [[Terminated DeathWatch]] and timed message scheduling. + * created, access to [[Terminated]] and timed message scheduling. */ @DoNotInherit @ApiMayChange diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index d5525e6ded..bb1ecb870d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -8,11 +8,7 @@ import scala.concurrent.duration.FiniteDuration import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit -import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem -import akka.actor.typed.Behavior -import akka.actor.typed.Props -import akka.actor.typed.EmptyProps +import akka.actor.typed._ /** * An Actor is given by the combination of a [[Behavior]] and a context in @@ -30,7 +26,7 @@ import akka.actor.typed.EmptyProps * * An `ActorContext` in addition provides access to the Actor’s own identity (“`self`”), * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it - * created, access to [[Terminated DeathWatch]] and timed message scheduling. + * created, access to [[Terminated]] and timed message scheduling. */ @DoNotInherit @ApiMayChange @@ -90,7 +86,7 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒ def stop[U](child: ActorRef[U]): Boolean /** - * Register for [[Terminated]] notification once the Actor identified by the + * Register for [[akka.actor.typed.Terminated]] notification once the Actor identified by the * given [[ActorRef]] terminates. This message is also sent when the watched actor * is on a node that has been removed from the cluster when using akka-cluster * or has been marked unreachable when using akka-remote directly diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala index 505b91731b..a552eca8d9 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala @@ -17,7 +17,8 @@ import akka.persistence.typed.scaladsl.PersistentActor._ object PersistentActorSpec { - val config = ConfigFactory.parseString(""" + val config = ConfigFactory.parseString( + """ akka.persistence.journal.plugin = "akka.persistence.journal.inmem" """) @@ -109,6 +110,7 @@ object PersistentActorSpec { } class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eventually with StartSupport { + import PersistentActorSpec._ implicit val testSettings = TestKitSettings(system) @@ -215,9 +217,13 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve // behavior is running as an untyped PersistentActor it's not possible to // wrap it in Actor.deferred or Actor.supervise pending + val probe = TestProbe[State] val behavior = Actor.supervise[Command](counter("c13")) .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1)) val c = start(behavior) + c ! Increment + c ! GetValue(probe.ref) + probe.expectMsg(State(1, Vector(0))) } }