From d6e5b0a46b4d83c9889f7bc79e198fb5f48e4da3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 25 Sep 2012 14:12:04 +0200 Subject: [PATCH 01/22] Terminate remote-deployed actors when system shutdown, see #2552 * Termination hook mechanism in system guardian to be able to notify RemoteSystemDaemon and wait for it to terminate children * Stopping the children will trigger ordinary death watch mechanism, in for example routers * Note bug in RemoteSystemDaemon, watch of children was not done properly, which might have been a memory leak for remote deployed actors. --- .../src/main/scala/akka/actor/ActorRef.scala | 8 +++++ .../scala/akka/actor/ActorRefProvider.scala | 33 +++++++++++++++++-- .../main/scala/akka/remote/RemoteDaemon.scala | 24 ++++++++++++-- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 615c4ef92e..b8b2164403 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -13,6 +13,7 @@ import scala.annotation.tailrec import java.util.concurrent.{ ConcurrentHashMap } import akka.event.LoggingAdapter import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.collection.JavaConverters /** * Immutable and serializable handle to an actor, which may or may not reside @@ -516,4 +517,11 @@ private[akka] class VirtualPathContainer( } } } + + def hasChildren: Boolean = !children.isEmpty + + def allChildren: Iterable[ActorRef] = { + import scala.collection.JavaConverters._ + children.values.asScala + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 3f96bd839c..5aa6049c74 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -300,6 +300,23 @@ trait ActorRefFactory { */ private[akka] case class StopChild(child: ActorRef) +/** + * INTERNAL API + */ +private[akka] object Guardian { + /** + * For the purpose of orderly shutdown it's possible + * to register interest in the termination of systemGuardian + * and receive a notification [[akka.actor.Guardian.TerminationHook]] + * before systemGuardian is stopped. The registered hook is supposed + * to reply with [[akka.actor.Guardian.TerminationHookDone]] and the + * systemGuardian will not stop until all registered hooks have replied. + */ + case object RegisterTerminationHook + case object TerminationHook + case object TerminationHookDone +} + /** * Local ActorRef provider. */ @@ -374,11 +391,21 @@ class LocalActorRefProvider( } private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor { + import Guardian._ + + var terminationHooks = Set.empty[ActorRef] def receive = { - case Terminated(_) ⇒ if (isSystem) eventStream.stopDefaultLoggers(); context.stop(self) - case StopChild(child) ⇒ context.stop(child) - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case Terminated(_) ⇒ terminationHooks foreach { _ ! TerminationHook }; stopWhenAllTerminationHooksDone() + case StopChild(child) ⇒ context.stop(child) + case RegisterTerminationHook ⇒ terminationHooks += sender + case TerminationHookDone ⇒ terminationHooks -= sender; stopWhenAllTerminationHooksDone() + case m ⇒ deadLetters ! DeadLetter(m, sender, self) + } + + def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) { + if (isSystem) eventStream.stopDefaultLoggers() + context.stop(self) } // guardian MUST NOT lose its children during restart diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 53023687c0..ccdeee439b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -24,6 +24,12 @@ private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: Str private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) extends VirtualPathContainer(system.provider, _path, _parent, _log) { + import akka.actor.Guardian._ + + system.provider.systemGuardian.tell(RegisterTerminationHook, this) + + @volatile private var terminating = false + /** * Find the longest matching path which we know about and return that ref * (or ask that ref to continue searching if elements are left). @@ -50,7 +56,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath } override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { - case message: DaemonMsg ⇒ + case message: DaemonMsg if !terminating ⇒ log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) message match { case DaemonMsgCreate(props, deploy, path, supervisor) ⇒ @@ -63,18 +69,30 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], path, systemService = false, Some(deploy), lookupDeploy = true, async = false) addChild(subpath.mkString("/"), actor) - this.sendSystemMessage(Watch(actor, this)) + actor.sendSystemMessage(Watch(actor, this)) case _ ⇒ log.error("remote path does not match path from message [{}]", message) } } + case message: DaemonMsg if terminating ⇒ + log.debug("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address) + case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒ removeChild(child.path.elements.drop(1).mkString("/")) + terminationHookDoneWhenNoChildren() case t: Terminated ⇒ - case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) + case TerminationHook ⇒ + terminating = true + terminationHookDoneWhenNoChildren() + allChildren foreach system.stop + + case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } + def terminationHookDoneWhenNoChildren(): Unit = if (terminating && !hasChildren) + system.provider.systemGuardian.tell(TerminationHookDone, this) + } From fd757fb680485c997b55a4ceda80e961666af77a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 25 Sep 2012 15:08:08 +0200 Subject: [PATCH 02/22] Terminate remote deployed actors when parent node goes down, see #2551 * RemoteSystemDaemon listens to AddressTerminated and stops matching children --- .../src/main/scala/akka/remote/RemoteDaemon.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index ccdeee439b..ac07e2211c 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -5,7 +5,7 @@ package akka.remote import scala.annotation.tailrec -import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor } +import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated } import akka.event.LoggingAdapter import akka.dispatch.Watch import akka.actor.ActorRefWithCell @@ -26,9 +26,11 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath import akka.actor.Guardian._ + @volatile private var terminating = false + system.provider.systemGuardian.tell(RegisterTerminationHook, this) - @volatile private var terminating = false + system.eventStream.subscribe(this, classOf[AddressTerminated]) /** * Find the longest matching path which we know about and return that ref @@ -89,6 +91,9 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath terminationHookDoneWhenNoChildren() allChildren foreach system.stop + case AddressTerminated(address) ⇒ + allChildren filter { _.asInstanceOf[InternalActorRef].getParent.path.address == address } foreach system.stop + case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } From 490d723f8135233f72dc37a2a8996c1675a5e8f1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 25 Sep 2012 16:30:20 +0200 Subject: [PATCH 03/22] Incorporate review feedback, see 2551 --- akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index ac07e2211c..581ac94498 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -92,7 +92,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath allChildren foreach system.stop case AddressTerminated(address) ⇒ - allChildren filter { _.asInstanceOf[InternalActorRef].getParent.path.address == address } foreach system.stop + allChildren foreach { case a: InternalActorRef if a.getParent.path.address == address ⇒ system.stop(a) } case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } From a38694bd69e6733c7bcae24702754f31702d8d1a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 25 Sep 2012 18:35:07 +0200 Subject: [PATCH 04/22] Remove race when terminating, see #2552 --- .../main/scala/akka/remote/RemoteDaemon.scala | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index ccdeee439b..eb13428ca5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -10,6 +10,7 @@ import akka.event.LoggingAdapter import akka.dispatch.Watch import akka.actor.ActorRefWithCell import akka.actor.ActorRefScope +import akka.util.Switch private[akka] sealed trait DaemonMsg private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg @@ -28,7 +29,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath system.provider.systemGuardian.tell(RegisterTerminationHook, this) - @volatile private var terminating = false + private val terminating = new Switch(false) /** * Find the longest matching path which we know about and return that ref @@ -56,7 +57,7 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath } override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { - case message: DaemonMsg if !terminating ⇒ + case message: DaemonMsg ⇒ log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) message match { case DaemonMsgCreate(props, deploy, path, supervisor) ⇒ @@ -66,18 +67,19 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath // TODO RK canonicalize path so as not to duplicate it always #1446 val subpath = elems.drop(1) val path = this.path / subpath - val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], - path, systemService = false, Some(deploy), lookupDeploy = true, async = false) - addChild(subpath.mkString("/"), actor) - actor.sendSystemMessage(Watch(actor, this)) + terminating.fold( + log.debug("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address)) { + + val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], + path, systemService = false, Some(deploy), lookupDeploy = true, async = false) + addChild(subpath.mkString("/"), actor) + actor.sendSystemMessage(Watch(actor, this)) + } case _ ⇒ log.error("remote path does not match path from message [{}]", message) } } - case message: DaemonMsg if terminating ⇒ - log.debug("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address) - case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒ removeChild(child.path.elements.drop(1).mkString("/")) terminationHookDoneWhenNoChildren() @@ -85,14 +87,15 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath case t: Terminated ⇒ case TerminationHook ⇒ - terminating = true - terminationHookDoneWhenNoChildren() - allChildren foreach system.stop + terminating.switchOn { + terminationHookDoneWhenNoChildren() + allChildren foreach system.stop + } case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } - def terminationHookDoneWhenNoChildren(): Unit = if (terminating && !hasChildren) + def terminationHookDoneWhenNoChildren(): Unit = if (terminating.isOn && !hasChildren) system.provider.systemGuardian.tell(TerminationHookDone, this) } From 8956523d5fa3cf62daf799f2f1a6ec1ed4ec9b24 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 26 Sep 2012 14:02:21 +0200 Subject: [PATCH 05/22] Clean up remote-deployed children when target node goes down, see #2550 * ClusterActorRefProvider watch remote deployed actors and sends ChildTerminated when AddressTerminated * Added addressTerminated flag to Terminated to know when the Terminated was generated from a AddressTerminated * Extra removal of child when the Terminated originates from AddressTerminated to support immediate creation of child with same name --- .../src/main/scala/akka/actor/Actor.scala | 4 +- .../src/main/scala/akka/actor/ActorCell.scala | 9 ++- .../scala/akka/actor/dungeon/DeathWatch.scala | 2 +- .../cluster/ClusterActorRefProvider.scala | 56 ++++++++++++++++++- .../akka/cluster/ClusterDeathWatchSpec.scala | 45 +++++++++++++++ .../ClusterConsistentHashingRouterSpec.scala | 23 ++++++-- .../main/scala/akka/remote/netty/Client.scala | 7 ++- 7 files changed, 134 insertions(+), 12 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b5f2093f90..08a4a84598 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -67,7 +67,9 @@ case object Kill extends Kill { * to another actor you should send the information in your own message. */ @SerialVersionUID(1L) -case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage +case class Terminated private[akka] (@BeanProperty actor: ActorRef)( + @BeanProperty val existenceConfirmed: Boolean, + @BeanProperty val addressTerminated: Boolean = false) extends AutoReceivedMessage /** * INTERNAL API diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 504ec5f762..2a28e4e8be 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -380,8 +380,13 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { - case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) - case t: Terminated ⇒ watchedActorTerminated(t) + case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) + case t: Terminated ⇒ + // when a parent is watching a child and it terminates due to AddressTerminated, + // it should be removed to support immediate creation of child with same name + if (t.addressTerminated) + childrenRefs.getByRef(t.actor) foreach { crs ⇒ removeChildAndGetStateChange(crs.child) } + watchedActorTerminated(t) case AddressTerminated(address) ⇒ addressTerminated(address) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 5407afc2c8..2b8ea2322b 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -118,7 +118,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ // existenceConfirmed = false because we could have been watching a // non-local ActorRef that had never resolved before the other node went down for (a ← watching; if a.path.address == address) { - self ! Terminated(a)(existenceConfirmed = false) + self ! Terminated(a)(existenceConfirmed = false, addressTerminated = true) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 03d9982bd1..5a679fe555 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -5,19 +5,30 @@ package akka.cluster import com.typesafe.config.Config import akka.ConfigurationException +import akka.actor.Actor +import akka.actor.ActorPath +import akka.actor.ActorRef import akka.actor.ActorSystem +import akka.actor.ActorSystemImpl import akka.actor.Deploy import akka.actor.DynamicAccess +import akka.actor.InternalActorRef import akka.actor.NoScopeGiven +import akka.actor.Props import akka.actor.Scheduler import akka.actor.Scope +import akka.actor.Terminated import akka.cluster.routing.ClusterRouterConfig +import akka.cluster.routing.ClusterRouterSettings +import akka.dispatch.ChildTerminated import akka.event.EventStream import akka.remote.RemoteActorRefProvider import akka.remote.RemoteDeployer import akka.remote.routing.RemoteRouterConfig -import akka.cluster.routing.ClusterRouterSettings +/** + * INTERNAL API + */ class ClusterActorRefProvider( _systemName: String, _settings: ActorSystem.Settings, @@ -26,10 +37,53 @@ class ClusterActorRefProvider( _dynamicAccess: DynamicAccess) extends RemoteActorRefProvider( _systemName, _settings, _eventStream, _scheduler, _dynamicAccess) { + @volatile private var remoteDeploymentWatcher: ActorRef = _ + + override def init(system: ActorSystemImpl): Unit = { + super.init(system) + + remoteDeploymentWatcher = system.systemActorOf(Props[RemoteDeploymentWatcher], "RemoteDeploymentWatcher") + } + override val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess) + /** + * This method is overridden here to keep track of remote deployed actors to + * be able to clean up corresponding child references. + */ + override def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = { + super.useActorOnNode(path, props, deploy, supervisor) + remoteDeploymentWatcher ! (actorFor(path), supervisor) + } + } +/** + * INTERNAL API + * + * Responsible for cleaning up child references of remote deployed actors when remote node + * goes down (jvm crash, network failure), i.e. triggered by [[akka.actor.AddressTerminated]]. + */ +private[akka] class RemoteDeploymentWatcher extends Actor { + var supervisors = Map.empty[ActorRef, InternalActorRef] + + def receive = { + case (a: ActorRef, supervisor: InternalActorRef) ⇒ + supervisors += (a -> supervisor) + context.watch(a) + + case t @ Terminated(a) if supervisors isDefinedAt a ⇒ + // send extra ChildTerminated to the supervisor so that it will remove the child + if (t.addressTerminated) supervisors(a).sendSystemMessage(ChildTerminated(a)) + supervisors -= a + } +} + +/** + * INTERNAL API + * + * Deployer of cluster aware routers. + */ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends RemoteDeployer(_settings, _pm) { override def parseConfig(path: String, config: Config): Option[Deploy] = { super.parseConfig(path, config) match { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 2c891531b5..b8554c995b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -14,6 +14,9 @@ import akka.actor.Address import akka.actor.RootActorPath import akka.actor.Terminated import akka.actor.Address +import akka.remote.RemoteActorRef +import java.util.concurrent.TimeoutException +import akka.actor.ActorSystemImpl object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -22,6 +25,12 @@ object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) + + deployOn(fourth, """/hello.remote = "@first@" """) + + class Hello extends Actor { + def receive = Actor.emptyBehavior + } } class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec @@ -114,5 +123,41 @@ abstract class ClusterDeathWatchSpec enterBarrier("after-3") } + "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in { + runOn(fourth) { + val hello = system.actorOf(Props[Hello], "hello") + hello.isInstanceOf[RemoteActorRef] must be(true) + hello.path.address must be(address(first)) + watch(hello) + enterBarrier("hello-deployed") + + markNodeAsUnavailable(first) + val t = expectMsgType[Terminated] + t.actor must be(hello) + + enterBarrier("first-unavailable") + + system.shutdown() + val timeout = remaining + try system.awaitTermination(timeout) catch { + case _: TimeoutException ⇒ + fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout, + system.asInstanceOf[ActorSystemImpl].printTree)) + } + } + + runOn(first, second, third) { + enterBarrier("hello-deployed") + enterBarrier("first-unavailable") + runOn(first) { + // fourth system will be shutdown, remove to not participate in barriers any more + testConductor.removeNode(fourth) + } + + enterBarrier("after-4") + } + + } + } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala index c39edd8a13..2aee4c1f50 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala @@ -104,9 +104,24 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC expectMsg(destinationA) } enterBarrier("after-2") + + runOn(first) { + testConductor.shutdown(second, 0) + enterBarrier("second-shutdown") + + println("## sleeping in master") + Thread.sleep(15000) + println("## done sleeping in master") + } + + runOn(second, third) { + enterBarrier("second-shutdown") + Thread.sleep(15000) + } + } - "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in { + "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest ignore { awaitClusterUp(first, second, third) @@ -121,7 +136,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-3") } - "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { + "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest ignore { runOn(first) { val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(), settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2))), "router2") @@ -135,7 +150,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-4") } - "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in { + "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest ignore { runOn(first) { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s @@ -149,7 +164,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-5") } - "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in { + "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest ignore { runOn(first) { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 0b66cd3b7c..62bf26e675 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -213,11 +213,12 @@ private[akka] class ActiveRemoteClient private[akka] ( reconnectionTimeWindowStart = System.currentTimeMillis true } else { - val timeLeft = (settings.ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 - if (timeLeft) + val timeLeft = (settings.ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) + val hasTimeLeft = timeLeft > 0 + if (hasTimeLeft) log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) - timeLeft + hasTimeLeft } } From c6dd37607cf154c68934cfc7d844e7a1f425d522 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 26 Sep 2012 14:11:19 +0200 Subject: [PATCH 06/22] Revert wrong changes in ClusterConsistentHashingRouterSpec --- .../ClusterConsistentHashingRouterSpec.scala | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala index 2aee4c1f50..c39edd8a13 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala @@ -104,24 +104,9 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC expectMsg(destinationA) } enterBarrier("after-2") - - runOn(first) { - testConductor.shutdown(second, 0) - enterBarrier("second-shutdown") - - println("## sleeping in master") - Thread.sleep(15000) - println("## done sleeping in master") - } - - runOn(second, third) { - enterBarrier("second-shutdown") - Thread.sleep(15000) - } - } - "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest ignore { + "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) @@ -136,7 +121,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-3") } - "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest ignore { + "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(), settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2))), "router2") @@ -150,7 +135,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-4") } - "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest ignore { + "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in { runOn(first) { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s @@ -164,7 +149,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-5") } - "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest ignore { + "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in { runOn(first) { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s From 85a4743a47ba2f3fc0ae9d3cbd5536fab626e409 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Sep 2012 08:33:54 +0200 Subject: [PATCH 07/22] Correct logic in Switch.whileOff, see #2569 --- akka-actor/src/main/scala/akka/util/LockUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 91c837063a..8279fcc7df 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -115,7 +115,7 @@ class Switch(startAsOn: Boolean = false) { * Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance */ def whileOff(action: ⇒ Unit): Boolean = synchronized { - if (switch.get) { + if (!switch.get) { action true } else false From 7779e493129203ff951545e6e0a2f11f5cc1e51a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Sep 2012 08:35:01 +0200 Subject: [PATCH 08/22] Use whileOff instead of fold to avoid confusion, see #2552 --- .../src/main/scala/akka/remote/RemoteDaemon.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 0ef6b054cb..4bf4f18e7a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -69,14 +69,13 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath // TODO RK canonicalize path so as not to duplicate it always #1446 val subpath = elems.drop(1) val path = this.path / subpath - terminating.fold( - log.debug("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address)) { - - val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], - path, systemService = false, Some(deploy), lookupDeploy = true, async = false) - addChild(subpath.mkString("/"), actor) - actor.sendSystemMessage(Watch(actor, this)) - } + val isTerminating = !terminating.whileOff { + val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], + path, systemService = false, Some(deploy), lookupDeploy = true, async = false) + addChild(subpath.mkString("/"), actor) + actor.sendSystemMessage(Watch(actor, this)) + } + if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address) case _ ⇒ log.error("remote path does not match path from message [{}]", message) } From 7d47091e59c0ddce3f2aacd5b14c0d5de370069b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Sep 2012 08:57:38 +0200 Subject: [PATCH 09/22] Add some extra safety to Guardian termination hook, see #2552 --- .../scala/akka/actor/ActorRefProvider.scala | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 5aa6049c74..d0cfb91a04 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -396,11 +396,27 @@ class LocalActorRefProvider( var terminationHooks = Set.empty[ActorRef] def receive = { - case Terminated(_) ⇒ terminationHooks foreach { _ ! TerminationHook }; stopWhenAllTerminationHooksDone() - case StopChild(child) ⇒ context.stop(child) - case RegisterTerminationHook ⇒ terminationHooks += sender - case TerminationHookDone ⇒ terminationHooks -= sender; stopWhenAllTerminationHooksDone() - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case Terminated(a) if terminationHooks.contains(a) ⇒ terminationHooks -= a + case Terminated(_) ⇒ + context.become(terminating) + terminationHooks foreach { _ ! TerminationHook } + stopWhenAllTerminationHooksDone() + case StopChild(child) ⇒ context.stop(child) + case RegisterTerminationHook if sender != context.system.deadLetters ⇒ + terminationHooks += sender + context watch sender + case m ⇒ deadLetters ! DeadLetter(m, sender, self) + } + + def terminating: Receive = { + case Terminated(a) if terminationHooks.contains(a) ⇒ stopWhenAllTerminationHooksDone(a) + case TerminationHookDone ⇒ stopWhenAllTerminationHooksDone(sender) + case m ⇒ deadLetters ! DeadLetter(m, sender, self) + } + + def stopWhenAllTerminationHooksDone(remove: ActorRef): Unit = { + terminationHooks -= sender + stopWhenAllTerminationHooksDone() } def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) { From e7a075a0d973f39f9d607b6dfd5f44eaf8f055e3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Sep 2012 09:20:52 +0200 Subject: [PATCH 10/22] foreachChild instead of allChildren, see #2552 --- akka-actor/src/main/scala/akka/actor/ActorRef.scala | 8 ++++---- akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index b8b2164403..65e8568832 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -10,7 +10,7 @@ import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.serialization.{ Serialization, JavaSerializer } import akka.event.EventStream import scala.annotation.tailrec -import java.util.concurrent.{ ConcurrentHashMap } +import java.util.concurrent.ConcurrentHashMap import akka.event.LoggingAdapter import scala.concurrent.forkjoin.ThreadLocalRandom import scala.collection.JavaConverters @@ -520,8 +520,8 @@ private[akka] class VirtualPathContainer( def hasChildren: Boolean = !children.isEmpty - def allChildren: Iterable[ActorRef] = { - import scala.collection.JavaConverters._ - children.values.asScala + def foreachChild(f: ActorRef ⇒ Unit) = { + val iter = children.values.iterator + while (iter.hasNext) f(iter.next) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 4bf4f18e7a..83c316f56e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -90,11 +90,11 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath case TerminationHook ⇒ terminating.switchOn { terminationHookDoneWhenNoChildren() - allChildren foreach system.stop + foreachChild { system.stop(_) } } case AddressTerminated(address) ⇒ - allChildren foreach { case a: InternalActorRef if a.getParent.path.address == address ⇒ system.stop(a) } + foreachChild { case a: InternalActorRef if a.getParent.path.address == address ⇒ system.stop(a) } case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } From 2e343eca2441cd0e13ed2a77abff60f5c4eba3f8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Sep 2012 14:28:40 +0200 Subject: [PATCH 11/22] Perform Terminate removeChild under the terminating lock also, see #2552 --- .../src/main/scala/akka/remote/RemoteDaemon.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 83c316f56e..b899bdbdd1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -82,15 +82,17 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath } case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒ - removeChild(child.path.elements.drop(1).mkString("/")) - terminationHookDoneWhenNoChildren() + terminating.locked { + removeChild(child.path.elements.drop(1).mkString("/")) + terminationHookDoneWhenNoChildren() + } case t: Terminated ⇒ case TerminationHook ⇒ terminating.switchOn { terminationHookDoneWhenNoChildren() - foreachChild { system.stop(_) } + foreachChild { system.stop } } case AddressTerminated(address) ⇒ @@ -99,7 +101,8 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } - def terminationHookDoneWhenNoChildren(): Unit = if (terminating.isOn && !hasChildren) - system.provider.systemGuardian.tell(TerminationHookDone, this) + def terminationHookDoneWhenNoChildren(): Unit = terminating.whileOn { + if (!hasChildren) system.provider.systemGuardian.tell(TerminationHookDone, this) + } } From 72b94f994e7acee3fb6e1936040377766e178d6c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Sep 2012 14:28:52 +0200 Subject: [PATCH 12/22] Verify proper shutdown when remote deployed actor, see #2552 --- .../akka/remote/testkit/MultiNodeSpec.scala | 17 ++++-- .../akka/remote/NewRemoteActorSpec.scala | 53 ++++++++++++++----- 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index ceb1dfbe15..e457542e89 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -12,7 +12,7 @@ import akka.actor._ import akka.util.Timeout import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.remote.RemoteActorRefProvider -import akka.testkit.TestKit +import akka.testkit._ import scala.concurrent.{ Await, Awaitable } import scala.util.control.NonFatal import scala.concurrent.util.Duration @@ -246,14 +246,23 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: } } system.shutdown() - try system.awaitTermination(5 seconds) catch { + val shutdownTimeout = 5.seconds.dilated + try system.awaitTermination(shutdownTimeout) catch { case _: TimeoutException ⇒ - system.log.warning("Failed to stop [{}] within 5 seconds", system.name) - println(system.asInstanceOf[ActorSystemImpl].printTree) + val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, shutdownTimeout, + system.asInstanceOf[ActorSystemImpl].printTree) + if (verifySystemShutdown) throw new RuntimeException(msg) + else system.log.warning(msg) } atTermination() } + /** + * Override this and return `true` to assert that the + * shutdown of the `ActorSystem` was done properly. + */ + def verifySystemShutdown: Boolean = false + /* * Test Class Interface */ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala index 6b3f081968..f7820ae8d3 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala @@ -3,14 +3,17 @@ */ package akka.remote +import language.postfixOps import com.typesafe.config.ConfigFactory - import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props import akka.pattern.ask -import testkit.{STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec} +import testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec } import akka.testkit._ +import akka.actor.Terminated +import scala.concurrent.util.duration._ +import com.typesafe.config.ConfigFactory object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { @@ -20,12 +23,16 @@ object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { } } - commonConfig(debugConfig(on = false)) + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off"))) val master = role("master") val slave = role("slave") - deployOn(master, """/service-hello.remote = "@slave@" """) + deployOn(master, """ + /service-hello.remote = "@slave@" + /service-hello3.remote = "@slave@" + """) deployOnAll("""/service-hello2.remote = "@slave@" """) } @@ -37,7 +44,10 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) with STMultiNodeSpec with ImplicitSender with DefaultTimeout { import NewRemoteActorMultiJvmSpec._ - def initialParticipants = 2 + def initialParticipants = roles.size + + // ensure that system shutdown is successful + override def verifySystemShutdown = true "A new remote actor" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { @@ -45,14 +55,11 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) runOn(master) { val actor = system.actorOf(Props[SomeActor], "service-hello") actor.isInstanceOf[RemoteActorRef] must be(true) + actor.path.address must be(node(slave).address) val slaveAddress = testConductor.getAddressFor(slave).await actor ! "identify" expectMsgType[ActorRef].path.address must equal(slaveAddress) - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) } enterBarrier("done") @@ -63,17 +70,37 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) runOn(master) { val actor = system.actorOf(Props[SomeActor], "service-hello2") actor.isInstanceOf[RemoteActorRef] must be(true) + actor.path.address must be(node(slave).address) val slaveAddress = testConductor.getAddressFor(slave).await actor ! "identify" expectMsgType[ActorRef].path.address must equal(slaveAddress) - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) } enterBarrier("done") } + + "be able to shutdown system when using remote deployed actor" taggedAs LongRunningTest in within(10 seconds) { + runOn(master) { + val actor = system.actorOf(Props[SomeActor], "service-hello3") + actor.isInstanceOf[RemoteActorRef] must be(true) + actor.path.address must be(node(slave).address) + watch(actor) + + enterBarrier("deployed") + + // master system is supposed to be shutdown after slave + // this should be triggered by slave system shutdown + expectMsgPF(remaining) { case Terminated(`actor`) ⇒ true } + } + + runOn(slave) { + enterBarrier("deployed") + } + + // Important that this is the last test. + // It must not be any barriers here. + // verifySystemShutdown = true will ensure that system shutdown is successful + } } } From e22b3ed201fbd60d0497d94c7668cfca0eaf1217 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Sep 2012 15:06:55 +0200 Subject: [PATCH 13/22] Add test for Switch, see #2569 --- .../src/test/scala/akka/util/SwitchSpec.scala | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala b/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala new file mode 100644 index 0000000000..cf272cba88 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.util + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +class SwitchSpec extends WordSpec with MustMatchers { + + "Switch" must { + + "on and off" in { + val s = new Switch(false) + s.isOff must be(true) + s.isOn must be(false) + + s.switchOn("hello") must be(true) + s.isOn must be(true) + s.isOff must be(false) + s.switchOn("hello") must be(false) + s.isOn must be(true) + s.isOff must be(false) + + s.switchOff("hello") must be(true) + s.isOff must be(true) + s.isOn must be(false) + s.switchOff("hello") must be(false) + s.isOff must be(true) + s.isOn must be(false) + } + + "revert when exception" in { + val s = new Switch(false) + intercept[RuntimeException] { + s.switchOn(throw new RuntimeException) + } + s.isOff must be(true) + } + + "run action without locking" in { + val s = new Switch(false) + s.ifOffYield("yes") must be(Some("yes")) + s.ifOnYield("no") must be(None) + s.ifOff("yes") must be(true) + s.ifOn("no") must be(false) + + s.switchOn() + s.ifOnYield("yes") must be(Some("yes")) + s.ifOffYield("no") must be(None) + s.ifOn("yes") must be(true) + s.ifOff("no") must be(false) + } + + "run action with locking" in { + val s = new Switch(false) + s.whileOffYield("yes") must be(Some("yes")) + s.whileOnYield("no") must be(None) + s.whileOff("yes") must be(true) + s.whileOn("no") must be(false) + + s.switchOn() + s.whileOnYield("yes") must be(Some("yes")) + s.whileOffYield("no") must be(None) + s.whileOn("yes") must be(true) + s.whileOff("no") must be(false) + } + + "run first or second action depending on state" in { + val s = new Switch(false) + s.fold("on")("off") must be("off") + s.switchOn() + s.fold("on")("off") must be("on") + } + + "do proper locking" in { + val s = new Switch(false) + + s.locked { + Thread.sleep(500) + s.switchOn() + s.isOn must be(true) + } + + val latch = new CountDownLatch(1) + new Thread { + override def run(): Unit = { + s.switchOff() + latch.countDown() + } + }.start() + + latch.await(5, TimeUnit.SECONDS) + s.isOff must be(true) + } + } +} From 6f727ecd29ad82cccb556e1c63442c7e165a45dc Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 27 Sep 2012 17:07:59 +0200 Subject: [PATCH 14/22] Incorporate review feedback, see #2552 --- .../src/main/scala/akka/actor/ActorRefProvider.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d0cfb91a04..e50f1378c1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -396,8 +396,14 @@ class LocalActorRefProvider( var terminationHooks = Set.empty[ActorRef] def receive = { - case Terminated(a) if terminationHooks.contains(a) ⇒ terminationHooks -= a + case Terminated(a) if terminationHooks.contains(a) ⇒ + // a registered, and watched termination hook terminated before + // termination process of guardian has started + terminationHooks -= a case Terminated(_) ⇒ + // time for the guardian to stop, but first notify all the + // termination hooks, they will reply with TerminationHookDone + // and when all are done the guardian is stopped context.become(terminating) terminationHooks foreach { _ ! TerminationHook } stopWhenAllTerminationHooksDone() @@ -415,7 +421,7 @@ class LocalActorRefProvider( } def stopWhenAllTerminationHooksDone(remove: ActorRef): Unit = { - terminationHooks -= sender + terminationHooks -= remove stopWhenAllTerminationHooksDone() } From ba7a18dde5b141dbb6bdf9e26a79dd253ed9d66f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Sep 2012 11:18:15 +0200 Subject: [PATCH 15/22] Incorporate review feedback, see #2550 --- .../test/scala/akka/actor/ActorSystemSpec.scala | 2 +- .../src/main/scala/akka/actor/Actor.scala | 9 ++++++++- .../src/main/scala/akka/actor/ActorCell.scala | 17 +++++++++++++---- .../src/main/scala/akka/actor/ActorRef.scala | 4 ++-- .../scala/akka/actor/dungeon/DeathWatch.scala | 2 +- .../main/scala/akka/pattern/AskSupport.scala | 4 ++-- .../akka/cluster/ClusterActorRefProvider.scala | 2 ++ 7 files changed, 29 insertions(+), 11 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 9c5dd95dc5..e18e0d56d6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -215,7 +215,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt EventFilter[Exception]("hello", occurrences = 1) intercept { a ! "die" } - val t = probe.expectMsg(Terminated(a)(true)) + val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false)) t.existenceConfirmed must be(true) } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 08a4a84598..633458953e 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -65,11 +65,18 @@ case object Kill extends Kill { * Terminated message can't be forwarded to another actor, since that actor * might not be watching the subject. Instead, if you need to forward Terminated * to another actor you should send the information in your own message. + * + * @param actor the watched actor that terminated + * @param existenceConfirmed is false when the Terminated message was not sent + * directly from the watched actor, but derived from another source, such as + * when watching a non-local ActorRef, which might not have been resolved + * @param addressTerminated the Terminated message was derived from + * that the remote node hosting the watched actor was detected as unreachable */ @SerialVersionUID(1L) case class Terminated private[akka] (@BeanProperty actor: ActorRef)( @BeanProperty val existenceConfirmed: Boolean, - @BeanProperty val addressTerminated: Boolean = false) extends AutoReceivedMessage + @BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage /** * INTERNAL API diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 2a28e4e8be..5ec4545fd1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -382,10 +382,7 @@ private[akka] class ActorCell( msg.message match { case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) case t: Terminated ⇒ - // when a parent is watching a child and it terminates due to AddressTerminated, - // it should be removed to support immediate creation of child with same name - if (t.addressTerminated) - childrenRefs.getByRef(t.actor) foreach { crs ⇒ removeChildAndGetStateChange(crs.child) } + if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor) watchedActorTerminated(t) case AddressTerminated(address) ⇒ addressTerminated(address) case Kill ⇒ throw new ActorKilledException("Kill") @@ -396,6 +393,18 @@ private[akka] class ActorCell( } } + /** + * When a parent is watching a child and it terminates due to AddressTerminated, + * it should be removed to support immediate creation of child with same name. + * + * For remote deployed actors ChildTerminated should be sent to the supervisor + * to clean up child references of remote deployed actors when remote node + * goes down, i.e. triggered by AddressTerminated, but that is the responsibility + * of the ActorRefProvider to handle that scenario. + */ + private def removeChildWhenToAddressTerminated(child: ActorRef): Unit = + childrenRefs.getByRef(child) foreach { crs ⇒ removeChildAndGetStateChange(crs.child) } + final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled) /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 615c4ef92e..90030bd565 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -442,7 +442,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ if (w.watchee == this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false) true case _: Unwatch ⇒ true // Just ignore case _ ⇒ false @@ -467,7 +467,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, override protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ if (w.watchee != this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false) true case w: Unwatch ⇒ true // Just ignore case NullMessage ⇒ true diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 2b8ea2322b..70f79f1d48 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -49,7 +49,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(existenceConfirmed = true) + val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false) try { watchedBy foreach { watcher ⇒ diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 92e34a2294..704ce43d8d 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -261,7 +261,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide case _: Terminate ⇒ stop() case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) { - if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true) + if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, addressTerminated = false) } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this)) case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher) @@ -280,7 +280,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide result tryComplete Failure(new ActorKilledException("Stopped")) val watchers = clearWatchers() if (!watchers.isEmpty) { - val termination = Terminated(this)(existenceConfirmed = true) + val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false) watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 5a679fe555..024dfdc00c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -76,6 +76,8 @@ private[akka] class RemoteDeploymentWatcher extends Actor { // send extra ChildTerminated to the supervisor so that it will remove the child if (t.addressTerminated) supervisors(a).sendSystemMessage(ChildTerminated(a)) supervisors -= a + + case _: Terminated ⇒ } } From b20c32049abd04909c515cb396610ef76cc75dad Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Sep 2012 11:44:28 +0200 Subject: [PATCH 16/22] Use deadline for reconnect window in remote Client --- .../main/scala/akka/remote/netty/Client.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 62bf26e675..4a391af5f0 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -20,6 +20,7 @@ import akka.actor.{ DeadLetter, Address, ActorRef } import akka.util.Switch import scala.util.control.NonFatal import org.jboss.netty.handler.ssl.SslHandler +import scala.concurrent.util.Deadline /** * This is the abstract baseclass for netty remote clients, currently there's only an @@ -106,7 +107,7 @@ private[akka] class ActiveRemoteClient private[akka] ( private[remote] var openChannels: DefaultChannelGroup = _ @volatile - private var reconnectionTimeWindowStart = 0L + private var reconnectionDeadline: Option[Deadline] = None def notifyListeners(msg: RemoteLifeCycleEvent): Unit = netty.notifyListeners(msg) @@ -208,21 +209,18 @@ private[akka] class ActiveRemoteClient private[akka] ( log.debug("[{}] has been shut down", name) } - private[akka] def isWithinReconnectionTimeWindow: Boolean = { - if (reconnectionTimeWindowStart == 0L) { - reconnectionTimeWindowStart = System.currentTimeMillis + private[akka] def isWithinReconnectionTimeWindow: Boolean = reconnectionDeadline match { + case None ⇒ + reconnectionDeadline = Some(Deadline.now + settings.ReconnectionTimeWindow) true - } else { - val timeLeft = (settings.ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) - val hasTimeLeft = timeLeft > 0 + case Some(deadline) ⇒ + val hasTimeLeft = deadline.hasTimeLeft if (hasTimeLeft) - log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) - + log.info("Will try to reconnect to remote server for another [{}] milliseconds", deadline.timeLeft.toMillis) hasTimeLeft - } } - private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L + private[akka] def resetReconnectionTimeWindow = reconnectionDeadline = None } @ChannelHandler.Sharable From dec3184dfa8ec8db5d500b48028f5247dc888bb1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Sep 2012 12:24:17 +0200 Subject: [PATCH 17/22] Separate class for SystemGuardian, see #2552 * Also checks that the Terminated originates for user guardian --- .../scala/akka/actor/ActorRefProvider.scala | 35 ++++++++++++++----- .../main/scala/akka/remote/RemoteDaemon.scala | 2 +- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index e50f1378c1..006e5c1f8a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -303,7 +303,7 @@ private[akka] case class StopChild(child: ActorRef) /** * INTERNAL API */ -private[akka] object Guardian { +private[akka] object SystemGuardian { /** * For the purpose of orderly shutdown it's possible * to register interest in the termination of systemGuardian @@ -390,8 +390,26 @@ class LocalActorRefProvider( } } - private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor { - import Guardian._ + /* + * Root and user guardian + */ + private class Guardian(override val supervisorStrategy: SupervisorStrategy) extends Actor { + + def receive = { + case Terminated(_) ⇒ context.stop(self) + case StopChild(child) ⇒ context.stop(child) + case m ⇒ deadLetters ! DeadLetter(m, sender, self) + } + + // guardian MUST NOT lose its children during restart + override def preRestart(cause: Throwable, msg: Option[Any]) {} + } + + /** + * System guardian + */ + private class SystemGuardian(override val supervisorStrategy: SupervisorStrategy) extends Actor { + import SystemGuardian._ var terminationHooks = Set.empty[ActorRef] @@ -400,13 +418,14 @@ class LocalActorRefProvider( // a registered, and watched termination hook terminated before // termination process of guardian has started terminationHooks -= a - case Terminated(_) ⇒ + case Terminated(`guardian`) ⇒ // time for the guardian to stop, but first notify all the // termination hooks, they will reply with TerminationHookDone // and when all are done the guardian is stopped context.become(terminating) terminationHooks foreach { _ ! TerminationHook } stopWhenAllTerminationHooksDone() + case Terminated(_) ⇒ case StopChild(child) ⇒ context.stop(child) case RegisterTerminationHook if sender != context.system.deadLetters ⇒ terminationHooks += sender @@ -426,7 +445,7 @@ class LocalActorRefProvider( } def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) { - if (isSystem) eventStream.stopDefaultLoggers() + eventStream.stopDefaultLoggers() context.stop(self) } @@ -484,7 +503,7 @@ class LocalActorRefProvider( protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy lazy val rootGuardian: LocalActorRef = - new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy, isSystem = false)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { + new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { case "temp" ⇒ tempContainer @@ -496,7 +515,7 @@ class LocalActorRefProvider( lazy val guardian: LocalActorRef = { val cell = rootGuardian.underlying cell.reserveChild("user") - val ref = new LocalActorRef(system, Props(new Guardian(guardianStrategy, isSystem = false)), rootGuardian, rootPath / "user") + val ref = new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user") cell.initChild(ref) ref } @@ -504,7 +523,7 @@ class LocalActorRefProvider( lazy val systemGuardian: LocalActorRef = { val cell = rootGuardian.underlying cell.reserveChild("system") - val ref = new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy, isSystem = true)), rootGuardian, rootPath / "system") + val ref = new LocalActorRef(system, Props(new SystemGuardian(systemGuardianStrategy)), rootGuardian, rootPath / "system") cell.initChild(ref) ref } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index b899bdbdd1..60a2e7b4b0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -25,7 +25,7 @@ private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: Str private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) extends VirtualPathContainer(system.provider, _path, _parent, _log) { - import akka.actor.Guardian._ + import akka.actor.SystemGuardian._ private val terminating = new Switch(false) From 5e0da159de16c994bb133fb5dba4b256b6786d68 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Sep 2012 17:37:50 +0200 Subject: [PATCH 18/22] Adjust order of receive of Terminated in SystemGuardian, see #2552 --- .../scala/akka/actor/ActorRefProvider.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 006e5c1f8a..d60a46d497 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -414,18 +414,17 @@ class LocalActorRefProvider( var terminationHooks = Set.empty[ActorRef] def receive = { - case Terminated(a) if terminationHooks.contains(a) ⇒ - // a registered, and watched termination hook terminated before - // termination process of guardian has started - terminationHooks -= a case Terminated(`guardian`) ⇒ - // time for the guardian to stop, but first notify all the + // time for the systemGuardian to stop, but first notify all the // termination hooks, they will reply with TerminationHookDone - // and when all are done the guardian is stopped + // and when all are done the systemGuardian is stopped context.become(terminating) terminationHooks foreach { _ ! TerminationHook } stopWhenAllTerminationHooksDone() - case Terminated(_) ⇒ + case Terminated(a) ⇒ + // a registered, and watched termination hook terminated before + // termination process of guardian has started + terminationHooks -= a case StopChild(child) ⇒ context.stop(child) case RegisterTerminationHook if sender != context.system.deadLetters ⇒ terminationHooks += sender @@ -434,9 +433,9 @@ class LocalActorRefProvider( } def terminating: Receive = { - case Terminated(a) if terminationHooks.contains(a) ⇒ stopWhenAllTerminationHooksDone(a) + case Terminated(a) ⇒ stopWhenAllTerminationHooksDone(a) case TerminationHookDone ⇒ stopWhenAllTerminationHooksDone(sender) - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case m ⇒ deadLetters ! DeadLetter(m, sender, self) } def stopWhenAllTerminationHooksDone(remove: ActorRef): Unit = { From 3a3af8f3fb2775d74ca850ab312bbb42f7bcc033 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Fri, 28 Sep 2012 13:06:26 -0600 Subject: [PATCH 19/22] #2576 Correct usage of alumni to alumnus on Team docs page --- akka-docs/rst/dev/team.rst | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/akka-docs/rst/dev/team.rst b/akka-docs/rst/dev/team.rst index 06aab5ee72..279e2c93bd 100644 --- a/akka-docs/rst/dev/team.rst +++ b/akka-docs/rst/dev/team.rst @@ -19,18 +19,18 @@ Martin Krasser Committer krasserm AT googlemail DOT com Raymond Roestenburg Committer Piotr Gabryanczyk Committer Helena Edelson Committer helena AT helenaedelson DOT com -Debasish Ghosh Alumni dghosh AT acm DOT org -Ross McDonald Alumni rossajmcd AT gmail DOT com -Eckhart Hertzler Alumni -Mikael Högqvist Alumni -Tim Perrett Alumni -Jeanfrancois Arcand Alumni jfarcand AT apache DOT org -Jan Van Besien Alumni -Michael Kober Alumni -Peter Veentjer Alumni -Irmo Manie Alumni -Heiko Seeberger Alumni -Hiram Chirino Alumni -Scott Clasen Alumni +Debasish Ghosh Alumnus dghosh AT acm DOT org +Ross McDonald Alumnus rossajmcd AT gmail DOT com +Eckhart Hertzler Alumnus +Mikael Högqvist Alumnus +Tim Perrett Alumnus +Jeanfrancois Arcand Alumnus jfarcand AT apache DOT org +Jan Van Besien Alumnus +Michael Kober Alumnus +Peter Veentjer Alumnus +Irmo Manie Alumnus +Heiko Seeberger Alumnus +Hiram Chirino Alumnus +Scott Clasen Alumnus =================== ========================== ==================================== \ No newline at end of file From d1df32451613a6df95804d7910f0acfe01480851 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Fri, 28 Sep 2012 13:28:38 -0600 Subject: [PATCH 20/22] #2577 Minor cleanup of 'What is Akka' in docs --- akka-docs/rst/intro/what-is-akka.rst | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/akka-docs/rst/intro/what-is-akka.rst b/akka-docs/rst/intro/what-is-akka.rst index b4e83d98a2..dc351b0d22 100644 --- a/akka-docs/rst/intro/what-is-akka.rst +++ b/akka-docs/rst/intro/what-is-akka.rst @@ -38,24 +38,23 @@ See :ref:`actors-scala` and :ref:`untyped-actors-java` Fault Tolerance --------------- -Fault tolerance through supervisor hierarchies with "let-it-crash" -semantics. Excellent for writing highly fault-tolerant systems that never stop, -systems that self-heal. Supervisor hierarchies can span over multiple JVMs to -provide truly fault-tolerant systems. + - Supervisor hierarchies with "let-it-crash" semantics. + - Supervisor hierarchies can span over multiple JVMs to provide truly fault-tolerant systems. + - Excellent for writing highly fault-tolerant systems that self-heal and never stop. See :ref:`fault-tolerance-scala` and :ref:`fault-tolerance-java` Location Transparency --------------------- Everything in Akka is designed to work in a distributed environment: all -interactions of actors use purely message passing and everything is asynchronous. +interactions of actors use pure message passing and everything is asynchronous. For an overview of the remoting see :ref:`remoting` Transactors ----------- -Transactors combine actors and STM (Software Transactional Memory) into transactional actors. +Transactors combine actors and Software Transactional Memory (STM) into transactional actors. It allows you to compose atomic message flows with automatic retry and rollback. See :ref:`transactors-scala` and :ref:`transactors-java` @@ -86,10 +85,10 @@ consisted of two things: - Cluster support for Akka - Monitoring & Management (formerly called Atmos) -Cloudy Akka have been discontinued and the Cluster support is now being moved into the -Open Source version of Akka (the upcoming Akka 2.1), while the Monitoring & Management -(Atmos) is now rebranded into Typesafe Console and is part of the commercial subscription -for the Typesafe Stack (see below for details). +Cloudy Akka has been discontinued and the Cluster support is now being moved into the +Open Source version of Akka (the upcoming Akka 2.1), while Monitoring & Management +(Atmos) has been re-branded as the Typesafe Console, which is now part of the commercial +subscription for the Typesafe Stack (see below for details). Typesafe Stack ============== From 65b67080e22f59176e4a413660ec1bb7c6c69048 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Fri, 28 Sep 2012 14:02:02 -0600 Subject: [PATCH 21/22] #2578 Minor cleanup of 'Why Akka' in docs --- akka-docs/rst/intro/why-akka.rst | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/akka-docs/rst/intro/why-akka.rst b/akka-docs/rst/intro/why-akka.rst index af5c0a7401..85789fdf19 100644 --- a/akka-docs/rst/intro/why-akka.rst +++ b/akka-docs/rst/intro/why-akka.rst @@ -14,8 +14,8 @@ Akka is an unified runtime and programming model for: One thing to learn and admin, with high cohesion and coherent semantics. -Akka is a very scalable piece of software, not only in the performance sense, -but in the size of applications it is useful for. The core of Akka, akka-actor, +Akka is a very scalable piece of software, not only in the context of performance +but also in the size of applications it is useful for. The core of Akka, akka-actor, is very small and easily dropped into an existing project where you need asynchronicity and lockless concurrency without hassle. @@ -31,15 +31,23 @@ job. What's a good use-case for Akka? -------------------------------- -We see Akka being adopted by many large organizations in a big range of industries -all from investment and merchant banking, retail and social media, simulation, -gaming and betting, automobile and traffic systems, health care, data analytics -and much more. Any system that have the need for high-throughput and low latency +We see Akka being adopted by many large organizations in a big range of industries: + +- Investment and Merchant Banking +- Retail +- Social Media +- Simulation +- Gaming and Betting +- Automobile and Traffic Systems +- Health Care +- Data Analytics + +and much more. Any system with the need for high-throughput and low latency is a good candidate for using Akka. -Actors lets you manage service failures (Supervisors), load management (back-off -strategies, timeouts and processing-isolation), both horizontal and vertical -scalability (add more cores and/or add more machines). +Actors let you manage service failures (Supervisors), load management (back-off +strategies, timeouts and processing-isolation), as well as both horizontal and +vertical scalability (add more cores and/or add more machines). Here's what some of the Akka users have to say about how they are using Akka: http://stackoverflow.com/questions/4493001/good-use-case-for-akka From 5070c9ffd480610e48fefaae21eca8a29d527ee0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 2 Oct 2012 09:52:30 +0200 Subject: [PATCH 22/22] Additional assert, see #2550 --- akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index e18e0d56d6..f7a9844c9d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -217,6 +217,7 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt } val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false)) t.existenceConfirmed must be(true) + t.addressTerminated must be(false) } "shut down when /user escalates" in {