diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 9747251806..b49ea69e1f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -72,6 +72,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with implicit val ec = system.dispatcher import akka.routing.RoutingSpec._ + muteDeadLetters("DeathWatchNotification.*")() + "routers in general" must { "evict terminated routees" in { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3c48b3a861..10408ac962 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -503,7 +503,11 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, sender ! ActorIdentity(messageId, None) true case s: SelectChildName ⇒ - s.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) } + s.identifyRequest match { + case Some(identify) ⇒ sender ! ActorIdentity(identify.messageId, None) + case None ⇒ + eventStream.publish(DeadLetter(s.wrappedMessage, if (sender eq Actor.noSender) provider.deadLetters else sender, this)) + } true case _ ⇒ false } @@ -533,15 +537,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, w.watcher.sendSystemMessage( DeathWatchNotification(w.watchee, existenceConfirmed = false, addressTerminated = false)) true - case w: Unwatch ⇒ true // Just ignore - case Identify(messageId) ⇒ - sender ! ActorIdentity(messageId, None) - true - case s: SelectChildName ⇒ - s.identifyRequest foreach { x ⇒ sender ! ActorIdentity(x.messageId, None) } - true - case NullMessage ⇒ true - case _ ⇒ false + case _ ⇒ super.specialHandle(msg, sender) } @throws(classOf[java.io.ObjectStreamException]) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 2268267275..cc369d1897 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -104,14 +104,18 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro sys.eventStream.publish(Mute(EventFilter.info(pattern = s))) } - Seq(".*received dead letter from.*ClientDisconnected", - ".*received dead letter from.*deadLetters.*PoisonPill", - ".*received dead letter from.*Disassociated", - ".*received dead letter from.*DisassociateUnderlying", - ".*received dead letter from.*HandleListenerRegistered", - ".*installing context org.jboss.netty.channel.DefaultChannelPipeline.*") foreach { s ⇒ - sys.eventStream.publish(Mute(EventFilter.warning(pattern = s))) - } + muteDeadLetters( + "Heartbeat.*", + "GossipEnvelope.*", + "ClusterMetricsChanged.*", + "Disassociated.*", + "DisassociateUnderlying.*", + "HandleListenerRegistered.*", + "PoisonPill.*", + "DeathWatchNotification.*", + "NullMessage.*", + "InboundPayload.*")(sys) + } } @@ -119,13 +123,9 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro if (!sys.log.isDebugEnabled) sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*"))) - def muteDeadLetters(sys: ActorSystem = system): Unit = - if (!sys.log.isDebugEnabled) - sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*"))) - override def afterAll(): Unit = { if (!log.isDebugEnabled) { - muteDeadLetters() + muteDeadLetters()() system.eventStream.setLogLevel(ErrorLevel) } super.afterAll() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index f05eb51f35..bf5b3882e7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -60,6 +60,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" akka.test.cluster-stress-spec { + log-stats = off # scale the nr-of-nodes* settings with this factor nr-of-nodes-factor = 1 nr-of-nodes = 13 @@ -147,6 +148,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { private def getDuration(name: String): FiniteDuration = Duration(getMilliseconds(name), MILLISECONDS) + val logStats = getBoolean("log-stats") val nFactor = getInt("nr-of-nodes-factor") val totalNumberOfNodes = getInt("nr-of-nodes") * nFactor ensuring ( _ >= 10, "nr-of-nodes must be >= 10") @@ -211,7 +213,9 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { * expected results has been collected. It shuts down * itself when expected results has been collected. */ - class ClusterResultAggregator(title: String, expectedResults: Int, reportMetricsInterval: FiniteDuration) extends Actor with ActorLogging { + class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging { + import settings.reportMetricsInterval + import settings.logStats val cluster = Cluster(context.system) var reportTo: Option[ActorRef] = None var results = Vector.empty[ClusterResult] @@ -242,12 +246,14 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues case StatsResult(from, stats) ⇒ clusterStatsObservedByNode += from -> stats case ReportTick ⇒ - log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") + if (logStats) + log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") case r: ClusterResult ⇒ results :+= r if (results.size == expectedResults) { val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats) - log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") + if (logStats) + log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") reportTo foreach { _ ! aggregated } context stop self } @@ -665,6 +671,7 @@ abstract class StressSpec sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*"))) sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*"))) sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*ClusterStats.*"))) + muteDeadLetters("SimpleJob.*", "Tick.*", "AggregatedClusterResult.*")(sys) } val seedNodes = roles.take(numberOfSeedNodes) @@ -680,7 +687,7 @@ abstract class StressSpec def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = { runOn(roles.head) { - val aggregator = system.actorOf(Props(new ClusterResultAggregator(title, expectedResults, reportMetricsInterval)), + val aggregator = system.actorOf(Props(new ClusterResultAggregator(title, expectedResults, settings)), name = "result" + step) if (includeInHistory) aggregator ! ReportTo(Some(clusterResultHistory)) else aggregator ! ReportTo(None) @@ -698,7 +705,9 @@ abstract class StressSpec identifyProbe.expectMsgType[ActorIdentity].ref } - lazy val clusterResultHistory = system.actorOf(Props[ClusterResultHistory], "resultHistory") + lazy val clusterResultHistory = + if (settings.logStats) system.actorOf(Props[ClusterResultHistory], "resultHistory") + else system.deadLetters lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver") @@ -953,9 +962,10 @@ abstract class StressSpec def awaitWorkResult: WorkResult = { val workResult = expectMsgType[WorkResult] - log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName, - workResult.jobsPerSecond.form, - workResult.retryCount, workResult.sendCount) + if (settings.logStats) + log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName, + workResult.jobsPerSecond.form, + workResult.retryCount, workResult.sendCount) master match { case Some(m) ⇒ watch(m) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala index 673afbcd1c..efb56fd5a2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala @@ -28,10 +28,6 @@ object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig { commonConfig(ConfigFactory.parseString( """ - # this setting is here to limit the number of retries and failures while the - # node is being blackholed - akka.remote.retry-gate-closed-for = 500 ms - akka.remote.log-remote-lifecycle-events = off akka.cluster.publish-stats-interval = 0s """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))) @@ -167,7 +163,6 @@ abstract class UnreachableNodeJoinsAgainSpec val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress system.shutdown() system.awaitTermination(10 seconds) - Thread.sleep(5000) // create new ActorSystem with same host:port val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" akka.remote.netty.tcp { @@ -178,7 +173,6 @@ abstract class UnreachableNodeJoinsAgainSpec try { Cluster(freshSystem).join(masterAddress) - Thread.sleep(5000) within(15 seconds) { awaitAssert(Cluster(freshSystem).readView.members.map(_.address) must contain(victimAddress)) awaitAssert(Cluster(freshSystem).readView.members.size must be(expectedNumberOfMembers)) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 6815f754c7..086d4f2f6d 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -16,6 +16,7 @@ import akka.util.Timeout import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.remote.RemoteActorRefProvider import akka.testkit._ +import akka.testkit.TestEvent._ import scala.concurrent.duration._ import akka.remote.testconductor.RoleName import akka.actor.RootActorPath @@ -101,7 +102,6 @@ abstract class MultiNodeConfig { if (_testTransport) ConfigFactory.parseString( """ akka.remote.netty.tcp.applied-adapters = [trttl, gremlin] - akka.remote.retry-gate-closed-for = 1 s """) else ConfigFactory.empty @@ -359,6 +359,14 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: */ def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await) + def muteDeadLetters(endPatterns: String*)(sys: ActorSystem = system): Unit = + if (!sys.log.isDebugEnabled) { + def mute(suffix: String): Unit = + sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead.*" + suffix))) + if (endPatterns.isEmpty) mute("") + else endPatterns foreach mute + } + /** * Enrich `.await()` onto all Awaitables, using remaining duration from the innermost * enclosing `within` block or QueryTimeout. diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 17bc4dd7d2..4ba062def5 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -56,6 +56,8 @@ class RemoteWatcherSpec extends AkkaSpec( val remoteSystem = ActorSystem("RemoteSystem", system.settings.config) val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + Seq(system, remoteSystem).foreach(muteDeadLetters("Disassociated.*", "DisassociateUnderlying.*")(_)) + override def afterTermination() { remoteSystem.shutdown() } diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index bea24f76b1..06f2612053 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -54,8 +54,6 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re startup-timeout = 5 s - retry-gate-closed-for = 0 s - use-passive-connections = on } """) diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala index 699cc23a67..2bf7945e3c 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -16,7 +16,6 @@ object AkkaProtocolStressTest { #loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" - remote.retry-gate-closed-for = 0 s remote.log-remote-lifecycle-events = on remote.transport-failure-detector { diff --git a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala index 90853ca5d0..0064249c5a 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala @@ -37,7 +37,6 @@ object SystemMessageDeliveryStressTest { #loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" - remote.retry-gate-closed-for = 0 s remote.log-remote-lifecycle-events = on remote.failure-detector { diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala index 80abedc9a6..ae73303a0c 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -18,7 +18,6 @@ object ThrottlerTransportAdapterSpec { actor.provider = "akka.remote.RemoteActorRefProvider" remote.netty.tcp.hostname = "localhost" - remote.retry-gate-closed-for = 0 s remote.log-remote-lifecycle-events = off remote.netty.tcp.applied-adapters = ["trttl"] diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 4cc59eca6e..e5e9e1b67c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -15,6 +15,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import java.util.concurrent.TimeoutException import akka.dispatch.Dispatchers import akka.pattern.ask +import akka.testkit.TestEvent._ object AkkaSpec { val testConf: Config = ConfigFactory.parseString(""" @@ -93,4 +94,12 @@ abstract class AkkaSpec(_system: ActorSystem) override def expectedTestDuration: FiniteDuration = 60 seconds + def muteDeadLetters(endPatterns: String*)(sys: ActorSystem = system): Unit = + if (!sys.log.isDebugEnabled) { + def mute(suffix: String): Unit = + sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead.*" + suffix))) + if (endPatterns.isEmpty) mute("") + else endPatterns foreach mute + } + }