fix warnings in remote and cluster
This commit is contained in:
parent
15ebe8f082
commit
82b8238a9c
10 changed files with 48 additions and 48 deletions
|
|
@ -163,7 +163,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
||||||
withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
|
withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case msg @ GetClusterCoreRef ⇒ coreSupervisor forward msg
|
case msg: GetClusterCoreRef.type ⇒ coreSupervisor forward msg
|
||||||
case AddOnMemberUpListener(code) ⇒
|
case AddOnMemberUpListener(code) ⇒
|
||||||
context.actorOf(Props(classOf[OnMemberUpListener], code).withDeploy(Deploy.local))
|
context.actorOf(Props(classOf[OnMemberUpListener], code).withDeploy(Deploy.local))
|
||||||
case PublisherCreated(publisher) ⇒
|
case PublisherCreated(publisher) ⇒
|
||||||
|
|
@ -659,6 +659,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
case Same ⇒ gossipStats.incrementSameCount
|
case Same ⇒ gossipStats.incrementSameCount
|
||||||
case Newer ⇒ gossipStats.incrementNewerCount
|
case Newer ⇒ gossipStats.incrementNewerCount
|
||||||
case Older ⇒ gossipStats.incrementOlderCount
|
case Older ⇒ gossipStats.incrementOlderCount
|
||||||
|
case Ignored ⇒ gossipStats // included in receivedGossipCount
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -246,21 +246,21 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
||||||
class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging {
|
class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging {
|
||||||
import settings.reportMetricsInterval
|
import settings.reportMetricsInterval
|
||||||
import settings.infolog
|
import settings.infolog
|
||||||
val cluster = Cluster(context.system)
|
private val cluster = Cluster(context.system)
|
||||||
var reportTo: Option[ActorRef] = None
|
private var reportTo: Option[ActorRef] = None
|
||||||
var results = Vector.empty[ClusterResult]
|
private var results = Vector.empty[ClusterResult]
|
||||||
var nodeMetrics = Set.empty[NodeMetrics]
|
private var nodeMetrics = Set.empty[NodeMetrics]
|
||||||
var phiValuesObservedByNode = {
|
private var phiValuesObservedByNode = {
|
||||||
import akka.cluster.Member.addressOrdering
|
import akka.cluster.Member.addressOrdering
|
||||||
immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]]
|
immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]]
|
||||||
}
|
}
|
||||||
var clusterStatsObservedByNode = {
|
private var clusterStatsObservedByNode = {
|
||||||
import akka.cluster.Member.addressOrdering
|
import akka.cluster.Member.addressOrdering
|
||||||
immutable.SortedMap.empty[Address, CurrentInternalStats]
|
immutable.SortedMap.empty[Address, CurrentInternalStats]
|
||||||
}
|
}
|
||||||
|
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
val reportMetricsTask = context.system.scheduler.schedule(
|
private val reportMetricsTask = context.system.scheduler.schedule(
|
||||||
reportMetricsInterval, reportMetricsInterval, self, ReportTick)
|
reportMetricsInterval, reportMetricsInterval, self, ReportTick)
|
||||||
|
|
||||||
// subscribe to ClusterMetricsChanged, re-subscribe when restart
|
// subscribe to ClusterMetricsChanged, re-subscribe when restart
|
||||||
|
|
@ -441,9 +441,9 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
class StatsObserver extends Actor {
|
class StatsObserver extends Actor {
|
||||||
val cluster = Cluster(context.system)
|
private val cluster = Cluster(context.system)
|
||||||
var reportTo: Option[ActorRef] = None
|
private var reportTo: Option[ActorRef] = None
|
||||||
var startStats: Option[GossipStats] = None
|
private var startStats: Option[GossipStats] = None
|
||||||
|
|
||||||
override def preStart(): Unit = cluster.subscribe(self, classOf[CurrentInternalStats])
|
override def preStart(): Unit = cluster.subscribe(self, classOf[CurrentInternalStats])
|
||||||
override def postStop(): Unit = cluster.unsubscribe(self)
|
override def postStop(): Unit = cluster.unsubscribe(self)
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig {
|
||||||
val third = role("third")
|
val third = role("third")
|
||||||
|
|
||||||
commonConfig(debugConfig(on = false).
|
commonConfig(debugConfig(on = false).
|
||||||
withFallback(ConfigFactory.parseString("""
|
withFallback(ConfigFactory.parseString(s"""
|
||||||
common-router-settings = {
|
common-router-settings = {
|
||||||
router = consistent-hashing-pool
|
router = consistent-hashing-pool
|
||||||
nr-of-instances = 10
|
nr-of-instances = 10
|
||||||
|
|
@ -48,9 +48,9 @@ object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
akka.actor.deployment {
|
akka.actor.deployment {
|
||||||
/router1 = ${common-router-settings}
|
/router1 = $${common-router-settings}
|
||||||
/router3 = ${common-router-settings}
|
/router3 = $${common-router-settings}
|
||||||
/router4 = ${common-router-settings}
|
/router4 = $${common-router-settings}
|
||||||
}
|
}
|
||||||
""")).
|
""")).
|
||||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
|
|
|
||||||
|
|
@ -49,14 +49,14 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
|
||||||
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4)
|
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4)
|
||||||
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5)
|
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5)
|
||||||
|
|
||||||
def emptyState: ClusterHeartbeatSenderState = emptyState(aa)
|
private def emptyState: ClusterHeartbeatSenderState = emptyState(aa)
|
||||||
|
|
||||||
def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState(
|
private def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState(
|
||||||
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, monitoredByNrOfMembers = 3),
|
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, monitoredByNrOfMembers = 3),
|
||||||
oldReceiversNowUnreachable = Set.empty[UniqueAddress],
|
oldReceiversNowUnreachable = Set.empty[UniqueAddress],
|
||||||
failureDetector = new DefaultFailureDetectorRegistry[Address](() ⇒ new FailureDetectorStub))
|
failureDetector = new DefaultFailureDetectorRegistry[Address](() ⇒ new FailureDetectorStub))
|
||||||
|
|
||||||
def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub =
|
private def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub =
|
||||||
state.failureDetector.asInstanceOf[DefaultFailureDetectorRegistry[Address]].failureDetector(node.address).
|
state.failureDetector.asInstanceOf[DefaultFailureDetectorRegistry[Address]].failureDetector(node.address).
|
||||||
get.asInstanceOf[FailureDetectorStub]
|
get.asInstanceOf[FailureDetectorStub]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,10 +23,10 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers {
|
||||||
|
|
||||||
val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize)
|
val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize)
|
||||||
|
|
||||||
def checkThunkForRing(ring: HeartbeatNodeRing, thunk: HeartbeatNodeRing ⇒ Unit, times: Int): Unit =
|
private def checkThunkForRing(ring: HeartbeatNodeRing, thunk: HeartbeatNodeRing ⇒ Unit, times: Int): Unit =
|
||||||
for (i ← 1 to times) thunk(ring)
|
for (i ← 1 to times) thunk(ring)
|
||||||
|
|
||||||
def myReceivers(ring: HeartbeatNodeRing): Unit = {
|
private def myReceivers(ring: HeartbeatNodeRing): Unit = {
|
||||||
val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, Set.empty, ring.monitoredByNrOfMembers)
|
val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, Set.empty, ring.monitoredByNrOfMembers)
|
||||||
r.myReceivers.isEmpty should be(false)
|
r.myReceivers.isEmpty should be(false)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ class ReachabilityPerfSpec extends WordSpec with Matchers {
|
||||||
val address = Address("akka.tcp", "sys", "a", 2552)
|
val address = Address("akka.tcp", "sys", "a", 2552)
|
||||||
val node = Address("akka.tcp", "sys", "a", 2552)
|
val node = Address("akka.tcp", "sys", "a", 2552)
|
||||||
|
|
||||||
def createReachabilityOfSize(base: Reachability, size: Int): Reachability =
|
private def createReachabilityOfSize(base: Reachability, size: Int): Reachability =
|
||||||
(base /: (1 to size)) {
|
(base /: (1 to size)) {
|
||||||
case (r, i) ⇒
|
case (r, i) ⇒
|
||||||
val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i)
|
val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i)
|
||||||
|
|
@ -25,7 +25,7 @@ class ReachabilityPerfSpec extends WordSpec with Matchers {
|
||||||
r.unreachable(observer, subject).reachable(observer, subject)
|
r.unreachable(observer, subject).reachable(observer, subject)
|
||||||
}
|
}
|
||||||
|
|
||||||
def addUnreachable(base: Reachability, count: Int): Reachability = {
|
private def addUnreachable(base: Reachability, count: Int): Reachability = {
|
||||||
val observers = base.allObservers.take(count)
|
val observers = base.allObservers.take(count)
|
||||||
val subjects = Stream.continually(base.allObservers).flatten.iterator
|
val subjects = Stream.continually(base.allObservers).flatten.iterator
|
||||||
(base /: observers) {
|
(base /: observers) {
|
||||||
|
|
@ -39,43 +39,43 @@ class ReachabilityPerfSpec extends WordSpec with Matchers {
|
||||||
val reachability3 = addUnreachable(reachability1, nodesSize / 2)
|
val reachability3 = addUnreachable(reachability1, nodesSize / 2)
|
||||||
val allowed = reachability1.allObservers
|
val allowed = reachability1.allObservers
|
||||||
|
|
||||||
def checkThunkFor(r1: Reachability, r2: Reachability, thunk: (Reachability, Reachability) ⇒ Unit, times: Int): Unit = {
|
private def checkThunkFor(r1: Reachability, r2: Reachability, thunk: (Reachability, Reachability) ⇒ Unit, times: Int): Unit = {
|
||||||
for (i ← 1 to times) {
|
for (i ← 1 to times) {
|
||||||
thunk(Reachability(r1.records, r1.versions), Reachability(r2.records, r2.versions))
|
thunk(Reachability(r1.records, r1.versions), Reachability(r2.records, r2.versions))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def checkThunkFor(r1: Reachability, thunk: Reachability ⇒ Unit, times: Int): Unit = {
|
private def checkThunkFor(r1: Reachability, thunk: Reachability ⇒ Unit, times: Int): Unit = {
|
||||||
for (i ← 1 to times) {
|
for (i ← 1 to times) {
|
||||||
thunk(Reachability(r1.records, r1.versions))
|
thunk(Reachability(r1.records, r1.versions))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def merge(expectedRecords: Int)(r1: Reachability, r2: Reachability): Unit = {
|
private def merge(expectedRecords: Int)(r1: Reachability, r2: Reachability): Unit = {
|
||||||
r1.merge(allowed, r2).records.size should be(expectedRecords)
|
r1.merge(allowed, r2).records.size should be(expectedRecords)
|
||||||
}
|
}
|
||||||
|
|
||||||
def checkStatus(r1: Reachability): Unit = {
|
private def checkStatus(r1: Reachability): Unit = {
|
||||||
val record = r1.records.head
|
val record = r1.records.head
|
||||||
r1.status(record.observer, record.subject) should be(record.status)
|
r1.status(record.observer, record.subject) should be(record.status)
|
||||||
}
|
}
|
||||||
|
|
||||||
def checkAggregatedStatus(r1: Reachability): Unit = {
|
private def checkAggregatedStatus(r1: Reachability): Unit = {
|
||||||
val record = r1.records.head
|
val record = r1.records.head
|
||||||
r1.status(record.subject) should be(record.status)
|
r1.status(record.subject) should be(record.status)
|
||||||
}
|
}
|
||||||
|
|
||||||
def allUnreachableOrTerminated(r1: Reachability): Unit = {
|
private def allUnreachableOrTerminated(r1: Reachability): Unit = {
|
||||||
val record = r1.records.head
|
val record = r1.records.head
|
||||||
r1.allUnreachableOrTerminated.isEmpty should be(false)
|
r1.allUnreachableOrTerminated.isEmpty should be(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
def allUnreachable(r1: Reachability): Unit = {
|
private def allUnreachable(r1: Reachability): Unit = {
|
||||||
val record = r1.records.head
|
val record = r1.records.head
|
||||||
r1.allUnreachable.isEmpty should be(false)
|
r1.allUnreachable.isEmpty should be(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
def recordsFrom(r1: Reachability): Unit = {
|
private def recordsFrom(r1: Reachability): Unit = {
|
||||||
r1.allObservers.foreach { o ⇒
|
r1.allObservers.foreach { o ⇒
|
||||||
r1.recordsFrom(o) should not be be(null)
|
r1.recordsFrom(o) should not be be(null)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -193,10 +193,11 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
||||||
case Event(ToServer(msg), d @ Data(Some(channel), None)) ⇒
|
case Event(ToServer(msg), d @ Data(Some(channel), None)) ⇒
|
||||||
channel.write(msg)
|
channel.write(msg)
|
||||||
val token = msg match {
|
val token = msg match {
|
||||||
case EnterBarrier(barrier, timeout) ⇒ barrier
|
case EnterBarrier(barrier, timeout) ⇒ Some(barrier -> sender())
|
||||||
case GetAddress(node) ⇒ node.name
|
case GetAddress(node) ⇒ Some(node.name -> sender())
|
||||||
|
case _ ⇒ None
|
||||||
}
|
}
|
||||||
stay using d.copy(runningOp = Some(token -> sender()))
|
stay using d.copy(runningOp = token)
|
||||||
case Event(ToServer(op), Data(channel, Some((token, _)))) ⇒
|
case Event(ToServer(op), Data(channel, Some((token, _)))) ⇒
|
||||||
log.error("cannot write {} while waiting for {}", op, token)
|
log.error("cannot write {} while waiting for {}", op, token)
|
||||||
stay
|
stay
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,9 @@
|
||||||
package akka.remote
|
package akka.remote
|
||||||
|
|
||||||
import language.postfixOps
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import akka.remote.AddressUidExtension
|
|
||||||
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
|
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
|
||||||
import akka.remote.testconductor.RoleName
|
import akka.remote.testconductor.RoleName
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -586,16 +586,16 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
""").withFallback(config)
|
""").withFallback(config)
|
||||||
val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo")
|
val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo")
|
||||||
otherSelection.tell("ping", probeSender)
|
otherSelection.tell("ping", probeSender)
|
||||||
probe.expectNoMsg(1 seconds)
|
probe.expectNoMsg(1.seconds)
|
||||||
val otherSystem = ActorSystem("other-system", otherConfig)
|
val otherSystem = ActorSystem("other-system", otherConfig)
|
||||||
try {
|
try {
|
||||||
muteSystem(otherSystem)
|
muteSystem(otherSystem)
|
||||||
probe.expectNoMsg(2 seconds)
|
probe.expectNoMsg(2.seconds)
|
||||||
otherSystem.actorOf(Props[Echo2], "echo")
|
otherSystem.actorOf(Props[Echo2], "echo")
|
||||||
within(5 seconds) {
|
within(5.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
otherSelection.tell("ping", probeSender)
|
otherSelection.tell("ping", probeSender)
|
||||||
assert(probe.expectMsgType[(String, ActorRef)](500 millis)._1 == "pong")
|
assert(probe.expectMsgType[(String, ActorRef)](500.millis)._1 == "pong")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
@ -624,18 +624,18 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
""").withFallback(config)
|
""").withFallback(config)
|
||||||
val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo")
|
val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo")
|
||||||
otherSelection.tell("ping", thisSender)
|
otherSelection.tell("ping", thisSender)
|
||||||
thisProbe.expectNoMsg(1 seconds)
|
thisProbe.expectNoMsg(1.seconds)
|
||||||
val otherSystem = ActorSystem("other-system", otherConfig)
|
val otherSystem = ActorSystem("other-system", otherConfig)
|
||||||
try {
|
try {
|
||||||
muteSystem(otherSystem)
|
muteSystem(otherSystem)
|
||||||
thisProbe.expectNoMsg(2 seconds)
|
thisProbe.expectNoMsg(2.seconds)
|
||||||
val otherProbe = new TestProbe(otherSystem)
|
val otherProbe = new TestProbe(otherSystem)
|
||||||
val otherSender = otherProbe.ref
|
val otherSender = otherProbe.ref
|
||||||
val thisSelection = otherSystem.actorSelection(s"akka.tcp://this-system@localhost:${port(thisSystem, "tcp")}/user/echo")
|
val thisSelection = otherSystem.actorSelection(s"akka.tcp://this-system@localhost:${port(thisSystem, "tcp")}/user/echo")
|
||||||
within(5 seconds) {
|
within(5.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
thisSelection.tell("ping", otherSender)
|
thisSelection.tell("ping", otherSender)
|
||||||
assert(otherProbe.expectMsgType[(String, ActorRef)](500 millis)._1 == "pong")
|
assert(otherProbe.expectMsgType[(String, ActorRef)](500.millis)._1 == "pong")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ object SystemMessageDeliveryStressTest {
|
||||||
}
|
}
|
||||||
""")
|
""")
|
||||||
|
|
||||||
class SystemMessageSequenceVerifier(system: ActorSystem, testActor: ActorRef) extends MinimalActorRef {
|
private[akka] class SystemMessageSequenceVerifier(system: ActorSystem, testActor: ActorRef) extends MinimalActorRef {
|
||||||
val provider = RARP(system).provider
|
val provider = RARP(system).provider
|
||||||
val path = provider.tempPath()
|
val path = provider.tempPath()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue