From 82b8238a9cb99461ab1d0dc53b16c9f3206af3d3 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Fri, 30 Jan 2015 16:34:27 +0100 Subject: [PATCH] fix warnings in remote and cluster --- .../scala/akka/cluster/ClusterDaemon.scala | 11 +++++----- .../scala/akka/cluster/StressSpec.scala | 20 +++++++++---------- .../ClusterConsistentHashingRouterSpec.scala | 8 ++++---- .../ClusterHeartbeatSenderStateSpec.scala | 6 +++--- .../cluster/HeartbeatNodeRingPerfSpec.scala | 4 ++-- .../akka/cluster/ReachabilityPerfSpec.scala | 20 +++++++++---------- .../akka/remote/testconductor/Player.scala | 7 ++++--- .../PiercingShouldKeepQuarantineSpec.scala | 2 -- .../test/scala/akka/remote/RemotingSpec.scala | 16 +++++++-------- .../SystemMessageDeliveryStressTest.scala | 2 +- 10 files changed, 48 insertions(+), 48 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index cf50f8a54f..e4cf868a85 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -163,7 +163,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") def receive = { - case msg @ GetClusterCoreRef ⇒ coreSupervisor forward msg + case msg: GetClusterCoreRef.type ⇒ coreSupervisor forward msg case AddOnMemberUpListener(code) ⇒ context.actorOf(Props(classOf[OnMemberUpListener], code).withDeploy(Deploy.local)) case PublisherCreated(publisher) ⇒ @@ -655,10 +655,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (statsEnabled) { gossipStats = gossipType match { - case Merge ⇒ gossipStats.incrementMergeCount - case Same ⇒ gossipStats.incrementSameCount - case Newer ⇒ gossipStats.incrementNewerCount - case Older ⇒ gossipStats.incrementOlderCount + case Merge ⇒ gossipStats.incrementMergeCount + case Same ⇒ gossipStats.incrementSameCount + case Newer ⇒ gossipStats.incrementNewerCount + case Older ⇒ gossipStats.incrementOlderCount + case Ignored ⇒ gossipStats // included in receivedGossipCount } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index cc8d60c8bd..28a7e04a46 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -246,21 +246,21 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging { import settings.reportMetricsInterval import settings.infolog - val cluster = Cluster(context.system) - var reportTo: Option[ActorRef] = None - var results = Vector.empty[ClusterResult] - var nodeMetrics = Set.empty[NodeMetrics] - var phiValuesObservedByNode = { + private val cluster = Cluster(context.system) + private var reportTo: Option[ActorRef] = None + private var results = Vector.empty[ClusterResult] + private var nodeMetrics = Set.empty[NodeMetrics] + private var phiValuesObservedByNode = { import akka.cluster.Member.addressOrdering immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]] } - var clusterStatsObservedByNode = { + private var clusterStatsObservedByNode = { import akka.cluster.Member.addressOrdering immutable.SortedMap.empty[Address, CurrentInternalStats] } import context.dispatcher - val reportMetricsTask = context.system.scheduler.schedule( + private val reportMetricsTask = context.system.scheduler.schedule( reportMetricsInterval, reportMetricsInterval, self, ReportTick) // subscribe to ClusterMetricsChanged, re-subscribe when restart @@ -441,9 +441,9 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { } class StatsObserver extends Actor { - val cluster = Cluster(context.system) - var reportTo: Option[ActorRef] = None - var startStats: Option[GossipStats] = None + private val cluster = Cluster(context.system) + private var reportTo: Option[ActorRef] = None + private var startStats: Option[GossipStats] = None override def preStart(): Unit = cluster.subscribe(self, classOf[CurrentInternalStats]) override def postStop(): Unit = cluster.unsubscribe(self) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala index 7d3bddde94..0e0a15828f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala @@ -37,7 +37,7 @@ object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig { val third = role("third") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString(""" + withFallback(ConfigFactory.parseString(s""" common-router-settings = { router = consistent-hashing-pool nr-of-instances = 10 @@ -48,9 +48,9 @@ object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig { } akka.actor.deployment { - /router1 = ${common-router-settings} - /router3 = ${common-router-settings} - /router4 = ${common-router-settings} + /router1 = $${common-router-settings} + /router3 = $${common-router-settings} + /router4 = $${common-router-settings} } """)). withFallback(MultiNodeClusterSpec.clusterConfig)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index 184cfea15c..4400182108 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -49,14 +49,14 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4) val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5) - def emptyState: ClusterHeartbeatSenderState = emptyState(aa) + private def emptyState: ClusterHeartbeatSenderState = emptyState(aa) - def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState( + private def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState( ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, monitoredByNrOfMembers = 3), oldReceiversNowUnreachable = Set.empty[UniqueAddress], failureDetector = new DefaultFailureDetectorRegistry[Address](() ⇒ new FailureDetectorStub)) - def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub = + private def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub = state.failureDetector.asInstanceOf[DefaultFailureDetectorRegistry[Address]].failureDetector(node.address). get.asInstanceOf[FailureDetectorStub] diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala index a4012c2938..5564f3ddfc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala @@ -23,10 +23,10 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers { val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize) - def checkThunkForRing(ring: HeartbeatNodeRing, thunk: HeartbeatNodeRing ⇒ Unit, times: Int): Unit = + private def checkThunkForRing(ring: HeartbeatNodeRing, thunk: HeartbeatNodeRing ⇒ Unit, times: Int): Unit = for (i ← 1 to times) thunk(ring) - def myReceivers(ring: HeartbeatNodeRing): Unit = { + private def myReceivers(ring: HeartbeatNodeRing): Unit = { val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, Set.empty, ring.monitoredByNrOfMembers) r.myReceivers.isEmpty should be(false) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala index c79b27a0cb..ae5f6c439e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala @@ -16,7 +16,7 @@ class ReachabilityPerfSpec extends WordSpec with Matchers { val address = Address("akka.tcp", "sys", "a", 2552) val node = Address("akka.tcp", "sys", "a", 2552) - def createReachabilityOfSize(base: Reachability, size: Int): Reachability = + private def createReachabilityOfSize(base: Reachability, size: Int): Reachability = (base /: (1 to size)) { case (r, i) ⇒ val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i) @@ -25,7 +25,7 @@ class ReachabilityPerfSpec extends WordSpec with Matchers { r.unreachable(observer, subject).reachable(observer, subject) } - def addUnreachable(base: Reachability, count: Int): Reachability = { + private def addUnreachable(base: Reachability, count: Int): Reachability = { val observers = base.allObservers.take(count) val subjects = Stream.continually(base.allObservers).flatten.iterator (base /: observers) { @@ -39,43 +39,43 @@ class ReachabilityPerfSpec extends WordSpec with Matchers { val reachability3 = addUnreachable(reachability1, nodesSize / 2) val allowed = reachability1.allObservers - def checkThunkFor(r1: Reachability, r2: Reachability, thunk: (Reachability, Reachability) ⇒ Unit, times: Int): Unit = { + private def checkThunkFor(r1: Reachability, r2: Reachability, thunk: (Reachability, Reachability) ⇒ Unit, times: Int): Unit = { for (i ← 1 to times) { thunk(Reachability(r1.records, r1.versions), Reachability(r2.records, r2.versions)) } } - def checkThunkFor(r1: Reachability, thunk: Reachability ⇒ Unit, times: Int): Unit = { + private def checkThunkFor(r1: Reachability, thunk: Reachability ⇒ Unit, times: Int): Unit = { for (i ← 1 to times) { thunk(Reachability(r1.records, r1.versions)) } } - def merge(expectedRecords: Int)(r1: Reachability, r2: Reachability): Unit = { + private def merge(expectedRecords: Int)(r1: Reachability, r2: Reachability): Unit = { r1.merge(allowed, r2).records.size should be(expectedRecords) } - def checkStatus(r1: Reachability): Unit = { + private def checkStatus(r1: Reachability): Unit = { val record = r1.records.head r1.status(record.observer, record.subject) should be(record.status) } - def checkAggregatedStatus(r1: Reachability): Unit = { + private def checkAggregatedStatus(r1: Reachability): Unit = { val record = r1.records.head r1.status(record.subject) should be(record.status) } - def allUnreachableOrTerminated(r1: Reachability): Unit = { + private def allUnreachableOrTerminated(r1: Reachability): Unit = { val record = r1.records.head r1.allUnreachableOrTerminated.isEmpty should be(false) } - def allUnreachable(r1: Reachability): Unit = { + private def allUnreachable(r1: Reachability): Unit = { val record = r1.records.head r1.allUnreachable.isEmpty should be(false) } - def recordsFrom(r1: Reachability): Unit = { + private def recordsFrom(r1: Reachability): Unit = { r1.allObservers.foreach { o ⇒ r1.recordsFrom(o) should not be be(null) } diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index 6e81c91372..29d5864d4a 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -193,10 +193,11 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) case Event(ToServer(msg), d @ Data(Some(channel), None)) ⇒ channel.write(msg) val token = msg match { - case EnterBarrier(barrier, timeout) ⇒ barrier - case GetAddress(node) ⇒ node.name + case EnterBarrier(barrier, timeout) ⇒ Some(barrier -> sender()) + case GetAddress(node) ⇒ Some(node.name -> sender()) + case _ ⇒ None } - stay using d.copy(runningOp = Some(token -> sender())) + stay using d.copy(runningOp = token) case Event(ToServer(op), Data(channel, Some((token, _)))) ⇒ log.error("cannot write {} while waiting for {}", op, token) stay diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala index 2da78d76cc..03db880499 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala @@ -1,11 +1,9 @@ package akka.remote -import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor._ import akka.testkit._ -import akka.remote.AddressUidExtension import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.remote.testconductor.RoleName diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 6100c64f8d..adb2c117ab 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -586,16 +586,16 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D """).withFallback(config) val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo") otherSelection.tell("ping", probeSender) - probe.expectNoMsg(1 seconds) + probe.expectNoMsg(1.seconds) val otherSystem = ActorSystem("other-system", otherConfig) try { muteSystem(otherSystem) - probe.expectNoMsg(2 seconds) + probe.expectNoMsg(2.seconds) otherSystem.actorOf(Props[Echo2], "echo") - within(5 seconds) { + within(5.seconds) { awaitAssert { otherSelection.tell("ping", probeSender) - assert(probe.expectMsgType[(String, ActorRef)](500 millis)._1 == "pong") + assert(probe.expectMsgType[(String, ActorRef)](500.millis)._1 == "pong") } } } finally { @@ -624,18 +624,18 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D """).withFallback(config) val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo") otherSelection.tell("ping", thisSender) - thisProbe.expectNoMsg(1 seconds) + thisProbe.expectNoMsg(1.seconds) val otherSystem = ActorSystem("other-system", otherConfig) try { muteSystem(otherSystem) - thisProbe.expectNoMsg(2 seconds) + thisProbe.expectNoMsg(2.seconds) val otherProbe = new TestProbe(otherSystem) val otherSender = otherProbe.ref val thisSelection = otherSystem.actorSelection(s"akka.tcp://this-system@localhost:${port(thisSystem, "tcp")}/user/echo") - within(5 seconds) { + within(5.seconds) { awaitAssert { thisSelection.tell("ping", otherSender) - assert(otherProbe.expectMsgType[(String, ActorRef)](500 millis)._1 == "pong") + assert(otherProbe.expectMsgType[(String, ActorRef)](500.millis)._1 == "pong") } } } finally { diff --git a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala index 021281c183..50109d58f9 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala @@ -63,7 +63,7 @@ object SystemMessageDeliveryStressTest { } """) - class SystemMessageSequenceVerifier(system: ActorSystem, testActor: ActorRef) extends MinimalActorRef { + private[akka] class SystemMessageSequenceVerifier(system: ActorSystem, testActor: ActorRef) extends MinimalActorRef { val provider = RARP(system).provider val path = provider.tempPath()